Chapter 4
Leverage caching.
If you want to see the code for this chapter of the guide, head over to the GitHub.

Swap out implementations of individual steps and see caching in action

What if we don't want to use TensorFlow but rather a scikit-learn model? This is easy to do.

Create steps

We add two more steps, a scikit-learn version of the trainer and evaluator step.

Trainer

1
import numpy as np
2
from sklearn.base import ClassifierMixin
3
from sklearn.linear_model import LogisticRegression
4
from zenml.steps import step
5
6
@step
7
def sklearn_trainer(
8
config: TrainerConfig,
9
X_train: np.ndarray,
10
y_train: np.ndarray,
11
) -> ClassifierMixin:
12
"""Train SVC from sklearn."""
13
clf = LogisticRegression(penalty="l1", solver="saga", tol=0.1)
14
clf.fit(X_train.reshape((X_train.shape[0], -1)), y_train)
15
return clf
Copied!
A simple enough step using a sklearn ClassifierMixin model. ZenML also knows how to store all primitive sklearn model types.

Evaluator

We also add a simple evaluator:
1
@step
2
def sklearn_evaluator(
3
X_test: np.ndarray,
4
y_test: np.ndarray,
5
model: ClassifierMixin,
6
) -> float:
7
"""Calculate accuracy score with classifier."""
8
9
test_acc = model.score(X_test.reshape((X_test.shape[0], -1)), y_test)
10
return test_acc
Copied!

Pipeline

And now the cool bit: We don't need to change the pipeline at all. We just need to change the concrete functions:
1
# Run the pipeline
2
mnist_pipeline(
3
importer=importer_mnist(),
4
normalizer=normalize_mnist(),
5
trainer=sklearn_trainer(config=TrainerConfig()),
6
evaluator=sklearn_evaluator(),
7
).run()
Copied!

Run

You can run this as follows:
1
python chapter_4.py
Copied!
The output will look as follows (note: this is filtered to highlight the most important logs)
1
...
2
Creating pipeline: mnist_pipeline
3
Cache enabled for pipeline `mnist_pipeline`
4
Using orchestrator `local_orchestrator` for pipeline `mnist_pipeline`. Running pipeline..
5
Step `importer_mnist` has started.
6
Step `importer_mnist` has finished in 0.032s.
7
Step `normalize_mnist` has started.
8
Step `normalize_mnist` has finished in 0.029s.
9
Step `sklearn_trainer` has started.
10
Step `sklearn_evaluator` has started.
11
Step `sklearn_evaluator` has finished in 0.191s.
Copied!
Note that the importer and mnist steps are now 100x faster. This is because we have not changed the pipeline at all, and just made another run with different functions. So ZenML caches these steps and skips straight to the new trainer and evaluator.

Inspect

If you add the following code to fetch the pipeline:
1
from zenml.core.repo import Repository
2
3
repo = Repository()
4
p = repo.get_pipeline(pipeline_name="mnist_pipeline")
5
print(f"Pipeline `mnist_pipeline` has {len(p.runs)} run(s)")
6
for r in p.runs[0:2]:
7
eval_step = r.get_step("evaluator")
8
print(
9
f"For {eval_step.name}, the accuracy is: "
10
f"{eval_step.output.read():.2f}"
11
)
Copied!
You get the following output:
1
Pipeline `mnist_pipeline` has 2 run(s)
2
For tf_evaluator, the accuracy is: 0.91
3
For sklearn_evaluator, the accuracy is: 0.92
Copied!
Looks like sklearn narrowly beat TensorFlow in this one. If we want we can keep extending this and add a PyTorch example (we have done with the not_so_quickstart example).
Combining different complex steps with standard pipeline interfaces is a powerful tool in any MLOps setup. You can now organize, track, and manage your codebase as it grows with your use-cases.
Last modified 6d ago