Manage artifacts

Understand and adjust how ZenML versions your data.

Data sits at the heart of every machine learning workflow. Managing and versioning this data correctly is essential for reproducibility and traceability within your ML pipelines. ZenML takes a proactive approach to data versioning, ensuring that every artifact—be it data, models, or evaluations—is automatically tracked and versioned upon pipeline execution.

Walkthrough of ZenML Artifact Control Plane (Dashboard available only on ZenML Pro)

This guide will delve into artifact versioning and management, showing you how to efficiently name, organize, and utilize your data with the ZenML framework.

Managing artifacts produced by ZenML pipelines

Artifacts, the outputs of your steps and pipelines, are automatically versioned and stored in the artifact store. Configuring these artifacts is pivotal for transparent and efficient pipeline development.

Giving names to your artifacts

Assigning custom names to your artifacts can greatly enhance their discoverability and manageability. As best practice, utilize the Annotated object within your steps to give precise, human-readable names to outputs:

from typing_extensions import Annotated
import pandas as pd
from sklearn.datasets import load_iris

from zenml import pipeline, step

# Using Annotated to name our dataset
@step
def training_data_loader() -> Annotated[pd.DataFrame, "iris_dataset"]:
    """Load the iris dataset as pandas dataframe."""
    iris = load_iris(as_frame=True)
    return iris.get("frame")


@pipeline
def feature_engineering_pipeline():
    training_data_loader()


if __name__ == "__main__":
    feature_engineering_pipeline()

Unspecified artifact outputs default to a naming pattern of {pipeline_name}::{step_name}::output. For visual exploration in the ZenML dashboard, it's best practice to give significant outputs clear custom names.

Artifacts named iris_dataset can then be found swiftly using various ZenML interfaces:

To list artifacts: zenml artifact list

Versioning artifacts manually

ZenML automatically versions all created artifacts using auto-incremented numbering. I.e., if you have defined a step creating an artifact named iris_dataset as shown above, the first execution of the step will create an artifact with this name and version "1", the second execution will create version "2", and so on.

While ZenML handles artifact versioning automatically, you have the option to specify custom versions using the ArtifactConfig. This may come into play during critical runs like production releases.

from zenml import step, ArtifactConfig

@step
def training_data_loader() -> (
    Annotated[
        pd.DataFrame, 
        # Add `ArtifactConfig` to control more properties of your artifact
        ArtifactConfig(
            name="iris_dataset", 
            version="raw_2023"
        ),
    ]
):
    ...

The next execution of this step will then create an artifact with the name iris_dataset and version raw_2023. This is primarily useful if you are making a particularly important pipeline run (such as a release) whose artifacts you want to distinguish at a glance later.

After execution, iris_dataset and its version raw_2023 can be seen using:

To list versions: zenml artifact version list

Add metadata and tags

If you would like to extend your artifacts and runs with extra metadata or tags you can do so by following the patterns demonstrated below:

from zenml import step, log_metadata, add_tags


# In the following step, we use the utility functions `log_metadata` and `add_tags`.
# Since we are calling these functions directly from a step, both will attach
# the additional information to the current run.
@step
def annotation_approach() -> str:
    log_metadata(metadata={"metadata_key": "metadata_value"})
    add_tags(tags=["tag_name"])
    return "string"


# There are other was to attach this information to different versions of your 
# artifacts as well. For instance, you will see a step with a single output below.
# If you modify the call to include the `infer_artifact` flag, these functions
# will attach this information to the artifact version instead.
@step
def annotation_approach() -> str:
    log_metadata(metadata={"metadata_key": "metadata_value"}, infer_artifact=True)
    add_tags(tags=["tag_name"], infer_artifact=True)
    return "string"

There are multiple ways to interact with tags and metadata in ZenML. If you would like to how to use this information in different scenarios please check the respective guides on tags and metadata.

Comparing metadata across runs (Pro)

The ZenML Pro dashboard includes an Experiment Comparison tool that allows you to visualize and analyze metadata across different pipeline runs. This feature helps you understand patterns and changes in your pipeline's behavior over time.

Using the comparison views

The tool offers two complementary views for analyzing your metadata:

Table View

The tabular view provides a structured comparison of metadata across runs:

Comparing metadata values across different pipeline runs in table view.

This view automatically calculates changes between runs and allows you to:

  • Sort and filter metadata values

  • Track changes over time

  • Compare up to 20 runs simultaneously

Parallel Coordinates View

The parallel coordinates visualization helps identify relationships between different metadata parameters:

Comparing metadata values across different pipeline runs in parallel coordinates view.

This view is particularly useful for:

  • Discovering correlations between different metrics

  • Identifying patterns across pipeline runs

  • Filtering and focusing on specific parameter ranges

Accessing the comparison tool

To compare metadata across runs:

  1. Navigate to any pipeline in your dashboard

  2. Click the "Compare" button in the top navigation

  3. Select the runs you want to compare

  4. Switch between table and parallel coordinates views using the tabs

The comparison tool works with any numerical metadata (float or int) that you've logged in your pipelines. Make sure to log meaningful metrics in your steps to make the most of this feature.

Sharing comparisons

The tool preserves your comparison configuration in the URL, making it easy to share specific views with team members. Simply copy and share the URL to allow others to see the same comparison with identical settings and filters.

Specify a type for your artifacts

Assigning a type to an artifact allows ZenML to highlight them differently in the dashboard and also lets you filter your artifacts better.

If you don't specify a type for your artifact, ZenML will use the default artifact type provided by the materializer that is used to save the artifact.

from typing_extensions import Annotated
from zenml import ArtifactConfig, save_artifact, step
from zenml.enums import ArtifactType

# Assign an artifact type to a step output
@step
def trainer() -> Annotated[MyCustomModel, ArtifactConfig(artifact_type=ArtifactType.MODEL)]:
    return MyCustomModel(...)


# Assign an artifact type when manually saving artifacts
model = ...
save_artifact(model, name="model", artifact_type=ArtifactType.MODEL)

Consuming external artifacts within a pipeline

While most pipelines start with a step that produces an artifact, it is often the case to want to consume artifacts external from the pipeline. The ExternalArtifact class can be used to initialize an artifact within ZenML with any arbitrary data type.

For example, let's say we have a Snowflake query that produces a dataframe, or a CSV file that we need to read. External artifacts can be used for this, to pass values to steps that are neither JSON serializable nor produced by an upstream step:

import numpy as np
from zenml import ExternalArtifact, pipeline, step

@step
def print_data(data: np.ndarray):
    print(data)

@pipeline
def printing_pipeline():
    # One can also pass data directly into the ExternalArtifact
    # to create a new artifact on the fly
    data = ExternalArtifact(value=np.array([0]))

    print_data(data=data)


if __name__ == "__main__":
    printing_pipeline()

Optionally, you can configure the ExternalArtifact to use a custom materializer for your data or disable artifact metadata and visualizations. Check out the SDK docs for all available options.

Using an ExternalArtifact for your step automatically disables caching for the step.

Consuming artifacts produced by other pipelines

It is also common to consume an artifact downstream after producing it in an upstream pipeline or step. As we have learned in the previous section, the Client can be used to fetch artifacts directly inside the pipeline code:

from uuid import UUID
import pandas as pd
from zenml import step, pipeline
from zenml.client import Client


@step
def trainer(dataset: pd.DataFrame):
    ...

@pipeline
def training_pipeline():
    client = Client()
    # Fetch by ID
    dataset_artifact = client.get_artifact_version(
        name_id_or_prefix=UUID("3a92ae32-a764-4420-98ba-07da8f742b76")
    )

    # Fetch by name alone - uses the latest version of this artifact
    dataset_artifact = client.get_artifact_version(name_id_or_prefix="iris_dataset")

    # Fetch by name and version
    dataset_artifact = client.get_artifact_version(
        name_id_or_prefix="iris_dataset", version="raw_2023"
    )

    # Pass into any step
    trainer(dataset=dataset_artifact)


if __name__ == "__main__":
    training_pipeline()

Calls of Client methods like get_artifact_version directly inside the pipeline code makes use of ZenML's late materialization behind the scenes.

If you would like to bypass materialization entirely and just download the data or files associated with a particular artifact version, you can use the .download_files method:

from zenml.client import Client

client = Client()
artifact = client.get_artifact_version(name_id_or_prefix="iris_dataset")
artifact.download_files("path/to/save.zip")

Take note that the path must have the .zip extension, as the artifact data will be saved as a zip file. Make sure to handle any exceptions that may arise from this operation.

Managing artifacts not produced by ZenML pipelines

Sometimes, artifacts can be produced completely outside of ZenML. A good example of this is the predictions produced by a deployed model.

# A model is deployed, running in a FastAPI container
# Let's use the ZenML client to fetch the latest model and make predictions

from zenml.client import Client
from zenml import save_artifact

# Fetch the model from a registry or a previous pipeline
model = ...

# Let's make a prediction
prediction = model.predict([[1, 1, 1, 1]])

# We now store this prediction in ZenML as an artifact
# This will create a new artifact version
save_artifact(prediction, name="iris_predictions")

You can also load any artifact stored within ZenML using the load_artifact method:

# Loads the latest version
load_artifact("iris_predictions")

load_artifact is simply short-hand for the following Client call:

from zenml.client import Client

client = Client()
client.get_artifact("iris_predictions").load()

Even if an artifact is created externally, it can be treated like any other artifact produced by ZenML steps - with all the functionalities described above!

It is also possible to use these functions inside your ZenML steps. However, it is usually cleaner to return the artifacts as outputs of your step to save them, or to use External Artifacts to load them instead.

Linking existing data as a ZenML artifact

Sometimes, data is produced completely outside of ZenML and can be conveniently stored on a given storage. A good example of this is the checkpoint files created as a side-effect of the Deep Learning model training. We know that the intermediate data of the deep learning frameworks is quite big and there is no good reason to move it around again and again, if it can be produced directly in the artifact store boundaries and later just linked to become an artifact of ZenML. Let's explore the Pytorch Lightning example to fit the model and store the checkpoints in a remote location.

import os
from zenml.client import Client
from zenml import register_artifact
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from uuid import uuid4

# Define where the model data should be saved
# use active ArtifactStore
prefix = Client().active_stack.artifact_store.path
# keep data separable for future runs with uuid4 folder
default_root_dir = os.path.join(prefix, uuid4().hex)

# Define the model and fit it
model = ...
trainer = Trainer(
    default_root_dir=default_root_dir,
    callbacks=[
        ModelCheckpoint(
            every_n_epochs=1, save_top_k=-1, filename="checkpoint-{epoch:02d}"
        )
    ],
)
try:
    trainer.fit(model)
finally:
    # We now link those checkpoints in ZenML as an artifact
    # This will create a new artifact version
    register_artifact(default_root_dir, name="all_my_model_checkpoints")

The artifact produced from the preexisting data will have a pathlib.Path type, once loaded or passed as input to another step.

Even if an artifact is created and stored externally, it can be treated like any other artifact produced by ZenML steps - with all the functionalities described above!

For more details and use-cases check-out detailed docs page Register Existing Data as a ZenML Artifact.

Logging metadata for an artifact

One of the most useful ways of interacting with artifacts in ZenML is the ability to associate metadata with them. As mentioned before, artifact metadata is an arbitrary dictionary of key-value pairs that are useful for understanding the nature of the data.

As an example, one can associate the results of a model training alongside a model artifact, the shape of a table alongside a pandas dataframe, or the size of an image alongside a PNG file.

For some artifacts, ZenML automatically logs metadata. As an example, for pandas.Series and pandas.DataFrame objects, ZenML logs the shape and size of the objects:

from zenml.client import Client

# Get an artifact version (e.g. pd.DataFrame)
artifact = Client().get_artifact_version('50ce903f-faa6-41f6-a95f-ff8c0ec66010')

# Fetch it's metadata
artifact.run_metadata["storage_size"].value  # Size in bytes
artifact.run_metadata["shape"].value  # Shape e.g. (500,20)

