Pipeline Reference (v0)¶
This page documents the Pipeline model that connects a Source, zero or more Transforms, and one or more Sinks.
Index¶
Pipeline
Define and execute a linear WowData workflow.
Signature
Pipeline(start, steps=[])
Examples
from wowdata import Pipeline, Sink, Source, Transform
pipe = (
Pipeline(Source("people.csv"))
.then(Transform("cast", params={"types": {"age": "integer"}, "on_error": "null"}))
.then(Transform("filter", params={"where": "age >= 18"}))
.then(Sink("adults.csv"))
)
wowdata: 0
pipeline:
start:
uri: people.csv
type: csv
steps:
- transform:
op: cast
params:
types:
age: integer
on_error: "null"
- transform:
op: filter
params:
where: "age >= 18"
- sink:
uri: adults.csv
type: csv
Arguments
start
Required.
The pipeline source. This must be a Source object in Python, or a valid start: descriptor in YAML.
steps
Optional. Default: [].
A list of Transform and Sink steps.
In normal Python usage, you will usually build this list with .then(...) rather than passing steps directly.
Pipeline Shape
v0 pipelines are linear only:
- one
Source - zero or more
Transforms - one or more
Sinks at the end
Ordering rule:
- transforms cannot appear after a sink
If you violate this ordering, WowData raises E_PIPELINE_ORDER.
Common Methods
then(step)
Add a Transform or Sink and return a new pipeline value.
Example:
pipe = Pipeline(Source("people.csv")).then(Transform("select", params={"columns": ["age"]}))
preflight()
Validate sources, sinks, and pipeline ordering before execution.
This is called automatically by run().
run()
Execute the pipeline end to end and return a PipelineContext.
During execution:
- source schema is initialized from
start.peek_schema() - each transform updates the running table and best-effort schema
- each sink writes the current table
- checkpoints are recorded along the way
schema(sample_rows=200, force=False)
Infer the pipeline's output schema without running the full pipeline.
This uses:
Source.peek_schema(...)- each transform's
output_schema(...)
to_ir(), from_ir(...)
Serialize to and from WowData's internal YAML-friendly dict format.
to_yaml(...), from_yaml(...)
Serialize to and from YAML text or YAML files.
save_yaml(...), load_yaml(...)
Convenience wrappers for writing or reading pipeline YAML files.
lock_schema(sample_rows=200, force=False)
Return a new pipeline where each transform carries an explicit output_schema.
This is useful when you want schema-aware validation to remain stable after serialization, especially around joins and other transforms that add or reshape columns.
YAML Notes
from_yaml(...) accepts either:
- a file path
- raw YAML text
Relative paths inside YAML are normalized relative to the YAML file location when loaded from disk.
When To Use It
Use Pipeline whenever you want to:
- compose sources, transforms, and sinks in Python
- serialize a pipeline to YAML
- load and run a pipeline from YAML
- inspect execution checkpoints and validations after a run
See also
PipelineContext
Pipeline.run() returns a PipelineContext.
Main fields:
checkpoints: step-by-step execution artifacts, including transform previews and sink recordsschema: best-effort running schema after the last executed transformvalidations: summaries produced by thevalidatetransform
This makes the pipeline runtime more inspectable, especially for teaching, debugging, and CLI output.