Chapter 5
Materialize artifacts as you want.
If you want to see the code for this chapter of the guide, head over to the GitHub.

Materialize artifacts the way you want to consume them.

At this point, the precise way that data passes between the steps has been a bit of a mystery to us. There is, of course, a mechanism to serialize and deserialize stuff flowing between steps. We can now take control of this mechanism if we require further control.

Create custom materializer

Data that flows through steps is stored in Artifact Stores. The logic that governs the reading and writing of data to and from the Artifact Stores lives in the Materializers.
Suppose we wanted to write the output of our evaluator step and store it in a SQLite table in the Artifact Store, rather than whatever the default mechanism is to store the float. Well, that should be easy. Let's create a custom materializer:
1
import os
2
import re
3
from sqlalchemy import Column, Integer, Float
4
from sqlalchemy import create_engine
5
from sqlalchemy.ext.declarative import declarative_base
6
from sqlalchemy.orm import sessionmaker
7
8
from zenml.materializers.base_materializer import BaseMaterializer
9
10
Base = declarative_base()
11
12
13
class Floats(Base):
14
__tablename__ = "my_floats"
15
16
id = Column(Integer, primary_key=True)
17
value = Column(Float, nullable=False)
18
19
20
class SQLALchemyMaterializerForSQLite(BaseMaterializer):
21
"""Read/Write float to sqlalchemy table."""
22
23
ASSOCIATED_TYPES = [float]
24
25
def __init__(self, artifact):
26
super().__init__(artifact)
27
# connection
28
sqlite_filepath = os.path.join(artifact.uri, "database")
29
engine = create_engine(f"sqlite:///{sqlite_filepath}")
30
31
# create metadata
32
Base.metadata.create_all(engine)
33
34
# create session
35
Session = sessionmaker(bind=engine)
36
self.session = Session()
37
38
# Every artifact has a URI with a unique integer ID
39
self.float_id = int(re.search(r"\d+", artifact.uri).group())
40
41
def handle_input(self, data_type) -> float:
42
"""Reads float from a table"""
43
super().handle_input(data_type)
44
45
# query data
46
return (
47
self.session.query(Floats)
48
.filter(Floats.id == self.float_id)
49
.first()
50
).value
51
52
def handle_return(self, data: float):
53
"""Stores float in a SQLAlchemy Table"""
54
super().handle_return(data)
55
my_float = Floats(id=self.float_id, value=data)
56
self.session.add_all([my_float])
57
self.session.commit()
Copied!
We use a bit of SQLAlchemy magic to manage the creation of the SQLite tables.
We then implement a custom BaseMaterializer and implement the handle_input and handle_return functions that manage the reading and writing respectively.
Of course this example is still a bit silly, as you don't really want to store evaluator results this way. But you can imagine many other use-cases where you would like to materialize data in different ways depending on your use-case and needs.

Pipeline

Again, there is no need to change the pipeline. You can just specify in the pipeline run that the evaluator step should use the custom materializer:
1
# Run the pipeline
2
scikit_p = mnist_pipeline(
3
importer=importer_mnist(),
4
normalizer=normalize_mnist(),
5
trainer=sklearn_trainer(config=TrainerConfig()),
6
evaluator=sklearn_evaluator().with_return_materializers(
7
SQLALchemyMaterializerForSQLite
8
),
9
)
Copied!

Run

You can run this as follows:
1
python chapter_5.py
Copied!

Inspect

We can also now read data from the SQLite table with our custom materializer:
1
repo = Repository()
2
p = repo.get_pipeline(pipeline_name="mnist_pipeline")
3
print(f"Pipeline `mnist_pipeline` has {len(p.get_runs())} run(s)")
4
eval_step = p.runs[0].get_step('evaluator')
5
val = eval_step.output.read(float, SQLALchemyMaterializerForSQLite)
6
print(f"The evaluator stored the value: {val} in a SQLite database!")
Copied!
Which returns:
1
Pipeline `mnist_pipeline` has 1 run(s)
2
The evaluator stored the value: 0.9238 in a SQLite database!
Copied!
Last modified 5d ago