The agent runtime
for builders who ship.

Production-grade reasoning, durable workflows, cost-aware routing, and full observability — out of the box. Continuum is the bridge from notebook to enterprise.

Python 3.13 250+ models via Smart Inference MCP-native tools Temporal · Durable workflows Langfuse · Auto-traced 9 multi-agent patterns

What you get

Smart Inference routing

One OpenAI-compatible endpoint, 250+ models. A classifier scores every prompt; the router picks the cheapest model that clears the quality bar. Per-agent strict / modest / quality tiers.

Two-tier persistent memory

Long-term semantic recall via mem0 + Milvus/Qdrant; short-term session in Redis. Four isolation scopes (USER, AGENT, SHARED, CONVERSATION) for multi-tenant safety.

9 composable workflow patterns

Sequential · Parallel · Loop · Reflection · Router · Planner · Debate · Scatter · SupervisedSequential. Mix and nest freely — every step is independently traceable.

MCP-native tooling

Stdio, SSE, or StreamableHTTP — connect any MCP server with zero adapters. Top-k Tool Attention promotes only the relevant tools each turn, cutting prompt tokens 30–60%.

Durable workflows · Temporal

Long-running multi-agent jobs that survive crashes, restarts, and deploys. Human-in-the-loop approval gates with timeout escalation, signals, and full audit trails.

Observability is non-negotiable

Every LLM call, tool invocation, handoff, and memory op auto-traced in Langfuse. Custom spans via @observe. Build golden eval sets from production traces.

Safe by default

Input/output PII redaction. Configurable scrubbers on memory writes. Cycle detection on agent handoffs. Graceful shutdown with in-flight trace flush. Bearer-scoped budgets at the gateway.

Production primitives

Dependency-injection container, health checks for every dependency, lifecycle hooks, FastAPI helpers, fakeredis for tests. Async-native, protocol-based, code-first.

Quickstart

  1. Install
    # Python 3.13 required
    python3.13 -m venv .venv    
    source .venv/bin/activate
    pip install shyftlabs-continuum 
  2. Setup environment and spin up infrastructure
    cp .env.template .env   

    Next, configure your .env file:

    # ── LLM Provider Keys ────────────────────────────────────────────────────────
    OPENAI_API_KEY=your-openai-api-key        # required (also used for embeddings)
    GEMINI_API_KEY=your-gemini-api-key        # if using Gemini
    # ANTHROPIC_API_KEY=your-anthropic-api-key  # if using Claude
    
    # ── Default LLM ──────────────────────────────────────────────────────────────
    DEFAULT_LLM_MODEL=gemini/gemini-2.5-flash
    FALLBACK_LLM_MODEL=gpt-4o-mini
    DEFAULT_LLM_TEMPERATURE=0.7
    DEFAULT_LLM_MAX_TOKENS=4096
    LLM_REQUEST_TIMEOUT=60
    LLM_MAX_RETRIES=3
    LLM_ENABLE_FALLBACK=true
    
    # ── Embeddings ────────────────────────────────────────────────────────────────
    EMBEDDER_PROVIDER=openai
    EMBEDDER_MODEL=text-embedding-3-small
    EMBEDDING_DIMS=1536
    
    # ── Memory (mem0 + Milvus) ───────────────────────────────────────────────────
    MEMORY_ENABLED=true
    VECTOR_STORE_PROVIDER=milvus
    MILVUS_HOST=localhost
    MILVUS_PORT=19530
    MILVUS_TOKEN=
    MILVUS_COLLECTION=orchestrator_memories
    MEMORY_LLM_MODEL=gemini/gemini-2.5-flash
    MEMORY_LLM_TEMPERATURE=0.1
    MEMORY_ISOLATION=user
    MEMORY_SEARCH_LIMIT=5
    MEMORY_HISTORY_DB_PATH=~/.orchestrator/memory_history.db
    
    # ── Session (Redis) ───────────────────────────────────────────────────────────
    SESSION_ENABLED=true
    SESSION_REDIS_HOST=localhost
    SESSION_REDIS_PORT=6380
    SESSION_REDIS_PASSWORD=sdk123456789
    SESSION_REDIS_DB=0
    SESSION_REDIS_SSL=false
    SESSION_TTL_SECONDS=172800
    SESSION_MAX_MESSAGES=1000
    SESSION_KEY_PREFIX=orchestrator:session
    
    # ── Langfuse Observability ────────────────────────────────────────────────────
    LANGFUSE_ENABLED=true
    LANGFUSE_PUBLIC_KEY=your-langfuse-public-key
    LANGFUSE_SECRET_KEY=your-langfuse-secret-key
    LANGFUSE_HOST=http://localhost:3000
    LANGFUSE_BASE_URL=http://localhost:3000
    NEXTAUTH_SECRET=your-nextauth-secret
    LANGFUSE_SAMPLE_RATE=1.0
    LANGFUSE_FLUSH_INTERVAL=1
    LANGFUSE_FLUSH_AT=15
    LANGFUSE_DEBUG=false
    
    # ── Temporal (durable workflows) ─────────────────────────────────────────────
    TEMPORAL_ENABLED=true
    TEMPORAL_HOST=localhost:7233
    TEMPORAL_NAMESPACE=default
    TEMPORAL_TASK_QUEUE=orchestrator-agents
    TEMPORAL_ENABLE_HUMAN_IN_LOOP=true
    TEMPORAL_APPROVAL_TIMEOUT_SECONDS=86400
    TEMPORAL_WORKFLOW_EXECUTION_TIMEOUT=604800
    TEMPORAL_ACTIVITY_START_TO_CLOSE_TIMEOUT=300
    TEMPORAL_ACTIVITY_RETRY_MAX_ATTEMPTS=3
    
    # ── Misc ──────────────────────────────────────────────────────────────────────
    ENVIRONMENT=development
    LOG_LEVEL=INFO
    SHARED_SERVICES_ENABLED=true
    MEM0_TELEMETRY=false
    ANONYMIZED_TELEMETRY=false
    TOKENIZERS_PARALLELISM=false

    Finally, spin up the infrastructure:

    docker compose up -d       # Redis (:6380) + Milvus (:19530)
  3. Create and run your first agent
    import asyncio
    from orchestrator.agent import BaseAgent
    from orchestrator.agent.runner import AgentRunner
    
    agent = BaseAgent(
        name="assistant",
        instructions="You are a helpful assistant.",
        model="gpt-4o-mini",
    )
    
    async def main():
        runner = AgentRunner()
        response = await runner.run(agent, "Hello!", user_id="u-001")
        print(response.content)
    
    asyncio.run(main())

Architecture

Continuum is structured as layered modules, each independently usable:

ModuleImport pathPurpose
agentorchestrator.agentBaseAgent, AgentRunner, 9 workflow patterns, handoffs
llmorchestrator.llmLLMClient, provider routing, structured output, context compression
memoryorchestrator.memorymem0 + Qdrant/Milvus long-term memory
sessionorchestrator.sessionRedis-backed conversation history
toolsorchestrator.toolsMCP servers, ToolExecutor, run artifacts
observabilityorchestrator.observabilityLangfuse tracing, metrics, error reporting
temporalorchestrator.temporalDurable workflows, approval gates
coreorchestrator.coreDI Container, lifecycle, health checks

Full Documentation

Each module has a detailed markdown reference in the repository:

DocCovers
agent.mdBaseAgent fields, AgentRunner, 9 workflow patterns, handoffs
llm.mdLLMClient, LLMConfig, provider routing, structured output, context compression
memory.md & memory-issue-analysis.mdmemory settings, memory mechanism and issues
session.mdSessionClient, Redis conversation history
tools.mdMCP servers, ToolExecutor, run artifacts
observability.mdLangfuse tracing, @observe decorator, metrics
temporal/Durable workflows, approval gates, human-in-the-loop
core.mdContainer, OrchestratorLifecycle, health checks, protocols
installation.mdFull env var reference, troubleshooting
features.mdComplete feature inventory organized by layer
GUIDE.mdDeveloper guide — config defaults, session/memory behaviour, practical patterns

Build Agents

From a minimal single agent to complex multi-agent workflows — everything starts with BaseAgent.

Single-agent example: playground/local-shop — one agent with MCP tools over HTTP.
Multi-agent example: playground/multi-agent-shop.

Base Agent

from orchestrator.agent import BaseAgent

