The agent runtime
for builders who ship.
Production-grade reasoning, durable workflows, cost-aware routing, and full observability — out of the box. Continuum is the bridge from notebook to enterprise.
What you get
Smart Inference routing
One OpenAI-compatible endpoint, 250+ models. A classifier scores every prompt; the router picks the cheapest model that clears the quality bar. Per-agent strict / modest / quality tiers.
Two-tier persistent memory
Long-term semantic recall via mem0 + Milvus/Qdrant; short-term session in Redis. Four isolation scopes (USER, AGENT, SHARED, CONVERSATION) for multi-tenant safety.
9 composable workflow patterns
Sequential · Parallel · Loop · Reflection · Router · Planner · Debate · Scatter · SupervisedSequential. Mix and nest freely — every step is independently traceable.
MCP-native tooling
Stdio, SSE, or StreamableHTTP — connect any MCP server with zero adapters. Top-k Tool Attention promotes only the relevant tools each turn, cutting prompt tokens 30–60%.
Durable workflows · Temporal
Long-running multi-agent jobs that survive crashes, restarts, and deploys. Human-in-the-loop approval gates with timeout escalation, signals, and full audit trails.
Observability is non-negotiable
Every LLM call, tool invocation, handoff, and memory op auto-traced in Langfuse. Custom spans via @observe. Build golden eval sets from production traces.
Safe by default
Input/output PII redaction. Configurable scrubbers on memory writes. Cycle detection on agent handoffs. Graceful shutdown with in-flight trace flush. Bearer-scoped budgets at the gateway.
Production primitives
Dependency-injection container, health checks for every dependency, lifecycle hooks, FastAPI helpers, fakeredis for tests. Async-native, protocol-based, code-first.
Quickstart
-
Install
# Python 3.13 required python3.13 -m venv .venv source .venv/bin/activate pip install shyftlabs-continuum -
Setup environment and spin up infrastructure
cp .env.template .env
Next, configure your
.envfile:# ── LLM Provider Keys ──────────────────────────────────────────────────────── OPENAI_API_KEY=your-openai-api-key # required (also used for embeddings) GEMINI_API_KEY=your-gemini-api-key # if using Gemini # ANTHROPIC_API_KEY=your-anthropic-api-key # if using Claude # ── Default LLM ────────────────────────────────────────────────────────────── DEFAULT_LLM_MODEL=gemini/gemini-2.5-flash FALLBACK_LLM_MODEL=gpt-4o-mini DEFAULT_LLM_TEMPERATURE=0.7 DEFAULT_LLM_MAX_TOKENS=4096 LLM_REQUEST_TIMEOUT=60 LLM_MAX_RETRIES=3 LLM_ENABLE_FALLBACK=true # ── Embeddings ──────────────────────────────────────────────────────────────── EMBEDDER_PROVIDER=openai EMBEDDER_MODEL=text-embedding-3-small EMBEDDING_DIMS=1536 # ── Memory (mem0 + Milvus) ─────────────────────────────────────────────────── MEMORY_ENABLED=true VECTOR_STORE_PROVIDER=milvus MILVUS_HOST=localhost MILVUS_PORT=19530 MILVUS_TOKEN= MILVUS_COLLECTION=orchestrator_memories MEMORY_LLM_MODEL=gemini/gemini-2.5-flash MEMORY_LLM_TEMPERATURE=0.1 MEMORY_ISOLATION=user MEMORY_SEARCH_LIMIT=5 MEMORY_HISTORY_DB_PATH=~/.orchestrator/memory_history.db # ── Session (Redis) ─────────────────────────────────────────────────────────── SESSION_ENABLED=true SESSION_REDIS_HOST=localhost SESSION_REDIS_PORT=6380 SESSION_REDIS_PASSWORD=sdk123456789 SESSION_REDIS_DB=0 SESSION_REDIS_SSL=false SESSION_TTL_SECONDS=172800 SESSION_MAX_MESSAGES=1000 SESSION_KEY_PREFIX=orchestrator:session # ── Langfuse Observability ──────────────────────────────────────────────────── LANGFUSE_ENABLED=true LANGFUSE_PUBLIC_KEY=your-langfuse-public-key LANGFUSE_SECRET_KEY=your-langfuse-secret-key LANGFUSE_HOST=http://localhost:3000 LANGFUSE_BASE_URL=http://localhost:3000 NEXTAUTH_SECRET=your-nextauth-secret LANGFUSE_SAMPLE_RATE=1.0 LANGFUSE_FLUSH_INTERVAL=1 LANGFUSE_FLUSH_AT=15 LANGFUSE_DEBUG=false # ── Temporal (durable workflows) ───────────────────────────────────────────── TEMPORAL_ENABLED=true TEMPORAL_HOST=localhost:7233 TEMPORAL_NAMESPACE=default TEMPORAL_TASK_QUEUE=orchestrator-agents TEMPORAL_ENABLE_HUMAN_IN_LOOP=true TEMPORAL_APPROVAL_TIMEOUT_SECONDS=86400 TEMPORAL_WORKFLOW_EXECUTION_TIMEOUT=604800 TEMPORAL_ACTIVITY_START_TO_CLOSE_TIMEOUT=300 TEMPORAL_ACTIVITY_RETRY_MAX_ATTEMPTS=3 # ── Misc ────────────────────────────────────────────────────────────────────── ENVIRONMENT=development LOG_LEVEL=INFO SHARED_SERVICES_ENABLED=true MEM0_TELEMETRY=false ANONYMIZED_TELEMETRY=false TOKENIZERS_PARALLELISM=false
Finally, spin up the infrastructure:
docker compose up -d # Redis (:6380) + Milvus (:19530) -
Create and run your first agent
import asyncio from orchestrator.agent import BaseAgent from orchestrator.agent.runner import AgentRunner agent = BaseAgent( name="assistant", instructions="You are a helpful assistant.", model="gpt-4o-mini", ) async def main(): runner = AgentRunner() response = await runner.run(agent, "Hello!", user_id="u-001") print(response.content) asyncio.run(main())
Architecture
Continuum is structured as layered modules, each independently usable:
| Module | Import path | Purpose |
|---|---|---|
agent | orchestrator.agent | BaseAgent, AgentRunner, 9 workflow patterns, handoffs |
llm | orchestrator.llm | LLMClient, provider routing, structured output, context compression |
memory | orchestrator.memory | mem0 + Qdrant/Milvus long-term memory |
session | orchestrator.session | Redis-backed conversation history |
tools | orchestrator.tools | MCP servers, ToolExecutor, run artifacts |
observability | orchestrator.observability | Langfuse tracing, metrics, error reporting |
temporal | orchestrator.temporal | Durable workflows, approval gates |
core | orchestrator.core | DI Container, lifecycle, health checks |
Full Documentation
Each module has a detailed markdown reference in the repository:
| Doc | Covers |
|---|---|
agent.md | BaseAgent fields, AgentRunner, 9 workflow patterns, handoffs |
llm.md | LLMClient, LLMConfig, provider routing, structured output, context compression |
memory.md & memory-issue-analysis.md | memory settings, memory mechanism and issues |
session.md | SessionClient, Redis conversation history |
tools.md | MCP servers, ToolExecutor, run artifacts |
observability.md | Langfuse tracing, @observe decorator, metrics |
temporal/ | Durable workflows, approval gates, human-in-the-loop |
core.md | Container, OrchestratorLifecycle, health checks, protocols |
installation.md | Full env var reference, troubleshooting |
features.md | Complete feature inventory organized by layer |
GUIDE.md | Developer guide — config defaults, session/memory behaviour, practical patterns |
Build Agents
From a minimal single agent to complex multi-agent workflows — everything starts with BaseAgent.
playground/local-shop — one agent with MCP tools over HTTP.
playground/multi-agent-shop.
Base Agent
from orchestrator.agent import BaseAgent agent = BaseAgent( name="support-bot", instructions="You are a support agent for Acme Corp. Be concise.", model="gpt-4o-mini", temperature=0.3, )
BaseAgent is a dataclass. Every field has a sensible default — only name and instructions are required in practice.
| Parameter | Type | Description |
|---|---|---|
| name | str | Unique identifier (alphanumeric, hyphens, underscores) |
| instructions | str | System prompt. Supports {slot} placeholders. |
| model | str | LLM model string, defaults to DEFAULT_LLM_MODEL env var |
| temperature | float | Default 0.7 |
| max_tokens | int | None | Response token cap |
| tools | list[ToolDefinition] | Static tool definitions for the LLM |
| mcp_servers | list[MCPServer] | MCP servers whose tools are dynamically loaded |
| handoffs | list[Handoff] | Agents this agent can delegate to |
| memory_config | AgentMemoryConfig | Long-term memory search/store behavior |
| config | AgentConfig | Execution config — turns, timeouts, retries, react_mode |
| output_schema | type[BaseModel] | None | Pydantic model for structured output |
| input_schema | type[BaseModel] | None | Validates input before execution |
| template_vars | dict | Static values injected into {slot} placeholders |
| examples | list[dict] | Few-shot examples — each needs input and output keys |
| instruction_modifiers | list[Callable] | Dynamic prompt modifiers applied at runtime |
| metadata | dict | Arbitrary key-value metadata |
| tags | list[str] | Categorization tags for filtering / routing |
Stateless vs Stateful
Based on your needs, your agent can be stateless or stateful; if stateless, disable long-term and short-term memories; if stateful, you may only need either long-term memories(Memory) or short-term memories(Session) or both, you can control them by setting the parameters in AgentConfig and AgentMemoryConfig.
# Stateless — no Redis, no mem0 calls (fastest for prototypes) agent = BaseAgent( name="stateless", instructions="...", memory_config=AgentMemoryConfig(search_memories=False, store_memories=False), config=AgentConfig(log_to_session=False, session_history_turns=0), ) # Stateful — loads last 20 turns + long-term memories (default) agent = BaseAgent( name="stateful", instructions="...", memory_config=AgentMemoryConfig(search_memories=True, store_memories=True), config=AgentConfig(log_to_session=True, session_history_turns=None), )
Memory (default):
-
search_memories=True— looks uplong-term memoriesbefore responding -
store_memories=True— saveslong-term memoriesafter responding
Session (default):
-
log_to_session=True— saves to session history (short-term memory) -
session_history_turns=None— loads last 20 turns of history (short-term memory)
session_history_turns behaviour
| Value | Behaviour |
|---|---|
None (default) | Load last 20 turns from Redis |
0 | Skip Redis entirely — load nothing |
5 | Load last 5 turns from Redis |
Lifecycle Hooks
Hooks are callables attached directly to BaseAgent. They receive the agent instance and the run context dict.
async def on_start(agent, context): print(f"Starting {agent.name}") async def on_tool(agent, tool_name, context): print(f"Calling tool: {tool_name}") agent = BaseAgent( name="my-agent", instructions="...", on_start=on_start, on_end=lambda agent, ctx: print("Done"), on_error=lambda agent, exc, ctx: print(f"Error: {exc}"), on_tool_call=on_tool, on_handoff=lambda agent, target, ctx: print(f"Handing off to {target}"), )
Prompt Engineering
Template Variables
agent = BaseAgent( name="regional-agent", instructions="You serve customers in {region}. Currency: {currency}.", template_vars={"region": "North America", "currency": "USD"}, )
Few-shot Examples
agent = BaseAgent( name="classifier", instructions="Classify the sentiment of the input.", examples=[ {"input": "I love this product!", "output": "positive"}, {"input": "This is terrible.", "output": "negative"}, ], )
Instruction Modifiers
Modifiers are called at runtime and receive (instructions: str, context: dict) → str.
def add_date(instructions, context): return instructions + f"\n\nToday is {context.get('date', 'unknown')}." agent = BaseAgent( name="date-aware", instructions="You are an assistant.", instruction_modifiers=[add_date], )
Structured Output
from pydantic import BaseModel class Review(BaseModel): sentiment: str score: float summary: str agent = BaseAgent( name="reviewer", instructions="Analyze the review and return structured output.", output_schema=Review, ) response = await runner.run(agent, "The hotel was fantastic but expensive.") review: Review = response.structured_output print(review.score) # e.g. 0.75
Multi-Agent Workflows
Continuum ships 9 built-in workflow patterns. Each has a class-based form and a factory shorthand. All patterns are composable — you can nest them or combine them with handoffs.
Custom Workflow
We strongly recommend building custom workflows using BaseAgent directly for anything closely tied to your project's business logic. A custom agent gives you full control over the flow, session saving, and memory behaviour — you decide which sub-agents are stateless and which are stateful.
Example: ParallelCoordinatorAgent in playground/multi-agent-shop/workflows.py
Instead of using ParallelAgent directly, the playground defines a custom BaseAgent subclass that orchestrates parallel search and synthesis manually:
class ParallelCoordinatorAgent(BaseAgent): synthesiser: BaseAgent | None = None parallel: ParallelAgent | None = None async def execute(self, input_text, runner, context, llm_client=None) -> AgentResponse: context.suppress_session_log = True # Step 1: parallel workers with a fresh stateless context (no history, no save) parallel_ctx = create_run_context( user_id=context.user_id, conversation_id=context.conversation_id, ) parallel_result = await self.parallel.execute(input_text, runner, parallel_ctx) # Step 2: synthesiser uses session history + memory; suppress_session_log # blocks auto-save but context carries session_id so history loads normally final = await runner.run( agent=self.synthesiser, input=synthesis_input, context=context, ) # Step 3: save exactly one clean turn await runner.save_turn( session_id=context.session_id, user_message=input_text, assistant_message=final.content, )
This gives the parallel workers a clean stateless context (no Redis calls) while the synthesiser still loads session history and memory — something the built-in ParallelAgent cannot do out of the box.
Built-in Workflows
Sequential Workflow
Executes agents as a pipeline — each agent receives the previous agent's output.
from orchestrator.agent.workflow import SequentialAgent pipeline = SequentialAgent( name="research-pipeline", instructions="Research pipeline coordinator.", agents=[researcher, summarizer, formatter], )
Or use the factory shorthand:
from orchestrator.agent import BaseAgent, AgentRunner, create_sequential_agent researcher = BaseAgent(name="researcher", instructions="Research the topic. Output key facts.", model="gpt-4o-mini") writer = BaseAgent(name="writer", instructions="Write a short report from the facts.", model="gpt-4o-mini") editor = BaseAgent(name="editor", instructions="Polish the report for clarity.", model="gpt-4o-mini") pipeline = create_sequential_agent( name="research-pipeline", agents=[researcher, writer, editor], ) response = await AgentRunner().run(pipeline, "AI in healthcare") print(response.content) # editor's final output
Parallel Workflow
All agents receive the same input and run concurrently. Results are merged by strategy.
from orchestrator.agent.workflow import ParallelAgent from orchestrator.agent.config import ParallelConfig, MergeStrategy fan_out = ParallelAgent( name="multi-analyst", instructions="Run multiple analyses in parallel.", agents=[sentiment_agent, topic_agent, entity_agent], parallel_config=ParallelConfig( merge_strategy=MergeStrategy.LLM_SUMMARIZE, ), )
| MergeStrategy | Behavior |
|---|---|
CONCATENATE | All outputs joined with newlines |
LLM_SUMMARIZE | A secondary LLM call synthesizes all outputs |
STRUCTURED_DICT | Returns a dict keyed by agent name |
FIRST_SUCCESS | Returns the first successful agent result |
Factory shorthand:
from orchestrator.agent import create_parallel_agent from orchestrator.agent.types import MergeStrategy parallel = create_parallel_agent( name="parallel-analysts", agents=[analyst_a, analyst_b, analyst_c], merge_strategy=MergeStrategy.CONCATENATE, )
Loop Workflow
Iterates an agent until a termination condition is satisfied.
from orchestrator.agent.workflow import LoopAgent from orchestrator.agent.config import TerminationConfig, TerminationType loop = LoopAgent( name="refinement-loop", instructions="Iteratively refine output.", agent=writer_agent, termination=TerminationConfig( type=TerminationType.LLM_DECISION, # LLM decides when done max_iterations=5, ), )
| TerminationType | When it stops |
|---|---|
LLM_DECISION | The inner agent's LLM signals completion |
TOOL_CALL | A specific tool name is called |
OUTPUT_MATCH | Output matches a regex pattern |
MAX_ITERATIONS | Always run exactly N times |
CUSTOM | User-supplied callable returns True |
Factory shorthand with OUTPUT_MATCH:
from orchestrator.agent import create_loop_agent from orchestrator.agent.types import TerminationType loop = create_loop_agent( name="refinement-loop", agent=refiner, termination_type=TerminationType.OUTPUT_MATCH, termination_pattern=r"\bDONE\b", max_iterations=3, )
Reflection Workflow
The agent runs, a critic evaluates the output, and the agent retries if the critique is NEEDS IMPROVEMENT.
from orchestrator.agent.workflow import ReflectionAgent from orchestrator.agent.config import ReflectionConfig reflective = ReflectionAgent( name="quality-writer", instructions="Improve output quality via self-critique.", agent=writer_agent, reflection_config=ReflectionConfig( max_reflections=3, critique_prompt="Is this response accurate and complete? Reply APPROVED or NEEDS IMPROVEMENT.", ), )
Router Workflow
Routes requests to specialist agents based on content. Three strategies: LLM, rule-based, or hybrid.
from orchestrator.agent.workflow import RouterAgent, Route router = RouterAgent( name="triage", instructions="Route customer requests to the right specialist.", routes=[ Route(agent_name="billing-agent", description="Billing, payments, invoices"), Route(agent_name="tech-support", description="Technical issues and bugs"), Route(agent_name="sales-agent", description="Pricing and upgrades"), ], fallback_agent_name="general-agent", )
Or use the factory with tuple-based routes (recommended — the old Route(target=...) API is removed):
from orchestrator.agent import BaseAgent, AgentRunner, create_router_agent billing = BaseAgent(name="billing-agent", instructions="Handle billing questions.") technical = BaseAgent(name="technical-agent", instructions="Handle technical support.") general = BaseAgent(name="general-agent", instructions="Handle general questions.") router = create_router_agent( name="triage", routes=[ ("billing-agent", "billing, invoice, payment, subscription, refund"), ("technical-agent", "bug, error, crash, not working, how to"), ], fallback="general-agent", strategy="hybrid", ) runner = AgentRunner(agent_registry={ "billing-agent": billing, "technical-agent": technical, "general-agent": general, }) response = await runner.run(router, "My payment failed twice this week")
Planner Workflow
Decomposes a goal into sub-tasks and executes them — either with a single agent or by routing each step to a specialist from a pool.
from orchestrator.agent.workflow import PlannerAgent from orchestrator.agent.config import PlanningConfig # Agent-pool mode: LLM routes each step to a specialist planner = PlannerAgent( name="research-planner", instructions="Decompose research goals into steps.", agents=[web_researcher, analyst, writer], planning_config=PlanningConfig( max_steps=8, enable_replanning=True, ), )
Debate, Scatter & SupervisedSequential
Handoffs
An agent can delegate to another agent mid-conversation. Handoffs appear to the LLM as callable tools.
from orchestrator.agent.handoff import Handoff triage = BaseAgent( name="triage", instructions="Triage requests and hand off to specialists.", handoffs=[ Handoff( target_agent="billing", description="Transfer to billing for payment questions.", return_to_parent=True, ), Handoff( target_agent="tech-support", description="Transfer for technical issues.", ), ], )
Handoff History Modes
Control how much context is passed when handing off between agents.
| Mode | What's passed | Best for |
|---|---|---|
FULL | Complete conversation history | Short conversations, full context needed |
SUMMARY | LLM-generated abstract of the conversation | Long conversations, context window limits |
RECENT_N | Last N turns only | When only recent context matters |
HYBRID | Summary of older messages + full recent N turns | Best of both — default recommendation |
from orchestrator.agent.handoff import Handoff, HistorySummarizationMode Handoff( target_agent="specialist", description="Escalate complex issues.", summarization_mode=HistorySummarizationMode.HYBRID, recent_turns=4, )
Run Agents
AgentRunner is the execution engine — it orchestrates LLM calls, tool invocations, memory retrieval, and handoffs.
playground/local-shop — one agent with MCP tools over HTTP.
playground/multi-agent-shop
AgentRunner
Create one AgentRunner instance per application. It manages internal service clients and can be shared across concurrent runs.
from orchestrator.agent.runner import AgentRunner from orchestrator.agent.config import RunnerConfig runner = AgentRunner( config=RunnerConfig( default_max_turns=20, parallel_tool_calls=True, max_parallel_tools=5, circuit_breaker_threshold=5, ) )
runner.run()
response = await runner.run( agent, "What is my account balance?", user_id="user-123", session_id="sess-456", # optional — loads Redis history conversation_id="conv-789", max_turns=15, metadata={"channel": "web"}, tags=["prod"], ) print(response.content) # main text output print(response.status) # SUCCESS, ERROR, MAX_TURNS_REACHED … print(response.usage.total_tokens) print(response.latency_ms) print(response.trace_id) # Langfuse trace link
Streaming
Use run_stream() to yield tokens and events as they occur. Ideal for WebSocket or SSE endpoints.
from orchestrator.agent.types import EventType async for event in runner.run_stream(agent, "Explain quantum entanglement.", user_id="u-1"): if event.type == EventType.CONTENT_DELTA: print(event.data["content"], end="", flush=True) elif event.type == EventType.TOOL_CALL_START: print(f"\n[calling tool: {event.data['tool_name']}]") elif event.type == EventType.RUN_END: print(f"\nDone — {event.data['usage']['total_tokens']} tokens")
EventType Reference
| EventType | data keys |
|---|---|
RUN_START | run_id, agent_name |
CONTENT_DELTA | content (text chunk) |
CONTENT_COMPLETE | content (full response) |
TOOL_CALL_START | tool_name, arguments |
TOOL_CALL_END | tool_name, result |
TOOL_CALL_ERROR | tool_name, error |
HANDOFF_START | from_agent, to_agent, reason |
HANDOFF_END | to_agent, result |
MEMORY_RETRIEVAL | query, results |
RUN_END | status, usage, latency_ms |
RUN_ERROR | error |
Session History
Pass a session_id to automatically load and save conversation history from Redis.
# Turn 1 — user's first message response1 = await runner.run(agent, "My name is Alice.", session_id="sess-1", user_id="u-1") # Turn 2 — agent remembers context from Redis response2 = await runner.run(agent, "What's my name?", session_id="sess-1", user_id="u-1") # → "Your name is Alice."
Control how many turns are loaded with AgentConfig.session_history_turns. Set log_to_session=False in AgentConfig to disable session writes for intermediate pipeline agents.
Creating sessions with get_or_create_session()
If you want session history to persist across requests, create a session before calling runner.run(). Passing a session_id that was never created will silently fail to save or load history.
# Step 1: create or retrieve the session session_id = await session_client.get_or_create_session( session_id=session_id, # pass existing ID to resume user_id="user-123", conversation_id="conv-456", # optional — see below ) # Step 2: run with that session_id response = await runner.run( agent=agent, input="Hello!", session_id=session_id, user_id="user-123", )
How session_id is computed
get_or_create_session() derives a deterministic key from the arguments you pass:
| Arguments passed | Computed session_id |
|---|---|
explicit session_id | used as-is |
conversation_id + user_id | c:{conversation_id}:u:{user_id} |
user_id only | u:{user_id} |
| neither | random UUID |
When to use conversation_id
Use conversation_id when a single user can have multiple independent chat windows. Without it, all conversations for a user share one session (u:{user_id}). With it, each window gets its own isolated session (c:{conversation_id}:u:{user_id}).
- Chat UI projects: generate a
conversation_idon the backend when the user opens a new chat window; pass it back with each request. - Task-based or webhook projects: use your natural entity ID (ticket ID, invoice ID, job ID) as
conversation_id. Never reuse IDs across unrelated tasks.
Multi-agent session saving pattern
Every workflow agent calls runner.run() once or multiple times internally. Without intervention, each sub-agent call auto-saves a turn — creating noisy intermediate history the user never saw. Prevent this with suppress_session_log + save_turn():
async def execute(self, input_text, runner, context) -> AgentResponse: context.suppress_session_log = True # blocks auto-save for ALL sub-agent runs response = await runner.run( agent=sub_agent, input=current_input, context=context, # same context object passed every time ) # ... more sub-agent calls ... # Save exactly one clean turn at the end await runner.save_turn( session_id=context.session_id, user_message=input_text, # what user originally sent assistant_message=final_output, # what user actually sees )
BaseAgent, you must follow this pattern yourself. Forgetting suppress_session_log = True will save every sub-agent turn to session history.
Deciding which agent's output is the final response
In a multi-agent workflow, you must explicitly decide which agent's output is the final response — this is what you pass to save_turn(). Here are two examples
Sequential: agents run one after another, each passing output to the next. The last agent may produce the final response:
await runner.save_turn(session_id, user_input, last_agent_response.content)
Handoff / Router: sub-agents do work and their results are injected back into the top-level agent's message list. The top-level agent then synthesizes its own final response — so the top-level agent's output is what to save, not the sub-agents' intermediate results:
await runner.save_turn(session_id, user_input, top_level_agent_response.content)
Context Management
When a conversation approaches the model's context window, Continuum automatically compresses older messages.
from orchestrator import AgentConfig, ContextManagementConfig, CompressionStrategy agent = BaseAgent( name="long-conv", instructions="...", config=AgentConfig( context_management=ContextManagementConfig( compression_strategy=CompressionStrategy.SMART, # SMART / SUMMARIZE_OLD / TRUNCATE_OLDEST ) ), )
CONTEXT_COMPRESSION_THRESHOLD=0.8 (default) to trigger compression at 80% of the model's context window. CONTEXT_KEEP_RECENT_MESSAGES=10 ensures the last 10 messages are never truncated.
App Lifecycle
Use OrchestratorLifecycle to initialise and cleanly shut down all shared services (Redis, Langfuse, vector store). Use Container to inject custom clients — useful in tests and multi-tenant setups.
from orchestrator.core import OrchestratorLifecycle, Container lifecycle = OrchestratorLifecycle() await lifecycle.startup() # connects Redis, Langfuse, vector store # Health checks health = await lifecycle.health_check() # → {"redis": "ok", "qdrant": "ok", "langfuse": "ok", "llm": "ok"} await lifecycle.shutdown() # flushes Langfuse, closes Redis connections
# Inject custom clients (e.g. in tests or multi-tenant setups) from orchestrator.core import Container container = Container() container.set_llm_client(my_llm_client) container.set_memory_client(my_memory_client) container.set_session_client(my_session_client) runner = AgentRunner(container=container)
FastAPI Server
from fastapi import FastAPI from orchestrator.agent import BaseAgent from orchestrator.agent.runner import AgentRunner app = FastAPI() runner = AgentRunner() agent = BaseAgent(name="api-agent", instructions="You are an API assistant.") @app.post("/chat") async def chat(body: dict): response = await runner.run( agent, body["message"], user_id=body["user_id"], session_id=body.get("session_id"), ) return {"reply": response.content, "session_id": body.get("session_id")}
Temporal Workers
from orchestrator.temporal import WorkerManager, AgentRegistry registry = AgentRegistry() registry.register(my_agent) worker_manager = WorkerManager(agent_registry=registry) await worker_manager.start_worker() # connects to TEMPORAL_HOST
Input / Output Scanning
Attach scanner callables to AgentConfig to detect prompt injection, PII, or unsafe content before/after the LLM call.
def pii_scanner(text: str) -> str: # Replace detected emails with [REDACTED] import re return re.sub(r'\S+@\S+', '[REDACTED]', text) agent = BaseAgent( name="safe-agent", instructions="...", config=AgentConfig( input_scanners=[pii_scanner], output_scanners=[pii_scanner], injection_detection=True, ), )
Testing Guide
Continuum favors integration tests over mocks. The [dev] extra ships fakeredis, respx, and pytest-asyncio.
# conftest.py import pytest import fakeredis.aioredis as fakeredis from orchestrator.core import Container @pytest.fixture async def container(): c = Container() c.set_session_client(SessionClient(redis=fakeredis.FakeRedis())) return c
# test_agent.py import pytest @pytest.mark.asyncio async def test_basic_response(container, real_llm_client): container.set_llm_client(real_llm_client) runner = AgentRunner(container=container) response = await runner.run(agent, "Say hello.", user_id="test-user") assert response.status.value == "success" assert len(response.content) > 0
Components
Continuum's building blocks — tools, memory, sessions, durable workflows.
MCP Servers
Every tool is exposed via the Model Context Protocol. Three transport types are supported:
from orchestrator.tools import MCPServerStdio, MCPServerSse, MCPServerStreamableHttp # Spawn a subprocess (local Python script or shell command) fs_server = MCPServerStdio(command="python", args=["-m", "mcp_filesystem"]) # Server-Sent Events (remote server) sse_server = MCPServerSse(url="https://tools.example.com/mcp/sse") # StreamableHTTP (recommended for production) http_server = MCPServerStreamableHttp(url="https://tools.example.com/mcp") agent = BaseAgent( name="tool-agent", instructions="You have access to the filesystem.", mcp_servers=[fs_server], )
Passing tools explicitly with MCPUtil
If you need the tool definitions as Python objects (e.g. to inspect, filter, or pass them manually), use MCPUtil.get_function_tools():
from orchestrator.tools import MCPUtil, ToolExecutor # Get tool definitions from a connected server tool_defs = await MCPUtil.get_function_tools(server) tools = [t.model_dump() for t in tool_defs] executor = ToolExecutor({server: None}) # None = expose all tools await executor.initialize() agent = BaseAgent( name="agent", instructions="...", tools=tools, tool_executor=executor, )
Tool Filtering
When you have many MCP tools, semantic tool filtering sends only the relevant subset to the LLM each turn — reducing token cost and noise.
from orchestrator.agent.config import AgentConfig, ToolAttentionConfig agent = BaseAgent( name="commerce-agent", instructions="...", mcp_servers=[shop_server], # 50+ tools config=AgentConfig( tool_attention=ToolAttentionConfig( enabled=True, max_tools=10, # send at most 10 tools per turn ) ), )
Tool Context Injection
Some tools return a value (e.g. session_id, cart_token) that subsequent tool calls need as input. ToolContextState captures these values automatically and injects them into later calls — no agent prompt changes required.
from orchestrator.tools import MCPServerStreamableHttp # Capture session_id from the login tool result, inject into every subsequent call shop_server = MCPServerStreamableHttp( url="https://shop.example.com/mcp", tool_context={ "capture": {"login": "session_id"}, # tool name → result field to capture "inject": {"session_id": "session_id"}, # param name → captured key }, )
Run Artifacts
MCP tool responses can contain rich structured data (widgets, tables, charts). Access them via response.run_artifacts.
response = await runner.run(agent, "Show me the product catalog.") if response.run_artifacts: for artifact_id, artifact in response.run_artifacts.items(): widget_meta = artifact.get("meta") # widget template structured = artifact.get("structured_content") text = artifact.get("text_content")
Long-term Memory
Continuum uses mem0 + Milvus(default) or Qdrant for persistent semantic memory. Facts are automatically extracted from conversations and stored as embeddings.
from orchestrator.agent.config import AgentMemoryConfig from orchestrator.memory.scopes import MemoryScope agent = BaseAgent( name="memory-agent", instructions="Remember user preferences.", memory_config=AgentMemoryConfig( search_memories=True, search_scope=MemoryScope.USER, search_limit=5, store_memories=True, store_scope=MemoryScope.USER, broadcast_learnings=True, # share useful facts with other agents ), )
Controlling what gets stored
Use extraction_prompt to tell mem0 exactly which facts to extract. Use pre_store_filter to remove PII or irrelevant facts after they are stored.
# extraction_prompt — override mem0's default extraction logic AgentMemoryConfig( store_memories=True, extraction_prompt=( "Only extract long-term facts about the user's pets, animal preferences, " "and dietary needs. Do NOT store transient actions like adding to cart or searches." ), ) # pre_store_filter — runs after storage; facts not returned are deleted def remove_pii(facts: list[str]) -> list[str]: return [f for f in facts if "credit card" not in f] AgentMemoryConfig(store_memories=True, pre_store_filter=remove_pii)
Memory management API
from orchestrator.core.container import get_container memory_client = get_container().memory_client # View all memories for a user memories = await memory_client.get_all(user_id="user-123") for m in memories: print(m.memory, m.id) # Search by query results = await memory_client.search("pet preferences", user_id="user-123") # Delete a specific memory await memory_client.delete(memory_id="abc-123") # Delete all memories for a user (GDPR right-to-forget) await memory_client.delete_all(user_id="user-123")
Memory Scopes
| Scope | Isolation | Use case |
|---|---|---|
USER | Per user_id | User preferences, personal context |
AGENT | Per agent name | Agent-specific domain knowledge |
SHARED | Cross-user, cross-agent | Global facts, product knowledge |
CONVERSATION | Per conversation_id | Ephemeral conversation context |
IntelligentMemoryClient
A drop-in replacement for MemoryClient that adds importance scoring, time-based decay, entity extraction, and user profiles. Low-relevance or stale memories are down-weighted before being injected into the prompt.
from orchestrator.memory import IntelligentMemoryClient, IntelligenceConfig memory = IntelligentMemoryClient( intelligence_config=IntelligenceConfig( enable_scoring=True, # LLM scores each memory at store time enable_decay=True, # recent memories get a relevance boost prune_threshold=0.15, # delete memories below this score ) ) # Accepts strings, list of strings, or message dicts await memory.add( [{"role": "user", "content": "I prefer dark mode."}], user_id="user-123", ) results = await memory.search("user preferences", user_id="user-123")
add() accepts three forms: a plain string, a list of strings, or a list of {"role", "content"} message dicts. The messages-style form is recommended because it gives mem0 more context for fact extraction.
Sessions (Redis)
Short-term conversation history stored in Redis. AgentRunner handles loading and saving automatically — you only use SessionClient directly when you need to manage sessions outside a run (e.g. building a chat history UI, clearing history, debugging).
from orchestrator.session import SessionClient session = SessionClient() # Create or resume a session session_id = await session.get_or_create_session(user_id="user-123") # Read history (e.g. to display in a chat UI) messages = await session.get_conversation_history(session_id) # Clear messages but keep the session await session.clear_session(session_id) # Delete the session entirely await session.delete_session(session_id)
Temporal Integration
Build durable workflows that survive process restarts, with automatic retries and audit trails.
from orchestrator.temporal import TemporalClient from orchestrator.temporal.workflows import AgentWorkflow from orchestrator.temporal.types import AgentStep, ApprovalStep, ParallelStep steps = [ AgentStep(agent_name="researcher", input="Analyze market trends"), ApprovalStep( description="Review analysis before proceeding", approvers=["manager@acme.com"], timeout=86400, # 24 hours ), AgentStep(agent_name="writer"), # receives researcher output ] client = TemporalClient() handle = await client.execute_workflow(AgentWorkflow, {"steps": steps})
Step Types
| Step | Purpose |
|---|---|
AgentStep | Run a registered agent |
ApprovalStep | Pause for human approval (email notification) |
ParallelStep | Run multiple agents concurrently |
ConditionalStep | Branch based on a condition agent's output |
WaitStep | Delay execution (1 second to 7 days) |
Loop Workflow (Temporal)
For iterative agentic work that must survive restarts, use LoopAgentWorkflow. The loop runs on Temporal and persists state between iterations.
from orchestrator.temporal.workflows import LoopAgentWorkflow handle = await client.execute_workflow( LoopAgentWorkflow, { "agent_name": "refinement-agent", "initial_input": "Draft a product description.", "max_iterations": 5, "termination_condition": "output_match", "termination_pattern": "APPROVED", }, )
Human-in-the-Loop
Workflows pause at ApprovalStep and send a notification to approvers. The workflow resumes only when approved — or auto-approves after timeout.
ApprovalStep( description="Approve the generated email before sending to customers", approvers=["alice@company.com", "bob@company.com"], timeout=3600, # 1 hour, then auto-expires auto_approve_if="low_risk", # skip approval if condition agent returns this )
Integrations
Continuum calls LLM providers directly via their official SDKs.
Provider Routing
Continuum routes to providers automatically based on the model string prefix — no configuration needed.
| Model name prefix | Provider | Examples |
|---|---|---|
claude-… or anthropic/… | Anthropic | claude-sonnet-4-5, claude-opus-4-5 |
gemini/… | Google Gemini | gemini/gemini-2.0-flash, gemini/gemini-1.5-pro |
| anything else | OpenAI | gpt-4o, gpt-4o-mini, gpt-5, o3-mini |
OpenAI
Default provider. Route by using any gpt-* model string.
agent = BaseAgent(name="gpt-agent", instructions="...", model="gpt-4o")
| Model | Context | Notes |
|---|---|---|
gpt-5 | — | Latest, highest capability |
gpt-4o | 128k | Strong overall quality |
gpt-4o-mini | 128k | Default model, fast & cheap |
o3-mini | — | Reasoning model |
Required env: OPENAI_API_KEY — also used by mem0's default embedder.
Anthropic
Route by using any claude-* or anthropic/... prefixed model string.
agent = BaseAgent(name="claude-agent", instructions="...", model="claude-sonnet-4-5")
| Model | Notes |
|---|---|
claude-sonnet-4-5 | Recommended — best balance |
claude-opus-4-5 | Highest capability |
claude-haiku-4-5 | Fastest & cheapest |
Required env: ANTHROPIC_API_KEY
Google Gemini
Route by using a gemini/-prefixed model string.
agent = BaseAgent(name="gemini-agent", instructions="...", model="gemini/gemini-2.5-flash")
| Model | Notes |
|---|---|
gemini/gemini-2.5-flash | Fast, cost-effective |
gemini/gemini-2.0-pro | Best Gemini quality |
gemini/gemini-1.5-pro | 1M context window |
Required env: GEMINI_API_KEY
Azure OpenAI
from orchestrator.agent import BaseAgent from orchestrator.llm import LLMConfig import os config = LLMConfig( model="azure/gpt-4o", api_key=os.environ["AZURE_API_KEY"], api_base=os.environ["AZURE_API_BASE"], api_version=os.environ["AZURE_API_VERSION"], ) agent = BaseAgent(name="azure-agent", instructions="...", llm_config=config)
The azure/ prefix routes to OpenAI's SDK with your Azure endpoint. Required env: AZURE_API_KEY, AZURE_API_BASE, AZURE_API_VERSION.
Automatic Fallback
When LLM_ENABLE_FALLBACK=true (default), any provider error transparently retries on FALLBACK_LLM_MODEL. Set it to a different provider to get cross-provider resilience:
# .env — primary OpenAI, fallback to Gemini
DEFAULT_LLM_MODEL=gpt-4o
FALLBACK_LLM_MODEL=gemini/gemini-1.5-flash
LLM_ENABLE_FALLBACK=trueLangfuse Tracing
All agent runs, LLM calls, tool invocations, and memory operations are automatically traced to Langfuse. No code changes required.
# docker-compose.yml ships Langfuse at http://localhost:3000 # .env LANGFUSE_PUBLIC_KEY=pk-lf-... LANGFUSE_SECRET_KEY=sk-lf-... LANGFUSE_HOST=http://localhost:3000
Each response.trace_id is a direct Langfuse trace link for debugging.
Tracing Decorators
Three decorators create Langfuse spans for custom functions — all imported from orchestrator.observability.
@observe — generic span
Add custom spans to any async function:
from orchestrator.observability import observe @observe("preprocess_input") async def preprocess(text: str) -> str: return text.strip().lower()
@trace_tool — for tool / API functions
from orchestrator.observability import trace_tool @trace_tool("search_products") async def search_products(query: str): # creates a tool-type span with input/output captured return await db.search(query)
@trace_agent — for custom agent wrappers
from orchestrator.observability import trace_agent @trace_agent("my-specialist") async def run_specialist(input: str): # span tagged as agent-type in Langfuse ...
| Decorator | Langfuse span type | Best for |
|---|---|---|
@observe | span | Generic business logic |
@trace_tool | tool | Tool / external API call functions |
@trace_agent | agent | Custom agent wrapper functions |
Qdrant
# .env
VECTOR_STORE_PROVIDER=qdrant
QDRANT_HOST=localhost
QDRANT_PORT=6333
QDRANT_COLLECTION=orchestrator_memoriesFor Qdrant Cloud, also set QDRANT_API_KEY.
Milvus default
# .env (default — no change needed if using docker compose)
VECTOR_STORE_PROVIDER=milvus
MILVUS_HOST=localhost
MILVUS_PORT=19530For Zilliz Cloud, set MILVUS_TOKEN and point MILVUS_HOST to your cloud endpoint.
Smart Inference
A cost-aware, classifier-driven routing layer that picks the optimal model per prompt. Continuum agents call one OpenAI-compatible endpoint; Smart Inference dispatches across 250+ models on 45+ providers with per-1M-token pricing, budget ledger, and dynamic output caps baked into the request path.
SMART_GATEWAY_URL, set agent_model="auto", and the gateway picks the cheapest model that meets the prompt's quality threshold. Switch tiers per-agent with gateway_mode="strict" | "modest" | "quality".
How it works
Every POST /v1/chat/completions request flows through a fixed middleware pipeline. Each stage is independently observable and skipped cleanly when its feature flag is off.
# Request lifecycle (Continuum agent → Smart Inference gateway → provider) inbound request ─► requireValidKey # bearer → virtualKey lookup, fail-closed 401 ─► requestValidator # content-type, custom-host checks ─► hooks (pre) # plugin pre-hooks (PII, prompt-shield…) ─► memoryCache # semantic cache (Redis, opt-in) ─► classifier # stamp complexity + domain ─► budget # pre-flight cost + reservation ─► router # model namespace → candidate list → top pick ─► provider handler # native API call (OpenAI / Anthropic / Google / …) ─► response transform # normalise to OpenAI shape ─► hooks (post) # plugin post-hooks ─► observability # flush trace to Langfuse client ◄┘
The classifier and the router consume two signals: the model namespace from the request body, and the metadata block (session_id, trace_id, optional complexity / domain overrides).
Routing modes
Three preset modes resolve to three quality tiers. The mode is picked per-agent via gateway_mode and per-request by the model field (auto/cheap, auto/mid, auto/quality).
| Mode | Tier | Optimises for | Typical pick |
|---|---|---|---|
strict | cheap | lowest cost | smallest model that clears the capability gate (gpt-4o-mini / gemini-flash / haiku class) |
modest | mid | quality / cost balance | mid-tier with the best q/cost — claude-sonnet, gpt-4o |
quality | quality | highest quality | top-tier candidates available in the registry |
src/services/router/registry.json. Models not in the registry are unreachable via auto — pin them explicitly with <provider>/<model_id> if needed.
Wire Continuum to the Gateway
Continuum routes all LLM calls through Smart Inference when SMART_GATEWAY_URL is set. There is nothing else to import or subclass — GatewayProvider automatically replaces the per-provider clients.
Environment variables
# Continuum side — .env SMART_GATEWAY_URL=http://localhost:8787/v1 # gateway base URL SMART_GATEWAY_API_KEY=ck-prod-2026-05-19 # bearer (matches a virtual key) SMART_GATEWAY_DEFAULT_MODE=modest # strict | modest | quality
Virtual key (bearer)
The gateway authenticates the client by bearer and looks up the upstream provider key, budget, and allowed-models from conf.integrations[].
// Smart Inference side — conf.json (excerpt) { "integrations": [ { "provider": "anthropic", "slug": "dev_team_anthropic", "bearer_token": "ck-prod-2026-05-19", "credentials": { "api_key_env": "ANTHROPIC_API_KEY" }, "budget_usd": 100, "allowed_models": [ "auto", "auto/cheap", "auto/mid", "auto/quality", "openai/gpt-4o-mini", "anthropic/claude-haiku-4-5-20251001", "anthropic/claude-opus-4-7", "google/gemini-2.0-flash" ] } ] }
.env file. npm run start:node does not dotenv-load the gateway. Either set -a; source .env; set +a before starting, or pin secrets in docker-compose.yaml's environment: block. A virtual key whose api_key_env resolves to undefined is silently dropped from the index — every request to it returns 401.
Model namespace
The model field in the request body is parsed by modelResolver.ts. Five grammars are supported:
| Form | Meaning | |
|---|---|---|
| auto | — | gateway-wide auto-routing, default tier |
| auto/<tier> | — | gateway-wide, specific tier (cheap/mid/quality) |
| <provider>/auto | — | provider-scoped auto-routing, default tier |
| <provider>/auto/<tier> | — | provider-scoped, specific tier |
| <provider>/<model_id> | — | explicit pin — bypasses model selection |
Agent code
From the agent's perspective, nothing changes. Set agent_model to a routing intent and optionally pick a mode.
from orchestrator.agent import BaseAgent, AgentRunner # Auto-routing — gateway picks the model per turn. agent = BaseAgent( name="shop-assistant", instructions="You are a friendly pet shop assistant.", model="auto", gateway_mode="modest", # "strict" | "modest" | "quality" ) response = await AgentRunner().run(agent, "Show me dog leashes", user_id="alice") print(response.content)
For multi-agent flows, mix and match modes per role:
# Triage routes fast; specialist reasons carefully. triage = BaseAgent(name="triage", model="auto", gateway_mode="strict") specialist = BaseAgent(name="specialist", model="auto", gateway_mode="quality")
Classifier output
When smart_inference.classifier.enabled = true (or by default in conf.defaults.json), every prompt is tagged with complexity and domain. The router uses these to filter candidates before mode/tier ranking.
| Field | Values | Source |
|---|---|---|
complexity | simple · medium · complex | classifier LLM (gpt-4o-mini by default) with rule fallback |
domain | general · code · health · math · analysis · reasoning · finance | classifier LLM |
You can override either via metadata:
{
"model": "auto",
"messages": [{"role": "user", "content": "…"}],
"metadata": {
"session_id": "s-001",
"complexity": "complex",
"domain": "code"
}
}Response headers
The gateway echoes its routing decision on every response. Useful for tracing and dashboards.
| Header | Type | Example |
|---|---|---|
| x-portkey-router-mode | string | modest |
| x-portkey-router-complexity | string | medium |
| x-portkey-router-domain | string | general |
| x-portkey-router-picker | string | category · pareto |
| x-portkey-router-pool-size-before | number | 20 |
| x-portkey-router-pool-size-after | number | 14 |
| x-portkey-router-attempts | list | claude-sonnet-4-6@anthropic:200 |
| x-portkey-router-handover | string | none · injected |
| x-portkey-budget-state | string | ok · warn |
| x-portkey-cache-status | string | HIT · MISS · DISABLED |
| x-portkey-trace-id | uuid | — |
Tips & gotchas
- Handover — when the router picks a different model between turns in the same
session_id, the gateway injects a one-line system note so the new model keeps prior context. Always passmetadata.session_idfor multi-turn agents. - Cooldown — 429/503 from a candidate temporarily removes it from the pool (default 15s, honoring upstream
Retry-Afterup to 5 min). - Semantic cache — cosine ≥ 0.95 hits on near-duplicate prompts. Streams and tool calls bypass cache by design. Enable via
conf.cache.semantic.enabled = true. - Streaming tool calls — current versions of Smart Inference forward provider chunks 1:1. If you stream and use tools, aggregate tool_call argument fragments client-side until
finish_reason="tool_calls". - Budgets — gated by
budgets.enabled. When on, requests are pre-checked against the bearer'sbudget_usdand the integration's session/project caps.
Continuum Research
The mechanisms behind reliable agents at scale — what makes Continuum more than a thin LLM wrapper. Each topic links a runtime concern to the module that implements it, with citations to the source files.
Design philosophy
Continuum is a runtime, not a framework. Four principles guide every module:
- Code-first, no YAML. Agents are Python dataclasses. The compiler is your friend.
- Async-native. Every I/O hop is non-blocking — LLM, MCP tools, Redis, vector store, Langfuse.
- Protocol-based abstractions. Replace any layer (LLM, memory, session, observability) by swapping the protocol implementation. No deep inheritance trees.
- Trace everything. Auto-tracing isn't a feature — it's a constraint. If a behaviour can't be traced, it shouldn't ship.
Tool Attention
An agent with 50+ MCP tools dilutes the LLM's function-calling accuracy and burns tokens on schemas it never uses. Tool Attention is a top-k semantic promotion mechanism: every turn, only the most relevant tools are sent to the LLM.
| Field | Type | Default | Effect |
|---|---|---|---|
| k | int | 3 | How many tools to promote per turn |
| min_tools | int | 5 | Skip attention when the agent has fewer tools than this |
| NEED_TOOL fallback | — | auto | If the LLM signals a missing tool, expand the candidate set and retry |
from orchestrator.tools.tool_attention.config import ToolAttentionConfig agent = BaseAgent( name="ops", instructions="…", tools=large_tool_set, # 30+ tools config=AgentConfig( tool_attention=ToolAttentionConfig(k=5, min_tools=10), ), )
Context compression
When the running message array approaches the model's context window, Continuum compresses older turns into a summary while preserving the most recent N exchanges verbatim.
| Env | Default | Meaning |
|---|---|---|
| CONTEXT_MANAGEMENT_ENABLED | true | Master switch |
| CONTEXT_COMPRESSION_THRESHOLD | 0.8 | Trigger at 80% of model's context window |
| CONTEXT_KEEP_RECENT_MESSAGES | 10 | Recent turns kept verbatim |
Compression runs synchronously between LLM calls. The summary is stored as a system message; the original turns are dropped from the message array but kept in session history (Redis) for audit.
Instruction modifiers
Dynamic, code-driven prompt augmentation. A modifier is a callable (prompt, ctx) → prompt that runs after template variables are resolved but before the LLM call.
def tier_aware(prompt: str, ctx: RunContext) -> str: tier = ctx.metadata.get("user_tier", "free") if tier == "enterprise": return prompt + "\n\nThis is an enterprise user. Prioritise SLA." return prompt agent = BaseAgent( name="support", instructions="You are helping {user_name}.", template_vars={"user_name": "Alice"}, instruction_modifiers=[tier_aware], )
Smart layer · model_tier routing
An alternative to Smart Inference's gateway-side router — Continuum can route inline via RouterAgent(routing_strategy="model_tier"). A tier classifier (small LLM or heuristic) reads each prompt and dispatches to one of several pre-defined model tiers.
When SMART_LAYER_ENABLED=true and the strategy is model_tier, the router:
- Runs the tier classifier on the prompt → returns
cheap/mid/quality - Looks up the tier's pinned model from
RouterConfig.tier_models - Falls back to
FALLBACK_LLM_MODELif the picked tier is unhealthy
Falls back to standard llm routing when SMART_LAYER_ENABLED=false.
Tier classifiers
Three classifier backends ship out of the box. Choose via LLM_ROUTE_TIER_CLASSIFIER.
| Classifier | Where it runs | Best for |
|---|---|---|
light_only | regex + length heuristics | zero-latency baseline; never calls an LLM |
qwen | HuggingFace Router API — Qwen3-4B-Instruct | cloud routing without spinning a model |
qwen_local | local OpenAI-compatible endpoint (MLX, vLLM) | air-gapped or zero-cost |
Set LLM_ROUTE_TIER_CLASSIFIER_HEURISTIC_SHORTCUT=false to skip the keyword shortcut and always run the classifier LLM.
Handoff history transfer modes
When agent A hands off to agent B, the prior conversation is rewritten for B's context. Four modes control how much history travels:
| Mode | What B sees | Use when |
|---|---|---|
FULL | Verbatim message array | Specialist needs all detail |
SUMMARY | LLM-generated summary + open question | Long conversations, narrow specialist |
RECENT_N | Last N turns only | Topic just changed; old context is noise |
HYBRID | Summary of older turns + last N verbatim | Default — best balance for most flows |
Continuum also tracks handoff depth, detects cycles (A→B→A→B), and emits HANDOFF_RETURN events when control returns to a parent agent.
Memory isolation scopes
Long-term memory in Continuum has four orthogonal scopes. Pick per-agent for search and store separately.
| Scope | Keyed by | Reach | Multi-tenant safe? |
|---|---|---|---|
USER | user_id | Per-user across all agents | ✓ default |
AGENT | agent_name | Per-agent across all users | One-way — shared by users |
SHARED | — | Global knowledge base | No — explicitly shared |
CONVERSATION | conversation_id | Ephemeral, single thread | ✓ |
IntelligentMemoryClient
A drop-in replacement for the standard MemoryClient that adds three behaviours:
- Adaptive extraction — uses a per-agent extraction prompt to filter what gets stored (e.g. "only pet preferences, never one-off cart actions").
- Relevance re-ranking — re-ranks recalled memories using a small LLM scorer before injecting them into the prompt.
- Deduplication — merges semantically equivalent memories to prevent drift.
PII filtering on memory writes
Before a memory is stored, Continuum can run it through a PII scrubber that redacts emails, phone numbers, SSNs, and credit cards. Configure via the memory client:
from orchestrator.memory import MemoryClient, PIIPolicy memory = MemoryClient(pii_policy=PIIPolicy.REDACT)
Run artifacts
MCP tools can return structuredContent — JSON payloads alongside their text output. Continuum captures these as run artifacts, exposing them on AgentResponse.artifacts for the application to consume (UI widgets, downstream pipelines, audit logs).
response = await runner.run(agent, "checkout", user_id="alice") for art in response.artifacts: print(art.tool_name, art.structured_content) # e.g. ('checkout', {'order_id': 'ORD-92151', 'total_cents': 699})
Evaluation framework
Continuum ships with two opt-in eval stacks (pip install -e ".[eval]"):
- DeepEval — criterion-based evaluation with customisable metrics (faithfulness, correctness, toxicity).
- RAGAS — RAG-specific metrics (context precision, recall, answer relevance).
- EvaluatorAgent — a specialised
BaseAgentwhose job is grading other agents' outputs.
Golden datasets from Langfuse
Build regression test sets directly from production traces. The orchestrator.evaluation.golden module pulls traces matching a filter (e.g. tagged good_response in the Langfuse UI) and materialises them as a pytest dataset.
from orchestrator.evaluation import build_golden_dataset dataset = await build_golden_dataset( project="shop-assistant", tags=["good_response"], since="2026-04-01", ) # Use with pytest-asyncio + DeepEval as a CI gate.
Example · Pet Shop Assistant
An end-to-end walkthrough: an agent backed by an MCP shop server, routed through Smart Inference, with Langfuse tracing and Redis-backed sessions. Copy the snippets in order and you'll have a working chat UI in under 10 minutes.
playground/gateway-local-shop/. This page is the annotated tour.
Architecture
┌──────────────┐ HTTP ┌────────────────┐ OpenAI ┌──────────────────┐
│ Browser │ ─────────► │ FastAPI UI │ │ Smart Inference │
│ /chat │ POST │ (web.py :8081)│ │ Gateway :8787 │
└──────────────┘ └────────┬───────┘ └────────┬─────────┘
│ │
│ Continuum AgentRunner │ pick model
▼ ▼
┌────────────────┐ ┌──────────────────┐
│ BaseAgent │ ◄── tools ──│ OpenAI / │
│ (agent.py) │ │ Anthropic / │
└────────┬───────┘ │ Google │
│ └──────────────────┘
│ MCP StreamableHTTP
▼
┌────────────────┐ ┌──────────────────┐
│ Shop server │ │ Langfuse :3005 │
│ (server.py │ │ ◄── traces ──── │
│ :8888) │ └──────────────────┘
└────────────────┘
Session/memory: Redis (:6380) · Milvus (:19530)1 · Prerequisites
- Python 3.13 with a venv
- Docker / Docker Compose
- Node 22 LTS for the gateway build
- At least one provider key (
OPENAI_API_KEY,ANTHROPIC_API_KEY, orGEMINI_API_KEY)
2 · Spin up infra
- Continuum stack — Redis, Milvus, Langfuse, ClickHouse, Postgres, MinIO
cd continuum docker compose up -d redis-sdk milvus milvus-etcd postgres clickhouse minio langfuse-web langfuse-worker
- Smart Inference gateway — build, then run from Docker (sharing Continuum's Langfuse Redis)
cd ../continuum-backend-smart-inference nvm use 22 && npm install && npm run build docker compose up -d gateway
3 · Configure env
Continuum (continuum/.env) — point at the gateway and the local Langfuse:
# LLM / routing SMART_GATEWAY_URL=http://localhost:8787/v1 SMART_GATEWAY_API_KEY=ck-prod-2026-05-19 SMART_GATEWAY_DEFAULT_MODE=modest # Observability LANGFUSE_ENABLED=true LANGFUSE_HOST=http://localhost:3005 # continuum's docker-compose maps Langfuse to 3005 LANGFUSE_PUBLIC_KEY=pk-lf-… LANGFUSE_SECRET_KEY=sk-lf-… # Sessions + memory SESSION_REDIS_PORT=6380 VECTOR_STORE_PROVIDER=milvus MILVUS_PORT=19530
Smart Inference (continuum-backend-smart-inference/.env) — real provider keys:
OPENAI_API_KEY=sk-…
ANTHROPIC_API_KEY=sk-ant-…
GEMINI_API_KEY=AIza…
# From inside docker, reach the host's Langfuse
LANGFUSE_PUBLIC_KEY=pk-lf-…
LANGFUSE_SECRET_KEY=sk-lf-…
LANGFUSE_BASE_URL=http://host.docker.internal:30054 · MCP shop server
A FastMCP server exposes 5 tools (search, get, add-to-cart, view-cart, checkout) and 3 resources. The agent talks to it over StreamableHTTP.
# playground/gateway-local-shop/server.py (excerpt) from mcp.server.fastmcp import FastMCP mcp = FastMCP("local-shop") PRODUCTS = [ {"id": "p1", "name": "Dog Food (Dry) 5kg", "price": 29.99, "animal": "dog"}, # … more products … ] _carts: dict[str, list] = {} @mcp.tool() def search_products(query: str = "", animal: str = "") -> list: """Filter products by query / animal.""" return [p for p in PRODUCTS if (not animal or p["animal"] == animal)] @mcp.tool() def add_to_cart(session_id: str, product_id: str, quantity: int = 1) -> dict: cart = _carts.setdefault(session_id, []) cart.append({"product_id": product_id, "quantity": quantity}) return {"cart_size": len(cart)} if __name__ == "__main__": import uvicorn uvicorn.run(mcp.streamable_http_app(), host="0.0.0.0", port=8888)
python playground/gateway-local-shop/server.py # MCP server on :88885 · Agent + config
The agent connects to the MCP server, defines memory + session policy, and uses Smart Inference auto-routing.
# playground/gateway-local-shop/agent.py (excerpt) from orchestrator import AgentConfig, AgentMemoryConfig, AgentMemoryScope, \ AgentRunner, BaseAgent, MCPServerStreamableHttp, ToolExecutor from orchestrator.tools.tool_attention.config import ToolAttentionConfig from orchestrator.tools.types import ToolContextConfig, ToolContextVariable mcp_server = MCPServerStreamableHttp( params={"url": "http://localhost:8888/mcp"}, context_config=ToolContextConfig( variables=[ToolContextVariable(name="session_id", inject_into=["add_to_cart", "view_cart", "checkout"])] ), ) await mcp_server.connect() executor = ToolExecutor({mcp_server: None}) await executor.initialize() agent = BaseAgent( name="shop-assistant", instructions="You are a friendly pet shop assistant.", model="auto", # gateway picks gateway_mode=None, # falls back to SMART_GATEWAY_DEFAULT_MODE tools=executor.get_tool_definitions(), tool_executor=executor, memory_config=AgentMemoryConfig( search_memories=True, store_memories=True, search_scope=AgentMemoryScope.USER, store_scope=AgentMemoryScope.USER, ), config=AgentConfig( max_turns=3, log_to_session=True, tool_attention=ToolAttentionConfig(k=3, min_tools=3), ), )
6 · CLI or Web UI
Two ways to drive it. The CLI is one file and stops at Ctrl-C; the Web UI is a FastAPI page with a ChatGPT-style chat.
CLI loop
# cli.py import asyncio from agent import agent, executor async def main(): runner = AgentRunner(tool_executor=executor) runner.register_agent(agent) while True: msg = input("you ❭ ").strip() if not msg: break r = await runner.run(agent, msg, user_id="alice", conversation_id="local") print("agent ❭", r.content) asyncio.run(main())
FastAPI Web UI
# web.py — POST /chat returns the agent's reply as JSON @app.post("/chat") async def chat(req: ChatRequest): response = await shop_agent.chat( req.message, user_id=req.user_id, conversation_id=req.conversation_id, ) return {"response": response}
python playground/gateway-local-shop/web.py # open http://localhost:80817 · Run a conversation
Type any of these into the chat — the agent will route to the right tool automatically.
| Prompt | What the agent does |
|---|---|
| show me dog toys | search_products(animal="dog", category="toys") → product list |
| add the tennis balls to my cart | add_to_cart(session_id, product_id="p5") |
| what's in my cart? | view_cart(session_id) → table with totals |
| checkout please | checkout(session_id) → order id + receipt |
http://localhost:3005 → your Continuum project. Every turn appears as a Langfuse trace with the user/assistant messages, every tool call as a span, and the picked model + latency on the generation.
Make it yours
- Swap the domain — replace the MCP server with one that exposes your tools (Slack, Jira, internal APIs). The agent code doesn't change.
- Tighten the tier — set
gateway_mode="strict"to drop latency to ~2–4s/turn (cheap-tier models). - Persist memories — keep
memory_configon and the agent will remember per-user facts across sessions. - Add a workflow — wrap the agent in a RouterAgent with a billing-specialist sibling and a triage agent.
Community
Continuum is built at Shyftlabs. Contributions, examples, and feedback are welcome.
Contributing
- Fork & clone — work in a feature branch
- Install dev dependencies
pip install -e ".[dev,temporal,eval]" - Write tests — unit + integration, no mocking the database
pytest tests/ -v
- Lint & type-check
ruff check src/ mypy src/
- Submit a PR — describe what changed and why, reference the relevant docs section
docker compose up -d before running the test suite.