Dynamic Pipelines (Experimental)

Write dynamic pipelines

Important: Before using dynamic pipelines, please review the Limitations and Known Issues section below. This section contains critical information about requirements and known bugs that may affect your pipeline execution, especially when running remotely.

Why Dynamic Pipelines?

Traditional ZenML pipelines require you to define the entire DAG structure at pipeline definition time. While this works well for many use cases, there are scenarios where you need more flexibility:

  • Runtime-dependent workflows: When the number of steps or their configuration depends on data computed during pipeline execution

  • Dynamic parallelization: When you need to spawn multiple parallel step executions based on runtime conditions

  • Conditional execution: When the workflow structure needs to adapt based on intermediate results

Dynamic pipelines allow you to write pipelines that generate their DAG structure dynamically at runtime, giving you the power of Python's control flow (loops, conditionals) combined with ZenML's orchestration capabilities.

Basic Example

The simplest dynamic pipeline uses regular Python control flow to determine step execution:

from zenml import step, pipeline

@step
def generate_int() -> int:
    return 3

@step
def do_something(index: int) -> None:
    print(f"Processing index {index}")

@pipeline(dynamic=True)
def dynamic_pipeline() -> None:
    count = generate_int()
    # `count` is an artifact, we now load the data
    count_data = count.load()

    for idx in range(count_data):
        # This will run sequentially, like regular Python code would.
        do_something(idx)

if __name__ == "__main__":
    dynamic_pipeline()

In this example, the number of do_something steps executed depends on the value returned by generate_int(), which is only known at runtime.

Key Features

Dynamic Step Configuration

You can configure steps dynamically within your pipeline using with_options():

This allows you to modify step behavior based on runtime conditions or data.

Step Runtime Configuration

You can control where a step executes by specifying its runtime:

  • runtime="inline": The step runs in the orchestration environment (same process/container as the orchestrator)

  • runtime="isolated": The orchestrator spins up a separate step execution environment (new container/process)

Use runtime="isolated" when you need:

  • Better resource isolation

  • Different environment requirements

  • Parallel execution (see below)

Use runtime="inline" when you need:

  • Faster execution (no container startup overhead)

  • Shared resources with the orchestrator

  • Sequential execution

Map/Reduce over collections

Dynamic pipelines support a high-level map/reduce pattern over sequence-like step outputs. This lets you fan out a step across items of a collection and then reduce the results without manually writing loops or loading data in the orchestration environment.

Key points:

  • step.map(...) fans out a step over sequence-like inputs.

  • Steps can accept lists of artifacts directly as inputs (useful for reducers).

  • You can pass the mapped output directly to a downstream step without loading in the orchestration environment.

Mapping semantics: map vs product

  • step.map(...): If multiple sequence-like inputs are provided, all must have the same length n. ZenML creates n mapped steps where the i-th step receives the i-th element from each input.

  • step.product(...): Creates a mapped step for each combination of elements across all input sequences (cartesian product).

Example (cartesian product):

Broadcasting inputs with unmapped(...)

If you want to pass a sequence-like artifact as a whole to each mapped invocation (i.e., avoid splitting), wrap it with unmapped(...):

Unpacking mapped outputs

If a mapped step returns multiple outputs, you can split them into separate lists (one per output) using unpack(). This returns a tuple of lists of artifact futures, aligned by mapped invocation.

Notes:

  • results is a future that refers to all outputs of all steps, and unpack() works for both .map(...) and .product(...).

  • Each list contains future objects that refer to a single artifact.

Parallel Step Execution

Dynamic pipelines support true parallel execution using step.submit(). This method returns a StepRunFuture that you can use to wait for results or pass to downstream steps:

The StepRunFuture object provides several methods:

  • result(): Wait for the step to complete and return the artifact response(s)

  • load(): Wait for the step to complete and load the actual artifact data

  • Pass directly: You can pass a StepRunFuture directly to downstream steps, and ZenML will automatically wait for it

When using step.submit(), steps with runtime="isolated" will execute in separate containers/processes, while steps with runtime="inline" will execute in separate threads within the orchestration environment.

Config Templates with depends_on

You can use YAML configuration files to provide default parameters for steps using the depends_on parameter:

The depends_on parameter tells ZenML which steps can be configured via the YAML file. This is particularly useful when you want to allow users to configure pipeline behavior without modifying code.

Pass pipeline parameters when running snapshots from the server

When running a snapshot from the server (either via the UI or the SDK/Rest API), you can now pass pipeline parameters for your dynamic pipelines.

For example:

Limitations and Known Issues

Logging

Our logging storage isn't threadsafe yet, which means logs from parallel steps may be mixed up when multiple steps execute concurrently. This is a known limitation that we're working to address.

Error Handling

When running multiple steps concurrently using step.submit(), a failure in one step does not automatically stop other steps. Instead, they continue executing until finished. You should implement your own error handling logic if you need coordinated failure behavior.

Orchestrator Support

Dynamic pipelines are currently only supported by:

  • local orchestrator

  • kubernetes orchestrator

  • sagemaker orchestrator

  • vertex orchestrator

Other orchestrators will raise an error if you try to run a dynamic pipeline with them.

Artifact Loading

When you call .load() on an artifact in a dynamic pipeline, it synchronously loads the data. For large artifacts or when you want to maintain parallelism, consider passing the step outputs (future or artifact) directly to downstream steps instead of loading them.

Mapping Limitations

  • Mapping is currently supported only over artifacts produced within the same pipeline run (mapping over raw data or external artifacts is not supported).

  • Chunk size for mapped collection loading defaults to 1 and is not yet configurable.

Best Practices

  1. Use runtime="isolated" for parallel steps: This ensures better resource isolation and prevents interference between concurrent step executions.

  2. Handle step outputs appropriately: If you need the data immediately, use .load(). If you're just passing to another step, pass the output directly.

  3. Be mindful of resource usage: Running many steps in parallel can consume significant resources. Monitor your orchestrator's resource limits.

  4. Test incrementally: Start with simple dynamic pipelines and gradually add complexity. Dynamic pipelines can be harder to debug than static ones.

  5. Use config templates for flexibility: The depends_on feature allows you to make pipelines configurable without code changes.

When to Use Dynamic Pipelines

Dynamic pipelines are ideal for:

  • AI agent orchestration: Coordinating multiple autonomous agents (e.g., retrieval or reasoning agents) whose interactions or number of invocations are determined at runtime

  • Hyperparameter tuning: Spawning multiple training runs with different configurations

  • Data processing: Processing variable numbers of data chunks in parallel

  • Conditional workflows: Adapting pipeline structure based on runtime data

  • Dynamic batching: Creating batches based on available data

  • Multi-agent and collaborative AI workflows: Building flexible, adaptive workflows where agents or LLM-driven components can be dynamically spawned, routed, or looped based on outputs, results, or user input

For most standard ML workflows, traditional static pipelines are simpler and more maintainable. Use dynamic pipelines when you specifically need runtime flexibility that static pipelines cannot provide.

Last updated

Was this helpful?