Orchestrators
Orchestrating the run of pipelines
The orchestrator is one of the most critical components of your stack, as it defines where the actual pipeline job runs. It controls how and where each individual step within a pipeline is executed. Therefore, the orchestrator can be used to great effect to scale jobs into production.
Before reading this chapter, make sure that you are familiar with the concept of stacks, stack components and their flavors.

Base Implementation

ZenML aims to enable orchestration with any orchestration tool. This is where the BaseOrchestrator comes into play. It abstracts away many of the ZenML specific details from the actual implementation and exposes a simplified interface:
  1. 1.
    As it is the base class for a specific type of StackComponent, it inherits from the StackComponent class. This sets the TYPE variable to the specific StackComponentType.
  2. 2.
    The FLAVOR class variable needs to be set in the particular sub-class as it is meant to identify the implementation flavor of the particular orchestrator.
  3. 3.
    Lastly, the base class features one abstractmethod: prepare_or_run_pipeline. In the implementation of every Orchestrator flavor, it is required to define this method with respect to the flavor at hand.
Putting all these considerations together, we end up with the following (simplified) implementation:
1
from abc import ABC, abstractmethod
2
from typing import ClassVar, List, Any
3
4
from tfx.proto.orchestration.pipeline_pb2 import Pipeline as Pb2Pipeline
5
6
from zenml.enums import StackComponentType
7
from zenml.stack import StackComponent
8
from zenml.steps import BaseStep
9
10
11
class BaseOrchestrator(StackComponent, ABC):
12
"""Base class for all ZenML orchestrators"""
13
14
# --- Class variables ---
15
TYPE: ClassVar[StackComponentType] = StackComponentType.ORCHESTRATOR
16
17
@abstractmethod
18
def prepare_or_run_pipeline(
19
self,
20
sorted_steps: List[BaseStep],
21
pipeline: "BasePipeline",
22
pb2_pipeline: Pb2Pipeline,
23
stack: "Stack",
24
runtime_configuration: "RuntimeConfiguration",
25
) -> Any:
26
"""Prepares and runs the pipeline outright or returns an intermediate
27
pipeline representation that gets deployed.
28
"""
Copied!
This is a slimmed-down version of the base implementation which aims to highlight the abstraction layer. In order to see the full implementation and get the complete docstrings, please check the source code on GitHub.

List of available orchestrators

Out of the box, ZenML comes with a LocalOrchestrator implementation, which is a simple implementation for running your pipelines locally.
Moreover, additional orchestrators can be found in specific integrations modules, such as the AirflowOrchestrator in the airflow integration and the KubeflowOrchestrator in the kubeflow integration.
Text
Flavor
Integration
local
built-in
airflow
airflow
kubeflow
kubeflow
vertex
gcp
github
github
If you would like to see the available flavors for artifact stores, you can use the command:
1
zenml orchestrator flavor list
Copied!

Build your own custom orchestrator

If you want to create your own custom flavor for an artifact store, you can follow the following steps:
  1. 1.
    Create a class which inherits from the BaseOrchestrator.
  2. 2.
    Define the FLAVOR class variable.
  3. 3.
    Implement the prepare_or_run_pipeline() based on your desired orchestrator.
Once you are done with the implementation, you can register it through the CLI as:
1
zenml orchestrator flavor register <THE-SOURCE-PATH-OF-YOUR-ORCHESTRATOR>
Copied!

Some additional implementation details

Not all orchestrators are created equal. Here is a few basic categories that differentiate them.

Direct Orchestration

The implementation of a local orchestrator can be summarized in two lines of code:
1
for step in sorted_steps:
2
self.run_step(...)
Copied!
The orchestrator basically iterates through each step and directly executes the step within the same Python process. Obviously all kind of additional configuration could be added around this.

Python Operator based Orchestration

The airflow orchestrator has a slightly more complex implementation of the prepare_or_run_pipeline() method. Instead of immediately executing a step, a PythonOperator is created which contains a _step_callable. This _step_callable will ultimately execute the self.run_step(...) method of the orchestrator. The PythonOperators are assembled into an AirflowDag which is returned. Through some Airflow magic, this DAG is loaded by the connected instance of Airflow and orchestration of this DAG is performed either directly or on a set schedule.