agent = BaseAgent(
    name="support-bot",
    instructions="You are a support agent for Acme Corp. Be concise.",
    model="gpt-4o-mini",
    temperature=0.3,
)

BaseAgent is a dataclass. Every field has a sensible default — only name and instructions are required in practice.

ParameterTypeDescription
namestrUnique identifier (alphanumeric, hyphens, underscores)
instructionsstrSystem prompt. Supports {slot} placeholders.
modelstrLLM model string, defaults to DEFAULT_LLM_MODEL env var
temperaturefloatDefault 0.7
max_tokensint | NoneResponse token cap
toolslist[ToolDefinition]Static tool definitions for the LLM
mcp_serverslist[MCPServer]MCP servers whose tools are dynamically loaded
handoffslist[Handoff]Agents this agent can delegate to
memory_configAgentMemoryConfigLong-term memory search/store behavior
configAgentConfigExecution config — turns, timeouts, retries, react_mode
output_schematype[BaseModel] | NonePydantic model for structured output
input_schematype[BaseModel] | NoneValidates input before execution
template_varsdictStatic values injected into {slot} placeholders
exampleslist[dict]Few-shot examples — each needs input and output keys
instruction_modifierslist[Callable]Dynamic prompt modifiers applied at runtime
metadatadictArbitrary key-value metadata
tagslist[str]Categorization tags for filtering / routing

Stateless vs Stateful

Based on your needs, your agent can be stateless or stateful; if stateless, disable long-term and short-term memories; if stateful, you may only need either long-term memories(Memory) or short-term memories(Session) or both, you can control them by setting the parameters in AgentConfig and AgentMemoryConfig.

# Stateless — no Redis, no mem0 calls (fastest for prototypes)
agent = BaseAgent(
    name="stateless",
    instructions="...",
    memory_config=AgentMemoryConfig(search_memories=False, store_memories=False),
    config=AgentConfig(log_to_session=False, session_history_turns=0),
)

# Stateful — loads last 20 turns + long-term memories (default)
agent = BaseAgent(
    name="stateful",
    instructions="...",
    memory_config=AgentMemoryConfig(search_memories=True, store_memories=True),
    config=AgentConfig(log_to_session=True, session_history_turns=None),
)

Memory (default):

  • search_memories=True — looks up long-term memories before responding
  • store_memories=True — saves long-term memories after responding

Session (default):

  • log_to_session=True — saves to session history (short-term memory)
  • session_history_turns=None — loads last 20 turns of history (short-term memory)

session_history_turns behaviour

ValueBehaviour
None (default)Load last 20 turns from Redis
0Skip Redis entirely — load nothing
5Load last 5 turns from Redis

Lifecycle Hooks

Hooks are callables attached directly to BaseAgent. They receive the agent instance and the run context dict.

async def on_start(agent, context):
    print(f"Starting {agent.name}")

async def on_tool(agent, tool_name, context):
    print(f"Calling tool: {tool_name}")

agent = BaseAgent(
    name="my-agent",
    instructions="...",
    on_start=on_start,
    on_end=lambda agent, ctx: print("Done"),
    on_error=lambda agent, exc, ctx: print(f"Error: {exc}"),
    on_tool_call=on_tool,
    on_handoff=lambda agent, target, ctx: print(f"Handing off to {target}"),
)

Prompt Engineering

Template Variables

agent = BaseAgent(
    name="regional-agent",
    instructions="You serve customers in {region}. Currency: {currency}.",
    template_vars={"region": "North America", "currency": "USD"},
)

Few-shot Examples

agent = BaseAgent(
    name="classifier",
    instructions="Classify the sentiment of the input.",
    examples=[
        {"input": "I love this product!", "output": "positive"},
        {"input": "This is terrible.",  "output": "negative"},
    ],
)

Instruction Modifiers

Modifiers are called at runtime and receive (instructions: str, context: dict) → str.

def add_date(instructions, context):
    return instructions + f"\n\nToday is {context.get('date', 'unknown')}."

agent = BaseAgent(
    name="date-aware",
    instructions="You are an assistant.",
    instruction_modifiers=[add_date],
)

Structured Output

from pydantic import BaseModel

class Review(BaseModel):
    sentiment: str
    score: float
    summary: str

agent = BaseAgent(
    name="reviewer",
    instructions="Analyze the review and return structured output.",
    output_schema=Review,
)

response = await runner.run(agent, "The hotel was fantastic but expensive.")
review: Review = response.structured_output
print(review.score)  # e.g. 0.75

Multi-Agent Workflows

Continuum ships 9 built-in workflow patterns. Each has a class-based form and a factory shorthand. All patterns are composable — you can nest them or combine them with handoffs.

Sequential
Pipeline — each agent's output feeds the next
Parallel
All agents run concurrently on the same input
Loop
Iterates until a termination condition is met
Reflection
Agent runs, critic evaluates, agent retries if needed
Router
Dispatches to the most relevant specialist agent
Planner
Decomposes a goal into steps, executes each in turn
Debate
Pro/con agents argue; a judge synthesizes a verdict
Scatter
LLM splits input into slices; agents process in parallel

Custom Workflow

We strongly recommend building custom workflows using BaseAgent directly for anything closely tied to your project's business logic. A custom agent gives you full control over the flow, session saving, and memory behaviour — you decide which sub-agents are stateless and which are stateful.

Example: ParallelCoordinatorAgent in playground/multi-agent-shop/workflows.py

Instead of using ParallelAgent directly, the playground defines a custom BaseAgent subclass that orchestrates parallel search and synthesis manually:

class ParallelCoordinatorAgent(BaseAgent):
    synthesiser: BaseAgent | None = None
    parallel: ParallelAgent | None = None

    async def execute(self, input_text, runner, context, llm_client=None) -> AgentResponse:
        context.suppress_session_log = True

        # Step 1: parallel workers with a fresh stateless context (no history, no save)
        parallel_ctx = create_run_context(
            user_id=context.user_id,
            conversation_id=context.conversation_id,
        )
        parallel_result = await self.parallel.execute(input_text, runner, parallel_ctx)

        # Step 2: synthesiser uses session history + memory; suppress_session_log
        # blocks auto-save but context carries session_id so history loads normally
        final = await runner.run(
            agent=self.synthesiser,
            input=synthesis_input,
            context=context,
        )

        # Step 3: save exactly one clean turn
        await runner.save_turn(
            session_id=context.session_id,
            user_message=input_text,
            assistant_message=final.content,
        )

This gives the parallel workers a clean stateless context (no Redis calls) while the synthesiser still loads session history and memory — something the built-in ParallelAgent cannot do out of the box.

Built-in Workflows

Sequential Workflow

Executes agents as a pipeline — each agent receives the previous agent's output.

from orchestrator.agent.workflow import SequentialAgent

pipeline = SequentialAgent(
    name="research-pipeline",
    instructions="Research pipeline coordinator.",
    agents=[researcher, summarizer, formatter],
)

Or use the factory shorthand:

from orchestrator.agent import BaseAgent, AgentRunner, create_sequential_agent

researcher = BaseAgent(name="researcher", instructions="Research the topic. Output key facts.", model="gpt-4o-mini")
writer     = BaseAgent(name="writer",     instructions="Write a short report from the facts.",  model="gpt-4o-mini")
editor     = BaseAgent(name="editor",     instructions="Polish the report for clarity.",         model="gpt-4o-mini")

pipeline = create_sequential_agent(
    name="research-pipeline",
    agents=[researcher, writer, editor],
)
response = await AgentRunner().run(pipeline, "AI in healthcare")
print(response.content)  # editor's final output

Parallel Workflow

All agents receive the same input and run concurrently. Results are merged by strategy.

from orchestrator.agent.workflow import ParallelAgent
from orchestrator.agent.config import ParallelConfig, MergeStrategy

fan_out = ParallelAgent(
    name="multi-analyst",
    instructions="Run multiple analyses in parallel.",
    agents=[sentiment_agent, topic_agent, entity_agent],
    parallel_config=ParallelConfig(
        merge_strategy=MergeStrategy.LLM_SUMMARIZE,
    ),
)
MergeStrategyBehavior
CONCATENATEAll outputs joined with newlines
LLM_SUMMARIZEA secondary LLM call synthesizes all outputs
STRUCTURED_DICTReturns a dict keyed by agent name
FIRST_SUCCESSReturns the first successful agent result

Factory shorthand:

