How to migrate your ZenML pipelines and steps from version <=0.39.1 to 0.41.0.
ZenML versions 0.40.0 to 0.41.0 introduced a new and more flexible syntax to define ZenML steps and pipelines. This page contains code samples that show you how to upgrade your steps and pipelines to the new syntax.
Newer versions of ZenML still work with pipelines and steps defined using the old syntax, but the old syntax is deprecated and will be removed in the future.
Overview
from typing import Optionalfrom zenml.steps import BaseParameters, Output, StepContext, stepfrom zenml.pipelines import pipeline# Define a StepclassMyStepParameters(BaseParameters): param_1:int param_2: Optional[float]=None@stepdefmy_step(params: MyStepParameters,context: StepContext,) ->Output(int_output=int, str_output=str): result =int(params.param_1 * (params.param_2 or1)) result_uri = context.get_output_artifact_uri()return result, result_uri# Run the Step separatelymy_step.entrypoint()# Define a Pipeline@pipelinedefmy_pipeline(my_step):my_step()step_instance =my_step(params=MyStepParameters(param_1=17))pipeline_instance =my_pipeline(my_step=step_instance)# Configure and run the Pipelinepipeline_instance.configure(enable_cache=False)schedule =Schedule(...)pipeline_instance.run(schedule=schedule)# Fetch the Pipeline Runlast_run = pipeline_instance.get_runs()[0]int_output = last_run.get_step["my_step"].outputs["int_output"].read()
from typing import Annotated, Optional, Tuplefrom zenml import get_step_context, pipeline, stepfrom zenml.client import Client# Define a Step@stepdefmy_step(param_1:int,param_2: Optional[float]=None) -> Tuple[Annotated[int,"int_output"], Annotated[str,"str_output"]]: result =int(param_1 * (param_2 or1)) result_uri =get_step_context().get_output_artifact_uri()return result, result_uri# Run the Step separatelymy_step()# Define a Pipeline@pipelinedefmy_pipeline():my_step(param_1=17)# Configure and run the Pipelinemy_pipeline = my_pipeline.with_options(enable_cache=False, schedule=schedule)my_pipeline()# Fetch the Pipeline Runlast_run = my_pipeline.last_runint_output = last_run.steps["my_step"].outputs["int_output"].load()
Defining steps
from zenml.steps import step, BaseParametersfrom zenml.pipelines import pipeline# Old: Subclass `BaseParameters` to define parameters for a stepclassMyStepParameters(BaseParameters): param_1:int param_2: Optional[float]=None@stepdefmy_step(params: MyStepParameters) ->None: ...@pipelinedefmy_pipeline(my_step):my_step()step_instance =my_step(params=MyStepParameters(param_1=17))pipeline_instance =my_pipeline(my_step=step_instance)
# New: Directly define the parameters as arguments of your step function.# In case you still want to group your parameters in a separate class,# you can subclass `pydantic.BaseModel` and use that as an argument of your# step functionfrom zenml import pipeline, step@stepdefmy_step(param_1:int,param_2: Optional[float]=None) ->None: ...@pipelinedefmy_pipeline():my_step(param_1=17)
Check out this page for more information on how to parameterize your steps.
Calling a step outside of a pipeline
from zenml.steps import step@stepdefmy_step() ->None: ...my_step.entrypoint()# Old: Call `step.entrypoint(...)`
from zenml import step@stepdefmy_step() ->None: ...my_step()# New: Call the step directly `step(...)`
Defining pipelines
from zenml.pipelines import pipeline@pipelinedefmy_pipeline(my_step): # Old: steps are arguments of the pipeline functionmy_step()
from zenml import pipeline, step@stepdefmy_step() ->None: ...@pipelinedefmy_pipeline():my_step()# New: The pipeline function calls the step directly
Configuring pipelines
from zenml.pipelines import pipelinefrom zenml.steps import step@stepdefmy_step() ->None: ...@pipelinedefmy_pipeline(my_step):my_step()# Old: Create an instance of the pipeline and then call `pipeline_instance.configure(...)`pipeline_instance =my_pipeline(my_step=my_step())pipeline_instance.configure(enable_cache=False)
from zenml import pipeline, step@stepdefmy_step() ->None: ...@pipelinedefmy_pipeline():my_step()# New: Call the `with_options(...)` method on the pipelinemy_pipeline = my_pipeline.with_options(enable_cache=False)
Running pipelines
from zenml.pipelines import pipelinefrom zenml.steps import step@stepdefmy_step() ->None: ...@pipelinedefmy_pipeline(my_step):my_step()# Old: Create an instance of the pipeline and then call `pipeline_instance.run(...)`pipeline_instance =my_pipeline(my_step=my_step())pipeline_instance.run(...)
from zenml import pipeline, step@stepdefmy_step() ->None: ...@pipelinedefmy_pipeline():my_step()my_pipeline()# New: Call the pipeline
Scheduling pipelines
from zenml.pipelines import pipeline, Schedulefrom zenml.steps import step@stepdefmy_step() ->None: ...@pipelinedefmy_pipeline(my_step):my_step()# Old: Create an instance of the pipeline and then call `pipeline_instance.run(schedule=...)`schedule =Schedule(...)pipeline_instance =my_pipeline(my_step=my_step())pipeline_instance.run(schedule=schedule)
from zenml.pipelines import Schedulefrom zenml import pipeline, step@stepdefmy_step() ->None: ...@pipelinedefmy_pipeline():my_step()# New: Set the schedule using the `pipeline.with_options(...)` method and then run itschedule =Schedule(...)my_pipeline = my_pipeline.with_options(schedule=schedule)my_pipeline()
Check out this page for more information on how to schedule your pipelines.
Check out this page for more information on how to programmatically fetch information about previous pipeline runs.
Controlling the step execution order
from zenml.pipelines import pipeline@pipelinedefmy_pipeline(step_1,step_2,step_3):step_1()step_2()step_3() step_3.after(step_1)# Old: Use the `step.after(...)` method step_3.after(step_2)
from zenml import pipeline@pipelinedefmy_pipeline():step_1()step_2()step_3(after=["step_1", "step_2"])# New: Pass the `after` argument when calling a step
Check out this page for more information on how to control the step execution order.
Defining steps with multiple outputs
# Old: Use the `Output` classfrom zenml.steps import step, Output@stepdefmy_step() ->Output(int_output=int, str_output=str): ...
Check out this page for more information on how to annotate your step outputs.
Accessing run information inside steps
from zenml.steps import StepContext, stepfrom zenml.environment import Environment@stepdefmy_step(context: StepContext) -> Any: # Old: `StepContext` class defined as arg env =Environment().step_environment output_uri = context.get_output_artifact_uri() step_name = env.step_name # Old: Run info accessible via `StepEnvironment` ...
from zenml import get_step_context, step@stepdefmy_step() -> Any: # New: StepContext is no longer an argument of the step context =get_step_context() output_uri = context.get_output_artifact_uri() step_name = context.step_name # New: StepContext now has ALL run/step info ...
Check out this page for more information on how to fetch run information inside your steps using get_step_context().