LangGraph Agents
What is agent-runtime?
agent-runtime is a LangGraph-powered agent host running as a Docker container on port 9100. It provides stateful, interruptible agent graphs with persistent checkpoints, HITL approval workflows, and native MCP tool integration.
Unlike simple chatbot pipelines, LangGraph agents are directed graphs — each node is a Python function, each edge defines the flow of state. This means agents can branch, loop, pause for human input, and resume days later from an exact checkpoint.
| Property | Value |
|---|---|
| Framework | LangGraph 1.1.0 (MIT) |
| Port | 9100 |
| Checkpointing | PostgresSaver (same PostgreSQL as xbrain) |
| Observability | Langfuse 3.x via CallbackHandler |
| MCP integration | mcp_gateway_client.py (Phase 4) |
LangGraph Architecture
LangGraph builds agent workflows as directed graphs where each node is a function and edges define the flow. xbrain uses LangGraph agents for:
- Document ingestion agents (PDF → structured facts)
- Truth-level promotion workflows (proposal → approval)
- RAG-enriched conversation agents (search → enrich → respond)
- Multi-step task agents with HITL checkpoints
Below is a complete example of a document ingestion agent graph. The graph extracts text, identifies facts, and routes to human review when confidence is low:
Python — agent graph constructionfrom langgraph.graph import StateGraph, START, END
from app.state import AgentState
from app.nodes import (
extract_text_node,
identify_facts_node,
human_review_node,
upsert_node,
check_confidence,
)
# Example: document ingestion agent graph
workflow = StateGraph(AgentState)
workflow.add_node("extract_text", extract_text_node)
workflow.add_node("identify_facts", identify_facts_node)
workflow.add_node("human_review", human_review_node) # HITL interrupt
workflow.add_node("upsert_to_memory", upsert_node)
workflow.add_edge(START, "extract_text")
workflow.add_edge("extract_text", "identify_facts")
workflow.add_conditional_edges("identify_facts", check_confidence,
{"high": "upsert_to_memory", "low": "human_review"})
workflow.add_edge("human_review", "upsert_to_memory")
workflow.add_edge("upsert_to_memory", END)
# Compile with PostgresSaver for persistent checkpoints
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
async def get_graph(db_pool):
checkpointer = AsyncPostgresSaver(db_pool)
graph = workflow.compile(checkpointer=checkpointer)
return graph
Human-In-The-Loop (HITL)
HITL is a core capability of xbrain's agent-runtime. When an agent reaches a low-confidence decision point, it pauses and waits for human approval before proceeding. The checkpoint is saved to PostgreSQL via PostgresSaver — the agent can resume hours or days later from exactly the same state.
The HITL flow works as follows:
- Agent encounters an
interrupt()call inside a node function - LangGraph saves the full agent state to PostgreSQL (via PostgresSaver)
- The agent surfaces a question to an admin in Open WebUI
- Admin reviews the context and approves or rejects
- Agent resumes from the exact checkpoint with the admin's decision injected into state
Python — HITL interrupt inside a nodefrom langgraph.types import interrupt
async def human_review_node(state: AgentState) -> AgentState:
"""This node pauses the agent and waits for human approval."""
# Agent pauses here — state is saved to PostgreSQL via PostgresSaver
decision = await interrupt({
"question": "Confidence is 0.4 — should I save this as WORKING or discard?",
"extracted_fact": state["extracted_fact"],
"source": state["source_document"],
"confidence": state["confidence"],
})
# Admin approves in Open WebUI → agent resumes from checkpoint
# LangGraph resumes from exactly this point with the admin's decision
state["approved_decision"] = decision
return state
How checkpointing works
The checkpoint is stored in PostgreSQL via PostgresSaver, which uses the same PostgreSQL instance as the rest of xbrain. The agent can resume days later — the full graph state (all node outputs, intermediate values) is fully preserved. No in-memory state is lost between interrupt and resume.
Resuming an Agent
After a human has provided their decision in Open WebUI, the agent-runtime resumes the
graph from the saved checkpoint using the thread_id:
Python — resume agent from PostgresSaver checkpoint# Resume with admin's decision
# The thread_id identifies the exact agent run in PostgreSQL
result = await graph.ainvoke(
{"decision": "approved", "target_level": "WORKING"},
config={"configurable": {"thread_id": "agent_thread_abc123"}}
)
# The graph resumes from the interrupt() point
# human_review_node receives the decision and returns updated state
# Then continues to upsert_to_memory → END
The thread_id is generated at graph start and stored alongside the
human-review task. Open WebUI displays pending HITL tasks with their thread_id,
allowing admins to approve and resume without additional context.
MCP Tools in Agents
Agents can invoke any registered MCP tool (scraper, calendar, Drive reader, Deck generator)
via mcp_gateway_client.py. The client handles authentication, team_scope
injection, and error handling transparently:
Python — calling MCP tools from an agent nodefrom app.mcp_gateway_client import MCPGatewayClient
async def use_mcp_tool(state: AgentState) -> AgentState:
client = MCPGatewayClient(
gateway_url="http://mcp-gateway:8080",
team_scope=state["team_scope"],
jwt=state["user_jwt"]
)
# Call the scraper MCP tool
result = await client.call_tool(
tool_name="scrape_url",
params={"url": state["target_url"]}
)
state["scraped_content"] = result["content"]
return state
async def enrich_with_calendar(state: AgentState) -> AgentState:
client = MCPGatewayClient(
gateway_url="http://mcp-gateway:8080",
team_scope=state["team_scope"],
jwt=state["user_jwt"]
)
# Fetch upcoming events to provide context
events = await client.call_tool(
tool_name="list_events",
params={"days_ahead": 7}
)
state["calendar_context"] = events["items"]
return state
MCP tool authorization
All MCP tool calls are routed through mcp-gateway (port 8080), which validates the JWT, enforces team_scope, and logs the call to the audit trail. Agents never call MCP servers directly — every call is mediated and audited.
CANONICAL Facts in System Prompts
Before making an LLM call, agent-runtime injects team CANONICAL facts into the system
prompt. These facts are retrieved from Qdrant, filtered by team_scope and
truth_level=CANONICAL, and formatted as a compact knowledge block:
Python — injecting CANONICAL facts via GET /v1/system-promptimport httpx
async def build_system_prompt(team_scope: str, jwt: str) -> str:
"""Fetch CANONICAL facts formatted for LLM system prompt."""
async with httpx.AsyncClient() as client:
response = await client.get(
"http://memory-api:8000/v1/system-prompt",
headers={
"Authorization": f"Bearer {jwt}",
"X-Team-Scope": team_scope,
}
)
facts_block = response.text
# Returns: "Team facts:\n- Q2 target: 2M€\n- Tech lead: Alice..."
base_prompt = (
"You are an AI assistant for the {team} team. "
"Use the team facts below to ground your answers."
).format(team=team_scope)
return f"{base_prompt}\n\n{facts_block}"
# Used in an agent node:
async def llm_node(state: AgentState) -> AgentState:
system_prompt = await build_system_prompt(
team_scope=state["team_scope"],
jwt=state["user_jwt"]
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": state["user_message"]}
]
# ... call LLM with messages
return state
Observability
All agent runs are tracked in Langfuse. Token costs, latency, node-level traces, and HITL interrupts are all visible in the Langfuse dashboard. The integration uses LangGraph's native callback system:
Python — Langfuse CallbackHandler for agent observabilityimport os
from langfuse.callback import CallbackHandler as LangfuseCallbackHandler
def get_langfuse_handler() -> LangfuseCallbackHandler:
return LangfuseCallbackHandler(
secret_key=os.environ["LANGFUSE_SECRET_KEY"],
public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
host="http://langfuse:3000", # Internal Docker DNS
release="1.0.0",
session_id="agent-runtime",
)
# Pass the handler to every graph invocation
async def run_agent(graph, state: AgentState, thread_id: str):
langfuse_handler = get_langfuse_handler()
result = await graph.ainvoke(
state,
config={
"configurable": {"thread_id": thread_id},
"callbacks": [langfuse_handler],
}
)
return result
View token costs, latency, and traces per agent run in the Langfuse dashboard at
http://langfuse:3000 (internal) or via Nginx on your public domain.
Each LangGraph node appears as a separate span — HITL interrupts show as a distinctive
"paused" span with the interrupt payload visible.
Required environment variables
agent-runtime requires LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY,
DATABASE_URL (for PostgresSaver), and ANTHROPIC_API_KEY or
OPENAI_API_KEY. See the
Configuration reference for the full list.
Available Agents
| Agent | Trigger | HITL? | Description |
|---|---|---|---|
doc_ingestion |
POST /agents/ingest | Yes (low confidence) | PDF/text → extract facts → upsert to memory-api |
promotion_workflow |
POST /agents/promote | Yes (always) | WORKING → VALIDATED → CANONICAL with admin approval |
rag_conversation |
POST /agents/chat | No | User message → search memory → enrich prompt → respond |
github_sync |
GitHub webhook | No | brain.yaml push → ingest repo content at specified truth level |