from orchestrator.agent import create_parallel_agent
from orchestrator.agent.types import MergeStrategy

parallel = create_parallel_agent(
    name="parallel-analysts",
    agents=[analyst_a, analyst_b, analyst_c],
    merge_strategy=MergeStrategy.CONCATENATE,
)

Loop Workflow

Iterates an agent until a termination condition is satisfied.

from orchestrator.agent.workflow import LoopAgent
from orchestrator.agent.config import TerminationConfig, TerminationType

loop = LoopAgent(
    name="refinement-loop",
    instructions="Iteratively refine output.",
    agent=writer_agent,
    termination=TerminationConfig(
        type=TerminationType.LLM_DECISION,  # LLM decides when done
        max_iterations=5,
    ),
)
TerminationTypeWhen it stops
LLM_DECISIONThe inner agent's LLM signals completion
TOOL_CALLA specific tool name is called
OUTPUT_MATCHOutput matches a regex pattern
MAX_ITERATIONSAlways run exactly N times
CUSTOMUser-supplied callable returns True

Factory shorthand with OUTPUT_MATCH:

from orchestrator.agent import create_loop_agent
from orchestrator.agent.types import TerminationType

loop = create_loop_agent(
    name="refinement-loop",
    agent=refiner,
    termination_type=TerminationType.OUTPUT_MATCH,
    termination_pattern=r"\bDONE\b",
    max_iterations=3,
)

Reflection Workflow

The agent runs, a critic evaluates the output, and the agent retries if the critique is NEEDS IMPROVEMENT.

from orchestrator.agent.workflow import ReflectionAgent
from orchestrator.agent.config import ReflectionConfig

reflective = ReflectionAgent(
    name="quality-writer",
    instructions="Improve output quality via self-critique.",
    agent=writer_agent,
    reflection_config=ReflectionConfig(
        max_reflections=3,
        critique_prompt="Is this response accurate and complete? Reply APPROVED or NEEDS IMPROVEMENT.",
    ),
)

Router Workflow

Routes requests to specialist agents based on content. Three strategies: LLM, rule-based, or hybrid.

from orchestrator.agent.workflow import RouterAgent, Route

router = RouterAgent(
    name="triage",
    instructions="Route customer requests to the right specialist.",
    routes=[
        Route(agent_name="billing-agent",  description="Billing, payments, invoices"),
        Route(agent_name="tech-support",  description="Technical issues and bugs"),
        Route(agent_name="sales-agent",   description="Pricing and upgrades"),
    ],
    fallback_agent_name="general-agent",
)

Or use the factory with tuple-based routes (recommended — the old Route(target=...) API is removed):

from orchestrator.agent import BaseAgent, AgentRunner, create_router_agent

billing   = BaseAgent(name="billing-agent",   instructions="Handle billing questions.")
technical = BaseAgent(name="technical-agent", instructions="Handle technical support.")
general   = BaseAgent(name="general-agent",   instructions="Handle general questions.")

router = create_router_agent(
    name="triage",
    routes=[
        ("billing-agent",   "billing, invoice, payment, subscription, refund"),
        ("technical-agent", "bug, error, crash, not working, how to"),
    ],
    fallback="general-agent",
    strategy="hybrid",
)

runner = AgentRunner(agent_registry={
    "billing-agent": billing, "technical-agent": technical, "general-agent": general,
})
response = await runner.run(router, "My payment failed twice this week")

Planner Workflow

Decomposes a goal into sub-tasks and executes them — either with a single agent or by routing each step to a specialist from a pool.

from orchestrator.agent.workflow import PlannerAgent
from orchestrator.agent.config import PlanningConfig

# Agent-pool mode: LLM routes each step to a specialist
planner = PlannerAgent(
    name="research-planner",
    instructions="Decompose research goals into steps.",
    agents=[web_researcher, analyst, writer],
    planning_config=PlanningConfig(
        max_steps=8,
        enable_replanning=True,
    ),
)

Debate, Scatter & SupervisedSequential

DebateAgent
Two agents argue pro/con, a judge synthesizes a final verdict. Good for nuanced decisions.
ScatterAgent
LLM splits input into N slices; each agent processes its own slice in parallel; results merged.
SupervisedSequential
Like Sequential but an LLM quality gate checks each step's output before proceeding.
DAGAgent
Dependency-aware parallel execution. Steps with dependencies wait; independent steps run together.

Handoffs

An agent can delegate to another agent mid-conversation. Handoffs appear to the LLM as callable tools.

from orchestrator.agent.handoff import Handoff

triage = BaseAgent(
    name="triage",
    instructions="Triage requests and hand off to specialists.",
    handoffs=[
        Handoff(
            target_agent="billing",
            description="Transfer to billing for payment questions.",
            return_to_parent=True,
        ),
        Handoff(
            target_agent="tech-support",
            description="Transfer for technical issues.",
        ),
    ],
)

Handoff History Modes

Control how much context is passed when handing off between agents.

ModeWhat's passedBest for
FULLComplete conversation historyShort conversations, full context needed
SUMMARYLLM-generated abstract of the conversationLong conversations, context window limits
RECENT_NLast N turns onlyWhen only recent context matters
HYBRIDSummary of older messages + full recent N turnsBest of both — default recommendation
from orchestrator.agent.handoff import Handoff, HistorySummarizationMode

Handoff(
    target_agent="specialist",
    description="Escalate complex issues.",
    summarization_mode=HistorySummarizationMode.HYBRID,
    recent_turns=4,
)

Run Agents

AgentRunner is the execution engine — it orchestrates LLM calls, tool invocations, memory retrieval, and handoffs.

Single-agent example: playground/local-shop — one agent with MCP tools over HTTP.
Multi-agent example: playground/multi-agent-shop

AgentRunner

Create one AgentRunner instance per application. It manages internal service clients and can be shared across concurrent runs.

from orchestrator.agent.runner import AgentRunner
from orchestrator.agent.config import RunnerConfig

runner = AgentRunner(
    config=RunnerConfig(
        default_max_turns=20,
        parallel_tool_calls=True,
        max_parallel_tools=5,
        circuit_breaker_threshold=5,
    )
)

runner.run()

response = await runner.run(
    agent,
    "What is my account balance?",
    user_id="user-123",
    session_id="sess-456",  # optional — loads Redis history
    conversation_id="conv-789",
    max_turns=15,
    metadata={"channel": "web"},
    tags=["prod"],
)

print(response.content)         # main text output
print(response.status)          # SUCCESS, ERROR, MAX_TURNS_REACHED …
print(response.usage.total_tokens)
print(response.latency_ms)
print(response.trace_id)        # Langfuse trace link

Streaming

Use run_stream() to yield tokens and events as they occur. Ideal for WebSocket or SSE endpoints.

from orchestrator.agent.types import EventType

async for event in runner.run_stream(agent, "Explain quantum entanglement.", user_id="u-1"):
    if event.type == EventType.CONTENT_DELTA:
        print(event.data["content"], end="", flush=True)

    elif event.type == EventType.TOOL_CALL_START:
        print(f"\n[calling tool: {event.data['tool_name']}]")

    elif event.type == EventType.RUN_END:
        print(f"\nDone — {event.data['usage']['total_tokens']} tokens")

EventType Reference

EventTypedata keys
RUN_STARTrun_id, agent_name
CONTENT_DELTAcontent (text chunk)
CONTENT_COMPLETEcontent (full response)
TOOL_CALL_STARTtool_name, arguments
TOOL_CALL_ENDtool_name, result
TOOL_CALL_ERRORtool_name, error
HANDOFF_STARTfrom_agent, to_agent, reason
HANDOFF_ENDto_agent, result
MEMORY_RETRIEVALquery, results
RUN_ENDstatus, usage, latency_ms
RUN_ERRORerror

Session History

Pass a session_id to automatically load and save conversation history from Redis.

# Turn 1 — user's first message
response1 = await runner.run(agent, "My name is Alice.", session_id="sess-1", user_id="u-1")

# Turn 2 — agent remembers context from Redis
response2 = await runner.run(agent, "What's my name?", session_id="sess-1", user_id="u-1")
# → "Your name is Alice."

Control how many turns are loaded with AgentConfig.session_history_turns. Set log_to_session=False in AgentConfig to disable session writes for intermediate pipeline agents.

Creating sessions with get_or_create_session()

If you want session history to persist across requests, create a session before calling runner.run(). Passing a session_id that was never created will silently fail to save or load history.

