# Dynamic Pipelines

## Why Dynamic Pipelines?

Traditional ZenML pipelines require you to define the entire DAG structure at pipeline definition time. While this works well for many use cases, there are scenarios where you need more flexibility:

* **Runtime-dependent workflows**: When the number of steps or their configuration depends on data computed during pipeline execution
* **Dynamic parallelization**: When you need to spawn multiple parallel step executions based on runtime conditions
* **Conditional execution**: When the workflow structure needs to adapt based on intermediate results

Dynamic pipelines allow you to write pipelines that generate their DAG structure dynamically at runtime, giving you the power of Python's control flow (loops, conditionals) combined with ZenML's orchestration capabilities.

{% hint style="info" %}
Dynamic pipelines are powerful but easy to get wrong (e.g., `.load()` vs `.chunk()`, mapping vs submit). If you use an AI coding agent, the `zenml-pipeline-authoring` skill can guide implementation step-by-step. See [LLM tooling](/reference/llms-txt.md).
{% endhint %}

## Basic Example

The simplest dynamic pipeline uses regular Python control flow to determine step execution:

```python
from zenml import step, pipeline

@step
def generate_int() -> int:
    return 3

@step
def do_something(index: int) -> None:
    print(f"Processing index {index}")

@pipeline(dynamic=True)
def dynamic_pipeline() -> None:
    count = generate_int()
    # `count` is an artifact, we now load the data
    count_data = count.load()

    for idx in range(count_data):
        # This will run sequentially, like regular Python code would.
        do_something(idx)

if __name__ == "__main__":
    dynamic_pipeline()
```

In this example, the number of `do_something` steps executed depends on the value returned by `generate_int()`, which is only known at runtime.

## Key Features

### Dynamic Step Configuration

You can configure steps dynamically within your pipeline using `with_options()`:

```python
@pipeline(dynamic=True)
def dynamic_pipeline():
    some_step.with_options(enable_cache=False)()
```

This allows you to modify step behavior based on runtime conditions or data.

### Artifact name substitutions in dynamic pipelines

Dynamic pipelines support the same artifact name substitutions as regular pipelines. This matters when a dynamically generated step has outputs whose names include runtime-friendly placeholders. The substituted artifact name is still a real output that you can pass to downstream steps.

```python
from typing import Annotated

from zenml import ArtifactConfig, pipeline, step

@step(substitutions={"suffix": "validated"})
def produce() -> Annotated[int, ArtifactConfig(name="score_{suffix}")]:
    return 1

@step
def consume(score: int) -> None:
    print(score)

@pipeline(dynamic=True)
def dynamic_pipeline() -> None:
    score = produce()
    consume(score)
```

One caveat: when you use `child_pipeline.embed(...)`, the child pipeline's own configuration is not applied. That includes child-level `substitutions`; the parent run's configuration controls the steps that execute inline.

### Step Runtime Configuration

You can control where a step executes by specifying its runtime:

* **`runtime="inline"`**: The step runs in the orchestration environment (same process/container as the orchestrator)
* **`runtime="isolated"`**: The orchestrator spins up a separate step execution environment (new container/process)

```python
@step(runtime="isolated")
def some_step() -> None:
    # This step will run in its own isolated environment
    ...

@step(runtime="inline")
def another_step() -> None:
    # This step will run in the orchestration environment
    ...
```

Use `runtime="isolated"` when you need:

* Better resource isolation
* Different environment requirements
* Parallel execution (see below)

Use `runtime="inline"` when you need:

* Faster execution (no container startup overhead)
* Shared resources with the orchestrator
* Sequential execution

### Map/Reduce over collections

Dynamic pipelines support a high-level map/reduce pattern over sequence-like step outputs. This lets you fan out a step across items of a collection and then reduce the results without manually writing loops or loading data in the orchestration environment.

```python
from zenml import pipeline, step

@step
def producer() -> list[int]:
    return [1, 2, 3]

@step
def worker(value: int) -> int:
    return value * 2

@step
def reducer(values: list[int]) -> int:
    return sum(values)

@pipeline(dynamic=True, enable_cache=False)
def map_reduce():
    values = producer()
    results = worker.map(values)   # fan out over collection
    reducer(results)               # pass list of artifacts directly
```

