For the complete documentation index, see llms.txt. This page is also available as Markdown.

Pydantic AI

Make any PydanticAI agent replayable, resumable, and observable by wrapping it once with KitaruAgent

Kitaru's PydanticAI adapter makes any PydanticAI agent durable without changing its code: wrap the agent once with KitaruAgent, and every model request, tool call, MCP invocation, and human-in-the-loop wait is persisted under a Kitaru flow.

from pydantic_ai import Agent
from kitaru.adapters.pydantic_ai import KitaruAgent

agent = Agent("openai:gpt-4o", name="researcher")
durable_agent = KitaruAgent(agent)

result = durable_agent.run_sync("Summarize quantum error correction.")
print(result.output)

No flow decorator, no checkpoint annotations. When called outside a flow, KitaruAgent auto-opens one for you. By default, checkpoint_strategy="calls" persists model, tool, and MCP calls as separate checkpoints. The dashboard shows the run, tool calls, model responses, and wait points.

Install

Add the pydantic-ai extra — and local if you want the local dashboard:

uv add "kitaru[pydantic-ai,local]"

Initialize the project once:

kitaru init
kitaru login        # local server; add a URL to connect to a deployed one
kitaru status

Migrating an existing PydanticAI project? The zenml-io/kitaru-skills package includes /kitaru:kitaru-pydantic-ai-migration for moving to KitaruAgent with the right checkpoint strategy and human-in-the-loop guardrails. See Agent Skills.

Usage patterns

Zero-config

Wrap the agent and call it directly. The adapter auto-opens a flow and per-call checkpoints with the default checkpoint_strategy="calls" when you're outside of one. If you already have an explicit @kitaru.flow, call the agent directly from the flow body — not from inside another @kitaru.checkpoint — when you want model/tool calls to become separate checkpoint rows.

Best for prototyping, porting an existing agent, or single-turn interactions.

Explicit boundaries

For multi-turn workflows, named replay boundaries, or coordinated waits across turns, use @kitaru.flow and @kitaru.checkpoint yourself. Inside a checkpoint, KitaruAgent is a straight passthrough: the whole agent call belongs to the enclosing checkpoint, and internal PydanticAI model/tool activity is recorded as events and artifacts under that checkpoint instead of opening nested *_tool checkpoints.

Checkpoint an agent turn

This example shows explicit flow/checkpoint boundaries. It creates one checkpoint per ask(...) call. Human approval waits are covered in the next sections.

Replay the flow with the original run ID to serve cached outputs for completed checkpoints and re-execute only what changed. See Replay and overrides.

Ask the human from a tool body

kp.wait_for_input(...) is a thin adapter-namespaced wrapper around kitaru.wait(...). The LLM can pick the question, the tool can return the human's typed answer, and the agent can continue with that value as the tool result — but the wait still has to be created at flow scope.

Two separate safety rules matter with the default checkpoint_strategy="calls":

  1. A regular tool body usually runs inside an adapter-created *_tool checkpoint, and kitaru.wait() is intentionally rejected from checkpoint scope.

  2. Pydantic AI normally moves sync tool functions to a worker thread, while Kitaru waits must be created on the workflow thread.

If a regular sync tool body needs to call kp.wait_for_input(...), configure two separate things: opt that tool out of per-call checkpointing, and explicitly opt into Pydantic AI sync-tool thread compatibility for the run:

Then construct the durable agent with a per-tool checkpoint opt-out and the explicit sync-tool wait compatibility flag:

Both question and schema are ordinary arguments, so the tool body can compute them, branch on agent state, prepend context, or call multiple waits. The adapter attaches identifying metadata (adapter=pydantic_ai, source=tool_body) so these waits are distinguishable from hand-written kitaru.wait() calls in flow code.

If you do not opt the tool out, the adapter fails early with an actionable KitaruUsageError rather than creating a checkpoint-contained wait that would be hard to resume safely. The opt-out is checkpoint-only: it keeps the wait out of the synthetic *_tool checkpoint. The allow_sync_tool_body_waits=True flag separately asks Pydantic AI to keep supported sync tools on the workflow thread while the agent run is active. That compatibility layer applies to sync tools for the whole agent run, so Kitaru only enables it when you ask for it explicitly. The trade-off is concrete: any supported sync tool in that run may execute inline instead of using Pydantic AI's normal worker-thread path, so avoid mixing this opt-in with slow/blocking sync tools if you rely on normal tool parallelism. Another safe option is to move the human gate out of the tool body and call kitaru.wait() directly before or after the agent turn in your @kitaru.flow code.

Running locally, Kitaru prompts in the terminal. Running against a deployed server, the execution pauses and can be resumed from anywhere:

Declarative sugar: @hitl_tool

When a tool is purely a wait — nothing computed in the body, no branching — prefer the @hitl_tool decorator. The body is skipped entirely, and the adapter creates the wait from its own flow-scope code instead of from the user sync tool body.

@hitl_tool(schema=..., question_arg=...) picks up the LLM-supplied argument at runtime (defaults to looking for question). Pass question_arg=None to force the static prompt.

Replay semantics

Waits belong at flow scope. kitaru.wait() is rejected inside @checkpoint bodies because the flow can pause while the enclosing checkpoint step is recorded as failed or incomplete. The default checkpoint_strategy="calls" splits each top-level model, tool, and MCP call into its own checkpoint, which improves visibility and retry isolation, but it also means regular tool-body waits need an explicit per-tool opt-out as shown above.

Runtime behavior and guardrails

Human-in-the-loop tools

The adapter bridges every PydanticAI deferred pattern into kitaru.wait(). A paused flow is visible from kitaru executions list, the dashboard, and the REST API; once input is supplied the flow resumes from the exact same point.

Other PydanticAI deferred patterns also route through kitaru.wait() when they run at flow scope:

  • @agent.tool(requires_approval=True) — PydanticAI's native approval flag

  • raising pydantic_ai.exceptions.ApprovalRequired or CallDeferred from a tool body

  • calling kp.wait_for_input(...) from a tool body

With checkpoint_strategy="calls", @hitl_tool stays flow-scope safe because the adapter deliberately skips the synthetic *_tool checkpoint for that call and creates the wait from adapter-managed code. Regular sync tools that call wait_for_input() need both tool_checkpoint_config_by_name={"tool_name": False} and allow_sync_tool_body_waits=True, or they should move the wait to explicit flow code. Regular tools that raise Pydantic AI approval/deferred exceptions also need the checkpoint opt-out unless they use @hitl_tool.

See Wait, Input, and Resume for how paused flows are resolved.

MCP servers

MCP servers attached to the agent are wrapped automatically. Their tool calls are tracked alongside native tools; with the default checkpoint_strategy="calls", each top-level MCP call gets its own adapter checkpoint. MCPServer.cache_tools=True is honored to skip redundant tools/list round-trips on replay.

Checkpoint strategy

The adapter offers two strategies for how agent work maps onto Kitaru checkpoints. Pick per agent based on how you want to replay and retry.

Strategy
How it maps
Replay unit
Best for

"calls" (default)

No turn checkpoint; each replay-safe model/tool/MCP call becomes its own checkpoint when the agent runs at flow scope outside any existing checkpoint

Per call

Expensive model calls, flaky tools, long tool-call chains where one failure shouldn't rewind everything

"turn"

One checkpoint per agent run; model/tool/MCP calls are child events and artifacts under that checkpoint

The full turn

Agents where one aggregated checkpoint and checkpoint artifacts like event_log / run_summary are more useful than per-call boundaries

Replay semantics in one sentence. If a flow crashes on the 8th model call of a turn, "turn" re-runs the whole turn; "calls" gives the earlier calls their own completed checkpoint boundaries when Kitaru is allowed to create those boundaries. If you set cache=True on adapter-created checkpoint configs, repeated runs can reuse completed checkpoints when the logical inputs are the same; changed prompts, message history, model settings, tool arguments, tool call IDs, or behavior-changing run options produce different cache keys and should miss cache.

checkpoint_strategy="calls" cannot create nested checkpoints inside your own @kitaru.checkpoint body. If you wrap durable_agent.run_sync(...) in a checkpoint, the outer checkpoint wins and model/tool calls are recorded as adapter events/artifacts under that checkpoint.

checkpoint_strategy="calls" is the default. It is shown here for clarity when setting per-call checkpoint configs:

Each config is a CheckpointConfig TypedDict accepting:

  • cache: bool | None — passed through to @kitaru.checkpoint(cache=...). Use True to opt adapter-created checkpoints into step caching, False to disable caching for that boundary, or omit it / use None to inherit the stack default.

  • runtime: "inline" — run in-process. runtime="isolated" is not yet supported on adapter-managed checkpoints and raises KitaruUsageError.

  • retries: int — auto-retry the call on failure.

  • type: str — dashboard grouping. Defaults to "llm_call", "tool_call", or "mcp_call" so adapter checkpoints group with native kitaru.llm() / @kitaru.checkpoint(type="tool_call") calls.

The turn checkpoint itself is configured via turn_checkpoint_config= with checkpoint_strategy="turn". To opt into one checkpoint per agent run, pass:

Looking for granular_checkpoints? It still works as a backwards-compatible alias, not a removed feature. Prefer checkpoint_strategy in new code: granular_checkpoints=Truecheckpoint_strategy="calls"; and granular_checkpoints=Falsecheckpoint_strategy="turn".

Streaming exception. checkpoint_strategy="calls" cannot apply to streamed turns — per-call checkpointing around an async context manager would require draining and replaying the stream inside a sync checkpoint. When an event_stream_handler is supplied, KitaruAgent transparently falls back to a turn checkpoint for that call. That fallback disables turn-checkpoint caching for the call, because a cached final result would skip the handler's progress side effects. run_stream() and iter() always require an explicit @kitaru.checkpoint.

Cross-adapter vocabulary

All adapters use checkpoint_strategy, but the values name the real boundary each framework exposes to Kitaru:

Adapter
Per-call strategy
Coarse strategy
What the coarse name means

PydanticAI

"calls"

"turn"

One PydanticAI agent run/turn

OpenAI Agents

"calls"

"runner_call"

One outer OpenAI Runner.run(...) call

LangGraph

"calls" where sync middleware owns the handler call

"graph_call"

One outer graph invocation; LangGraph still owns graph-internal state

Claude Agent SDK

Not supported

"invocation"

One Claude SDK query/invocation

So "calls" is a shared idea, not a promise of identical mechanics. It means Kitaru can create per-call checkpoints only where the adapter physically owns a replay-safe call body.

Streaming

PydanticAI streaming has two records in Kitaru:

  1. Live events are the radio chatter while the agent is running. They are useful for a dashboard, terminal watcher, or progress log.

  2. Checkpoint outputs and artifacts are the saved truth. If you need to resume, inspect, or replay a run later, read the final result plus artifacts such as pydantic_ai_events, pydantic_ai_run_summaries, and stream_transcript.

Kitaru publishes these adapter-specific live event kinds when the backend supports live event streaming:

  • pydantic_ai.stream.started

  • pydantic_ai.stream.event

  • pydantic_ai.stream.completed

  • pydantic_ai.stream.failed

The recommended PydanticAI path is event_stream_handler on run() / run_sync(). PydanticAI drives the full agent graph, including tool calls, while Kitaru watches the same event stream and publishes privacy-preserving live updates:

Watch those events from another thread or process with the normal execution watcher. Import the event-kind tuple instead of hard-coding strings:

run_stream() and iter() are still available, but they return async context managers. Kitaru cannot safely auto-open a function-shaped checkpoint around a context manager, so those surfaces require an explicit @kitaru.checkpoint. PydanticAI's run_stream() can stop after the first output that matches the agent output type; if you want full graph completion plus live observation, use run() / run_sync() with event_stream_handler instead.

Replay and cache behavior is the same as other live events: if the checkpoint body re-executes, live events may be published again; if Kitaru reuses a cached checkpoint result, the body does not run and there may be no live stream events. Use the durable result and artifacts for saved state.

Live payloads are deliberately small. Kitaru includes safe fields such as event category, event type, tool names or IDs, short display text, and clipped text deltas when stream transcripts are enabled. It does not publish raw prompts, full tool arguments, full tool results, final outputs, raw upstream event dumps, or reasoning content.

Stream transcripts are persisted as artifacts when CapturePolicy.save_stream_transcripts=True (the default). Set CapturePolicy.emit_child_events=False to turn off adapter-owned child/live events while keeping normal PydanticAI execution behavior.

Capture policy

CapturePolicy controls what the adapter stores per run. Defaults favor full observability. Wait records always keep minimal routing metadata (adapter, tool_name, tool_call_id), but tool args and exception payloads are only stored in wait metadata when tool_capture="full".

Option
Default
Description

emit_child_events

True

Track per-request / per-tool events. False disables tool-wait correlation.

save_prompts

True

Persist prompts sent to the model.

save_responses

True

