Chapter 1
Create your first step.
If you want to see the code for this chapter of the guide, head over to the GitHub.

Create an importer step to load data

The first thing to do is to load our data. We create a step that can load data from an external source (in this case a Keras Dataset). This can be done by creating a simple function and decorating it with the @step decorator.

Create steps

1
import numpy as np
2
import tensorflow as tf
3
from zenml.steps import step
4
from zenml.steps.step_output import Output
5
6
@step
7
def importer_mnist() -> Output(
8
X_train=np.ndarray, y_train=np.ndarray, X_test=np.ndarray, y_test=np.ndarray
9
):
10
"""Download the MNIST data and store it as an artifact"""
11
(X_train, y_train), (
12
X_test,
13
y_test,
14
) = tf.keras.datasets.mnist.load_data()
15
return X_train, y_train, X_test, y_test
Copied!
There are some things to note:
  • As this step has multiple outputs, we need to use the zenml.steps.step_output.Output class to indicate the names of each output. If there was only one, we would not need to do this.
  • We could have returned the tf.keras.datasets.mnist directly but we wanted to persist the actual data (for caching purposes), rather than the dataset object.
Now we can go ahead and create a pipeline with one step to make sure this step works:
1
from zenml.pipelines import pipeline
2
3
@pipeline
4
def load_mnist_pipeline(
5
importer,
6
):
7
"""The simplest possible pipeline"""
8
# We just need to call the function
9
importer()
10
11
# run the pipeline
12
load_mnist_pipeline(importer=importer_mnist()).run()
Copied!

Run

You can run this as follows:
1
python chapter_1.py
Copied!
The output will look as follows (note: this is filtered to highlight the most important logs)
1
Creating pipeline: load_mnist_pipeline
2
Cache enabled for pipeline `load_mnist_pipeline`
3
Using orchestrator `local_orchestrator` for pipeline `load_mnist_pipeline`. Running pipeline..
4
Step `importer_mnist` has started.
5
Step `importer_mnist` has finished in 1.726s.
Copied!

Inspect

You can add the following code to fetch the pipeline:
1
from zenml.core.repo import Repository
2
3
repo = Repository()
4
p = repo.get_pipeline(pipeline_name="load_mnist_pipeline")
5
runs = p.runs
6
print(f"Pipeline `load_mnist_pipeline` has {len(runs)} run(s)")
7
run = runs[-1]
8
print(f"The run you just made has {len(run.steps)} step(s).")
9
step = run.get_step('importer')
10
print(f"That step has {len(step.outputs)} output artifacts.")
11
for k, o in step.outputs.items():
12
arr = o.read()
13
print(f"Output '{k}' is an array with shape: {arr.shape}")
Copied!
You will get the following output:
1
Pipeline `load_mnist_pipeline` has 1 run(s).
2
The run you just made has 1 step(s).
3
That step has 4 output artifacts.
4
Output 'X_test' is an array with shape: (10000, 28, 28)
5
Output 'y_test' is an array with shape: (10000,)
6
Output 'y_train' is an array with shape: (60000,)
7
Output 'X_train' is an array with shape: (60000, 28, 28)
Copied!
So now we have successfully confirmed that the data is loaded with the right shape and we can fetch it again from the artifact store.
Last modified 5d ago