Key points:

* `step.map(...)` fans out a step over sequence-like inputs. These inputs can be either
  * a single list-like output artifact (see the code sample above)
  * a list of output artifacts.
  * the output of a `.map(...)` or `.product(...)` call if the respective step only returns a single output artifact
* Steps can accept lists of artifacts directly as inputs (useful for reducers).
* You can pass the mapped output directly to a downstream step without loading in the orchestration environment.

#### Mapping semantics: map vs product

* `step.map(...)`: If multiple sequence-like inputs are provided, all must have the same length `n`. ZenML creates `n` mapped steps where the i-th step receives the i-th element from each input.
* `step.product(...)`: Creates a mapped step for each combination of elements across all input sequences (cartesian product).

Example (cartesian product):

```python
from zenml import pipeline, step

@step
def int_values() -> list[int]:
    return [1, 2]

@step
def str_values() -> list[str]:
    return ["a", "b", "c"]

@step
def do_something(a: int, b: str) -> int:
    ...

@pipeline(dynamic=True)
def cartesian_example():
    a = int_values()
    b = str_values()
    # Produces 2 * 3 = 6 mapped steps
    do_something.product(a=a, b=b)
```

#### Broadcasting inputs with unmapped(...)

If you want to pass a sequence-like artifact as a whole to each mapped invocation (i.e., avoid splitting), wrap it with `unmapped(...)`:

```python
from zenml import pipeline, step, unmapped

@step
def producer(length: int) -> list[int]:
    return [1] * length

@step
def consumer(a: int, b: list[int]) -> None:
    # `b` is the full list for every mapped call
    ...

@pipeline(dynamic=True)
def unmapped_example():
    a = producer(length=3)   # list of 3 ints
    b = producer(length=4)   # list of 4 ints
    consumer.map(a=a, b=unmapped(b))
```

#### Unpacking mapped outputs

If a mapped step returns multiple outputs, you can split them into separate lists (one per output) using `unpack()`. This returns a tuple of lists of artifact futures, aligned by mapped invocation.

```python
from zenml import pipeline, step

@step
def create_int_list() -> list[int]:
    return [1, 2]

@step
def compute(a: int) -> tuple[int, int]:
    return a * 2, a * 3

@pipeline(dynamic=True)
def map_pipeline():
    ints = create_int_list()
    results = compute.map(a=ints)  # Map over [1, 2]

    # Unpack per-output across all mapped invocations
    double, triple = results.unpack()

    # Each element is an ArtifactFuture; load to get concrete values
    doubles = [f.load() for f in double]  # [2, 4]
    triples = [f.load() for f in triple]  # [3, 6]
```

Notes:

* `results` is a future that refers to all outputs of all steps, and `unpack()` works for both `.map(...)` and `.product(...)`.
* Each list contains future objects that refer to a single artifact.

#### Manual Looping: `.chunk()` vs `.load()`

When looping over artifacts manually, you need two different operations:

| Method        | Purpose                  | When to Use                               |
| ------------- | ------------------------ | ----------------------------------------- |
| `.load()`     | Gets the **actual data** | Making decisions, filtering, control flow |
| `.chunk(idx)` | Creates a **DAG edge**   | Passing to downstream steps               |

{% hint style="info" %}
**Mental model**: `.chunk()` is for wiring (tells the orchestrator "this step depends on item X from upstream"), `.load()` is for decisions (gets values for your Python logic). You typically need both: load to iterate and decide, chunk to wire up the DAG.
{% endhint %}

```python
from zenml import pipeline, step

@step
def create_int_list() -> list[int]:
    return [1, 2, 3, 4]

@step
def compute(a: int) -> int:
    return a * 2

@pipeline(dynamic=True)
def custom_loop():
    ints = create_int_list()

    # .load() to get values for Python control flow (iteration + filtering)
    for index, value in enumerate(ints.load()):
        if value % 2 == 0:
            # .chunk() to create DAG edge (wiring to downstream step)
            chunk = ints.chunk(index=index)
            compute(chunk)
```