# Step 1: create or retrieve the session
session_id = await session_client.get_or_create_session(
    session_id=session_id,          # pass existing ID to resume
    user_id="user-123",
    conversation_id="conv-456",   # optional — see below
)

# Step 2: run with that session_id
response = await runner.run(
    agent=agent,
    input="Hello!",
    session_id=session_id,
    user_id="user-123",
)

How session_id is computed

get_or_create_session() derives a deterministic key from the arguments you pass:

Arguments passedComputed session_id
explicit session_idused as-is
conversation_id + user_idc:{conversation_id}:u:{user_id}
user_id onlyu:{user_id}
neitherrandom UUID

When to use conversation_id

Use conversation_id when a single user can have multiple independent chat windows. Without it, all conversations for a user share one session (u:{user_id}). With it, each window gets its own isolated session (c:{conversation_id}:u:{user_id}).

  • Chat UI projects: generate a conversation_id on the backend when the user opens a new chat window; pass it back with each request.
  • Task-based or webhook projects: use your natural entity ID (ticket ID, invoice ID, job ID) as conversation_id. Never reuse IDs across unrelated tasks.

Multi-agent session saving pattern

Every workflow agent calls runner.run() once or multiple times internally. Without intervention, each sub-agent call auto-saves a turn — creating noisy intermediate history the user never saw. Prevent this with suppress_session_log + save_turn():

async def execute(self, input_text, runner, context) -> AgentResponse:
    context.suppress_session_log = True  # blocks auto-save for ALL sub-agent runs

    response = await runner.run(
        agent=sub_agent,
        input=current_input,
        context=context,   # same context object passed every time
    )
    # ... more sub-agent calls ...

    # Save exactly one clean turn at the end
    await runner.save_turn(
        session_id=context.session_id,
        user_message=input_text,        # what user originally sent
        assistant_message=final_output, # what user actually sees
    )
Custom workflow agents: If you build a custom workflow by subclassing BaseAgent, you must follow this pattern yourself. Forgetting suppress_session_log = True will save every sub-agent turn to session history.

Deciding which agent's output is the final response

In a multi-agent workflow, you must explicitly decide which agent's output is the final response — this is what you pass to save_turn(). Here are two examples

Sequential: agents run one after another, each passing output to the next. The last agent may produce the final response:

await runner.save_turn(session_id, user_input, last_agent_response.content)

Handoff / Router: sub-agents do work and their results are injected back into the top-level agent's message list. The top-level agent then synthesizes its own final response — so the top-level agent's output is what to save, not the sub-agents' intermediate results:

await runner.save_turn(session_id, user_input, top_level_agent_response.content)
Saving the wrong output: If you save an intermediate agent's output by mistake, session history will contain turns the user never saw — and future turns will load them as prior context.

Context Management

When a conversation approaches the model's context window, Continuum automatically compresses older messages.

from orchestrator import AgentConfig, ContextManagementConfig, CompressionStrategy

agent = BaseAgent(
    name="long-conv",
    instructions="...",
    config=AgentConfig(
        context_management=ContextManagementConfig(
            compression_strategy=CompressionStrategy.SMART,  # SMART / SUMMARIZE_OLD / TRUNCATE_OLDEST
        )
    ),
)
Tip: Set CONTEXT_COMPRESSION_THRESHOLD=0.8 (default) to trigger compression at 80% of the model's context window. CONTEXT_KEEP_RECENT_MESSAGES=10 ensures the last 10 messages are never truncated.

App Lifecycle

Use OrchestratorLifecycle to initialise and cleanly shut down all shared services (Redis, Langfuse, vector store). Use Container to inject custom clients — useful in tests and multi-tenant setups.

from orchestrator.core import OrchestratorLifecycle, Container

lifecycle = OrchestratorLifecycle()
await lifecycle.startup()   # connects Redis, Langfuse, vector store

# Health checks
health = await lifecycle.health_check()
# → {"redis": "ok", "qdrant": "ok", "langfuse": "ok", "llm": "ok"}

await lifecycle.shutdown()  # flushes Langfuse, closes Redis connections
# Inject custom clients (e.g. in tests or multi-tenant setups)
from orchestrator.core import Container

container = Container()
container.set_llm_client(my_llm_client)
container.set_memory_client(my_memory_client)
container.set_session_client(my_session_client)

runner = AgentRunner(container=container)

FastAPI Server

from fastapi import FastAPI
from orchestrator.agent import BaseAgent
from orchestrator.agent.runner import AgentRunner

app = FastAPI()
runner = AgentRunner()
agent = BaseAgent(name="api-agent", instructions="You are an API assistant.")

@app.post("/chat")
async def chat(body: dict):
    response = await runner.run(
        agent,
        body["message"],
        user_id=body["user_id"],
        session_id=body.get("session_id"),
    )
    return {"reply": response.content, "session_id": body.get("session_id")}

Temporal Workers

from orchestrator.temporal import WorkerManager, AgentRegistry

registry = AgentRegistry()
registry.register(my_agent)

worker_manager = WorkerManager(agent_registry=registry)
await worker_manager.start_worker()  # connects to TEMPORAL_HOST

Input / Output Scanning

Attach scanner callables to AgentConfig to detect prompt injection, PII, or unsafe content before/after the LLM call.

def pii_scanner(text: str) -> str:
    # Replace detected emails with [REDACTED]
    import re
    return re.sub(r'\S+@\S+', '[REDACTED]', text)

agent = BaseAgent(
    name="safe-agent",
    instructions="...",
    config=AgentConfig(
        input_scanners=[pii_scanner],
        output_scanners=[pii_scanner],
        injection_detection=True,
    ),
)

Testing Guide

Continuum favors integration tests over mocks. The [dev] extra ships fakeredis, respx, and pytest-asyncio.

# conftest.py
import pytest
import fakeredis.aioredis as fakeredis
from orchestrator.core import Container

@pytest.fixture
async def container():
    c = Container()
    c.set_session_client(SessionClient(redis=fakeredis.FakeRedis()))
    return c
# test_agent.py
import pytest

@pytest.mark.asyncio
async def test_basic_response(container, real_llm_client):
    container.set_llm_client(real_llm_client)
    runner = AgentRunner(container=container)
    response = await runner.run(agent, "Say hello.", user_id="test-user")
    assert response.status.value == "success"
    assert len(response.content) > 0

Components

Continuum's building blocks — tools, memory, sessions, durable workflows.

MCP Servers

Every tool is exposed via the Model Context Protocol. Three transport types are supported:

from orchestrator.tools import MCPServerStdio, MCPServerSse, MCPServerStreamableHttp

# Spawn a subprocess (local Python script or shell command)
fs_server = MCPServerStdio(command="python", args=["-m", "mcp_filesystem"])

# Server-Sent Events (remote server)
sse_server = MCPServerSse(url="https://tools.example.com/mcp/sse")

# StreamableHTTP (recommended for production)
http_server = MCPServerStreamableHttp(url="https://tools.example.com/mcp")

agent = BaseAgent(
    name="tool-agent",
    instructions="You have access to the filesystem.",
    mcp_servers=[fs_server],
)

Passing tools explicitly with MCPUtil

If you need the tool definitions as Python objects (e.g. to inspect, filter, or pass them manually), use MCPUtil.get_function_tools():

from orchestrator.tools import MCPUtil, ToolExecutor

# Get tool definitions from a connected server
tool_defs = await MCPUtil.get_function_tools(server)
tools = [t.model_dump() for t in tool_defs]

executor = ToolExecutor({server: None})   # None = expose all tools
await executor.initialize()

agent = BaseAgent(
    name="agent",
    instructions="...",
    tools=tools,
    tool_executor=executor,
)

Tool Filtering

When you have many MCP tools, semantic tool filtering sends only the relevant subset to the LLM each turn — reducing token cost and noise.

from orchestrator.agent.config import AgentConfig, ToolAttentionConfig

agent = BaseAgent(
    name="commerce-agent",
    instructions="...",
    mcp_servers=[shop_server],    # 50+ tools
    config=AgentConfig(
        tool_attention=ToolAttentionConfig(
            enabled=True,
            max_tools=10,        # send at most 10 tools per turn
        )
    ),
)

Tool Context Injection

Some tools return a value (e.g. session_id, cart_token) that subsequent tool calls need as input. ToolContextState captures these values automatically and injects them into later calls — no agent prompt changes required.

