Materializers
Control how data is persisted between steps.
A ZenML pipeline is built in a data-centric way. The outputs and inputs of steps define how steps are connected and the order in which they are executed. Each step should be considered as its very own process that reads and writes its inputs and outputs from and to the artifact store. This is where materializers come into play.

Materializers: Serializing and deserializing your artifacts

A materializer dictates how a given artifact can be written to and retrieved from the artifact store. It contains all serialization and deserialization logic.

What is a materializer?

from typing import Type, Any
from zenml.materializers.base_materializer import BaseMaterializerMeta
class BaseMaterializer(metaclass=BaseMaterializerMeta):
"""Base Materializer to realize artifact data."""
ASSOCIATED_ARTIFACT_TYPES = ()
ASSOCIATED_TYPES = ()
def __init__(self, artifact: "BaseArtifact"):
"""Initializes a materializer with the given artifact."""
self.artifact = artifact
def handle_input(self, data_type: Type[Any]) -> Any:
"""Write logic here to handle input of the step function.
Args:
data_type: What type the input should be materialized as.
Returns:
Any object that is to be passed into the relevant artifact in the
step.
"""
# read from self.artifact.uri
...
def handle_return(self, data: Any) -> None:
"""Write logic here to handle return of the step function.
Args:
data: Any object that is specified as an input artifact of the step.
"""
# write `data` to self.artifact.uri
...
Above you can see the basic definition of the BaseMaterializer. All other materializers inherit from this class, and this class defines the interface of all materializers.
Each materializer has an artifact object. The most important property of an artifact object is the uri. The uri is created by ZenML at pipeline run time and points to the directory of a file system (the artifact store).
The handle_input and handle_return functions are important.
  • handle_input is responsible for reading the artifact from the artifact store.
  • handle_return is responsible for writing the artifact to the artifact store.
Each materializer has ASSOCIATED_TYPES and ASSOCIATED_ARTIFACT_TYPES.
  • ASSOCIATED_TYPES is the data type that is being stored. ZenML uses this information to call the right materializer at the right time. i.e. If a ZenML step returns a pd.DataFrame, ZenML will try to find any materializer that has pd.DataFrame (or its subclasses) in its ASSOCIATED_TYPES.
  • ASSOCIATED_ARTIFACT_TYPES simply define what type of artifacts are being stored. This can be DataArtifact, StatisticsArtifact, DriftArtifact, etc. This is simply a tag to query certain artifact types in the post-execution workflow.

Writing a custom materializer

