Airflow Orchestrator
How to orchestrate pipelines with Airflow
The Airflow orchestrator is an orchestrator flavor provided with the ZenML
airflow
integration that uses Airflow to run your pipelines.You should use the Airflow orchestrator if
- you're looking for a proven production-grade orchestrator.
- you're already using Airflow.
- you want to run your pipelines locally.
- you're willing to deploy and maintain Airflow.
The Airflow orchestrator can be used to run pipelines locally as well as remotely. In the local case, no additional setup is necessary.
There are many options to use a deployed Airflow server:
- Use one of ZenML's Airflow stack recipes. This is the simplest solution to get ZenML working with Airflow, as the recipe also takes care of additional steps such as installing required Python dependencies in your Airflow server environment.
If you're not using a stack recipe to deploy Airflow, there are some additional python packages that you'll need to install in the Python environment of your Airflow server:
pydantic~=1.9.2
: The Airflow DAG files that ZenML creates for you require Pydantic to parse and validate configuration files.apache-airflow-providers-docker
orapache-airflow-providers-cncf-kubernetes
, depending on which Airflow operator you'll be using to run your pipeline steps. Check out this section for more information on supported operators.
To use the Airflow orchestrator, we need:
- The ZenML
airflow
integration installed. If you haven't done so, runzenml integration install airflow - The orchestrator registered and part of our active stack:
zenml orchestrator register <ORCHESTRATOR_NAME> \
--flavor=airflow \
--local=True # set this to `False` if using a remote Airflow deployment
# Register and activate a stack with the new orchestrator
zenml stack register <STACK_NAME> -o <ORCHESTRATOR_NAME> ... --set
Local
Remote
In the local case, we need to install one additional Python package that is needed for the local Airflow server:
pip install apache-airflow-providers-docker
Once that is installed, we can start the local Airflow server by running:
zenml stack up
This command will start up an Airflow server on your local machine that's running in the same Python environment that you used to provision it. When it is finished, it will print a username and password which you can use to log in to the Airflow UI here.
As long as you didn't configure any custom value for the
dag_output_dir
attribute of your orchestrator, running a pipeline locally is as simple as calling:python file_that_runs_a_zenml_pipeline.py
This call will produce a
.zip
file containing a representation of your ZenML pipeline to the Airflow DAGs directory. From there, the local Airflow server will load it and run your pipeline (It might take a few seconds until the pipeline shows up in the Airflow UI).The ability to provision resources using the
zenml stack up
command is deprecated and will be removed in a future release. While it is still available for the Airflow orchestrator, we recommend following the steps to set up a local Airflow server manually.- 1.Install the
apache-airflow
package in your Python environment where ZenML is installed. - 2.The Airflow environment variables are used to configure the behavior of the Airflow server. The following variables are particularly important to set:
- 3.
AIRFLOW_HOME
: This variable defines the location where the Airflow server stores its database and configuration files. The default value is ~/airflow. - 4.
AIRFLOW__CORE__DAGS_FOLDER
: This variable defines the location where the Airflow server looks for DAG files. The default value is <AIRFLOW_HOME>/dags. - 5.
AIRFLOW__CORE__LOAD_EXAMPLES
: This variable controls whether the Airflow server should load the default set of example DAGs. The default value is false, which means that the example DAGs will not be loaded. - 6.
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
: This variable controls how often the Airflow scheduler checks for new or updated DAGs. By default, the scheduler will check for new DAGs every 30 seconds. This variable can be used to increase or decrease the frequency of the checks, depending on the specific needs of your pipeline.
export AIRFLOW_HOME=...
export AIRFLOW__CORE__DAGS_FOLDER=...
export AIRFLOW__CORE__LOAD_EXAMPLES=false
export AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=10
# Prevent crashes during forking on MacOS
# https://github.com/apache/airflow/issues/28487
export no_proxy=*
- 3.Run
airflow standalone
to initialize the database, create a user, and start all components for you.
When using the Airflow orchestrator with a remote deployment, you'll additionally need:
In the remote case, the Airflow orchestrator works differently than other ZenML orchestrators. Executing a python file which runs a pipeline by calling
pipeline.run()
will not actually run the pipeline, but instead will create a .zip
file containing an Airflow representation of your ZenML pipeline. In one additional step, you need to make sure this zip file ends up in the DAGs directory of your Airflow deployment.ZenML will build a Docker image called
<CONTAINER_REGISTRY_URI>/zenml:<PIPELINE_NAME>
which includes your code and use it to run your pipeline steps in Airflow. Check out this page if you want to learn more about how ZenML builds these images and how you can customize them.Airflow comes with its own UI that you can use to find further details about your pipeline runs, such as the logs of your steps. For local Airflow, you can find the Airflow UI at http://localhost:8080 by default. Alternatively, you can get the orchestrator UI URL in Python using the following code snippet:
from zenml.post_execution import get_run
pipeline_run = get_run("<PIPELINE_RUN_NAME>")
orchestrator_url = deployer_step.metadata["orchestrator_url"].value
For additional configuration of the Airflow orchestrator, you can pass
AirflowOrchestratorSettings
when defining or running your pipeline. Check out the API docs for a full list of available attributes and this docs page for more information on how to specify settings.Note that if you wish to use this orchestrator to run steps on a GPU, you will need to follow the instructions on this page to ensure that it works. It requires adding some extra settings customization and is essential to enable CUDA for the GPU to give its full acceleration.
Airflow operators specify how a step in your pipeline gets executed. As ZenML relies on Docker images to run pipeline steps, only operators that support executing a Docker image work in combination with ZenML. Airflow comes with two operators that support this:
- the
DockerOperator
runs the Docker images for executing your pipeline steps on the same machine that your Airflow server is running on. For this to work, the server environment needs to have theapache-airflow-providers-docker
package installed. - the
KubernetesPodOperator
runs the Docker image on a pod in the Kubernetes cluster that the Airflow server is deployed to. For this to work, the server environment needs to have theapache-airflow-providers-cncf-kubernetes
package installed.
You can specify which operator to use and additional arguments to it as follows:
from zenml.pipelines import pipeline
from zenml.steps import step
from zenml.integrations.airflow.flavors.airflow_orchestrator_flavor import AirflowOrchestratorSettings
airflow_settings = AirflowOrchestratorSettings(
operator="docker" # or "kubernetes_pod"
# Dictionary of arguments to pass to the operator __init__ method
operator_args={}
)
# Using the operator for a single step
@step(settings={"orchestrator.airflow": airflow_settings})
def my_step(...)
# Using the operator for all steps in your pipeline
@pipeline(settings={"orchestrator.airflow": airflow_settings})
def my_pipeline(...)
Custom operators
If you want to use any other operator to run your steps, you can specify the
operator
in your AirflowSettings
as a path to the python operator class:from zenml.integrations.airflow.flavors.airflow_orchestrator_flavor import AirflowOrchestratorSettings
airflow_settings = AirflowOrchestratorSettings(
# This could also be a reference to one of your custom classes.
# e.g. `my_module.MyCustomOperatorClass` as long as the class
# is importable in your Airflow server environment
operator="airflow.providers.docker.operators.docker.DockerOperator"
# Dictionary of arguments to pass to the operator __init__ method
operator_args={}
)
Custom DAG generator file
To run a pipeline in Airflow, ZenML creates a Zip archive which contains two files:
- A Json configuration file that the orchestrator creates. This file contains all the information required to create the Airflow DAG to run the pipeline.
- A Python file which reads this configuration file and actually creates the Airflow DAG. We call this file the
DAG generator
and you can find the implementation here.
If you need more control over how the Airflow DAG is generated, you can provide a custom DAG generator file using the setting
custom_dag_generator
. This setting will need to reference a Python module that can be imported in your active Python environment. It will additionally need to contain the same classes (DagConfiguration
and TaskConfiguration
) and constants (ENV_ZENML_AIRFLOW_RUN_ID
, ENV_ZENML_LOCAL_STORES_PATH
and CONFIG_FILENAME
) as the original module. For this reason we suggest to start by copying the original and modifying it according to your needs.For more information and a full list of configurable attributes of the Airflow orchestrator, check out the API Docs.
Last modified 2d ago