from orchestrator.tools import MCPServerStreamableHttp

# Capture session_id from the login tool result, inject into every subsequent call
shop_server = MCPServerStreamableHttp(
    url="https://shop.example.com/mcp",
    tool_context={
        "capture": {"login": "session_id"},   # tool name → result field to capture
        "inject":  {"session_id": "session_id"}, # param name → captured key
    },
)

Run Artifacts

MCP tool responses can contain rich structured data (widgets, tables, charts). Access them via response.run_artifacts.

response = await runner.run(agent, "Show me the product catalog.")

if response.run_artifacts:
    for artifact_id, artifact in response.run_artifacts.items():
        widget_meta = artifact.get("meta")       # widget template
        structured  = artifact.get("structured_content")
        text        = artifact.get("text_content")

Long-term Memory

Continuum uses mem0 + Milvus(default) or Qdrant for persistent semantic memory. Facts are automatically extracted from conversations and stored as embeddings.

from orchestrator.agent.config import AgentMemoryConfig
from orchestrator.memory.scopes import MemoryScope

agent = BaseAgent(
    name="memory-agent",
    instructions="Remember user preferences.",
    memory_config=AgentMemoryConfig(
        search_memories=True,
        search_scope=MemoryScope.USER,
        search_limit=5,
        store_memories=True,
        store_scope=MemoryScope.USER,
        broadcast_learnings=True,  # share useful facts with other agents
    ),
)

Controlling what gets stored

Use extraction_prompt to tell mem0 exactly which facts to extract. Use pre_store_filter to remove PII or irrelevant facts after they are stored.

# extraction_prompt — override mem0's default extraction logic
AgentMemoryConfig(
    store_memories=True,
    extraction_prompt=(
        "Only extract long-term facts about the user's pets, animal preferences, "
        "and dietary needs. Do NOT store transient actions like adding to cart or searches."
    ),
)

# pre_store_filter — runs after storage; facts not returned are deleted
def remove_pii(facts: list[str]) -> list[str]:
    return [f for f in facts if "credit card" not in f]

AgentMemoryConfig(store_memories=True, pre_store_filter=remove_pii)

Memory management API

from orchestrator.core.container import get_container

memory_client = get_container().memory_client

# View all memories for a user
memories = await memory_client.get_all(user_id="user-123")
for m in memories:
    print(m.memory, m.id)

# Search by query
results = await memory_client.search("pet preferences", user_id="user-123")

# Delete a specific memory
await memory_client.delete(memory_id="abc-123")

# Delete all memories for a user (GDPR right-to-forget)
await memory_client.delete_all(user_id="user-123")
Privacy tip: Consider exposing memory management in your frontend — let users view and delete what the AI remembers about them. This matters for GDPR compliance and user trust and experience.

Memory Scopes

ScopeIsolationUse case
USERPer user_idUser preferences, personal context
AGENTPer agent nameAgent-specific domain knowledge
SHAREDCross-user, cross-agentGlobal facts, product knowledge
CONVERSATIONPer conversation_idEphemeral conversation context

IntelligentMemoryClient

A drop-in replacement for MemoryClient that adds importance scoring, time-based decay, entity extraction, and user profiles. Low-relevance or stale memories are down-weighted before being injected into the prompt.

from orchestrator.memory import IntelligentMemoryClient, IntelligenceConfig

memory = IntelligentMemoryClient(
    intelligence_config=IntelligenceConfig(
        enable_scoring=True,       # LLM scores each memory at store time
        enable_decay=True,         # recent memories get a relevance boost
        prune_threshold=0.15,      # delete memories below this score
    )
)

# Accepts strings, list of strings, or message dicts
await memory.add(
    [{"role": "user", "content": "I prefer dark mode."}],
    user_id="user-123",
)

results = await memory.search("user preferences", user_id="user-123")
add() accepts three forms: a plain string, a list of strings, or a list of {"role", "content"} message dicts. The messages-style form is recommended because it gives mem0 more context for fact extraction.

Sessions (Redis)

Short-term conversation history stored in Redis. AgentRunner handles loading and saving automatically — you only use SessionClient directly when you need to manage sessions outside a run (e.g. building a chat history UI, clearing history, debugging).

from orchestrator.session import SessionClient

session = SessionClient()

# Create or resume a session
session_id = await session.get_or_create_session(user_id="user-123")

# Read history (e.g. to display in a chat UI)
messages = await session.get_conversation_history(session_id)

# Clear messages but keep the session
await session.clear_session(session_id)

# Delete the session entirely
await session.delete_session(session_id)

Temporal Integration

Build durable workflows that survive process restarts, with automatic retries and audit trails.

from orchestrator.temporal import TemporalClient
from orchestrator.temporal.workflows import AgentWorkflow
from orchestrator.temporal.types import AgentStep, ApprovalStep, ParallelStep

steps = [
    AgentStep(agent_name="researcher", input="Analyze market trends"),
    ApprovalStep(
        description="Review analysis before proceeding",
        approvers=["manager@acme.com"],
        timeout=86400,  # 24 hours
    ),
    AgentStep(agent_name="writer"),  # receives researcher output
]

client = TemporalClient()
handle = await client.execute_workflow(AgentWorkflow, {"steps": steps})

Step Types

StepPurpose
AgentStepRun a registered agent
ApprovalStepPause for human approval (email notification)
ParallelStepRun multiple agents concurrently
ConditionalStepBranch based on a condition agent's output
WaitStepDelay execution (1 second to 7 days)

Loop Workflow (Temporal)

For iterative agentic work that must survive restarts, use LoopAgentWorkflow. The loop runs on Temporal and persists state between iterations.

from orchestrator.temporal.workflows import LoopAgentWorkflow

handle = await client.execute_workflow(
    LoopAgentWorkflow,
    {
        "agent_name": "refinement-agent",
        "initial_input": "Draft a product description.",
        "max_iterations": 5,
        "termination_condition": "output_match",
        "termination_pattern": "APPROVED",
    },
)

Human-in-the-Loop

Workflows pause at ApprovalStep and send a notification to approvers. The workflow resumes only when approved — or auto-approves after timeout.

ApprovalStep(
    description="Approve the generated email before sending to customers",
    approvers=["alice@company.com", "bob@company.com"],
    timeout=3600,           # 1 hour, then auto-expires
    auto_approve_if="low_risk",  # skip approval if condition agent returns this
)

Integrations

Continuum calls LLM providers directly via their official SDKs.

Provider Routing

Continuum routes to providers automatically based on the model string prefix — no configuration needed.

Model name prefixProviderExamples
claude-… or anthropic/…Anthropicclaude-sonnet-4-5, claude-opus-4-5
gemini/…Google Geminigemini/gemini-2.0-flash, gemini/gemini-1.5-pro
anything elseOpenAIgpt-4o, gpt-4o-mini, gpt-5, o3-mini

OpenAI

Default provider. Route by using any gpt-* model string.

agent = BaseAgent(name="gpt-agent", instructions="...", model="gpt-4o")
ModelContextNotes
gpt-5Latest, highest capability
gpt-4o128kStrong overall quality
gpt-4o-mini128kDefault model, fast & cheap
o3-miniReasoning model

Required env: OPENAI_API_KEY — also used by mem0's default embedder.

Anthropic

Route by using any claude-* or anthropic/... prefixed model string.

agent = BaseAgent(name="claude-agent", instructions="...", model="claude-sonnet-4-5")
ModelNotes
claude-sonnet-4-5Recommended — best balance
claude-opus-4-5Highest capability
claude-haiku-4-5Fastest & cheapest

Required env: ANTHROPIC_API_KEY

Google Gemini

Route by using a gemini/-prefixed model string.

agent = BaseAgent(name="gemini-agent", instructions="...", model="gemini/gemini-2.5-flash")
ModelNotes
gemini/gemini-2.5-flashFast, cost-effective
gemini/gemini-2.0-proBest Gemini quality
gemini/gemini-1.5-pro1M context window

Required env: GEMINI_API_KEY

Azure OpenAI

from orchestrator.agent import BaseAgent
from orchestrator.llm import LLMConfig
import os

config = LLMConfig(
    model="azure/gpt-4o",
    api_key=os.environ["AZURE_API_KEY"],
    api_base=os.environ["AZURE_API_BASE"],
    api_version=os.environ["AZURE_API_VERSION"],
)
agent = BaseAgent(name="azure-agent", instructions="...", llm_config=config)

