# Flows

A **flow** is the outer durable boundary in Kitaru — the unit of work your platform invokes and the runner executes. Everything inside the flow is tracked at checkpoint boundaries: persisted outputs, retry, replay, resume, and wait. Your harness (Pydantic AI, LangGraph, Claude Agent SDK, raw Python) lives inside the checkpoints. Your platform sits in front of the flow's invocation API. See [Harness, Runtime, Platform](/kitaru/core-concepts/harness-runtime-platform.md) for the bigger picture.

## The shape of a flow

The `@flow` boundary defines the durable execution. Checkpoints inside it are the persisted replay boundaries; the flow body is ordinary Python that orchestrates those checkpoints. Side effects in the flow body itself are not automatically durable — put anything that needs to survive a crash, be replayed, or be rolled back behind a `@checkpoint`.

<figure><img src="https://assets.kitaru.ai/docs/diagrams/flow-shape.png" alt="The flow body orchestrates checkpoints, which are the durable replay boundaries."><figcaption><p>The flow body is the orchestration layer. The checkpoints inside are the replay boundaries.</p></figcaption></figure>

## Defining a flow

Decorate your orchestration function with `@flow`:

```python
from kitaru import flow

@flow
def my_agent(url: str) -> str:
    data = fetch_data(url)
    return process_data(data)
```

The decorated function becomes a callable wrapper object. Inside the flow body, you compose [checkpoints](/kitaru/core-concepts/checkpoints.md) — the units of work whose outputs are persisted.

## Running a flow

Use `.run()` to start an execution:

```python
handle = my_agent.run(url="https://example.com")
print(handle.exec_id)   # unique execution identifier
# ... do other work ...
result = handle.wait()   # block until finished
```

`.run()` submits the execution and immediately returns a `FlowHandle`. The flow runs in the background while your code continues. Call `handle.wait()` when you need the result.

For a synchronous one-liner, chain `.wait()`:

```python
result = my_agent.run(url="https://example.com").wait()
```

To target a remote stack for one execution, pass `stack=`:

```python
handle = my_agent.run(url="https://example.com", stack="production")
```

## Deploying and invoking flows

Use `.run()` when the current source process is starting a flow directly. Use `.deploy()` when you want to save a reusable, versioned flow entrypoint that other processes can invoke later.

```python
# Producer side: create a reusable deployment version.
my_agent.deploy(url="https://example.com")

# Consumer side: start a new execution from the deployed default route.
handle = my_agent.invoke(url="https://example.com")
```

`.invoke()` is the remote invocation verb for deployed flows. If you do not pass `version=` or `tag=`, it invokes the reserved `default` tag. To pin a specific version, pass `version=2`; to route through a named tag, pass `tag="stable"`.

See [Deployments](/kitaru/core-concepts/deployments.md) for auto-versioning, tag semantics, auth context, and worked producer/consumer examples.

## FlowHandle

A `FlowHandle` is returned by `.run()`. It gives you access to the running execution:

| Property / Method | What it does                                                        |
| ----------------- | ------------------------------------------------------------------- |
| `handle.exec_id`  | The unique execution identifier (a string you can store or log)     |
| `handle.status`   | Current execution status (refreshed on each access)                 |
| `handle.wait()`   | Block until the execution finishes, then return the result          |
| `handle.get()`    | Return the result immediately if finished, otherwise raise an error |

{% hint style="info" %}
`handle.get()` does **not** wait. If the execution is still running, it raises a `KitaruStateError`. Use `handle.wait()` when you want to block.
{% endhint %}

### How errors surface

If the flow execution fails, `handle.wait()` raises a typed `KitaruExecutionError` (or a more specific subclass) with the execution ID, final status, and the failure origin attached:

```python
import kitaru

try:
    result = my_agent.run(url="bad-input").wait()
except kitaru.KitaruExecutionError as exc:
    print(exc.exec_id, exc.status, exc.failure_origin)
```

## Runtime options

You can configure execution behavior at the decorator level (defaults) or override per-run via `.run()`:

```python
from kitaru import flow

@flow(retries=2, cache=False)
def my_agent(url: str) -> str:
    ...

# Override at call time
handle = my_agent.run(url="https://example.com", retries=3, cache=True)
```

| Option    | Default | What it controls                                                                                                                                                                                          |
| --------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `retries` | `0`     | Number of automatic retries on failure                                                                                                                                                                    |
| `cache`   | `True`  | Whether checkpointed outputs can be reused from previous runs. Set `False` to disable.                                                                                                                    |
| `stack`   | `None`  | Target execution environment for this run (overrides the active stack default)                                                                                                                            |
| `image`   | `None`  | Container image for remote execution (string, mapping, or settings object). Supports `base_image`, `requirements`, `environment`, `apt_packages`, `replicate_local_python_environment`, and `dockerfile`. |

Per-run values override decorator defaults. If you don't pass an override, the decorator default applies.

For `stack`, the full precedence chain is:

1. `.run(..., stack="...")`
2. `@flow(stack="...")`
3. `kitaru.configure(stack="...")`
4. `KITARU_STACK`
5. `[tool.kitaru].stack` in `pyproject.toml`
6. active stack selected via `kitaru stack use ...`

When a higher layer supplies `stack`, Kitaru binds that stack only for the submission of that execution and then restores the previous active stack. That override does **not** permanently switch your default stack.

## Run, then deploy

`.run()` starts an ad-hoc execution from the current process. It's the right loop for iteration and for calls made from your own code.

For production, a flow is **deployed**: `.deploy()` captures an immutable versioned snapshot that consumers invoke by flow name. Tags route traffic between versions (`default` is the tag your platform normally targets), so you can roll a new version out without changing the invocation surface. Auth is workspace-scoped; there are no per-deployment tokens to rotate.

```python
# Ad-hoc run from the current process
handle = my_agent.run(url="https://example.com")

# Deploy an immutable snapshot (new version each call).
# Parameterized flows take representative deployment-time inputs;
# consumers can override them at invocation time.
my_agent.deploy(url="https://example.com")

# Consumers that have the flow imported can invoke by attribute
handle = my_agent.invoke(url="https://example.com")

# Or invoke source-free by flow name (from Python, CLI, MCP, or HTTP)
from kitaru import KitaruClient
handle = KitaruClient().deployments.invoke(
    flow="my_agent",
    inputs={"url": "https://example.com"},
)
```

## Rules to know

* **Flow functions should compose checkpoints.** The flow body is the orchestration layer — heavy work belongs in [checkpoints](/kitaru/core-concepts/checkpoints.md).
* **Use `.run()` to start flows directly from source.** Direct calls (`my_agent(...)`) are not supported and raise `KitaruUsageError`. Use `my_agent.run(...)` for source-backed executions, or `.invoke(...)` for deployment-backed executions.
* **Retries must be non-negative.** Passing a negative `retries` value raises a `KitaruUsageError`.

## Next steps

* Learn how to break work into durable units with [Checkpoints](/kitaru/core-concepts/checkpoints.md)
* Attach structured data to your executions with [Logging and Metadata](/kitaru/core-concepts/logging.md)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.zenml.io/kitaru/core-concepts/flows.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