Let's say you have a custom class called MyObject that flows between two steps in a pipeline:
import logging
from zenml.steps import step
from zenml.pipelines import pipeline
class MyObj:
def __init__(self, name: str):
self.name = name
@step
def my_first_step() -> MyObj:
"""Step that returns an object of type MyObj"""
return MyObj("my_object")
@step
def my_second_step(my_obj: MyObj) -> None:
"""Step that logs the input object and returns nothing."""
logging.info(
f"The following object was passed to this step: `{my_obj.name}`")
@pipeline
def first_pipeline(
step_1,
step_2
):
output_1 = step_1()
step_2(output_1)
first_pipeline(step_1=my_first_step(), step_2=my_second_step()).run()
Running the above without a custom materializer will result in the following error:
zenml.exceptions.StepInterfaceError: Unable to find materializer for output 'output' of
type `<class '__main__.MyObj'>` in step 'step1'. Please make sure to either explicitly set a materializer for step
outputs using `step.with_return_materializers(...)` or registering a default materializer for specific types by
subclassing `BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable.
For more information, visit https://docs.zenml.io/guides/common-usecases/custom-materializer.
The above basically means that ZenML does not know how to persist the object of type MyObj between steps (how could it? We just created this!). Therefore, we have to create our own materializer. To do this you can simply extend the BaseMaterializer by sub-classing it.
import os
from typing import Type
from zenml.artifacts import DataArtifact
from zenml.io import fileio
from zenml.materializers.base_materializer import BaseMaterializer
class MyMaterializer(BaseMaterializer):
ASSOCIATED_TYPES = (MyObj,)
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
def handle_input(self, data_type: Type[MyObj]) -> MyObj:
"""Read from artifact store"""
super().handle_input(data_type)
with fileio.open(os.path.join(self.artifact.uri, 'data.txt'), 'r') as f:
name = f.read()
return MyObj(name=name)
def handle_return(self, my_obj: MyObj) -> None:
"""Write to artifact store"""
super().handle_return(my_obj)
with fileio.open(os.path.join(self.artifact.uri, 'data.txt'), 'w') as f:
f.write(my_obj.name)
Pro-tip: Use the ZenML fileio module to ensure your materialization logic works across artifact stores (local and remote like S3 buckets).
Now ZenML can use this materializer to handle outputs and inputs of your customs object. Edit the pipeline as follows to see this in action:
first_pipeline(
step_1=my_first_step().with_return_materializers(MyMaterializer),
step_2=my_second_step()).run()
Due to the typing of the in- and outputs and the ASSOCIATED_TYPES attribute of the materializer you won't necessarily have to add `.with_return_materializers(MyMaterializer)` to the step. It should automatically be detected. It doesn't hurt to be explicit though.
For multiple outputs a dictionary can be supplied of type {OUTPUT_NAME: MATERIALIZER_CLASS} to the with_return_materializers function.
Also, notice that with_return_materializers is only called on step1, all downstream steps will use the same materializer by default.
This will yield the proper response as follows:
Creating run for pipeline: `first_pipeline`
Cache enabled for pipeline `first_pipeline`
Using stack `default` to run pipeline `first_pipeline`...
Step `my_first_step` has started.
Step `my_first_step` has finished in 0.081s.
Step `my_second_step` has started.
The following object was passed to this step: `my_object`
Step `my_second_step` has finished in 0.048s.
Pipeline run `first_pipeline-22_Apr_22-10_58_51_135729` has finished in 0.153s.

Summary in Code

Code Example of this Section

Skip materialization

While in most cases, materializers should be used to control how artifacts are consumed and output from steps in a pipeline, you will sometimes need to have a completely non-materialized artifact in a step.
A non-materialized artifact is a BaseArtifact (or any of its subclasses) and has a property uri that points to the unique path in the artifact store where the artifact is stored. One can use a non-materialized artifact by simply specifying it as the type in the step:
from zenml.artifacts import DataArtifact
from zenml.steps import step
@step
def my_step(my_artifact: DataArtifact) # rather than pd.DataFrame
pass
The list of raw artifact types can be found in zenml.artifacts.* and include ModelArtifact, DataArtifact etc. Materializers link pythonic types to these artifact types implicitly, e.g., a keras.model or torch.nn.Module are pythonic types that are both linked to ModelArtifact implicitly via their materializers. When using artifacts directly, one must be aware of which type they are by looking at the previous step's materializer: if the previous step produces a ModelArtifact then you should specify ModelArtifact in a non-materialized step.
Be careful: Using artifacts directly like this might have unintended consequences for downstream tasks that rely on materialized artifacts.

A simple example

A simple example can suffice to showcase how to use non-materialized artifacts:
from typing import Dict, List
from zenml.artifacts import DataArtifact, ModelArtifact
from zenml.pipelines import pipeline
from zenml.steps import Output, step
@step
def step_1() -> Output(dict_=Dict, list_=List):
return {"some": "data"}, []
@step
def step_2() -> Output(dict_=Dict, list_=List):
return {"some": "data"}, []
@step
def step_3(dict_: Dict, list_: List) -> None:
assert type(dict_) is dict
assert type(list_) is list
@step
def step_4(dict_: DataArtifact, list_: ModelArtifact) -> None:
assert hasattr(dict_, "uri")
assert hasattr(list_, "uri")
@pipeline
def p(s1, s2, s3, s4):
s3(*s1())
s4(*s2())
p(step_1(), step_2(), step_3(), step_4()).run()
In the above the pipeline looks as follows:
s1 -> s3
s2 -> s4
s1 and s2 produce identical artifacts, however s3 consumes materialized artifacts while s4 consumes non-materialized artifacts. s4 can now use the dict_.uri and list_.uri paths directly rather than their materialized counterparts.
Export as PDF
Copy link
Edit on GitHub
On this page
Materializers: Serializing and deserializing your artifacts
What is a materializer?
Writing a custom materializer
Skip materialization