The azure/ prefix routes to OpenAI's SDK with your Azure endpoint. Required env: AZURE_API_KEY, AZURE_API_BASE, AZURE_API_VERSION.

Automatic Fallback

When LLM_ENABLE_FALLBACK=true (default), any provider error transparently retries on FALLBACK_LLM_MODEL. Set it to a different provider to get cross-provider resilience:

# .env — primary OpenAI, fallback to Gemini
DEFAULT_LLM_MODEL=gpt-4o
FALLBACK_LLM_MODEL=gemini/gemini-1.5-flash
LLM_ENABLE_FALLBACK=true

Langfuse Tracing

All agent runs, LLM calls, tool invocations, and memory operations are automatically traced to Langfuse. No code changes required.

# docker-compose.yml ships Langfuse at http://localhost:3000
# .env
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=sk-lf-...
LANGFUSE_HOST=http://localhost:3000

Each response.trace_id is a direct Langfuse trace link for debugging.

Tracing Decorators

Three decorators create Langfuse spans for custom functions — all imported from orchestrator.observability.

@observe — generic span

Add custom spans to any async function:

from orchestrator.observability import observe

@observe("preprocess_input")
async def preprocess(text: str) -> str:
    return text.strip().lower()

@trace_tool — for tool / API functions

from orchestrator.observability import trace_tool

@trace_tool("search_products")
async def search_products(query: str):
    # creates a tool-type span with input/output captured
    return await db.search(query)

@trace_agent — for custom agent wrappers

from orchestrator.observability import trace_agent

@trace_agent("my-specialist")
async def run_specialist(input: str):
    # span tagged as agent-type in Langfuse
    ...
DecoratorLangfuse span typeBest for
@observespanGeneric business logic
@trace_tooltoolTool / external API call functions
@trace_agentagentCustom agent wrapper functions

Qdrant

# .env
VECTOR_STORE_PROVIDER=qdrant
QDRANT_HOST=localhost
QDRANT_PORT=6333
QDRANT_COLLECTION=orchestrator_memories

For Qdrant Cloud, also set QDRANT_API_KEY.

Milvus default

# .env (default — no change needed if using docker compose)
VECTOR_STORE_PROVIDER=milvus
MILVUS_HOST=localhost
MILVUS_PORT=19530

For Zilliz Cloud, set MILVUS_TOKEN and point MILVUS_HOST to your cloud endpoint.

Smart Inference

A cost-aware, classifier-driven routing layer that picks the optimal model per prompt. Continuum agents call one OpenAI-compatible endpoint; Smart Inference dispatches across 250+ models on 45+ providers with per-1M-token pricing, budget ledger, and dynamic output caps baked into the request path.

OpenAI-compatible Sub-millisecond overhead Classifier · Router · Budget 250+ models
TL;DR. Point Continuum at SMART_GATEWAY_URL, set agent_model="auto", and the gateway picks the cheapest model that meets the prompt's quality threshold. Switch tiers per-agent with gateway_mode="strict" | "modest" | "quality".

How it works

Every POST /v1/chat/completions request flows through a fixed middleware pipeline. Each stage is independently observable and skipped cleanly when its feature flag is off.

# Request lifecycle (Continuum agent → Smart Inference gateway → provider)

inbound request
  ─► requireValidKey            # bearer → virtualKey lookup, fail-closed 401
  ─► requestValidator           # content-type, custom-host checks
  ─► hooks (pre)                # plugin pre-hooks (PII, prompt-shield…)
  ─► memoryCache                # semantic cache (Redis, opt-in)
  ─► classifier                 # stamp complexity + domain
  ─► budget                     # pre-flight cost + reservation
  ─► router                     # model namespace → candidate list → top pick
  ─► provider handler           # native API call (OpenAI / Anthropic / Google / …)
  ─► response transform         # normalise to OpenAI shape
  ─► hooks (post)               # plugin post-hooks
  ─► observability              # flush trace to Langfuse
client ◄┘

The classifier and the router consume two signals: the model namespace from the request body, and the metadata block (session_id, trace_id, optional complexity / domain overrides).

Routing modes

Three preset modes resolve to three quality tiers. The mode is picked per-agent via gateway_mode and per-request by the model field (auto/cheap, auto/mid, auto/quality).

ModeTierOptimises forTypical pick
strictcheaplowest costsmallest model that clears the capability gate (gpt-4o-mini / gemini-flash / haiku class)
modestmidquality / cost balancemid-tier with the best q/cost — claude-sonnet, gpt-4o
qualityqualityhighest qualitytop-tier candidates available in the registry
Capped by the registry. The router can only pick from src/services/router/registry.json. Models not in the registry are unreachable via auto — pin them explicitly with <provider>/<model_id> if needed.

Wire Continuum to the Gateway

Continuum routes all LLM calls through Smart Inference when SMART_GATEWAY_URL is set. There is nothing else to import or subclass — GatewayProvider automatically replaces the per-provider clients.

Environment variables

# Continuum side — .env
SMART_GATEWAY_URL=http://localhost:8787/v1     # gateway base URL
SMART_GATEWAY_API_KEY=ck-prod-2026-05-19       # bearer (matches a virtual key)
SMART_GATEWAY_DEFAULT_MODE=modest              # strict | modest | quality

Virtual key (bearer)

The gateway authenticates the client by bearer and looks up the upstream provider key, budget, and allowed-models from conf.integrations[].

// Smart Inference side — conf.json (excerpt)
{
  "integrations": [
    {
      "provider": "anthropic",
      "slug": "dev_team_anthropic",
      "bearer_token": "ck-prod-2026-05-19",
      "credentials": { "api_key_env": "ANTHROPIC_API_KEY" },
      "budget_usd": 100,
      "allowed_models": [
        "auto", "auto/cheap", "auto/mid", "auto/quality",
        "openai/gpt-4o-mini",
        "anthropic/claude-haiku-4-5-20251001",
        "anthropic/claude-opus-4-7",
        "google/gemini-2.0-flash"
      ]
    }
  ]
}
Process env vs .env file. npm run start:node does not dotenv-load the gateway. Either set -a; source .env; set +a before starting, or pin secrets in docker-compose.yaml's environment: block. A virtual key whose api_key_env resolves to undefined is silently dropped from the index — every request to it returns 401.

Model namespace

The model field in the request body is parsed by modelResolver.ts. Five grammars are supported:

FormMeaning
autogateway-wide auto-routing, default tier
auto/<tier>gateway-wide, specific tier (cheap/mid/quality)
<provider>/autoprovider-scoped auto-routing, default tier
<provider>/auto/<tier>provider-scoped, specific tier
<provider>/<model_id>explicit pin — bypasses model selection

Agent code

From the agent's perspective, nothing changes. Set agent_model to a routing intent and optionally pick a mode.

from orchestrator.agent import BaseAgent, AgentRunner

# Auto-routing — gateway picks the model per turn.
agent = BaseAgent(
    name="shop-assistant",
    instructions="You are a friendly pet shop assistant.",
    model="auto",
    gateway_mode="modest",    # "strict" | "modest" | "quality"
)

response = await AgentRunner().run(agent, "Show me dog leashes", user_id="alice")
print(response.content)

For multi-agent flows, mix and match modes per role:

# Triage routes fast; specialist reasons carefully.
triage    = BaseAgent(name="triage", model="auto", gateway_mode="strict")
specialist = BaseAgent(name="specialist", model="auto", gateway_mode="quality")

Classifier output

When smart_inference.classifier.enabled = true (or by default in conf.defaults.json), every prompt is tagged with complexity and domain. The router uses these to filter candidates before mode/tier ranking.

FieldValuesSource
complexitysimple · medium · complexclassifier LLM (gpt-4o-mini by default) with rule fallback
domaingeneral · code · health · math · analysis · reasoning · financeclassifier LLM

You can override either via metadata:

{
  "model": "auto",
  "messages": [{"role": "user", "content": "…"}],
  "metadata": {
    "session_id": "s-001",
    "complexity": "complex",
    "domain": "code"
  }
}

Response headers

The gateway echoes its routing decision on every response. Useful for tracing and dashboards.

