LogoLogo
ProductResourcesGitHubStart free
  • Documentation
  • Learn
  • ZenML Pro
  • Stacks
  • API Reference
  • SDK Reference
  • Overview
  • Integrations
  • Stack Components
    • Orchestrators
      • Local Orchestrator
      • Local Docker Orchestrator
      • Kubeflow Orchestrator
      • Kubernetes Orchestrator
      • Google Cloud VertexAI Orchestrator
      • AWS Sagemaker Orchestrator
      • AzureML Orchestrator
      • Databricks Orchestrator
      • Tekton Orchestrator
      • Airflow Orchestrator
      • Skypilot VM Orchestrator
      • HyperAI Orchestrator
      • Lightning AI Orchestrator
      • Develop a custom orchestrator
    • Artifact Stores
      • Local Artifact Store
      • Amazon Simple Cloud Storage (S3)
      • Google Cloud Storage (GCS)
      • Azure Blob Storage
      • Develop a custom artifact store
    • Container Registries
      • Default Container Registry
      • DockerHub
      • Amazon Elastic Container Registry (ECR)
      • Google Cloud Container Registry
      • Azure Container Registry
      • GitHub Container Registry
      • Develop a custom container registry
    • Step Operators
      • Amazon SageMaker
      • AzureML
      • Google Cloud VertexAI
      • Kubernetes
      • Modal
      • Spark
      • Develop a Custom Step Operator
    • Experiment Trackers
      • Comet
      • MLflow
      • Neptune
      • Weights & Biases
      • Google Cloud VertexAI Experiment Tracker
      • Develop a custom experiment tracker
    • Image Builders
      • Local Image Builder
      • Kaniko Image Builder
      • AWS Image Builder
      • Google Cloud Image Builder
      • Develop a Custom Image Builder
    • Alerters
      • Discord Alerter
      • Slack Alerter
      • Develop a Custom Alerter
    • Annotators
      • Argilla
      • Label Studio
      • Pigeon
      • Prodigy
      • Develop a Custom Annotator
    • Data Validators
      • Great Expectations
      • Deepchecks
      • Evidently
      • Whylogs
      • Develop a custom data validator
    • Feature Stores
      • Feast
      • Develop a Custom Feature Store
    • Model Deployers
      • MLflow
      • Seldon
      • BentoML
      • Hugging Face
      • Databricks
      • vLLM
      • Develop a Custom Model Deployer
    • Model Registries
      • MLflow Model Registry
      • Develop a Custom Model Registry
  • Service Connectors
    • Introduction
    • Complete guide
    • Best practices
    • Connector Types
      • Docker Service Connector
      • Kubernetes Service Connector
      • AWS Service Connector
      • GCP Service Connector
      • Azure Service Connector
      • HyperAI Service Connector
  • Popular Stacks
    • AWS
    • Azure
    • GCP
    • Kubernetes
  • Deployment
    • 1-click Deployment
    • Terraform Modules
    • Register a cloud stack
    • Infrastructure as code
  • Contribute
    • Custom Stack Component
    • Custom Integration
Powered by GitBook
On this page
  • Base Implementation
  • Build your own custom orchestrator
  • Implementation guide
  • Optional features
  • Code sample
  • Enabling CUDA for GPU-backed hardware

Was this helpful?

Edit on GitHub
  1. Stack Components
  2. Orchestrators

Develop a custom orchestrator

Learning how to develop a custom orchestrator.

PreviousLightning AI OrchestratorNextArtifact Stores

Last updated 8 days ago

Was this helpful?

Before diving into the specifics of this component type, it is beneficial to familiarize yourself with our . This guide provides an essential understanding of ZenML's component flavor concepts.

Base Implementation

ZenML aims to enable orchestration with any orchestration tool. This is where the BaseOrchestrator comes into play. It abstracts away many of the ZenML-specific details from the actual implementation and exposes a simplified interface:

from abc import ABC, abstractmethod
from typing import Any, Dict, Type

from zenml.models import PipelineDeploymentResponseModel, PipelineRunResponse
from zenml.enums import StackComponentType
from zenml.stack import StackComponent, StackComponentConfig, Stack, Flavor


