How to create ML pipelines in ZenML
ZenML helps you standardize your ML workflows as ML Pipelines consisting of decoupled, modular Steps. This enables you to write portable code that can be moved from experimentation to production in seconds.
If you are new to MLOps and would like to learn more about ML pipelines in general, checkout ZenBytes, our lesson series on practical MLOps, where we introduce ML pipelines in more detail in ZenBytes lesson 1.1.


Steps are the atomic components of a ZenML pipeline. Each step is defined by its inputs, the logic it applies and its outputs. Here is a very basic example of such a step, which uses a utility function to load the Digits dataset:
import numpy as np
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from zenml.steps import Output, step
def digits_data_loader() -> Output(
X_train=np.ndarray, X_test=np.ndarray, y_train=np.ndarray, y_test=np.ndarray
"""Loads the digits dataset as a tuple of flattened numpy arrays."""
digits = load_digits()
data = digits.images.reshape((len(digits.images), -1))
X_train, X_test, y_train, y_test = train_test_split(
data, digits.target, test_size=0.2, shuffle=False
return X_train, X_test, y_train, y_test
As this step has multiple outputs, we need to use the zenml.steps.step_output.Output class to indicate the names of each output. These names can be used to directly access the outputs of steps after running a pipeline, as we will see in a later chapter.
Let's come up with a second step that consumes the output of our first step and performs some sort of transformation on it. In this case, let's train a support vector machine classifier on the training data using sklearn:
import numpy as np
from sklearn.base import ClassifierMixin
from sklearn.svm import SVC
from zenml.steps import step
def svc_trainer(
X_train: np.ndarray,
y_train: np.ndarray,
) -> ClassifierMixin:
"""Train a sklearn SVC classifier."""
model = SVC(gamma=0.001)
model.fit(X_train, y_train)
return model
Next, we will combine our two steps into our first ML pipeline.
In case you want to run the step function outside the context of a ZenML pipeline, all you need to do is call the .entrypoint() method with the same input signature. For example:
svc_trainer.entrypoint(X_train=..., y_train=...)
Using the Class-based API


Let us now define our first ML pipeline. This is agnostic of the implementation and can be done by routing outputs through the steps within the pipeline. You can think of this as a recipe for how we want data to flow through our steps.
from zenml.pipelines import pipeline
def first_pipeline(step_1, step_2):
X_train, X_test, y_train, y_test = step_1()
step_2(X_train, y_train)
Using the Class-based API

Instantiate and run your Pipeline

With your pipeline recipe in hand you can now specify which concrete step implementations to use when instantiating the pipeline:
first_pipeline_instance = first_pipeline(
You can then execute your pipeline instance with the .run() method:
You should see the following output in your terminal:
Registered new pipeline with name `first_pipeline`.
Creating run `first_pipeline-03_Oct_22-14_08_44_284312` for pipeline `first_pipeline` (Caching enabled)
Using stack `default` to run pipeline `first_pipeline`...
Step `digits_data_loader` has started.
Step `digits_data_loader` has finished in 0.121s.
Step `svc_trainer` has started.
Step `svc_trainer` has finished in 0.099s.
Pipeline run `first_pipeline-03_Oct_22-14_08_44_284312` has finished in 0.236s.
Pipeline visualization can be seen in the ZenML Dashboard. Run `zenml up` to see your pipeline!
We will dive deeper into how to inspect the finished run within the chapter on Accessing Pipeline Runs.

Inspect your pipeline in the dashboard

Notice the last log, that indicates running a command to view the dashboard. Check out the dashboard guide in the next section to inspect your pipeline there.

Give each pipeline run a name

When running a pipeline by calling my_pipeline.run(), ZenML uses the current date and time as the name for the pipeline run. In order to change the name for a run, pass run_name as a parameter to the run() function:
Pipeline run names must be unique, so make sure to compute it dynamically if you plan to run your pipeline multiple times.

Unlisted runs

Once a pipeline has been executed, it is represented by a PipelineSpec that uniquely identifies it. Therefore, you cannot edit a pipeline after it has been run once. In order to iterate quickly pipelines, there are three options:
  • Pipeline runs can be created without being associated with a pipeline explicitly. These are called unlisted runs and can be created by passing the unlisted parameter when running a pipeline: pipeline_instance.run(unlisted=True).
  • Pipelines can be deleted and created again using zenml pipeline delete <PIPELINE_ID_OR_NAME>.
  • Pipelines can be given unique names each time they are run to uniquely identify them.
We will dive into quickly iterating over pipelines later in this section.

Code Summary

Code Example for this Section