Persist final model responses.

save_stream_transcripts

True

Persist serialized stream events + final response.

tool_capture

"full"

"full" (args + result), "metadata" (timing only), or None (skip entirely).

tool_capture_overrides

{}

Per-tool overrides keyed by tool name.

correlate_otel_spans

True

Attach Kitaru event IDs to the current OTel span.

Capture policy is observability-only — it never changes tool execution.

Usage and cost statistics

When emit_child_events=True (the default) and the wrapped model runs inside a Kitaru checkpoint, the adapter records each observed Pydantic AI model request as a canonical llm_usage_v1 record. The record uses the model event ID as its stable identity and includes the Pydantic AI RequestUsage payload when the model response exposes one.

The Pydantic AI adapter does not run a separate cost calculator and does not receive provider-reported dollar cost through this path. In practice, these records are usually token records with empty cost fields. If a model response has no usage payload, Kitaru can still record that the model request happened; the token fields simply remain empty.

In checkpoint_strategy="calls", cached model checkpoint results are recorded as reused_not_incurred, so replay and cache hits do not look like fresh spend in the execution summary. The canonical records roll up after FlowHandle.wait() or FlowHandle.get() observes the terminal execution. Setting emit_child_events=False disables the model/tool event tracking that produces these per-model usage records.

Message history

Pass message_history explicitly like any PydanticAI agent, or let the adapter thread it for you:

With persist_message_history=True the adapter remembers result.all_messages() on the instance after each run and auto-injects it as message_history on the next call when the caller doesn't pass one. One KitaruAgent instance = one conversation — create separate instances for separate conversations. An explicit message_history= on a single call overrides the remembered history for that call only.

Constraints

  • Concrete model at construction time. The wrapped agent must have a bound Model — late model binding and per-run model= overrides are not supported. To use a different model, wrap a different agent.

  • Stable agent name. name= is required; the adapter uses it for artifact keys and auto-created flow/checkpoint names. Changing it orphans existing executions.

  • No nested checkpoints. Kitaru forbids opening a checkpoint inside another, so checkpoint_strategy="calls" cannot coexist with an enclosing turn checkpoint — the adapter runs the agent body inline at flow scope when per-call checkpoints are enabled.

Advanced composition

Most users only need KitaruAgent. For custom durable surfaces, the lower-level wrappers are exported:

  • KitaruModel — wrap a PydanticAI Model directly.

  • KitaruToolset / KitaruFunctionToolset / KitaruMCPServer — wrap toolsets or MCP servers independently.

  • kitaruify_toolset(toolset, capture=..., ...) — dispatch helper that picks the right wrapper class.

  • KitaruRunContextRunContext subclass that survives isolated-runtime serialization boundaries.

Troubleshooting

  • "KitaruAgent requires the wrapped agent to define a concrete model" — pass model= to the Agent() constructor, not to run().

  • "requires an explicit @kitaru.checkpoint"run_stream() and iter() return context managers; wrap them in a checkpoint yourself.

  • Auto-flow fails on a remote stack — the in-process registry doesn't cross process boundaries. Use @kitaru.flow explicitly.

  • Too many per-call checkpoints — pass checkpoint_strategy="turn" to group a whole agent run into one turn checkpoint. Existing granular_checkpoints=False code still works as a compatibility alias.

  • Replay cost controlcheckpoint_strategy="calls" gives per-call checkpoint boundaries, not a billing guarantee. Pair it with provider-side caching or idempotency for expensive calls.

  • Tool calls show up only under artifacts or metadata — check whether durable_agent.run(...) / run_sync(...) is inside your own @kitaru.checkpoint. That checkpoints the whole agent turn, so model/tool activity is recorded under that checkpoint instead of as separate *_tool rows. Move the agent call directly into a @kitaru.flow body to get per-call checkpoints.

  • Checkpoints not appearing in the dashboard — verify kitaru status shows a running server and that kitaru init has been run in the project root.

Runnable examples

The base adapter example uses PydanticAI's TestModel, so it needs no provider key:

The streaming example uses a real OpenAI-backed PydanticAI model so users can watch live provider events. Set OPENAI_API_KEY first:

Set PYDANTIC_AI_MODEL to override the default openai:gpt-5-nano model. The example submits a flow, watches pydantic_ai.stream.* events, and then prints the durable final answer from .wait(). For the broader catalog, see Examples.

Last updated

Was this helpful?