class BaseOrchestratorConfig(StackComponentConfig):
    """Base class for all ZenML orchestrator configurations."""


class BaseOrchestrator(StackComponent, ABC):
    """Base class for all ZenML orchestrators"""

    @abstractmethod
    def prepare_or_run_pipeline(
        self,
        deployment: PipelineDeploymentResponseModel,
        stack: Stack,
        environment: Dict[str, str],
        placeholder_run: Optional[PipelineRunResponse] = None,
    ) -> Any:
        """Prepares and runs the pipeline outright or returns an intermediate
        pipeline representation that gets deployed.
        """

    @abstractmethod
    def get_orchestrator_run_id(self) -> str:
        """Returns the run id of the active orchestrator run.

        Important: This needs to be a unique ID and return the same value for
        all steps of a pipeline run.

        Returns:
            The orchestrator run id.
        """


class BaseOrchestratorFlavor(Flavor):
    """Base orchestrator for all ZenML orchestrator flavors."""

    @property
    @abstractmethod
    def name(self):
        """Returns the name of the flavor."""

    @property
    def type(self) -> StackComponentType:
        """Returns the flavor type."""
        return StackComponentType.ORCHESTRATOR

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor."""
        return BaseOrchestratorConfig

    @property
    @abstractmethod
    def implementation_class(self) -> Type["BaseOrchestrator"]:
        """Implementation class for this flavor."""

Build your own custom orchestrator

If you want to create your own custom flavor for an orchestrator, you can follow the following steps:

  1. Create a class that inherits from the BaseOrchestrator class and implement the abstract prepare_or_run_pipeline(...) and get_orchestrator_run_id() methods.

  2. If you need to provide any configuration, create a class that inherits from the BaseOrchestratorConfig class and add your configuration parameters.

  3. Bring both the implementation and the configuration together by inheriting from the BaseOrchestratorFlavor class. Make sure that you give a name to the flavor through its abstract property.

Once you are done with the implementation, you can register it through the CLI. Please ensure you point to the flavor class via dot notation:

zenml orchestrator flavor register <path.to.MyOrchestratorFlavor>

For example, if your flavor class MyOrchestratorFlavor is defined in flavors/my_flavor.py, you'd register it by doing:

zenml orchestrator flavor register flavors.my_flavor.MyOrchestratorFlavor

If ZenML does not find an initialized ZenML repository in any parent directory, it will default to the current working directory, but usually, it's better to not have to rely on this mechanism and initialize zenml at the root.

Afterward, you should see the new flavor in the list of available flavors:

zenml orchestrator flavor list

It is important to draw attention to when and how these base abstractions are coming into play in a ZenML workflow.

  • The CustomOrchestratorFlavor class is imported and utilized upon the creation of the custom flavor through the CLI.

  • The CustomOrchestratorConfig class is imported when someone tries to register/update a stack component with this custom flavor. Especially, during the registration process of the stack component, the config will be used to validate the values given by the user. As Config object are inherently pydantic objects, you can also add your own custom validators here.

  • The CustomOrchestrator only comes into play when the component is ultimately in use.

The design behind this interaction lets us separate the configuration of the flavor from its implementation. This way we can register flavors and components even when the major dependencies behind their implementation are not installed in our local setting (assuming the CustomOrchestratorFlavor and the CustomOrchestratorConfig are implemented in a different module/path than the actual CustomOrchestrator).

Implementation guide

  1. Create your orchestrator class: This class should either inherit from BaseOrchestrator, or more commonly from ContainerizedOrchestrator. If your orchestrator uses container images to run code, you should inherit from ContainerizedOrchestrator which handles building all Docker images for the pipeline to be executed. If your orchestator does not use container images, you'll be responsible that the execution environment contains all the necessary requirements and code files to run the pipeline.

  2. Implement the prepare_or_run_pipeline(...) method: This method is responsible for running or scheduling the pipeline. In most cases, this means converting the pipeline into a format that your orchestration tool understands and running it. To do so, you should:

    • Loop over all steps of the pipeline and configure your orchestration tool to run the correct command and arguments in the correct Docker image

    • Make sure the passed environment variables are set when the container is run

    • Make sure the containers are running in the correct order

  3. Implement the get_orchestrator_run_id() method: This must return a ID that is different for each pipeline run, but identical if called from within Docker containers running different steps of the same pipeline run. If your orchestrator is based on an external tool like Kubeflow or Airflow, it is usually best to use an unique ID provided by this tool.

Optional features

There are some additional optional features that your orchestrator can implement:

  • Running pipelines on a schedule: if your orchestrator supports running pipelines on a schedule, make sure to handle deployment.schedule if it exists. If your orchestrator does not support schedules, you should either log a warning and or even raise an exception in case the user tries to schedule a pipeline.

  • Specifying hardware resources: If your orchestrator supports setting resources like CPUs, GPUs or memory for the pipeline or specific steps, make sure to handle the values defined in step.config.resource_settings. See the code sample below for additional helper methods to check whether any resources are required from your orchestrator.

Code sample

from typing import Dict

from zenml.entrypoints import StepEntrypointConfiguration
from zenml.models import PipelineDeploymentResponseModel, PipelineRunResponse
from zenml.orchestrators import ContainerizedOrchestrator
from zenml.stack import Stack


class MyOrchestrator(ContainerizedOrchestrator):

    def get_orchestrator_run_id(self) -> str:
        # Return an ID that is different each time a pipeline is run, but the
        # same for all steps being executed as part of the same pipeline run.
        # If you're using some external orchestration tool like Kubeflow, you
        # can usually use the run ID of that tool here.
        ...

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponseModel",
        stack: "Stack",
        environment: Dict[str, str],
        placeholder_run: Optional["PipelineRunResponse"] = None,
    ) -> None:
        # If your orchestrator supports scheduling, you should handle the schedule
        # configured by the user. Otherwise you might raise an exception or log a warning
        # that the orchestrator doesn't support scheduling
        if deployment.schedule:
            ...

        for step_name, step in deployment.step_configurations.items():
            image = self.get_image(deployment=deployment, step_name=step_name)
            command = StepEntrypointConfiguration.get_entrypoint_command()
            arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name, deployment_id=deployment.id
            )
            # Your orchestration tool should run this command and arguments
            # in the Docker image fetched above. Additionally, the container which
            # is running the command must contain the environment variables specified
            # in the `environment` dictionary.
            
            # If your orchestrator supports parallel execution of steps, make sure
            # each step only runs after all its upstream steps finished
            upstream_steps = step.spec.upstream_steps

            # You can get the settings your orchestrator like so.
            # The settings are the "dynamic" part of your orchestrators config,
            # optionally defined when you register your orchestrator but can be
            # overridden at runtime.
            # In contrast, the "static" part of your orchestrators config is
            # always defined when you register the orchestrator and can be
            # accessed via `self.config`.
            step_settings = cast(
                MyOrchestratorSettings, self.get_settings(step)
            )

            # If your orchestrator supports setting resources like CPUs, GPUs or
            # memory for the pipeline or specific steps, you can find out whether
            # specific resources were specified for this step:
            if self.requires_resources_in_orchestration_environment(step):
                resources = step.config.resource_settings

Enabling CUDA for GPU-backed hardware

This is a slimmed-down version of the base implementation which aims to highlight the abstraction layer. In order to see the full implementation and get the complete docstrings, please check .

ZenML resolves the flavor class by taking the path where you initialized zenml (via zenml init) as the starting point of resolution. Therefore, please ensure you follow of initializing zenml at the root of your repository.

Check out the below for more details on how to fetch the Docker image, command, arguments and step order.

To see a full end-to-end worked example of a custom orchestrator, .

To see a full end-to-end worked example of a custom orchestrator, .

Note that if you wish to use your custom orchestrator to run steps on a GPU, you will need to follow to ensure that it works. It requires adding some extra settings customization and is essential to enable CUDA for the GPU to give its full acceleration.

general guide to writing custom component flavors in ZenML
the source code on GitHub
the best practice
see here
see here
the instructions on this page
code sample
ZenML Scarf