Understand and adjust how ZenML versions your data.
Data sits at the heart of every machine learning workflow. Managing and versioning this data correctly is essential for reproducibility and traceability within your ML pipelines. ZenML takes a proactive approach to data versioning, ensuring that every artifact—be it data, models, or evaluations—is automatically tracked and versioned upon pipeline execution.
This guide will delve into artifact versioning and management, showing you how to efficiently name, organize, and utilize your data with the ZenML framework.
Managing artifacts produced by ZenML pipelines
Artifacts, the outputs of your steps and pipelines, are automatically versioned and stored in the artifact store. Configuring these artifacts is pivotal for transparent and efficient pipeline development.
Giving names to your artifacts
Assigning custom names to your artifacts can greatly enhance their discoverability and manageability. As best practice, utilize the Annotated object within your steps to give precise, human-readable names to outputs:
from typing_extensions import Annotatedimport pandas as pdfrom sklearn.datasets import load_irisfrom zenml import pipeline, step# Using Annotated to name our dataset@stepdeftraining_data_loader() -> Annotated[pd.DataFrame,"iris_dataset"]:"""Load the iris dataset as pandas dataframe.""" iris =load_iris(as_frame=True)return iris.get("frame")@pipelinedeffeature_engineering_pipeline():training_data_loader()if__name__=="__main__":feature_engineering_pipeline()
Unspecified artifact outputs default to a naming pattern of {pipeline_name}::{step_name}::output. For visual exploration in the ZenML dashboard, it's best practice to give significant outputs clear custom names.
Artifacts named iris_dataset can then be found swiftly using various ZenML interfaces:
To list artifacts: zenml artifact list
The ZenML Pro dashboard offers advanced visualization features for artifact exploration.
To prevent visual clutter, make sure to assign names to your most important artifacts that you would like to explore visually.
Versioning artifacts manually
ZenML automatically versions all created artifacts using auto-incremented numbering. I.e., if you have defined a step creating an artifact named iris_dataset as shown above, the first execution of the step will create an artifact with this name and version "1", the second execution will create version "2", and so on.
While ZenML handles artifact versioning automatically, you have the option to specify custom versions using the ArtifactConfig. This may come into play during critical runs like production releases.
from zenml import step, ArtifactConfig@stepdeftraining_data_loader() -> ( Annotated[ pd.DataFrame,# Add `ArtifactConfig` to control more properties of your artifactArtifactConfig( name="iris_dataset", version="raw_2023" ),]): ...
The next execution of this step will then create an artifact with the name iris_dataset and version raw_2023. This is primarily useful if you are making a particularly important pipeline run (such as a release) whose artifacts you want to distinguish at a glance later.
Since custom versions cannot be duplicated, the above step can only be run once successfully. To avoid altering your code frequently, consider using a YAML config for artifact versioning.
After execution, iris_dataset and its version raw_2023 can be seen using:
To list versions: zenml artifact version list
The Cloud dashboard visualizes version history for your review.
Add metadata and tags to artifacts
If you would like to extend your artifacts with extra metadata or tags you can do so by following the patterns demonstrated below:
from zenml import step, get_step_context, ArtifactConfigfrom typing_extensions import Annotated# below we annotate output with `ArtifactConfig` giving it a name,# run_metadata and tags. As a result, the created artifact# `artifact_name` will get configured with metadata and tags@stepdefannotation_approach() -> ( Annotated[str,ArtifactConfig( name="artifact_name", run_metadata={"metadata_key": "metadata_value"}, tags=["tag_name"], ),]):return"string"# below we annotate output using functional approach with# run_metadata and tags. As a result, the created artifact # `artifact_name` will get configured with metadata and tags@stepdefannotation_approach() -> Annotated[str,"artifact_name"]: step_context =get_step_context() step_context.add_output_metadata( output_name="artifact_name", metadata={"metadata_key": "metadata_value"} ) step_context.add_output_tags(output_name="artifact_name", tags=["tag_name"])return"string"# below we combine both approaches, so the artifact will get# metadata and tags from both sources@stepdefannotation_approach() -> ( Annotated[str,ArtifactConfig( name="artifact_name", run_metadata={"metadata_key": "metadata_value"}, tags=["tag_name"], ),]): step_context =get_step_context() step_context.add_output_metadata( output_name="artifact_name", metadata={"metadata_key2": "metadata_value2"} ) step_context.add_output_tags(output_name="artifact_name", tags=["tag_name2"])return"string"
Specify a type for your artifacts
Assigning a type to an artifact allows ZenML to highlight them differently in the dashboard and also lets you filter your artifacts better.
If you don't specify a type for your artifact, ZenML will use the default artifact type provided by the materializer that is used to save the artifact.
from typing_extensions import Annotatedfrom zenml import ArtifactConfig, save_artifact, stepfrom zenml.enums import ArtifactType# Assign an artifact type to a step output@stepdeftrainer() -> Annotated[MyCustomModel,ArtifactConfig(artifact_type=ArtifactType.MODEL)]:returnMyCustomModel(...)# Assign an artifact type when manually saving artifactsmodel = ...save_artifact(model, name="model", artifact_type=ArtifactType.MODEL)
Consuming external artifacts within a pipeline
While most pipelines start with a step that produces an artifact, it is often the case to want to consume artifacts external from the pipeline. The ExternalArtifact class can be used to initialize an artifact within ZenML with any arbitrary data type.
For example, let's say we have a Snowflake query that produces a dataframe, or a CSV file that we need to read. External artifacts can be used for this, to pass values to steps that are neither JSON serializable nor produced by an upstream step:
import numpy as npfrom zenml import ExternalArtifact, pipeline, step@stepdefprint_data(data: np.ndarray):print(data)@pipelinedefprinting_pipeline():# One can also pass data directly into the ExternalArtifact# to create a new artifact on the fly data =ExternalArtifact(value=np.array([0]))print_data(data=data)if__name__=="__main__":printing_pipeline()
Optionally, you can configure the ExternalArtifact to use a custom materializer for your data or disable artifact metadata and visualizations. Check out the SDK docs for all available options.
Using an ExternalArtifact for your step automatically disables caching for the step.
Consuming artifacts produced by other pipelines
It is also common to consume an artifact downstream after producing it in an upstream pipeline or step. As we have learned in the previous section, the Client can be used to fetch artifacts directly inside the pipeline code:
from uuid import UUIDimport pandas as pdfrom zenml import step, pipelinefrom zenml.client import Client@stepdeftrainer(dataset: pd.DataFrame): ...@pipelinedeftraining_pipeline(): client =Client()# Fetch by ID dataset_artifact = client.get_artifact_version( name_id_or_prefix=UUID("3a92ae32-a764-4420-98ba-07da8f742b76") )# Fetch by name alone - uses the latest version of this artifact dataset_artifact = client.get_artifact_version(name_id_or_prefix="iris_dataset")# Fetch by name and version dataset_artifact = client.get_artifact_version( name_id_or_prefix="iris_dataset", version="raw_2023" )# Pass into any steptrainer(dataset=dataset_artifact)if__name__=="__main__":training_pipeline()
Calls of Client methods like get_artifact_version directly inside the pipeline code makes use of ZenML's late materialization behind the scenes.
If you would like to bypass materialization entirely and just download the data or files associated with a particular artifact version, you can use the .download_files method:
from zenml.client import Clientclient =Client()artifact = client.get_artifact_version(name_id_or_prefix="iris_dataset")artifact.download_files("path/to/save.zip")
Take note that the path must have the .zip extension, as the artifact data will be saved as a zip file. Make sure to handle any exceptions that may arise from this operation.
Managing artifacts not produced by ZenML pipelines
Sometimes, artifacts can be produced completely outside of ZenML. A good example of this is the predictions produced by a deployed model.
# A model is deployed, running in a FastAPI container# Let's use the ZenML client to fetch the latest model and make predictionsfrom zenml.client import Clientfrom zenml import save_artifact# Fetch the model from a registry or a previous pipelinemodel = ...# Let's make a predictionprediction = model.predict([[1, 1, 1, 1]])# We now store this prediction in ZenML as an artifact# This will create a new artifact versionsave_artifact(prediction, name="iris_predictions")
You can also load any artifact stored within ZenML using the load_artifact method:
# Loads the latest versionload_artifact("iris_predictions")
load_artifact is simply short-hand for the following Client call:
from zenml.client import Clientclient =Client()client.get_artifact("iris_predictions").load()
Even if an artifact is created externally, it can be treated like any other artifact produced by ZenML steps - with all the functionalities described above!
It is also possible to use these functions inside your ZenML steps. However, it is usually cleaner to return the artifacts as outputs of your step to save them, or to use External Artifacts to load them instead.
Linking existing data as a ZenML artifact
Sometimes, data is produced completely outside of ZenML and can be conveniently stored on a given storage. A good example of this is the checkpoint files created as a side-effect of the Deep Learning model training. We know that the intermediate data of the deep learning frameworks is quite big and there is no good reason to move it around again and again, if it can be produced directly in the artifact store boundaries and later just linked to become an artifact of ZenML. Let's explore the Pytorch Lightning example to fit the model and store the checkpoints in a remote location.
import osfrom zenml.client import Clientfrom zenml import register_artifactfrom pytorch_lightning import Trainerfrom pytorch_lightning.callbacks import ModelCheckpointfrom uuid import uuid4# Define where the model data should be saved# use active ArtifactStoreprefix =Client().active_stack.artifact_store.path# keep data separable for future runs with uuid4 folderdefault_root_dir = os.path.join(prefix, uuid4().hex)# Define the model and fit itmodel = ...trainer =Trainer( default_root_dir=default_root_dir, callbacks=[ModelCheckpoint( every_n_epochs=1, save_top_k=-1, filename="checkpoint-{epoch:02d}" ) ],)try: trainer.fit(model)finally:# We now link those checkpoints in ZenML as an artifact# This will create a new artifact versionregister_artifact(default_root_dir, name="all_my_model_checkpoints")
The artifact produced from the preexisting data will have a pathlib.Path type, once loaded or passed as input to another step.
Even if an artifact is created and stored externally, it can be treated like any other artifact produced by ZenML steps - with all the functionalities described above!
One of the most useful ways of interacting with artifacts in ZenML is the ability to associate metadata with them. As mentioned before, artifact metadata is an arbitrary dictionary of key-value pairs that are useful for understanding the nature of the data.
As an example, one can associate the results of a model training alongside a model artifact, the shape of a table alongside a pandas dataframe, or the size of an image alongside a PNG file.
For some artifacts, ZenML automatically logs metadata. As an example, for pandas.Series and pandas.DataFrame objects, ZenML logs the shape and size of the objects:
from zenml.client import Client# Get an artifact version (e.g. pd.DataFrame)artifact =Client().get_artifact_version('50ce903f-faa6-41f6-a95f-ff8c0ec66010')# Fetch it's metadataartifact.run_metadata["storage_size"].value # Size in bytesartifact.run_metadata["shape"].value # Shape e.g. (500,20)
The information regarding the metadata of an artifact can be found within the DAG visualizer interface on the OSS dashboard:
The ZenML Pro dashboard offers advanced visualization features for artifact exploration, including a dedicated artifacts tab with metadata visualization:
A user can also add metadata to an artifact within a step directly using the log_artifact_metadata method:
from zenml import step, log_artifact_metadata@stepdefmodel_finetuner_step(model: ClassifierMixin,dataset: Tuple[np.ndarray, np.ndarray]) -> Annotated[ ClassifierMixin,ArtifactConfig(name="my_model", tags=["SVC", "trained"])]:"""Finetunes a given model on a given dataset.""" model.fit(dataset[0], dataset[1]) accuracy = model.score(dataset[0], dataset[1])log_artifact_metadata(# Artifact name can be omitted if step returns only one output artifact_name="my_model",# Passing None or omitting this will use the `latest` version version=None,# Metadata should be a dictionary of JSON-serializable values metadata={"accuracy": float(accuracy)}# A dictionary of dictionaries can also be passed to group metadata# in the dashboard# metadata = {"metrics": {"accuracy": accuracy}} )return model
Additionally, there is a lot more to learn about artifacts within ZenML. Please read the dedicated data management guide for more information.
Code example
This section combines all the code from this section into one simple script that you can use easily:
Code Example of this Section
from typing import Optional, Tuplefrom typing_extensions import Annotatedimport numpy as npfrom sklearn.base import ClassifierMixinfrom sklearn.datasets import load_digitsfrom sklearn.svm import SVCfrom zenml import ArtifactConfig, pipeline, step, log_artifact_metadatafrom zenml import save_artifact, load_artifactfrom zenml.client import Client@stepdefversioned_data_loader_step() -> ( Annotated[ Tuple[np.ndarray, np.ndarray],ArtifactConfig( name="my_dataset", tags=["digits", "computer vision", "classification"], ),]):"""Loads the digits dataset as a tuple of flattened numpy arrays.""" digits =load_digits()return (digits.images.reshape((len(digits.images), -1)), digits.target)@stepdefmodel_finetuner_step(model: ClassifierMixin,dataset: Tuple[np.ndarray, np.ndarray]) -> Annotated[ ClassifierMixin,ArtifactConfig(name="my_model", tags=["SVC", "trained"]),]:"""Finetunes a given model on a given dataset.""" model.fit(dataset[0], dataset[1]) accuracy = model.score(dataset[0], dataset[1])log_artifact_metadata(metadata={"accuracy": float(accuracy)})return model@pipelinedefmodel_finetuning_pipeline(dataset_version: Optional[str]=None,model_version: Optional[str]=None,): client =Client()# Either load a previous version of "my_dataset" or create a new oneif dataset_version: dataset = client.get_artifact_version( name_id_or_prefix="my_dataset", version=dataset_version )else: dataset =versioned_data_loader_step()# Load the model to finetune# If no version is specified, the latest version of "my_model" is used model = client.get_artifact_version( name_id_or_prefix="my_model", version=model_version )# Finetune the model# This automatically creates a new version of "my_model"model_finetuner_step(model=model, dataset=dataset)defmain():# Save an untrained model as first version of "my_model" untrained_model =SVC(gamma=0.001)save_artifact( untrained_model, name="my_model", version="1", tags=["SVC", "untrained"] )# Create a first version of "my_dataset" and train the model on itmodel_finetuning_pipeline()# Finetune the latest model on an older version of the datasetmodel_finetuning_pipeline(dataset_version="1")# Run inference with the latest model on an older version of the dataset latest_trained_model =load_artifact("my_model") old_dataset =load_artifact("my_dataset", version="1") latest_trained_model.predict(old_dataset[0])if__name__=="__main__":main()
This would create the following pipeline run DAGs: