Migration guide 0.39.1 → 0.41.0

How to migrate your ZenML pipelines and steps from version <=0.39.1 to 0.41.0.

ZenML versions 0.40.0 to 0.41.0 introduced a new and more flexible syntax to define ZenML steps and pipelines. This page contains code samples that show you how to upgrade your steps and pipelines to the new syntax.

Newer versions of ZenML still work with pipelines and steps defined using the old syntax, but the old syntax is deprecated and will be removed in the future.

Overview

from typing import Optional

from zenml.steps import BaseParameters, Output, StepContext, step
from zenml.pipelines import pipeline

# Define a Step
class MyStepParameters(BaseParameters):
    param_1: int
    param_2: Optional[float] = None

@step
def my_step(
    params: MyStepParameters, context: StepContext,
) -> Output(int_output=int, str_output=str):
    result = int(params.param_1 * (params.param_2 or 1))
    result_uri = context.get_output_artifact_uri()
    return result, result_uri

# Run the Step separately
my_step.entrypoint()

# Define a Pipeline
@pipeline
def my_pipeline(my_step):
    my_step()

step_instance = my_step(params=MyStepParameters(param_1=17))
pipeline_instance = my_pipeline(my_step=step_instance)

# Configure and run the Pipeline
pipeline_instance.configure(enable_cache=False)
schedule = Schedule(...)
pipeline_instance.run(schedule=schedule)

# Fetch the Pipeline Run
last_run = pipeline_instance.get_runs()[0]
int_output = last_run.get_step["my_step"].outputs["int_output"].read()

Defining steps

from zenml.steps import step, BaseParameters
from zenml.pipelines import pipeline

# Old: Subclass `BaseParameters` to define parameters for a step
class MyStepParameters(BaseParameters):
    param_1: int
    param_2: Optional[float] = None

@step
def my_step(params: MyStepParameters) -> None:
    ...

@pipeline
def my_pipeline(my_step):
    my_step()

step_instance = my_step(params=MyStepParameters(param_1=17))
pipeline_instance = my_pipeline(my_step=step_instance)

Check out this page for more information on how to parameterize your steps.

Calling a step outside of a pipeline

from zenml.steps import step

@step
def my_step() -> None:
    ...

my_step.entrypoint()  # Old: Call `step.entrypoint(...)`

Defining pipelines

from zenml.pipelines import pipeline

@pipeline
def my_pipeline(my_step):  # Old: steps are arguments of the pipeline function
    my_step()

Configuring pipelines

from zenml.pipelines import pipeline
from zenml.steps import step

@step
def my_step() -> None:
    ...

@pipeline
def my_pipeline(my_step):
    my_step()

# Old: Create an instance of the pipeline and then call `pipeline_instance.configure(...)`
pipeline_instance = my_pipeline(my_step=my_step())
pipeline_instance.configure(enable_cache=False)

Running pipelines

from zenml.pipelines import pipeline
from zenml.steps import step

@step
def my_step() -> None:
    ...

@pipeline
def my_pipeline(my_step):
    my_step()

# Old: Create an instance of the pipeline and then call `pipeline_instance.run(...)`
pipeline_instance = my_pipeline(my_step=my_step())
pipeline_instance.run(...)

Scheduling pipelines

from zenml.pipelines import pipeline, Schedule
from zenml.steps import step

@step
def my_step() -> None:
    ...

@pipeline
def my_pipeline(my_step):
    my_step()

# Old: Create an instance of the pipeline and then call `pipeline_instance.run(schedule=...)`
schedule = Schedule(...)
pipeline_instance = my_pipeline(my_step=my_step())
pipeline_instance.run(schedule=schedule)

Check out this page for more information on how to schedule your pipelines.

Fetching pipelines after execution

pipeline: PipelineView = zenml.post_execution.get_pipeline("first_pipeline")

last_run: PipelineRunView = pipeline.runs[0]
# OR: last_run = my_pipeline.get_runs()[0]

model_trainer_step: StepView = last_run.get_step("model_trainer")

model: ArtifactView = model_trainer_step.output
loaded_model = model.read()

Check out this page for more information on how to programmatically fetch information about previous pipeline runs.

Controlling the step execution order

from zenml.pipelines import pipeline

@pipeline
def my_pipeline(step_1, step_2, step_3):
    step_1()
    step_2()
    step_3()
    step_3.after(step_1)  # Old: Use the `step.after(...)` method
    step_3.after(step_2)

Check out this page for more information on how to control the step execution order.

Defining steps with multiple outputs

# Old: Use the `Output` class
from zenml.steps import step, Output

@step
def my_step() -> Output(int_output=int, str_output=str):
    ...

Check out this page for more information on how to annotate your step outputs.

Accessing run information inside steps

from zenml.steps import StepContext, step
from zenml.environment import Environment

@step
def my_step(context: StepContext) -> Any:  # Old: `StepContext` class defined as arg
    env = Environment().step_environment
    output_uri = context.get_output_artifact_uri()
    step_name = env.step_name  # Old: Run info accessible via `StepEnvironment`
    ...

Check out this page for more information on how to fetch run information inside your steps using get_step_context().

Last updated