LangGraph Agents

Phase 2 agent-runtime • LangGraph 1.1.0 • HITL workflows

What is agent-runtime?

agent-runtime is a LangGraph-powered agent host running as a Docker container on port 9100. It provides stateful, interruptible agent graphs with persistent checkpoints, HITL approval workflows, and native MCP tool integration.

Unlike simple chatbot pipelines, LangGraph agents are directed graphs — each node is a Python function, each edge defines the flow of state. This means agents can branch, loop, pause for human input, and resume days later from an exact checkpoint.

Property Value
Framework LangGraph 1.1.0 (MIT)
Port 9100
Checkpointing PostgresSaver (same PostgreSQL as xbrain)
Observability Langfuse 3.x via CallbackHandler
MCP integration mcp_gateway_client.py (Phase 4)

LangGraph Architecture

LangGraph builds agent workflows as directed graphs where each node is a function and edges define the flow. xbrain uses LangGraph agents for:

Below is a complete example of a document ingestion agent graph. The graph extracts text, identifies facts, and routes to human review when confidence is low:

Python — agent graph constructionfrom langgraph.graph import StateGraph, START, END
from app.state import AgentState
from app.nodes import (
    extract_text_node,
    identify_facts_node,
    human_review_node,
    upsert_node,
    check_confidence,
)

# Example: document ingestion agent graph
workflow = StateGraph(AgentState)

workflow.add_node("extract_text", extract_text_node)
workflow.add_node("identify_facts", identify_facts_node)
workflow.add_node("human_review", human_review_node)  # HITL interrupt
workflow.add_node("upsert_to_memory", upsert_node)

workflow.add_edge(START, "extract_text")
workflow.add_edge("extract_text", "identify_facts")
workflow.add_conditional_edges("identify_facts", check_confidence,
    {"high": "upsert_to_memory", "low": "human_review"})
workflow.add_edge("human_review", "upsert_to_memory")
workflow.add_edge("upsert_to_memory", END)

# Compile with PostgresSaver for persistent checkpoints
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

async def get_graph(db_pool):
    checkpointer = AsyncPostgresSaver(db_pool)
    graph = workflow.compile(checkpointer=checkpointer)
    return graph

Human-In-The-Loop (HITL)

HITL is a core capability of xbrain's agent-runtime. When an agent reaches a low-confidence decision point, it pauses and waits for human approval before proceeding. The checkpoint is saved to PostgreSQL via PostgresSaver — the agent can resume hours or days later from exactly the same state.

The HITL flow works as follows:

  1. Agent encounters an interrupt() call inside a node function
  2. LangGraph saves the full agent state to PostgreSQL (via PostgresSaver)
  3. The agent surfaces a question to an admin in Open WebUI
  4. Admin reviews the context and approves or rejects
  5. Agent resumes from the exact checkpoint with the admin's decision injected into state
Python — HITL interrupt inside a nodefrom langgraph.types import interrupt

async def human_review_node(state: AgentState) -> AgentState:
    """This node pauses the agent and waits for human approval."""

    # Agent pauses here — state is saved to PostgreSQL via PostgresSaver
    decision = await interrupt({
        "question": "Confidence is 0.4 — should I save this as WORKING or discard?",
        "extracted_fact": state["extracted_fact"],
        "source": state["source_document"],
        "confidence": state["confidence"],
    })

    # Admin approves in Open WebUI → agent resumes from checkpoint
    # LangGraph resumes from exactly this point with the admin's decision
    state["approved_decision"] = decision
    return state

How checkpointing works

The checkpoint is stored in PostgreSQL via PostgresSaver, which uses the same PostgreSQL instance as the rest of xbrain. The agent can resume days later — the full graph state (all node outputs, intermediate values) is fully preserved. No in-memory state is lost between interrupt and resume.

Resuming an Agent

After a human has provided their decision in Open WebUI, the agent-runtime resumes the graph from the saved checkpoint using the thread_id:

Python — resume agent from PostgresSaver checkpoint# Resume with admin's decision
# The thread_id identifies the exact agent run in PostgreSQL
result = await graph.ainvoke(
    {"decision": "approved", "target_level": "WORKING"},
    config={"configurable": {"thread_id": "agent_thread_abc123"}}
)

# The graph resumes from the interrupt() point
# human_review_node receives the decision and returns updated state
# Then continues to upsert_to_memory → END

The thread_id is generated at graph start and stored alongside the human-review task. Open WebUI displays pending HITL tasks with their thread_id, allowing admins to approve and resume without additional context.

MCP Tools in Agents

