Runtime Configuration
Configure Step and Pipeline Parameters for Each Run.
A ZenML pipeline clearly separates business logic from parameter configuration. Business logic is what defines a step and the pipeline. Step and pipeline configurations are used to dynamically set parameters at runtime.
You can configure your pipelines at runtime in the following ways:
  • Configure from within the code: Do this when you are quickly iterating on your code and don't want to change your actual step code. This is useful in the development phase.
  • Configure from the CLI and a YAML config: Do this when you want to launch pipeline runs without modifying the code at all. This is most useful in production scenarios.

Configuring from within code

You can easily add a configuration to a step by creating your configuration as a subclass to the BaseStepConfig. When such a config object is passed to a step, it is not treated like other artifacts. Instead, it gets passed into the step when the pipeline is instantiated.
1
from zenml.steps import step, Output, BaseStepConfig
2
3
4
class SecondStepConfig(BaseStepConfig):
5
"""Trainer params"""
6
multiplier: int = 4
7
8
9
@step
10
def my_second_step(config: SecondStepConfig, input_int: int, input_float: float
11
) -> Output(output_int=int, output_float=float):
12
"""Step that multiply the inputs"""
13
return config.multiplier * input_int, config.multiplier * input_float
Copied!
The default value for the multiplier is set to 4. However, when the pipeline is instantiated you can override the default like this:
1
first_pipeline(step_1=my_first_step(),
2
step_2=my_second_step(SecondStepConfig(multiplier=3))
3
).run()
Copied!
This functionality is based on Step Fixtures which you will learn more about below.

Setting step parameters using a config file

In addition to setting parameters for your pipeline steps in code as seen above, ZenML also allows you to use a configuration YAML file. This configuration file must follow the following structure:
1
steps:
2
step_name:
3
parameters:
4
parameter_name: parameter_value
5
some_other_parameter_name: 2
6
some_other_step_name:
7
...
Copied!
For our example from above this results in the following configuration yaml.
1
steps:
2
step_2:
3
parameters:
4
multiplier: 3
Copied!
Use the configuration file by calling the pipeline method with_config(...):
1
first_pipeline(step_1=my_first_step(),
2
step_2=my_second_step()
3
).with_config("path_to_config.yaml").run()
Copied!

Configuring from the CLI and a YAML config file

In case you want to have control to configure and run your pipeline from outside your code. For this you can use the ZenML command line argument:
1
zenml pipeline run <NAME-OF-PYTHONFILE> -c <NAME-OF-CONFIG-YAML-FILE>
Copied!
Do not instantiate and run your pipeline within the python file that you want to run using the CLI, else your pipeline will be run twice, possibly with different configurations.
This will require a config file with a bit more information than how it is described above.

Define the name of the pipeline definition

You will need to define which pipeline to run by it's name.
1
name: <name_of_your_pipeline>
2
...
Copied!
In case you defined your pipeline using decorators this name is the name of the decorated function. If you used the Class Based API, it will be the name of your class.
1
from zenml.pipelines import pipeline, BasePipeline
2
3
4
@pipeline
5
def name_of_your_pipeline(...):
6
...
7
8
9
class ClassBasedPipelineName(BasePipeline):
10
...
Copied!

Supply the names of the step functions (and materializers)

In total the step functions can be supplied with 3 arguments here:
  • source:
    • name of the step *(Optional) file of the step (file contains the step)
1
steps:
2
step_1:
3
source:
4
name: <step_name>
5
file: <relative/filepath>
Copied!
  • parameters - list of parameters for the StepConfig
  • materializers - dict of output_name and corresponding Materializer name and file
Materializers are responsible for reading and writing. You can learn more about Materializers in the materializer section.
1
...
2
steps:
3
...
4
step_2:
5
source:
6
name: <step_name>
7
parameters:
8
multiplier: 3
9
materializers:
10
output_obj:
11
name: <MaterializerName>
12
file: <relative/filepath>
Copied!
Again the step name corresponds to the function or class name of your step. The materializer name refers to the class name of your materializer.
1
from zenml.materializers.base_materializer import BaseMaterializer
2
from zenml.steps import step
3
4
5
class MaterializerName(BaseMaterializer):
6
...
7
8
9
@step
10
def step_name(...):
11
...
Copied!

When you put it all together you would have something that looks like this:

CLI Command
config.yaml
run.py
Equivalent run from python
1
zenml pipeline run run.py -c config.yaml
Copied!
1
name: first_pipeline
2
steps:
3
step_1:
4
source:
5
name: my_first_step
6
step_2:
7
source:
8
name: my_second_step
9
parameters:
10
multiplier: 3
11
materializers:
12
output_obj:
13
name: MyMaterializer
Copied!
1
import os
2
from typing import Type
3
4
from zenml.artifacts import DataArtifact
5
from zenml.io import fileio
6
from zenml.materializers.base_materializer import BaseMaterializer
7
from zenml.steps import step, Output, BaseStepConfig
8
from zenml.pipelines import pipeline
9
10
11
class MyObj:
12
def __init__(self, name: str):
13
self.name = name
14
15
16
class MyMaterializer(BaseMaterializer):
17
ASSOCIATED_TYPES = (MyObj,)
18
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
19
20
def handle_input(self, data_type: Type[MyObj]) -> MyObj:
21
"""Read from artifact store"""
22
super().handle_input(data_type)
23
with fileio.open(os.path.join(self.artifact.uri, 'data.txt'),
24
'r') as f:
25
name = f.read()
26
return MyObj(name=name)
27
28
def handle_return(self, my_obj: MyObj) -> None:
29
"""Write to artifact store"""
30
super().handle_return(my_obj)
31
with fileio.open(os.path.join(self.artifact.uri, 'data.txt'),
32
'w') as f:
33
f.write(my_obj.name)
34
35
36
@step
37
def my_first_step() -> Output(output_int=int, output_float=float):
38
"""Step that returns a pre-defined integer and float"""
39
return 7, 0.1
40
41
42
class SecondStepConfig(BaseStepConfig):
43
"""Trainer params"""
44
multiplier: int = 4
45
46
47
@step
48
def my_second_step(config: SecondStepConfig, input_int: int,
49
input_float: float
50
) -> Output(output_int=int,
51
output_float=float,
52
output_obj=MyObj):
53
"""Step that multiply the inputs"""
54
return (config.multiplier * input_int,
55
config.multiplier * input_float,
56
MyObj("Custom-Object"))
57
58
59
@pipeline(enable_cache=False)
60
def first_pipeline(
61
step_1,
62
step_2
63
):
64
output_1, output_2 = step_1()
65
step_2(output_1, output_2)
Copied!
This is what the same pipeline run would look like if triggered from within python.
1
first_pipeline(
2
step_1=my_first_step(),
3
step_2=(my_second_step(SecondStepConfig(multiplier=3))
4
.with_return_materializers({"output_obj": MyMaterializer}))
5
).run()
Copied!
Pro-Tip: You can easily use this to configure and run your pipeline from within your github action (or comparable tools). This way you ensure each run is directly associated with an associated code version.