How to create ML pipelines in ZenML
ZenML helps you standardize your ML workflows as ML Pipelines consisting of decoupled, modular Steps. This enables you to write portable code that can be moved from experimentation to production in seconds.
Steps are the atomic components of a ZenML pipeline. Each step is defined by its inputs, the logic it applies and its outputs. Here is a very basic example of such a step, which uses a utility function to load the Digits dataset:
import numpy as np
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from zenml.steps import Output, step
def digits_data_loader() -> Output(
X_train=np.ndarray, X_test=np.ndarray, y_train=np.ndarray, y_test=np.ndarray
"""Loads the digits dataset as a tuple of flattened numpy arrays."""
digits = load_digits()
data = digits.images.reshape((len(digits.images), -1))
X_train, X_test, y_train, y_test = train_test_split(
data, digits.target, test_size=0.2, shuffle=False
return X_train, X_test, y_train, y_test
As this step has multiple outputs, we need to use the
zenml.steps.step_output.Outputclass to indicate the names of each output. These names can be used to directly access the outputs of steps after running a pipeline, as we will see in a later chapter.
Let's come up with a second step that consumes the output of our first step and performs some sort of transformation on it. In this case, let's train a support vector machine classifier on the training data using sklearn:
import numpy as np
from sklearn.base import ClassifierMixin
from sklearn.svm import SVC
from zenml.steps import step
) -> ClassifierMixin:
"""Train a sklearn SVC classifier."""
model = SVC(gamma=0.001)
Next, we will combine our two steps into our first ML pipeline.
Let us now define our first ML pipeline. This is agnostic of the implementation and can be done by routing outputs through the steps within the pipeline. You can think of this as a recipe for how we want data to flow through our steps.
from zenml.pipelines import pipeline
def first_pipeline(step_1, step_2):
X_train, X_test, y_train, y_test = step_1()
With your pipeline recipe in hand you can now specify which concrete step implementations to use when instantiating the pipeline:
first_pipeline_instance = first_pipeline(
You can then execute your pipeline instance with the
You should see the following output in your terminal:
Registered new pipeline with name `first_pipeline`.
Creating run `first_pipeline-03_Oct_22-14_08_44_284312` for pipeline `first_pipeline` (Caching enabled)
Using stack `default` to run pipeline `first_pipeline`...
Step `digits_data_loader` has started.
Step `digits_data_loader` has finished in 0.121s.
Step `svc_trainer` has started.
Step `svc_trainer` has finished in 0.099s.
Pipeline run `first_pipeline-03_Oct_22-14_08_44_284312` has finished in 0.236s.
Pipeline visualization can be seen in the ZenML Dashboard. Run `zenml up` to see your pipeline!
When running a pipeline by calling
my_pipeline.run(), ZenML uses the current date and time as the name for the pipeline run. In order to change the name for a run, pass
run_nameas a parameter to the
Once a pipeline has been executed, it is represented by a
PipelineSpecthat uniquely identifies it. Therefore, you cannot edit a pipeline after it has been run once. In order to iterate quickly pipelines, there are three options:
- Pipeline runs can be created without being associated with a pipeline explicitly. These are called
unlistedruns and can be created by passing the
unlistedparameter when running a pipeline:
- Pipelines can be deleted and created again using
zenml pipeline delete <PIPELINE_ID_OR_NAME>.