Agents can invoke any registered MCP tool (scraper, calendar, Drive reader, Deck generator) via mcp_gateway_client.py. The client handles authentication, team_scope injection, and error handling transparently:

Python — calling MCP tools from an agent nodefrom app.mcp_gateway_client import MCPGatewayClient

async def use_mcp_tool(state: AgentState) -> AgentState:
    client = MCPGatewayClient(
        gateway_url="http://mcp-gateway:8080",
        team_scope=state["team_scope"],
        jwt=state["user_jwt"]
    )

    # Call the scraper MCP tool
    result = await client.call_tool(
        tool_name="scrape_url",
        params={"url": state["target_url"]}
    )

    state["scraped_content"] = result["content"]
    return state

async def enrich_with_calendar(state: AgentState) -> AgentState:
    client = MCPGatewayClient(
        gateway_url="http://mcp-gateway:8080",
        team_scope=state["team_scope"],
        jwt=state["user_jwt"]
    )

    # Fetch upcoming events to provide context
    events = await client.call_tool(
        tool_name="list_events",
        params={"days_ahead": 7}
    )

    state["calendar_context"] = events["items"]
    return state

MCP tool authorization

All MCP tool calls are routed through mcp-gateway (port 8080), which validates the JWT, enforces team_scope, and logs the call to the audit trail. Agents never call MCP servers directly — every call is mediated and audited.

CANONICAL Facts in System Prompts

Before making an LLM call, agent-runtime injects team CANONICAL facts into the system prompt. These facts are retrieved from Qdrant, filtered by team_scope and truth_level=CANONICAL, and formatted as a compact knowledge block:

Python — injecting CANONICAL facts via GET /v1/system-promptimport httpx

async def build_system_prompt(team_scope: str, jwt: str) -> str:
    """Fetch CANONICAL facts formatted for LLM system prompt."""

    async with httpx.AsyncClient() as client:
        response = await client.get(
            "http://memory-api:8000/v1/system-prompt",
            headers={
                "Authorization": f"Bearer {jwt}",
                "X-Team-Scope": team_scope,
            }
        )
        facts_block = response.text
        # Returns: "Team facts:\n- Q2 target: 2M€\n- Tech lead: Alice..."

    base_prompt = (
        "You are an AI assistant for the {team} team. "
        "Use the team facts below to ground your answers."
    ).format(team=team_scope)

    return f"{base_prompt}\n\n{facts_block}"

# Used in an agent node:
async def llm_node(state: AgentState) -> AgentState:
    system_prompt = await build_system_prompt(
        team_scope=state["team_scope"],
        jwt=state["user_jwt"]
    )
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": state["user_message"]}
    ]
    # ... call LLM with messages
    return state

Observability

All agent runs are tracked in Langfuse. Token costs, latency, node-level traces, and HITL interrupts are all visible in the Langfuse dashboard. The integration uses LangGraph's native callback system:

Python — Langfuse CallbackHandler for agent observabilityimport os
from langfuse.callback import CallbackHandler as LangfuseCallbackHandler

def get_langfuse_handler() -> LangfuseCallbackHandler:
    return LangfuseCallbackHandler(
        secret_key=os.environ["LANGFUSE_SECRET_KEY"],
        public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
        host="http://langfuse:3000",  # Internal Docker DNS
        release="1.0.0",
        session_id="agent-runtime",
    )

# Pass the handler to every graph invocation
async def run_agent(graph, state: AgentState, thread_id: str):
    langfuse_handler = get_langfuse_handler()

    result = await graph.ainvoke(
        state,
        config={
            "configurable": {"thread_id": thread_id},
            "callbacks": [langfuse_handler],
        }
    )
    return result

View token costs, latency, and traces per agent run in the Langfuse dashboard at http://langfuse:3000 (internal) or via Nginx on your public domain. Each LangGraph node appears as a separate span — HITL interrupts show as a distinctive "paused" span with the interrupt payload visible.

Required environment variables

agent-runtime requires LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, DATABASE_URL (for PostgresSaver), and ANTHROPIC_API_KEY or OPENAI_API_KEY. See the Configuration reference for the full list.

Available Agents

Agent Trigger HITL? Description
doc_ingestion POST /agents/ingest Yes (low confidence) PDF/text → extract facts → upsert to memory-api
promotion_workflow POST /agents/promote Yes (always) WORKING → VALIDATED → CANONICAL with admin approval
rag_conversation POST /agents/chat No User message → search memory → enrich prompt → respond
github_sync GitHub webhook No brain.yaml push → ingest repo content at specified truth level