Container-based Orchestration

The kubeflow orchestrator is a great example of container-based orchestration. In an implementation-specific method called prepare_pipeline_deployment() a Docker image containing the complete project context is built.
Within prepare_or_run_pipeline() a yaml file is created as an intermediate representation of the pipeline and uploaded to the Kubeflow instance. To create this yaml file a callable is defined within which a dsl.ContainerOp is created for each step. This ContainerOp contains the container entrypoint command and arguments that will make the image run just the one step. The ContainerOps are assembled according to their interdependencies inside a dsl.Pipeline which can then be compiled into the yaml file.

Base Implementation of the Step Entrypoint Configuration

Within the base Docker images that are used for container-based orchestration the src.zenml.entrypoints.step_entrypoint.py is the default entrypoint to run a specific step. It does so by loading an orchestrator specific StepEntrypointConfiguration object. This object is then used to parse all entrypoint arguments (e.g. --step_source ). Finally, the StepEntrypointConfiguration.run() method is used to execute the step. Under the hood this will eventually also call the orchestrators run_step() method.
The StepEntrypointConfiguration is the base class that already implements most of the required functionality. Let's dive right into it:
  1. 1.
    The DEFAULT_SINGLE_STEP_CONTAINER_ENTRYPOINT_COMMAND is the default entrypoint command for the Docker container.
  2. 2.
    Some arguments are mandatory for the step entrypoint. These are set as constants at the top of the file and used as the minimum required arguments.
  3. 3.
    The run() method uses the parsed arguments to set up all required prerequisites before ultimately executing the step.
Here is a schematic view of what the StepEntrypointConfiguration looks like:
1
from abc import ABC, abstractmethod
2
from typing import Any, List, Set
3
4
from zenml.steps import BaseStep
5
6
DEFAULT_SINGLE_STEP_CONTAINER_ENTRYPOINT_COMMAND = [
7
"python",
8
"-m",
9
"zenml.entrypoints.step_entrypoint",
10
]
11
# Constants for all the ZenML default entrypoint options
12
ENTRYPOINT_CONFIG_SOURCE_OPTION = "entrypoint_config_source"
13
PIPELINE_JSON_OPTION = "pipeline_json"
14
MAIN_MODULE_SOURCE_OPTION = "main_module_source"
15
STEP_SOURCE_OPTION = "step_source"
16
INPUT_SPEC_OPTION = "input_spec"
17
18
19
class StepEntrypointConfiguration(ABC):
20
21
# --- This has to be implemented by the subclass ---
22
@abstractmethod
23
def get_run_name(self, pipeline_name: str) -> str:
24
"""Returns the run name."""
25
26
# --- These can be implemented by subclasses ---
27
@classmethod
28
def get_custom_entrypoint_options(cls) -> Set[str]:
29
"""Custom options for this entrypoint configuration"""
30
return set()
31
32
@classmethod
33
def get_custom_entrypoint_arguments(
34
cls, step: BaseStep, **kwargs: Any
35
) -> List[str]:
36
"""Custom arguments the entrypoint command should be called with."""
37
return []
38
39
# --- This will ultimately be called by the step entrypoint ---
40
41
def run(self) -> None:
42
"""Prepares execution and runs the step that is specified by the
43
passed arguments"""
44
...
Copied!
This is a slimmed-down version of the base implementation which aims to highlight the abstraction layer. In order to see the full implementation and get the complete docstrings, please check the API docs.

Build your own Step Entrypoint Configuration

There is only one mandatory method get_run_name(...) that you need to implement in order to get a functioning entrypoint. Inside this method you need to return a string which has to be the same for all steps that are executed as part of the same pipeline run.
If you need to pass additional arguments to the entrypoint, there are two methods that you need to implement:
  • get_custom_entrypoint_options(): This method should return all the additional options that you require in the entrypoint.
  • get_custom_entrypoint_arguments(...): This method should return a list of arguments that should be passed to the entrypoint. The arguments need to provide values for all options defined in the custom_entrypoint_options() method mentioned above.