### Parallel Step Execution

Dynamic pipelines support true parallel execution using `step.submit()`. This method returns a `StepRunFuture` that you can use to wait for results or pass to downstream steps:

```python
from zenml import step, pipeline

@step
def some_step(arg: int) -> int:
    return arg * 2

@pipeline(dynamic=True)
def dynamic_pipeline():
    # Submit a step for parallel execution
    future = some_step.submit(arg=1)
    
    # Wait and get artifact response(s)
    artifact = future.result()
    
    # Wait and load artifact data
    data = future.load()
    
    # Pass the output to another step
    downstream_step(future)

    # Run multiple steps in parallel
    for idx in range(3):
        some_step.submit(arg=idx)
```

The `StepRunFuture` object provides several methods:

* **`result()`**: Wait for the step to complete and return the artifact response(s)
* **`load()`**: Wait for the step to complete and load the actual artifact data
* **Pass directly**: You can pass a `StepRunFuture` directly to downstream steps, and ZenML will automatically wait for it

{% hint style="info" %}
When using `step.submit()`, steps with `runtime="isolated"` will execute in separate containers/processes, while steps with `runtime="inline"` will execute in separate threads within the orchestration environment.
{% endhint %}

### Child pipelines inside dynamic pipelines

Dynamic pipelines can call other dynamic pipelines from their `@pipeline` body. This is useful for composing larger workflows out of reusable dynamic building blocks.

Key behavior:

* Only dynamic pipelines can be called as child pipelines.
* Child pipelines run on the same stack as the parent run.
* Child pipelines can run synchronously (`child(...)`) or concurrently (`child.submit(...)`).
* Child pipeline calls are only allowed in pipeline bodies, not inside step functions.
* Child pipelines reuse the parent run's Docker image — they don't trigger a new build. The child snapshot inherits the parent's build, code reference, and code path so the child runs against the exact same image and source bundle as the parent.

Child pipeline outputs are returned as artifact references:

* `None`
* A single output artifact
* A tuple of output artifacts

These outputs can be passed directly to downstream steps.

```python
from zenml import pipeline, step

@step
def produce_number() -> int:
    return 42

@pipeline(dynamic=True)
def child_pipeline():
    return produce_number()

@step
def consume_number(value: int) -> None:
    print(value)

@pipeline(dynamic=True)
def parent_pipeline():
    child_output = child_pipeline()
    consume_number(child_output)
```

For concurrent execution, use `submit()` and wait on the future:

```python
@pipeline(dynamic=True)
def parent_pipeline_concurrent():
    future = child_pipeline.submit()
    child_output = future.result()
    consume_number(child_output)
```

### Inline child pipelines with `embed(...)`

Use `child_pipeline.embed(...)` if you want to reuse another dynamic pipeline's body without creating a child pipeline run.

```python
@pipeline(dynamic=True)
def parent_pipeline_inline():
    # Executes child steps in the parent run context
    child_output = child_pipeline.embed()
    consume_number(child_output)
```

`embed(...)` behavior:

* It executes the child pipeline entrypoint inline as part of the parent run.
* It does not create a separate child run in the dashboard.
* It is only valid inside a dynamic pipeline body.
* It is not allowed inside `@step` functions.

{% hint style="warning" %}
**Limitations of `embed(...)`.** Unlike `child_pipeline(...)` and `child_pipeline.submit(...)`, the inline form does not apply the child pipeline's own configuration. The parent run's configuration governs every step that runs inline:

* Child-level `settings`, `retry`, `enable_cache`, `enable_step_logs`, `environment`, `secrets`, `tags`, `substitutions`, `model`, and `on_init` / `on_success` / `on_failure` / `on_cleanup` hooks are ignored.
* Per-step Docker overrides on the child pipeline are also ignored — the parent's image is used for any inline isolated step.
* `depends_on` config templates declared on the child pipeline are not picked up.
* There is no failure isolation: an exception inside the inline body aborts the parent run.

If any of these matter to your use case, call the child as `child_pipeline(...)` (sync) or `child_pipeline.submit(...)` (concurrent) instead. Both create a real child run with its own configuration applied.
{% endhint %}