HeaderTypeExample
x-portkey-router-modestringmodest
x-portkey-router-complexitystringmedium
x-portkey-router-domainstringgeneral
x-portkey-router-pickerstringcategory · pareto
x-portkey-router-pool-size-beforenumber20
x-portkey-router-pool-size-afternumber14
x-portkey-router-attemptslistclaude-sonnet-4-6@anthropic:200
x-portkey-router-handoverstringnone · injected
x-portkey-budget-statestringok · warn
x-portkey-cache-statusstringHIT · MISS · DISABLED
x-portkey-trace-iduuid

Tips & gotchas

  • Handover — when the router picks a different model between turns in the same session_id, the gateway injects a one-line system note so the new model keeps prior context. Always pass metadata.session_id for multi-turn agents.
  • Cooldown — 429/503 from a candidate temporarily removes it from the pool (default 15s, honoring upstream Retry-After up to 5 min).
  • Semantic cache — cosine ≥ 0.95 hits on near-duplicate prompts. Streams and tool calls bypass cache by design. Enable via conf.cache.semantic.enabled = true.
  • Streaming tool calls — current versions of Smart Inference forward provider chunks 1:1. If you stream and use tools, aggregate tool_call argument fragments client-side until finish_reason="tool_calls".
  • Budgets — gated by budgets.enabled. When on, requests are pre-checked against the bearer's budget_usd and the integration's session/project caps.

Continuum Research

The mechanisms behind reliable agents at scale — what makes Continuum more than a thin LLM wrapper. Each topic links a runtime concern to the module that implements it, with citations to the source files.

Internals Design notes Production lessons

Design philosophy

Continuum is a runtime, not a framework. Four principles guide every module:

  1. Code-first, no YAML. Agents are Python dataclasses. The compiler is your friend.
  2. Async-native. Every I/O hop is non-blocking — LLM, MCP tools, Redis, vector store, Langfuse.
  3. Protocol-based abstractions. Replace any layer (LLM, memory, session, observability) by swapping the protocol implementation. No deep inheritance trees.
  4. Trace everything. Auto-tracing isn't a feature — it's a constraint. If a behaviour can't be traced, it shouldn't ship.

Tool Attention

An agent with 50+ MCP tools dilutes the LLM's function-calling accuracy and burns tokens on schemas it never uses. Tool Attention is a top-k semantic promotion mechanism: every turn, only the most relevant tools are sent to the LLM.

FieldTypeDefaultEffect
kint3How many tools to promote per turn
min_toolsint5Skip attention when the agent has fewer tools than this
NEED_TOOL fallbackautoIf the LLM signals a missing tool, expand the candidate set and retry
from orchestrator.tools.tool_attention.config import ToolAttentionConfig

agent = BaseAgent(
    name="ops",
    instructions="…",
    tools=large_tool_set,        # 30+ tools
    config=AgentConfig(
        tool_attention=ToolAttentionConfig(k=5, min_tools=10),
    ),
)
Side effect. Tool Attention reduces prompt tokens 30–60% on tool-heavy agents, often paying for itself in latency before quality even enters the equation.

Context compression

When the running message array approaches the model's context window, Continuum compresses older turns into a summary while preserving the most recent N exchanges verbatim.

EnvDefaultMeaning
CONTEXT_MANAGEMENT_ENABLEDtrueMaster switch
CONTEXT_COMPRESSION_THRESHOLD0.8Trigger at 80% of model's context window
CONTEXT_KEEP_RECENT_MESSAGES10Recent turns kept verbatim

Compression runs synchronously between LLM calls. The summary is stored as a system message; the original turns are dropped from the message array but kept in session history (Redis) for audit.

Instruction modifiers

Dynamic, code-driven prompt augmentation. A modifier is a callable (prompt, ctx) → prompt that runs after template variables are resolved but before the LLM call.

def tier_aware(prompt: str, ctx: RunContext) -> str:
    tier = ctx.metadata.get("user_tier", "free")
    if tier == "enterprise":
        return prompt + "\n\nThis is an enterprise user. Prioritise SLA."
    return prompt

agent = BaseAgent(
    name="support",
    instructions="You are helping {user_name}.",
    template_vars={"user_name": "Alice"},
    instruction_modifiers=[tier_aware],
)

Smart layer · model_tier routing

An alternative to Smart Inference's gateway-side router — Continuum can route inline via RouterAgent(routing_strategy="model_tier"). A tier classifier (small LLM or heuristic) reads each prompt and dispatches to one of several pre-defined model tiers.

When SMART_LAYER_ENABLED=true and the strategy is model_tier, the router:

  1. Runs the tier classifier on the prompt → returns cheap / mid / quality
  2. Looks up the tier's pinned model from RouterConfig.tier_models
  3. Falls back to FALLBACK_LLM_MODEL if the picked tier is unhealthy

Falls back to standard llm routing when SMART_LAYER_ENABLED=false.

Tier classifiers

Three classifier backends ship out of the box. Choose via LLM_ROUTE_TIER_CLASSIFIER.

ClassifierWhere it runsBest for
light_onlyregex + length heuristicszero-latency baseline; never calls an LLM
qwenHuggingFace Router API — Qwen3-4B-Instructcloud routing without spinning a model
qwen_locallocal OpenAI-compatible endpoint (MLX, vLLM)air-gapped or zero-cost

Set LLM_ROUTE_TIER_CLASSIFIER_HEURISTIC_SHORTCUT=false to skip the keyword shortcut and always run the classifier LLM.

Handoff history transfer modes

When agent A hands off to agent B, the prior conversation is rewritten for B's context. Four modes control how much history travels:

ModeWhat B seesUse when
FULLVerbatim message arraySpecialist needs all detail
SUMMARYLLM-generated summary + open questionLong conversations, narrow specialist
RECENT_NLast N turns onlyTopic just changed; old context is noise
HYBRIDSummary of older turns + last N verbatimDefault — best balance for most flows

Continuum also tracks handoff depth, detects cycles (A→B→A→B), and emits HANDOFF_RETURN events when control returns to a parent agent.

Memory isolation scopes

Long-term memory in Continuum has four orthogonal scopes. Pick per-agent for search and store separately.

ScopeKeyed byReachMulti-tenant safe?
USERuser_idPer-user across all agents✓ default
AGENTagent_namePer-agent across all usersOne-way — shared by users
SHAREDGlobal knowledge baseNo — explicitly shared
CONVERSATIONconversation_idEphemeral, single thread

IntelligentMemoryClient

A drop-in replacement for the standard MemoryClient that adds three behaviours:

  • Adaptive extraction — uses a per-agent extraction prompt to filter what gets stored (e.g. "only pet preferences, never one-off cart actions").
  • Relevance re-ranking — re-ranks recalled memories using a small LLM scorer before injecting them into the prompt.
  • Deduplication — merges semantically equivalent memories to prevent drift.

PII filtering on memory writes

Before a memory is stored, Continuum can run it through a PII scrubber that redacts emails, phone numbers, SSNs, and credit cards. Configure via the memory client:

from orchestrator.memory import MemoryClient, PIIPolicy

memory = MemoryClient(pii_policy=PIIPolicy.REDACT)

Run artifacts

MCP tools can return structuredContent — JSON payloads alongside their text output. Continuum captures these as run artifacts, exposing them on AgentResponse.artifacts for the application to consume (UI widgets, downstream pipelines, audit logs).

response = await runner.run(agent, "checkout", user_id="alice")
for art in response.artifacts:
    print(art.tool_name, art.structured_content)
    # e.g. ('checkout', {'order_id': 'ORD-92151', 'total_cents': 699})

Evaluation framework

Continuum ships with two opt-in eval stacks (pip install -e ".[eval]"):

  • DeepEval — criterion-based evaluation with customisable metrics (faithfulness, correctness, toxicity).
  • RAGAS — RAG-specific metrics (context precision, recall, answer relevance).
  • EvaluatorAgent — a specialised BaseAgent whose job is grading other agents' outputs.

Golden datasets from Langfuse

Build regression test sets directly from production traces. The orchestrator.evaluation.golden module pulls traces matching a filter (e.g. tagged good_response in the Langfuse UI) and materialises them as a pytest dataset.

from orchestrator.evaluation import build_golden_dataset

dataset = await build_golden_dataset(
    project="shop-assistant",
    tags=["good_response"],
    since="2026-04-01",
)
# Use with pytest-asyncio + DeepEval as a CI gate.

Example · Pet Shop Assistant

An end-to-end walkthrough: an agent backed by an MCP shop server, routed through Smart Inference, with Langfuse tracing and Redis-backed sessions. Copy the snippets in order and you'll have a working chat UI in under 10 minutes.

