Agent SDK (Python)
The pipelines.odyssey Python package.
The recommended path for any Python agent. The SDK handles envelope parsing, inbound auth, the proxy shim, and response shaping.
Install
pip install 'pipelines-sdk[odyssey]'Add the adapter for your framework — each adapter extra implies [odyssey], so
you don't need to list it separately:
pip install 'pipelines-sdk[openai-agents]'
pip install 'pipelines-sdk[anthropic]'
pip install 'pipelines-sdk[langchain]'
pip install 'pipelines-sdk[strands]'Scaffold a project
Generate a runnable wrapper instead of hand writing one. Every command is also
available as the standalone odyssey console script.
pipelines odyssey init --framework anthropic --dir my-agentThe emitted project shape, the framework options, and the full scaffold, preflight, and tunnel loop are documented once in Local development.
Minimal example (OpenAI Agents SDK)
from agents import Agent, Runner, function_tool
from fastapi import FastAPI
from pipelines.odyssey import proxy_call, register_dispatch_route
app = FastAPI()
@function_tool
def get_order(order_id: str) -> dict:
return proxy_call("get_order", {"order_id": order_id})
@function_tool
def refund_order(order_id: str) -> dict:
return proxy_call("refund_order", {"order_id": order_id})
def build_agent() -> Agent:
return Agent(
name="orders-triage",
instructions="Triage refund requests; look up orders before deciding.",
tools=[get_order, refund_order],
model="gpt-5",
)
@register_dispatch_route(app, agent_token_env="AGENT_TOKEN")
async def run(envelope):
result = await Runner.run(build_agent(), envelope.user_instruction)
return result.final_outputDump the registration JSON for the Import JSON dialog:
from pipelines.odyssey.adapters.openai_agents import dump_tools_schema_json
print(dump_tools_schema_json(build_agent()))register_dispatch_route
The decorator validates the inbound bearer, parses the request into an
Envelope, binds it on a per-request context (so module-level tools can
call proxy_call(name, args)), and coerces your return value into the
v1 response envelope.
| You return... | Sent as... |
|---|---|
"some string" | {"final_response": "some string"} |
{"final_response": ..., "messages": [...], "metadata": {...}} | passes through |
An object with final_output / final_response / output / message | wrapped into {"final_response": "..."} |
Error → HTTP mapping:
| Failure | Status |
|---|---|
| Missing or wrong inbound bearer | 401 |
agent_token_env env var unset | 503 |
Missing odyssey_proxy_url or run token | 400 |
| Malformed JSON body | 400 |
Empty final_response | 500 |
The envelope
@dataclass(frozen=True)
class Envelope:
proxy_url: str
run_token: str
user_instruction: str
task_input: Mapping[str, Any]
task_id: int | str | None
run_id: int | str | None
agent_id: int | str | None
latest_user_prompt: str | None
session_id: int | str | None
turn_id: str | None
messages: list | None
scenario_state: Mapping | None
workspace: Mapping | None
run_token_jti: str | None
tools_schema: list[dict] | None
declared_actor_ids: frozenset[str] | None
raw: Mapping[str, Any]proxy_url— trailing-slash-stripped; append/tools/{name}.run_token— per-run bearer.user_instruction— canonical current prompt frombody["input"]["user_instruction"]; stable across single-shot and multi-turn dispatches.task_input— the row'scurrent_state, else{}.latest_user_prompt— back-compat alias for the current multi-turn prompt.session_id/turn_id— optional multi-turn identifiers for agents that explicitly key their own memory. Platform proxy correlation uses the per-run token, so these are not required for tool calls.messages— optional prior conversation history in replay memory mode.scenario_state— optional replay-mode world state carried forward by the platform.workspace— coding-agent (workspace) run context (cwd,setup_mode,setup_instructions);Noneon non-workspace runs.run_token_jti— non-secret correlation id, safe to log.tools_schema— inlined tools schema when it fits the platform's dispatch cap;Nonewhen the shim resolves it over a separate hop.declared_actor_ids— the run's declared sub-agent catalog, used byfor_actorto flag declared/runtime label drift;Nonewhen no catalog is available (single-agent runs, code mode, or older platforms).raw— full dispatch body.
Single-shot dispatches usually omit the multi-turn fields, so they parse as
None. Agents that only need the current user prompt should continue to pass
envelope.user_instruction to their runner.
Proxy Call retries
proxy_call and async_proxy_call automatically retry transient platform
errors, specifically HTTP 429 and 503 responses tagged lock_contention,
context_store_unavailable, or trace_sequence_contended. The retry policy
uses up to four attempts with exponential backoff plus jitter (roughly
0.5 to 2.5 seconds per wait, capped at 4 seconds). As a result, tool calls may occasionally take a
few additional seconds under contention. Terminal errors (401, 404, 400 or
422, and 500) raise ProxyCallError immediately, with status_code, body, and
error_class attached.
# Disable auto-retry for a latency-sensitive call:
result = proxy_call("get_order", {"order_id": "4521"}, auto_retry=False)Live trace forwarding
For long-running agents, forward intermediate events while in flight so the trace tab renders live (it polls every 3s):
from pipelines.odyssey import (
post_trace_event,
safe_post_trace_event,
async_safe_post_trace_event,
)
post_trace_event("system_prompt", {"text": SYSTEM_PROMPT, "model": "gpt-5"})
safe_post_trace_event("thinking", {"text": "Considering refund eligibility..."})
await async_safe_post_trace_event("assistant_message", {"text": "Refund approved."})| Helper | Failure mode |
|---|---|
post_trace_event / async_post_trace_event | Raises TraceForwardError. |
safe_post_trace_event / async_safe_post_trace_event | Logs a warning on failure. Recommended. |
Reserved event_type values: assistant_message, thinking,
system_prompt, custom, plus the multi-agent types handoff,
subagent_message, subagent_final — post the first with the typed
post_handoff / safe_post_handoff
helpers. See Trace events.
Per-adapter helpers
Each adapter ships a dump_tools_schema(...) (for the Import JSON
dialog) plus framework-specific glue.
OpenAI Agents SDK
from pipelines.odyssey.adapters.openai_agents import (
dump_tools_schema_json,
proxy_tool,
forward_run_result_events,
)
print(dump_tools_schema_json(build_agent()))
get_order = proxy_tool("get_order", description="Look up an order.", is_async=False)
result = await Runner.run(build_agent(), envelope.user_instruction)
forward_run_result_events(result) # post-run live-trace flush
return result.final_outputFor internally multi-agent roots, build each sub-agent's tools from
its own envelope.for_actor("<actor_id>") handle (pass it as actor= to
pipelines_function_tool / proxy_tool) so every call — including a
shared tool — is attributed deterministically; add pipelines_run_hooks
for the handoff timeline, and use extract_topology /
build_agent_manifest for the declared topology + tools. See
Multi-agent systems.
Anthropic SDK
from functools import lru_cache
from anthropic import Anthropic
from pipelines.odyssey import register_dispatch_route
from pipelines.odyssey.adapters.anthropic import run_anthropic_loop
TOOLS = [...] # in `tools_schema` shape — strip registration-only keys
# (default_execution_mode, passthrough_binding, ledger_*)
# before passing to the model; messages.create rejects them.
@lru_cache(maxsize=1)
def get_client() -> Anthropic:
# Lazy: `Anthropic()` raises without ANTHROPIC_API_KEY, so building
# it at import would crash the server before the ping probe can pass.
return Anthropic()
@register_dispatch_route(app, agent_token_env="AGENT_TOKEN")
def run(envelope): # sync def: the SDK offloads it to a worker thread
return run_anthropic_loop(
client=get_client(),
tools=TOOLS,
user_instruction=envelope.user_instruction,
model="claude-sonnet-4-5",
system_prompt=SYSTEM_PROMPT,
thinking_budget_tokens=1024,
)run_anthropic_loop runs the full tool-use loop and returns a v1 dict.
Kwargs: system_prompt, thinking_budget_tokens, max_tokens (2048),
max_turns (10), forward_live_trace (True).
Declare the handler as a plain def (not async def) whenever its
body is synchronous — run_anthropic_loop, Runner.run_sync, sync
httpx. The SDK runs sync handlers on a worker thread so the blocking
work doesn't freeze the uvicorn event loop (which would serialize
every concurrent dispatch). Use async def only when the body is
genuinely await-based.
Anthropic model IDs are dash-delimited: claude-sonnet-4-5, not claude-sonnet-4.5.
LangChain
from fastapi import FastAPI
from pydantic import BaseModel, Field
from pipelines.odyssey import register_dispatch_route
from pipelines.odyssey.adapters.langchain import dump_tools_schema, proxy_tool
app = FastAPI()
class GetOrderArgs(BaseModel):
order_id: str = Field(description="The order id to look up.")
class RefundOrderArgs(BaseModel):
order_id: str
get_order = proxy_tool("get_order", description="Look up an order.", args_schema=GetOrderArgs)
refund_order = proxy_tool("refund_order", description="Refund an order.", args_schema=RefundOrderArgs)
TOOLS = [get_order, refund_order]
@register_dispatch_route(app, agent_token_env="AGENT_TOKEN")
async def run(envelope):
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(ChatOpenAI(model="gpt-5"), TOOLS)
result = await agent.ainvoke({"messages": [("user", envelope.user_instruction)]})
return result["messages"][-1].contentDump the registration JSON:
print(dump_tools_schema(TOOLS))The adapter targets langchain_core.tools, which is stable across
both langchain 0.x and the langgraph-based stack. If you're on
LangChain 1.x+, AgentExecutor has moved to langgraph — the
adapter works with either.
LangGraph approval gates. interrupt_before is an interactive
pause/resume mechanism, while a Pipelines dispatch is one HTTP request
that must return a final response. If your local graph uses
interrupt_before=["sensitive_tools"] for human approval, compile the
hosted dispatch graph without those interrupts, or route approvals
through an explicit application flow before the dispatch runs. The
platform does not resume a paused LangGraph thread inside a single
dispatch.
@pipelines_proxy — keep your existing tool bodies. Wraps an
existing @tool function in-place: during a dispatch the body is
replaced with a proxy call; outside a dispatch (local CLI, tests) the
original body runs normally.
from langchain_core.tools import tool
from pipelines.odyssey.adapters.langchain import pipelines_proxy
@pipelines_proxy()
@tool
def get_order(order_id: str) -> dict:
"""Look up an order by id."""
return db.orders.find(order_id) # runs locally; proxied at dispatch timeThe LangChain import handles Tool/StructuredTool objects. For any
other framework (or plain functions), import from the top level:
from pipelines.odyssey import pipelines_proxyPass tool_name="registered_name" if the platform name differs from
the function name.
Strands
from pipelines.odyssey.adapters.strands import (
dump_tools_schema,
proxy_tool,
forward_agent_result_events,
)
get_order = proxy_tool("get_order", description="Look up an order by id.")
print(dump_tools_schema(my_strands_agent))
result = agent(envelope.user_instruction)
forward_agent_result_events(result) # post-run live-trace flush
return str(result.message) if result.message else str(result)Tool endpoints from local modules
Ship local Python tool bodies as a code tool endpoint so passthrough
runs them verbatim in a sandbox — no drift between local execution and
the platform path.
import os
from pipelines.odyssey import tool_endpoints
from research_agent import tools_network
endpoint = tool_endpoints.create_code(
name="research-tools",
functions=[
tools_network.web_search,
tools_network.search_wikipedia,
tools_network.fetch_url,
],
requirements=["httpx>=0.27", "trafilatura"],
org_id=42,
api_key=os.environ["PIPELINES_API_KEY"],
)
print(endpoint["id"], endpoint["name"])The helper reads module source, derives input_schema from type hints,
generates a per-tool execute(args) wrapper, and POSTs to
/api/organizations/{org_id}/tool-endpoints.
For dry runs / custom flows, use the lower-level builder:
payload = tool_endpoints.build_code_endpoint_payload(
name="research-tools",
functions=[tools_network.web_search],
requirements=["httpx>=0.27"],
)Bind the agent's tool by name (no UUID required):
{
"tools_schema": [
{
"name": "search_wikipedia",
"input_schema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]},
"default_execution_mode": "passthrough",
"passthrough_binding": {
"endpoint_name": "research-tools",
"tool_name": "search_wikipedia"
}
}
]
}Cold-start cost per call is ~150 ms + pip install time for
requirements. If your tool bodies are decorated with
@function_tool / @tool, the SDK unwraps the decorator chain
automatically; keep tool bodies in a module without the framework
decorator if you want to avoid that dependency in the sandbox.
Secrets via env_vars → org credentials
A code tool that calls an authenticated upstream (a search API, your
own backend) needs a key at runtime. Don't bake it into functions —
pass env_vars, a list of org-credential names the platform
decrypts and injects as environment variables inside the sandbox:
endpoint = tool_endpoints.create_code(
name="research-tools",
functions=[tools_network.web_search],
requirements=["httpx>=0.27"],
env_vars=["TAVILY_API_KEY"], # ← name of an org credential
org_id=42,
api_key=os.environ["PIPELINES_API_KEY"],
)Your tool body then reads it as a normal env var:
def web_search(query: str) -> list[dict]:
key = os.environ["TAVILY_API_KEY"] # injected by the platform
...Each name in env_vars must match an org credential of the same
credential_type. The platform injects that credential into the sandbox
under the same name, so env_vars=["TAVILY_API_KEY"] resolves the
TAVILY_API_KEY org credential and exposes it as $TAVILY_API_KEY.
Create the credential before the first run. Otherwise dispatch fails with
an error reporting that the credential was not found, prompting you to add
it on the Credentials page. The credentials endpoint accepts your
pk_live_ org API key, so the full setup (registering the agent,
configuring tool endpoints, and adding credentials) can be scripted
end-to-end:
curl -X PUT \
"$PIPELINES_BASE_URL/api/organizations/42/credentials/TAVILY_API_KEY" \
-H "Authorization: Bearer $PIPELINES_API_KEY" \
-H "Content-Type: application/json" \
-d '{"value": "tvly-..."}'Managing credentials requires an org-admin principal. An API key
minted by a non-admin resolves to that user and is rejected — mint the
key as an org admin, or add the credential from the dashboard
(Settings → Credentials → Add credential), naming it exactly as it
appears in env_vars.
Credential values are decrypted only at dispatch time, never logged, and
masked in run output. The same env_vars surface works for
hand-authored code endpoints.
Register an External HTTP agent
create_http_agent registers the agent you host yourself — the
external_http analogue of create_code_agent, so you never hand-roll
the POST /api/agents body. This is exactly what the scaffolded
register.py runs:
import json
import os
from pipelines.odyssey import agents
agent = agents.create_http_agent(
name="orders-triage",
endpoint_url="https://your-agent.example.com/dispatch",
api_key=os.environ["PIPELINES_API_KEY"],
auth_header_name="Authorization",
auth_credential_value=f"Bearer {os.environ['AGENT_TOKEN']}",
tools_schema=json.load(open("tools_schema.json")),
)
print(agent["id"])update_http_agent(agent_id=..., ...) edits one in place (e.g. after
changing tools).
Single-source the Tools Schema
Mount a GET /tools_schema route so your declared tools live in exactly
one place and can't drift from what the platform was told to expect:
from pipelines.odyssey import register_tools_schema_route
register_tools_schema_route(app, TOOLS) # GET /tools_schema -> {"tools_schema": [...]}Then push changes from the running wrapper to the platform without a manual re-paste:
pipelines odyssey sync --from-app http://localhost:8000 --agent-id 42pipelines odyssey doctor also reads this route to flag a schema mismatch
before a run.
Shipping a multi-file code agent
For code-mode agents larger than one file (up to 50 files, 200 KB per file, 1 MB total):
from pathlib import Path
from pipelines.odyssey import agents
payload = agents.build_code_agent_payload(
name="research-agent",
entrypoint="run",
entrypoint_file="main.py",
source_dir=Path("./my_agent"),
requirements=["httpx>=0.27", "trafilatura"],
)
agent = agents.create_code_agent(
name="research-agent",
entrypoint="run",
entrypoint_file="main.py",
source_dir=Path("./my_agent"),
requirements=["httpx>=0.27", "trafilatura"],
api_key="pk_live_...",
)walk_agent_source_dir reads a .pipelinesignore (fnmatch globs, one
per line) and skips the usual __pycache__/, .venv/, .git/,
node_modules/, dist/, build/, .pytest_cache/ directories.
Updating an existing agent without resetting version history:
agents.update_code_agent(
agent_id=42,
api_key="pk_live_...",
source_dir=Path("./my_agent"),
entrypoint="run",
entrypoint_file="main.py",
requirements=["httpx>=0.27"],
)Ledger schema helpers
build_code_agent_payload / create_code_agent / update_code_agent
take an optional ledger_schema, and generate_ledger_schema drafts
one from your tools (the generate → review → save flow):
draft = agents.generate_ledger_schema(api_key="pk_live_...", tools_schema=my_tools)
agents.create_code_agent(..., tools_schema=my_tools, ledger_schema=draft)The draft is un-persisted until you save it on an agent. Pass
ledger_schema=None to update_code_agent to clear it. See
Ledger schema.
Custom HTTP framework
The framework-agnostic helpers compose into ~20 lines:
from pipelines.odyssey import (
Envelope,
EnvelopeError,
InboundAuthError,
build_response,
require_pipelines_auth,
set_current,
)
def handle_dispatch(headers, body, agent_token):
try:
require_pipelines_auth(headers.get("Authorization"), expected_token=agent_token)
except InboundAuthError:
return 401, {"detail": "missing or invalid bearer token"}
try:
envelope = Envelope.parse(body, headers)
except EnvelopeError as exc:
return 400, {"detail": str(exc)}
with set_current(envelope):
result = your_agent_runtime(envelope)
return 200, build_response(result)Custom auth header
If you registered with X-API-Key instead of Authorization:
@register_dispatch_route(
app,
auth_header_name="X-API-Key",
agent_token_env="AGENT_TOKEN",
)
async def run(envelope):
...Unit-testing tools
set_current stages an envelope for tests:
from pipelines.odyssey import Envelope, set_current
def test_get_order_calls_proxy(httpx_mock):
envelope = Envelope(
proxy_url="http://stub/proxy",
run_token="t",
user_instruction="",
task_input={},
)
httpx_mock.add_response(
url="http://stub/proxy/tools/get_order",
json={"tool_name": "get_order", "response": {"id": "1"}, "source": "odyssey"},
)
with set_current(envelope):
result = get_order("1")
assert result == {"id": "1"}Tested adapter floors
| Adapter | Pinned floor |
|---|---|
openai-agents | 0.0.7 |
anthropic | 0.39 |
langchain-core | 0.3 |
strands-agents | 0.4 |