A user can also add metadata to an artifact within a step directly using the log_artifact_metadata method:

from zenml import step, log_artifact_metadata

@step
def model_finetuner_step(
    model: ClassifierMixin, dataset: Tuple[np.ndarray, np.ndarray]
) -> Annotated[
    ClassifierMixin, ArtifactConfig(name="my_model", tags=["SVC", "trained"])
]:
    """Finetunes a given model on a given dataset."""
    model.fit(dataset[0], dataset[1])
    accuracy = model.score(dataset[0], dataset[1])

    
    log_artifact_metadata(
        # Artifact name can be omitted if step returns only one output
        artifact_name="my_model",
        # Passing None or omitting this will use the `latest` version
        version=None,
        # Metadata should be a dictionary of JSON-serializable values
        metadata={"accuracy": float(accuracy)}
        # A dictionary of dictionaries can also be passed to group metadata
        #  in the dashboard
        # metadata = {"metrics": {"accuracy": accuracy}}
    )
    return model

For further depth, there is an advanced metadata logging guide that goes more into detail about logging metadata in ZenML.

Additionally, there is a lot more to learn about artifacts within ZenML. Please read the dedicated data management guide for more information.

Code example

This section combines all the code from this section into one simple script that you can use easily:

Code Example of this Section
from typing import Optional, Tuple
from typing_extensions import Annotated

import numpy as np
from sklearn.base import ClassifierMixin
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from zenml import ArtifactConfig, pipeline, step, log_artifact_metadata
from zenml import save_artifact, load_artifact
from zenml.client import Client


@step
def versioned_data_loader_step() -> (
    Annotated[
        Tuple[np.ndarray, np.ndarray],
        ArtifactConfig(
            name="my_dataset",
            tags=["digits", "computer vision", "classification"],
        ),
    ]
):
    """Loads the digits dataset as a tuple of flattened numpy arrays."""
    digits = load_digits()
    return (digits.images.reshape((len(digits.images), -1)), digits.target)


@step
def model_finetuner_step(
    model: ClassifierMixin, dataset: Tuple[np.ndarray, np.ndarray]
) -> Annotated[
    ClassifierMixin,
    ArtifactConfig(name="my_model", tags=["SVC", "trained"]),
]:
    """Finetunes a given model on a given dataset."""
    model.fit(dataset[0], dataset[1])
    accuracy = model.score(dataset[0], dataset[1])
    log_artifact_metadata(metadata={"accuracy": float(accuracy)})
    return model


@pipeline
def model_finetuning_pipeline(
    dataset_version: Optional[str] = None,
    model_version: Optional[str] = None,
):
    client = Client()
    # Either load a previous version of "my_dataset" or create a new one
    if dataset_version:
        dataset = client.get_artifact_version(
            name_id_or_prefix="my_dataset", version=dataset_version
        )
    else:
        dataset = versioned_data_loader_step()

    # Load the model to finetune
    # If no version is specified, the latest version of "my_model" is used
    model = client.get_artifact_version(
        name_id_or_prefix="my_model", version=model_version
    )

    # Finetune the model
    # This automatically creates a new version of "my_model"
    model_finetuner_step(model=model, dataset=dataset)


def main():
    # Save an untrained model as first version of "my_model"
    untrained_model = SVC(gamma=0.001)
    save_artifact(
        untrained_model, name="my_model", version="1", tags=["SVC", "untrained"]
    )

    # Create a first version of "my_dataset" and train the model on it
    model_finetuning_pipeline()

    # Finetune the latest model on an older version of the dataset
    model_finetuning_pipeline(dataset_version="1")

    # Run inference with the latest model on an older version of the dataset
    latest_trained_model = load_artifact("my_model")
    old_dataset = load_artifact("my_dataset", version="1")
    latest_trained_model.predict(old_dataset[0])


if __name__ == "__main__":
    main()

This would create the following pipeline run DAGs:

Run 1:

Run 2:

ZenML Scarf

Last updated

Was this helpful?