BaseAgent MCP tools Smart Inference FastAPI Web UI
Source. The complete code lives at playground/gateway-local-shop/. This page is the annotated tour.

Architecture

┌──────────────┐    HTTP    ┌────────────────┐    OpenAI    ┌──────────────────┐
│  Browser     │ ─────────► │  FastAPI UI    │              │  Smart Inference │
│  /chat       │   POST     │  (web.py :8081)│              │  Gateway :8787   │
└──────────────┘            └────────┬───────┘              └────────┬─────────┘
                                     │                               │
                                     │ Continuum AgentRunner         │ pick model
                                     ▼                               ▼
                            ┌────────────────┐              ┌──────────────────┐
                            │  BaseAgent     │  ◄── tools ──│  OpenAI /        │
                            │  (agent.py)    │              │  Anthropic /     │
                            └────────┬───────┘              │  Google          │
                                     │                      └──────────────────┘
                                     │ MCP StreamableHTTP
                                     ▼
                            ┌────────────────┐              ┌──────────────────┐
                            │  Shop server   │              │  Langfuse :3005  │
                            │  (server.py    │              │  ◄── traces ──── │
                            │   :8888)       │              └──────────────────┘
                            └────────────────┘

Session/memory:  Redis (:6380)  ·  Milvus (:19530)

1 · Prerequisites

  • Python 3.13 with a venv
  • Docker / Docker Compose
  • Node 22 LTS for the gateway build
  • At least one provider key (OPENAI_API_KEY, ANTHROPIC_API_KEY, or GEMINI_API_KEY)

2 · Spin up infra

  1. Continuum stack — Redis, Milvus, Langfuse, ClickHouse, Postgres, MinIO
    cd continuum
    docker compose up -d redis-sdk milvus milvus-etcd postgres clickhouse minio langfuse-web langfuse-worker
  2. Smart Inference gateway — build, then run from Docker (sharing Continuum's Langfuse Redis)
    cd ../continuum-backend-smart-inference
    nvm use 22 && npm install && npm run build
    docker compose up -d gateway

3 · Configure env

Continuum (continuum/.env) — point at the gateway and the local Langfuse:

# LLM / routing
SMART_GATEWAY_URL=http://localhost:8787/v1
SMART_GATEWAY_API_KEY=ck-prod-2026-05-19
SMART_GATEWAY_DEFAULT_MODE=modest

# Observability
LANGFUSE_ENABLED=true
LANGFUSE_HOST=http://localhost:3005           # continuum's docker-compose maps Langfuse to 3005
LANGFUSE_PUBLIC_KEY=pk-lf-…
LANGFUSE_SECRET_KEY=sk-lf-…

# Sessions + memory
SESSION_REDIS_PORT=6380
VECTOR_STORE_PROVIDER=milvus
MILVUS_PORT=19530

Smart Inference (continuum-backend-smart-inference/.env) — real provider keys:

OPENAI_API_KEY=sk-…
ANTHROPIC_API_KEY=sk-ant-…
GEMINI_API_KEY=AIza…

# From inside docker, reach the host's Langfuse
LANGFUSE_PUBLIC_KEY=pk-lf-…
LANGFUSE_SECRET_KEY=sk-lf-…
LANGFUSE_BASE_URL=http://host.docker.internal:3005

4 · MCP shop server

A FastMCP server exposes 5 tools (search, get, add-to-cart, view-cart, checkout) and 3 resources. The agent talks to it over StreamableHTTP.

# playground/gateway-local-shop/server.py (excerpt)
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("local-shop")

PRODUCTS = [
    {"id": "p1", "name": "Dog Food (Dry) 5kg", "price": 29.99, "animal": "dog"},
    # … more products …
]
_carts: dict[str, list] = {}

@mcp.tool()
def search_products(query: str = "", animal: str = "") -> list:
    """Filter products by query / animal."""
    return [p for p in PRODUCTS if (not animal or p["animal"] == animal)]

@mcp.tool()
def add_to_cart(session_id: str, product_id: str, quantity: int = 1) -> dict:
    cart = _carts.setdefault(session_id, [])
    cart.append({"product_id": product_id, "quantity": quantity})
    return {"cart_size": len(cart)}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(mcp.streamable_http_app(), host="0.0.0.0", port=8888)
python playground/gateway-local-shop/server.py     # MCP server on :8888

5 · Agent + config

The agent connects to the MCP server, defines memory + session policy, and uses Smart Inference auto-routing.

# playground/gateway-local-shop/agent.py (excerpt)
from orchestrator import AgentConfig, AgentMemoryConfig, AgentMemoryScope, \
    AgentRunner, BaseAgent, MCPServerStreamableHttp, ToolExecutor
from orchestrator.tools.tool_attention.config import ToolAttentionConfig
from orchestrator.tools.types import ToolContextConfig, ToolContextVariable

mcp_server = MCPServerStreamableHttp(
    params={"url": "http://localhost:8888/mcp"},
    context_config=ToolContextConfig(
        variables=[ToolContextVariable(name="session_id",
                  inject_into=["add_to_cart", "view_cart", "checkout"])]
    ),
)
await mcp_server.connect()

executor = ToolExecutor({mcp_server: None})
await executor.initialize()

agent = BaseAgent(
    name="shop-assistant",
    instructions="You are a friendly pet shop assistant.",
    model="auto",                        # gateway picks
    gateway_mode=None,                    # falls back to SMART_GATEWAY_DEFAULT_MODE
    tools=executor.get_tool_definitions(),
    tool_executor=executor,
    memory_config=AgentMemoryConfig(
        search_memories=True, store_memories=True,
        search_scope=AgentMemoryScope.USER,
        store_scope=AgentMemoryScope.USER,
    ),
    config=AgentConfig(
        max_turns=3,
        log_to_session=True,
        tool_attention=ToolAttentionConfig(k=3, min_tools=3),
    ),
)

6 · CLI or Web UI

Two ways to drive it. The CLI is one file and stops at Ctrl-C; the Web UI is a FastAPI page with a ChatGPT-style chat.

CLI loop

# cli.py
import asyncio
from agent import agent, executor

async def main():
    runner = AgentRunner(tool_executor=executor)
    runner.register_agent(agent)
    while True:
        msg = input("you ❭ ").strip()
        if not msg: break
        r = await runner.run(agent, msg, user_id="alice", conversation_id="local")
        print("agent ❭", r.content)

asyncio.run(main())

FastAPI Web UI

# web.py — POST /chat returns the agent's reply as JSON
@app.post("/chat")
async def chat(req: ChatRequest):
    response = await shop_agent.chat(
        req.message,
        user_id=req.user_id,
        conversation_id=req.conversation_id,
    )
    return {"response": response}
python playground/gateway-local-shop/web.py    # open http://localhost:8081

7 · Run a conversation

Type any of these into the chat — the agent will route to the right tool automatically.

PromptWhat the agent does
show me dog toyssearch_products(animal="dog", category="toys") → product list
add the tennis balls to my cartadd_to_cart(session_id, product_id="p5")
what's in my cart?view_cart(session_id) → table with totals
checkout pleasecheckout(session_id) → order id + receipt
Observe. Open http://localhost:3005 → your Continuum project. Every turn appears as a Langfuse trace with the user/assistant messages, every tool call as a span, and the picked model + latency on the generation.

Make it yours

  • Swap the domain — replace the MCP server with one that exposes your tools (Slack, Jira, internal APIs). The agent code doesn't change.
  • Tighten the tier — set gateway_mode="strict" to drop latency to ~2–4s/turn (cheap-tier models).
  • Persist memories — keep memory_config on and the agent will remember per-user facts across sessions.
  • Add a workflow — wrap the agent in a RouterAgent with a billing-specialist sibling and a triage agent.

Community

Continuum is built at Shyftlabs. Contributions, examples, and feedback are welcome.

Contributing

  1. Fork & clone — work in a feature branch
  2. Install dev dependencies
    pip install -e ".[dev,temporal,eval]"
  3. Write tests — unit + integration, no mocking the database
    pytest tests/ -v
  4. Lint & type-check
    ruff check src/
    mypy src/
  5. Submit a PR — describe what changed and why, reference the relevant docs section
Integration tests over mocks: Continuum integration tests hit real services (Redis, Qdrant/Milvus). Run docker compose up -d before running the test suite.