Pipeline & YAML config¶
The step runner behind Curator and the CLI, and the pydantic models that validate
YAML pipelines.
curatorkit.pipeline ¶
Pipeline orchestration — synchronous and async runners.
run() — synchronous execution: readers, gates, normalizers, exporters. run_async() — async runner for generation tasks and LLM gates with concurrency. Generation tasks (BaseGenerationTask subclasses) are handled specially: their rejected samples are collected, and async execution is supported.
The sync runner remains under 100 lines. Async adds ~50 lines.
PipelineResult
dataclass
¶
PipelineResult(passed: list[DataSample], rejected: list[RejectedSample], stage_counts: dict[str, dict[str, int]] = dict(), wall_clock_seconds: float = 0.0, diagnostics: object = None)
Everything a Pipeline.run() / run_async() call produced.
passed — DataSample objects that cleared every step. rejected — RejectedSample objects from all stages (readers, gates, generation tasks), each with a structured rejection reason. stage_counts — per-step counters keyed by step display name, e.g. {"input_count", "output_count", "rejected_count", "probe_recovered", "exported_count"} depending on the step kind. wall_clock_seconds — total run time. diagnostics — PipelineDiagnostics accumulator when the diagnostic probe is enabled, else None.
Pipeline ¶
Pipeline runner supporting both synchronous and async execution.
Steps are executed in order
Readers → produce (samples, reader_rejections) Gates → filter samples into (passed, gate_rejections) Normalizers → transform samples (includes generation tasks) Exporters → write samples to disk
Generation tasks (BaseGenerationTask subclasses) are normalizers that call LLMs. In sync mode they run sequentially. In async mode they use concurrency-controlled parallel LLM calls.
All rejections from every stage flow into PipelineResult.rejected.
dry_run ¶
Print and return the planned execution order without running anything.
Each entry is {step_num, name, kind, config_hash} so callers can compare plans across runs. config_hash is the step's own _config_hash() if it defines one, else the empty string.
run_async
async
¶
Async pipeline execution.
Steps with a run_async() method (generation tasks and LLM gates) use native async with semaphore-bounded concurrency. All other steps (readers, non-LLM normalizers, exporters) run synchronously.
curatorkit.config.PipelineConfig ¶
Bases: BaseModel
Root model for a pipeline YAML file.
Maps the top-level YAML keys to typed sub-configs: readers, gates,
normalizers, exporters (lists, run in order within each stage),
plus the optional llm: block (global LLM settings), generators:
(LLM generation tasks), and diagnostic: (failure-mode probe attached
to the hallucination gate). max_samples caps the total sample count
after reading, and output_split shuffles accepted samples into split
subdirectories (fractions must sum to 1.0). Unrecognised settings can
be stashed in extra.
Build one with PipelineConfig.from_yaml(path) or directly via
keyword arguments.
from_yaml
classmethod
¶
Load and validate a pipeline YAML file.
Parses the file with yaml.safe_load and validates it eagerly
against this model, so a malformed config raises
pydantic.ValidationError before any pipeline step runs.