In short, use:

* `child_pipeline(...)` for a synchronous child run
* `child_pipeline.submit(...)` for a concurrent child run
* `child_pipeline.embed(...)` for embedded execution in the parent run

{% hint style="warning" %}
**Resume idempotency depends on submit order.** Child pipeline child runs are identified by the order of `child_pipeline(...)` / `child_pipeline.submit(...)` calls in the parent body: the first call to `my_pipeline` becomes `pipeline:my_pipeline`, the second becomes `pipeline:my_pipeline_2`, and so on. On resume, ZenML reuses an existing child run only if the same call appears in the same position. If you reorder, insert, or remove child pipeline calls before existing ones, every subsequent ID shifts and previously completed children are re-executed. Same caveat applies to step invocation IDs.
{% endhint %}

### Build, code, and Docker settings inheritance

Child runs share the parent's orchestration environment, image, and code bundle. This has two consequences worth knowing:

* **No new Docker build.** The child snapshot inherits the parent's `build`, `code_reference`, and `code_path`. The child runs against the exact same image and source bundle as the parent — there is no separate build step, and the child's code/dependencies must already be installed in the parent's image.
* **Pipeline-level Docker settings on the child are ignored.** When a child pipeline (or a child step) declares non-default `docker_settings`, those settings are silently overridden by the parent's. If you need a different image for a step inside a child pipeline, configure that step with a `step_operator` or use `runtime="isolated"` together with stack-level resource configuration on the parent.

This applies to all three call modes (`child(...)`, `child.submit(...)`, and `child.embed(...)`).

### Permissions and authentication for nested runs

Nested runs orchestrate from the parent's environment, so they share the parent's API token. The token must be scoped to the **root** run of the nesting tree — the root orchestrator can mint per-child-run tokens for any descendant. Child runs cannot mint tokens for their siblings; only descendants of the same root tree are reachable from a given parent token.

This is transparent for the default flow (the root orchestrator launches everything in the same environment). It matters if you build automation on top of `ZENML_PIPELINE_RUN_ID` tokens — those tokens give you read/update access to the run they were minted for and any of its descendants, but not to siblings or unrelated runs.

### Config Templates with `depends_on`

You can use YAML configuration files to provide default parameters for steps using the `depends_on` parameter:

```yaml
# config.yaml
steps:
  some_step:
    parameters:
      arg: 3
```

```python
# run.py
from zenml import step, pipeline

@step
def some_step(arg: int) -> None:
    print(f"arg is {arg}")

@pipeline(dynamic=True, depends_on=[some_step])
def dynamic_pipeline():
    some_step()

if __name__ == "__main__":
    dynamic_pipeline.with_options(config_path="config.yaml")()
```

The `depends_on` parameter tells ZenML which steps can be configured via the YAML file. This is particularly useful when you want to allow users to configure pipeline behavior without modifying code.

### Pass pipeline parameters when running snapshots from the server

When running a snapshot from the server (either via the UI or the SDK/Rest API), you can now pass pipeline parameters for your dynamic pipelines.

For example:

```python
from zenml.client import Client

Client().trigger_pipeline(snapshot_id=<ID>, run_configuration={"parameters": {"my_param": 3}})
```

## Limitations and Known Issues

### Execution modes and error handling

* The `CONTINUE_ON_FAILURE` execution mode is currently not supported in dynamic pipelines. Instead, you can use `try...except` to catch step exceptions and continue the pipeline.
* When using the `FAIL_FAST` execution mode, failure of a step does not immediately cancel other other **inline** steps. Instead, they continue executing until finished. **Isolated** steps on the other hand will be shut down immediately.

### Orchestrator Support

Dynamic pipelines are currently only supported by:

| Orchestrator                                                                                        | Isolated steps | Handles orchestration environment failures |
| --------------------------------------------------------------------------------------------------- | :------------: | :----------------------------------------: |
| [LocalOrchestrator](https://docs.zenml.io/stacks/stack-components/orchestrators/local)              |        ❌       |                      ❌                     |
| [LocalDockerOrchestrator](https://docs.zenml.io/stacks/stack-components/orchestrators/local-docker) |        ❌       |                      ❌                     |
| [KubernetesOrchestrator](https://docs.zenml.io/stacks/stack-components/orchestrators/kubernetes)    |        ✅       |                      ✅                     |
| [VertexOrchestrator](https://docs.zenml.io/stacks/stack-components/orchestrators/vertex)            |        ✅       |                      ❌                     |
| [SagemakerOrchestrator](https://docs.zenml.io/stacks/stack-components/orchestrators/sagemaker)      |        ✅       |                      ❌                     |
| [AzureMLOrchestrator](https://docs.zenml.io/stacks/stack-components/orchestrators/azureml)          |        ✅       |                      ❌                     |

### Artifact Loading

When you call `.load()` on an artifact in a dynamic pipeline, it synchronously loads the data. For large artifacts or when you want to maintain parallelism, consider passing the step outputs (future or artifact) directly to downstream steps instead of loading them.

### Mapping Limitations

* Mapping is currently supported only over artifacts produced within the same pipeline run (mapping over raw data or external artifacts is not supported).
* Chunk size for mapped collection loading defaults to 1 and is not yet configurable.

## Best Practices

1. **Use `runtime="isolated"` for parallel steps**: This ensures better resource isolation and prevents interference between concurrent step executions.
2. **Handle step outputs appropriately**: If you need the data immediately, use `.load()`. If you're just passing to another step, pass the output directly.
3. **Be mindful of resource usage**: Running many steps in parallel can consume significant resources. Monitor your orchestrator's resource limits.
4. **Test incrementally**: Start with simple dynamic pipelines and gradually add complexity. Dynamic pipelines can be harder to debug than static ones.
5. **Use config templates for flexibility**: The `depends_on` feature allows you to make pipelines configurable without code changes.

## When to Use Dynamic Pipelines

Dynamic pipelines are ideal for:

* **AI agent orchestration**: Coordinating multiple autonomous agents (e.g., retrieval or reasoning agents) whose interactions or number of invocations are determined at runtime
* **Hyperparameter tuning**: Spawning multiple training runs with different configurations
* **Data processing**: Processing variable numbers of data chunks in parallel
* **Conditional workflows**: Adapting pipeline structure based on runtime data
* **Dynamic batching**: Creating batches based on available data
* **Multi-agent and collaborative AI workflows**: Building flexible, adaptive workflows where agents or LLM-driven components can be dynamically spawned, routed, or looped based on outputs, results, or user input

For most standard ML workflows, traditional static pipelines are simpler and more maintainable. Use dynamic pipelines when you specifically need runtime flexibility that static pipelines cannot provide.

## Real-World Example: Hierarchical Document Search

The [`examples/hierarchical_doc_search_agent`](https://github.com/zenml-io/zenml/tree/main/examples/hierarchical_doc_search_agent) example combines dynamic pipelines with Pydantic AI agents for intelligent document traversal. It demonstrates:

* Using `.with_options()` to pass parameters vs artifacts
* The `.chunk()` vs `.load()` pattern: chunks for wiring the DAG, loads for making traversal decisions
* Spawning steps dynamically based on AI agent decisions

Each `traverse_node` call appears as a separate step in the DAG, created at runtime based on what the agent decides to explore.

Two other examples are useful when you want to see dynamic pipelines in more specialized settings:

* [`examples/rlm_document_analysis`](https://github.com/zenml-io/zenml/tree/main/examples/rlm_document_analysis) shows a Recursive Language Model style document-analysis workflow. ZenML decides how many chunk-processing steps to create at runtime, while the LLM loop inside each chunk decides which typed search tools to use.
* [`examples/optuna_hyperparameter_tuning`](https://github.com/zenml-io/zenml/tree/main/examples/optuna_hyperparameter_tuning) combines Optuna's ask API with ZenML dynamic pipelines. Optuna decides which hyperparameters to try next; ZenML runs the trials, tracks their artifacts and metadata, and can fan the work out in parallel.

<figure><img src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" alt="ZenML Scarf"><figcaption></figcaption></figure>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.zenml.io/concepts/steps_and_pipelines/dynamic_pipelines.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
