Steps & Pipelines
Create Steps, Build a Pipeline and Run it.

Step

Steps are the atomic components of a ZenML pipeline. Each step is defined by its inputs, the logic it applies and its outputs. Here is a very simple example of such a step:
1
from zenml.steps import step, Output
2
3
4
@step
5
def my_first_step() -> Output(output_int=int, output_float=float):
6
"""Step that returns a pre-defined integer and float"""
7
return 7, 0.1
Copied!
As this step has multiple outputs, we need to use the zenml.steps.step_output.Output class to indicate the names of each output. These names can be used to directly access an output within the post execution workflow.
Let's come up with a second step that consumes the output of our first step and performs some sort of transformation on it. In this case, let's double the input.
1
from zenml.steps import step, Output
2
3
4
@step
5
def my_second_step(input_int: int, input_float: float
6
) -> Output(output_int=int, output_float=float):
7
"""Step that doubles the inputs"""
8
return 2 * input_int, 2 * input_float
Copied!
Now we can go ahead and create a pipeline with our two steps to make sure they work.
In case you want to run the step function outside the context of a ZenML pipeline, all you need to do is call the .entrypoint() method with the same input signature. For example:
1
my_second_step.entrypoint(input_int=1, input_float=0.9)
Copied!

Pipeline

Here we define the pipeline. This is done agnostic of implementation by simply routing outputs through the steps within the pipeline. You can think of this as a recipe for how we want data to flow through our steps.
1
from zenml.pipelines import pipeline
2
3
4
@pipeline
5
def first_pipeline(
6
step_1,
7
step_2
8
):
9
output_1, output_2 = step_1()
10
step_2(output_1, output_2)
Copied!

Instantiate and run your Pipeline

With your pipeline recipe in hand you can now specify which concrete step implementations are used. And with that, you are ready to run:
1
first_pipeline(step_1=my_first_step(), step_2=my_second_step()).run()
Copied!
You should see the following output on your command line:
1
Creating run for pipeline: `first_pipeline`
2
Cache disabled for pipeline `first_pipeline`
3
Using stack `default` to run pipeline `first_pipeline`
4
Step `my_first_step` has started.
5
Step `my_first_step` has finished in 0.049s.
6
Step `my_second_step` has started.
7
Step `my_second_step` has finished in 0.067s.
8
Pipeline run `first_pipeline-20_Apr_22-16_07_14_577771` has finished in 0.128s.
Copied!
You'll learn how to inspect the finished run within the chapter on our Post Execution Workflow.

Summary in Code

Code Example for this Section

Give each pipeline run a name

When running a pipeline by calling my_pipeline.run(), ZenML uses the current date and time as the name for the pipeline run. In order to change the name for a run, simply pass it as a parameter to the run() function:
1
first_pipeline_instance.run(run_name="custom_pipeline_run_name")
Copied!
Pipeline run names must be unique, so make sure to compute it dynamically if you plan to run your pipeline multiple times.
Once the pipeline run is finished we can easily access this specific run during our post-execution workflow:
1
from zenml.repository import Repository
2
3
repo = Repository()
4
pipeline = repo.get_pipeline(pipeline_name="first_pipeline")
5
run = pipeline.get_run("custom_pipeline_run_name")
Copied!

Summary in Code

Code Example for this Section