Creating a custom materializer
All code in this guide can be found [here](https://github.com/zenml-io/zenml/tree/main/examples/custom_materializer).

What is a materializer?

The precise way that data passes between the steps is dictated by materializers. The data that flows through steps are stored as artifacts and artifacts are stored in artifact stores. The logic that governs the reading and writing of data to and from the artifact stores lives in the materializers.
1
class BaseMaterializer(metaclass=BaseMaterializerMeta):
2
"""Base Materializer to realize artifact data."""
3
4
ASSOCIATED_ARTIFACT_TYPES = []
5
ASSOCIATED_TYPES = []
6
7
def __init__(self, artifact: "BaseArtifact"):
8
"""Initializes a materializer with the given artifact."""
9
self.artifact = artifact
10
11
def handle_input(self, data_type: Type[Any]) -> Any:
12
"""Write logic here to handle input of the step function.
13
14
Args:
15
data_type: What type the input should be materialized as.
16
Returns:
17
Any object that is to be passed into the relevant artifact in the
18
step.
19
"""
20
# read from self.artifact.uri
21
...
22
23
def handle_return(self, data: Any) -> None:
24
"""Write logic here to handle return of the step function.
25
26
Args:
27
data: Any object that is specified as an input artifact of the step.
28
"""
29
# write `data` to self.artifact.uri
30
...
Copied!
Above you can see the basic definition of the BaseMaterializer. All other materializers inherit from this class, and this class defines the interface of all materializers.
Each materializer has an artifact object. The most important property of an artifact object is the uri. The uri is created by ZenML at pipeline run time and points to the directory of a file system (the artifact store).
The handle_input and handle_return functions are important.
  • handle_input is responsible for reading the artifact from the artifact store.
  • handle_return is responsible for writing the artifact to the artifact store.
Each materializer has ASSOCIATED_TYPES and ASSOCIATED_ARTIFACT_TYPES.
  • ASSOCIATED_TYPES is the data type that is being stored. ZenML uses this information to call the right materializer at the right time. i.e. If a ZenML step returns a pd.DataFrame, ZenML will try to find any materializer that has pd.DataFrame (or its subclasses) in its ASSOCIATED_TYPES.
  • ASSOCIATED_ARTIFACT_TYPES simply define what type of artifacts are being stored. This can be DataArtifact, StatisticsArtifact, DriftArtifact, etc. This is simply a tag to query certain artifact types in the post-execution workflow.

Extending the BaseMaterializer

In order to control more precisely how data flowing between steps is treated, one can simply extend the BaseMaterializer by sub-classing it.
1
class MyCustomMaterializer(BaseMaterializer):
2
"""Define my own materialization logic"""
3
ASSOCIATED_ARTIFACT_TYPES = [...]
4
ASSOCIATED_TYPES = [...]
5
6
7
def handle_input(self, data_type: Type[Any]) -> Any:
8
# read from self.artifact.uri
9
...
10
11
def handle_return(self, data: Any) -> None:
12
# write `data` to self.artifact.uri
13
...
Copied!
For example, let's say you a custom object called MyObject that flows between two steps in a pipeline:
1
from zenml.steps import step
2
from zenml.pipelines import pipeline
3
4
5
class MyObj:
6
def __init__(self, name: str):
7
self.name = name
8
9
@step
10
def step1() -> MyObj:
11
return MyObj("jk")
12
13
@step
14
def step1(my_obj: MyObj):
15
print(my_obj)
16
17
@pipeline
18
def pipe(step1, step2):
19
step2(step1())
20
21
pipe(
22
step1=step1(),
23
step2=step2()
24
).run()
Copied!
Running the above without a custom materializer will result in the following error:
1
zenml.exceptions.StepInterfaceError: Unable to find materializer for output 'output' of
2
type `<class '__main__.MyObj'>` in step 'step1'. Please make sure to either explicitly set a materializer for step
3
outputs using `step.with_return_materializers(...)` or registering a default materializer for specific types by
4
subclassing `BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable.
5
For more information, visit https://docs.zenml.io/guides/common-usecases/custom-materializer.
Copied!
The above basically means that ZenML does not know how to persist the object of type MyObj between steps (how could it? We just created this!). Therefore, we can create our own materializer:
1
import os
2
from typing import Type
3
4
from zenml.artifacts import DataArtifact
5
from zenml.io import fileio
6
from zenml.materializers.base_materializer import BaseMaterializer
7
8
class MyMaterializer(BaseMaterializer):
9
ASSOCIATED_TYPES = [MyObj]
10
ASSOCIATED_ARTIFACT_TYPES = [DataArtifact]
11
12
def handle_input(self, data_type: Type[MyObj]) -> MyObj:
13
"""Read from artifact store"""
14
super().handle_input(data_type)
15
with fileio.open(os.path.join(self.artifact.uri, 'data.txt'), 'r') as f:
16
name = f.read()
17
return MyObj(name=name)
18
19
def handle_return(self, my_obj: MyObj) -> None:
20
"""Write to artifact store"""
21
super().handle_return(my_obj)
22
with fileio.open(os.path.join(self.artifact.uri, 'data.txt'), 'w') as f:
23
f.write(my_obj.name)
Copied!
Pro-tip: Use the ZenML fileio handler to ensure your materialization logic works across artifact stores (local and remote like S3 buckets).
Then edit the pipeline as follows:
1
pipe(
2
step1=step1().with_return_materializers(MyMaterializer),
3
step2=step2()
4
).run()
Copied!
Please note that for multiple outputs a dictionary can be supplied of type {OUTPUT_NAME: MATERIALIZER_CLASS} to the with_return_materializers function.
Also, notice that with_return_materializers need only be called on step1, all downstream steps will use the same materializer by default.
This will yield the proper response as follows:
1
Creating run for pipeline: `pipe`
2
Cache enabled for pipeline `pipe`
3
Using stack `local_stack` to run pipeline `pipe`...
4
Step `step1` has started.
5
Step `step1` has finished in 0.035s.
6
Step `step2` has started.
7
jk
8
Step `step2` has finished in 0.036s.
9
Pipeline run `pipe-24_Jan_22-23_12_18_504593` has finished in 0.080s.
Copied!
Export as PDF
Copy link
Edit on GitHub