Skip to content

Pipeline & YAML config

The step runner behind Curator and the CLI, and the pydantic models that validate YAML pipelines.

curatorkit.pipeline

Pipeline orchestration — synchronous and async runners.

run() — synchronous execution: readers, gates, normalizers, exporters. run_async() — async runner for generation tasks and LLM gates with concurrency. Generation tasks (BaseGenerationTask subclasses) are handled specially: their rejected samples are collected, and async execution is supported.

The sync runner remains under 100 lines. Async adds ~50 lines.

PipelineResult dataclass

PipelineResult(passed: list[DataSample], rejected: list[RejectedSample], stage_counts: dict[str, dict[str, int]] = dict(), wall_clock_seconds: float = 0.0, diagnostics: object = None)

Everything a Pipeline.run() / run_async() call produced.

passed — DataSample objects that cleared every step. rejected — RejectedSample objects from all stages (readers, gates, generation tasks), each with a structured rejection reason. stage_counts — per-step counters keyed by step display name, e.g. {"input_count", "output_count", "rejected_count", "probe_recovered", "exported_count"} depending on the step kind. wall_clock_seconds — total run time. diagnostics — PipelineDiagnostics accumulator when the diagnostic probe is enabled, else None.

Pipeline

Pipeline(steps: list[Step], output_dir: Path | None = None, diagnostics: object = None)

Pipeline runner supporting both synchronous and async execution.

Steps are executed in order

Readers → produce (samples, reader_rejections) Gates → filter samples into (passed, gate_rejections) Normalizers → transform samples (includes generation tasks) Exporters → write samples to disk

Generation tasks (BaseGenerationTask subclasses) are normalizers that call LLMs. In sync mode they run sequentially. In async mode they use concurrency-controlled parallel LLM calls.

All rejections from every stage flow into PipelineResult.rejected.

dry_run

dry_run() -> list[dict[str, str]]

Print and return the planned execution order without running anything.

Each entry is {step_num, name, kind, config_hash} so callers can compare plans across runs. config_hash is the step's own _config_hash() if it defines one, else the empty string.

run

run() -> PipelineResult

Synchronous pipeline execution.

run_async async

run_async() -> PipelineResult

Async pipeline execution.

Steps with a run_async() method (generation tasks and LLM gates) use native async with semaphore-bounded concurrency. All other steps (readers, non-LLM normalizers, exporters) run synchronously.

curatorkit.config.PipelineConfig

Bases: BaseModel

Root model for a pipeline YAML file.

Maps the top-level YAML keys to typed sub-configs: readers, gates, normalizers, exporters (lists, run in order within each stage), plus the optional llm: block (global LLM settings), generators: (LLM generation tasks), and diagnostic: (failure-mode probe attached to the hallucination gate). max_samples caps the total sample count after reading, and output_split shuffles accepted samples into split subdirectories (fractions must sum to 1.0). Unrecognised settings can be stashed in extra.

Build one with PipelineConfig.from_yaml(path) or directly via keyword arguments.

from_yaml classmethod

from_yaml(path: Path) -> PipelineConfig

Load and validate a pipeline YAML file.

Parses the file with yaml.safe_load and validates it eagerly against this model, so a malformed config raises pydantic.ValidationError before any pipeline step runs.