Architecture¶
System-level reference for KubeIntellect: how the FastAPI app, LangGraph graph, tools, HITL gate, and streaming layer fit together. Read this first; then drill into per-subsystem deep dives.
Related docs
- Agent behaviors — five additive coordinator behaviors (kubectl error hints, snapshot bias, gather-then-conclude, playbook library, visible plan)
- Reflexion subsystem — cluster-scoped outcome learning, cooldown/decay/verification gates
- Configuration — env vars, feature flags, deployment values
- Security — auth, RBAC, namespace/resource blocks
Overview¶
KubeIntellect is a FastAPI + LangGraph multi-agent system for Kubernetes operations. It accepts natural-language queries via HTTP/SSE, runs LLM-orchestrated investigations against a live cluster, and streams typed events (status, tool calls, tokens) back to the client. Write operations are gated behind human-in-the-loop approval.
System Components¶
Browser / CLI (kq)
│ HTTP POST /v1/chat/completions
│ SSE GET /v1/chat/stream/{session_id}
▼
FastAPI app (app/main.py)
│
├── Auth middleware (API key → role)
├── Chat completions endpoint
├── SSE streaming endpoint
└── asyncio.create_task → run_session()
│
LangGraph graph
(workflow.py)
LangGraph Graph Topology¶
START
│
▼
memory_loader — load pinned cluster context from DB (or Redis)
│
▼
context_fetcher — parallel kubectl pods + warning events pre-fetch
│
▼
coordinator — LLM decision + tool calls (ReAct loop)
│
├─── TARGETED_sentinel ──► targeted_investigator ──► coordinator
│ (3 parallel reads: describe, events, deploys)
│
├─── RCA_REQUIRED ──────► subagent_executor x4 (parallel fan-out via Send)
│ pod | metrics | logs | events
│ │
│ coordinator (synthesis)
│
└─── direct answer ──────► END
Routing logic (route_coordinator)¶
| State condition | Route |
|---|---|
targeted_investigation is set |
targeted_investigator |
rca_required = True |
list[Send] → 4× subagent_executor |
rca_result is not None |
END (synthesis complete) |
findings present, no rca_result |
coordinator (synthesis mode) |
| otherwise | END (direct answer) |
Node Reference¶
memory_loader¶
- Loads pinned cluster context from Postgres/SQLite (key-value store keyed by
user_id). - Resets
findingsto[]via_findings_reducer(None)— prevents stale RCA findings bleeding into the next turn. - Emits
StatusEvent(phase="loading").
context_fetcher¶
- Runs two
kubectlcommands in parallel viaasyncio.to_thread: kubectl get pods --all-namespaceskubectl get events --all-namespaces --sort-by=.lastTimestamp --field-selector=type=Warning- Caps each output at
_SNAPSHOT_MAX_CHARS = 8_000chars. - Builds
cluster_snapshotstring injected into the coordinator's system prompt. - Why this matters: Kubernetes events contain the exact error message (e.g.,
"couldn't find key DB_URL in Secret"). With the snapshot pre-loaded, the coordinator can diagnose without additional tool calls. - Adds ~150ms to every query (2 parallel subprocess calls against local kubeconfig).
coordinator¶
Two modes, same node:
Decision mode (no findings yet):
- Builds system prompt from: _COORDINATOR_SYSTEM + optional memory_context + cluster_snapshot.
- Trims session history to last _MAX_SESSION_MESSAGES = 20 messages (prevents Azure 400 errors from context overflow).
- Calls create_react_agent (LangGraph prebuilt) with 3 tools: run_kubectl, query_prometheus, query_loki.
- Instructs LLM to emit parallel tool calls in one response for independent operations.
- Parses response for sentinels: RCA_REQUIRED or TARGETED: namespace=X, pod=Y, issue=Z.
- Applies _trim_tool_messages() to all new ToolMessage objects before writing to state (caps at 2,000 chars per message).
Synthesis mode (findings present):
- Formats all 4 subagent findings as XML.
- Single LLM call (no ReAct loop) to synthesize into structured RCAResult JSON.
- Returns AIMessage with human-readable RCA summary.
targeted_investigator¶
- Triggered when coordinator emits
TARGETED: namespace=X, pod=Y, issue=Z. - Runs 3 parallel reads via
asyncio.to_thread: kubectl describe pod <pod> -n <ns>kubectl get events -n <ns> --sort-by=.lastTimestampkubectl get deployments -n <ns>- Appends findings to
cluster_snapshotand clearstargeted_investigation. - Returns to coordinator with enriched snapshot for final answer.
- Design rationale: Avoids a full 4-subagent RCA fan-out for single-resource issues. Adds 1 extra LLM round-trip but uses 3 parallel reads instead of 4× subagent overhead.
- Note: In practice the LLM often makes parallel tool calls directly in the ReAct loop for single-pod issues (more efficient — no extra round-trip). The TARGETED path handles cases where the LLM decides pre-enrichment is needed before answering.
subagent_executor¶
- LangGraph node wrapping
run_subagent(). - Receives a
SubagentInputpayload (domain, session_id, messages, evidence_bundle). - Emits
StatusEvent(phase="investigating"). - Returns
{"findings": [AgentFinding]}— accumulated by_findings_reducer.
run_subagent (in nodes/subagent.py)¶
- Each of 4 domains (pod, metrics, logs, events) has a domain-specific system prompt.
- Receives
evidence_bundle(the cluster snapshot) in system prompt — avoids duplicatekubectl get podscalls. - Uses
gpt-4o-mini(Azure) orOPENAI_SUBAGENT_MODELfor cost efficiency. - Returns structured
AgentFinding(domain, signals, hypothesis, confidence 0-1, evidence, tool_calls_made).
State Schema (AgentState)¶
| Field | Type | Description |
|---|---|---|
messages |
Annotated[list[BaseMessage], add_messages] |
Full conversation history. add_messages reducer appends (never overwrites). |
memory_context |
str |
Pinned cluster context loaded by memory_loader. |
cluster_snapshot |
str |
Pre-fetched by context_fetcher. Injected into coordinator system prompt. Reset each turn. |
findings |
Annotated[list[AgentFinding], _findings_reducer] |
Subagent findings. Custom reducer: None resets, list appends. |
rca_required |
bool |
Set by coordinator when LLM emits RCA_REQUIRED. |
rca_result |
dict \| None |
Final synthesized RCA (stored as dict to avoid LangGraph msgpack warning). |
targeted_investigation |
dict[str, str] \| None |
{namespace, pod, issue} set by coordinator when LLM emits TARGETED sentinel. Cleared by targeted_investigator. |
pending_hitl |
dict[str, Any] \| None |
HITL state: {action_id, command, risk_level, human_summary}. |
session_id |
str |
Per-conversation ID (maps to LangGraph thread_id). |
user_id |
str |
API key-derived user identifier. |
user_role |
str |
superadmin \| admin \| operator \| readonly |
Tool Architecture¶
All tools live in app/tools/. Registered in app/tools/registry.py as ALL_TOOLS.
run_kubectl¶
- Runs
kubectlas a subprocess with configurable timeout. - Role-based access control:
readonly: read-only verbs only (get, describe, logs, top, explain, diff, version, api-resources, api-versions)operator: read + medium-risk writes (create, apply, scale, set, rollout, exec, port-forward, cp, attach, label, annotate, taint, cordon, uncordon, drain)admin: read + medium + high-risk (patch, delete, replace, edit, run, expose, autoscale, config, certificate, cluster-info, proxy)superadmin: all verbs, bypasses namespace block (but still blocks secrets/serviceaccounts)- Namespace block:
KUBECTL_BLOCKED_NAMESPACESenv var (default: kubeintellect, monitoring, kube-system, etc.) - Resource block:
KUBECTL_BLOCKED_RESOURCESenv var (default: secret, serviceaccount) - HITL gate: destructive commands trigger
interrupt()→HITLRequired→ pauses graph → resumes viaCommand(resume=bool)
query_prometheus¶
- PromQL queries against
PROMETHEUS_URL. range_minutes=0→ instant query (current snapshot).range_minutes>0→ range query.- Returns empty/error string when
PROMETHEUS_URLis not configured.
query_loki¶
- LogQL queries against
LOKI_URL. sinceparameter accepts duration strings (1h,2d).- Returns empty/error string when
LOKI_URLis not configured.
HITL (Human-in-the-Loop) Flow¶
LLM decides to run a destructive kubectl command
│
▼
run_kubectl checks risk level via _classify_risk()
│
▼ (medium/high risk)
HITLRequired exception raised
│
▼
LangGraph interrupt() — pauses graph at checkpoint
│
▼
SSE stream emits HitlRequestEvent {risk_level, command, stdin_yaml}
│
▼
User sees approval prompt in UI/CLI
│
┌───── yes ──────┐
│ │
▼ ▼
Command(resume=True) Command(resume=False)
│ │
▼ ▼
kubectl runs Denied — no-op
The graph is checkpointed at the interrupt point. The next user message is checked for an active interrupt (graph.aget_state()) and routed to Command(resume=...) instead of _fresh_turn_state().
Streaming Architecture¶
Every query runs as asyncio.create_task(run_session(...)). The task writes to a per-session asyncio.Queue via emit(). The SSE endpoint reads from this queue and yields Server-Sent Events.
Typed events emitted¶
| Event | When |
|---|---|
StatusEvent |
Node entry (memory_loader, context_fetcher, coordinator, subagent_executor) |
ToolCallEvent |
on_tool_start from LangGraph astream_events |
ToolResultEvent |
on_tool_end from LangGraph astream_events (first 500 chars) |
TokenEvent |
on_chat_model_stream — each LLM token chunk |
PlanEvent |
Coordinator emits INVESTIGATION_PLAN: block; parsed out and re-emitted as structured steps |
HitlRequestEvent |
HITL interrupt detected after stream ends |
ErrorEvent |
Any unhandled exception in run_session |
Important: ToolResultEvent uses the raw tool output from on_tool_end (fires BEFORE _trim_tool_messages). Users see full tool output; only the LLM re-ingestion is trimmed.
Structured Logging and Observability¶
All log output is JSON (LOG_FORMAT=json). Every line carries time, level, logger, request_id, and any extra={} fields the caller passes. The JSON formatter emits all extra fields — there is no fixed allowlist.
Three key production events are logged at INFO level and carry a session_id field, making them queryable by session in Loki:
| Logger event | Key fields | Source |
|---|---|---|
context_fetcher: snapshot_complete |
session_id, cluster_id, snapshot_has_issues, snapshot_has_warnings, snapshot_pod_count, matched_playbooks |
context_fetcher.py |
coordinator: routing_decision |
session_id, routing_decision (direct|TARGETED|RCA), elapsed_ms |
coordinator.py |
run_kubectl: hitl_classification |
session_id, hitl_verb, hitl_risk_level, hitl_cmd |
kubectl_tool.py |
Querying by session in Loki:
The session_id is the same value set in the SSE stream and stored in Langfuse traces, so all three observability signals (logs, traces, metrics) are correlatable by a single ID.
Context Management¶
Session trimmer (_trim_session_messages)¶
Caps session history at _MAX_SESSION_MESSAGES = 20 before passing to LLM. Always advances to the first HumanMessage in the window — prevents Azure 400 errors from orphaned ToolMessage without matching AIMessage(tool_calls).
Tool output trimmer (_trim_tool_messages)¶
Applied to all new ToolMessage objects before writing to state (AgentState.messages).
Strategy:
- kubectl table output (has NAME column): header + first 30 rows + any rows matching error|warning|failed|pending|oomkilled|crashloop|backoff|imagepull|containercreating
- logs / describe / prometheus / loki: first 60 lines
- Hard cap: 2,000 chars per message
Auth & RBAC¶
Four-tier role model, enforced at API layer:
| Role | Capabilities | HITL |
|---|---|---|
superadmin |
All operations, bypass namespace block | Write ops still gated |
admin |
High + medium risk ops, infra namespace writes blocked | Required for medium/high risk |
operator |
Medium risk only (create, apply, scale, exec…) | Required for medium risk |
readonly |
Read-only verbs only | N/A (writes rejected before LLM) |
API key → role mapping via KUBEINTELLECT_*_KEYS env vars (comma-separated). Optional HMAC backend for demo keys (signed, expiry-checked, no list needed).
Checkpointing & Storage¶
LangGraph checkpointer persists full AgentState between turns:
- Production:
AsyncPostgresSaver(Postgres viaDATABASE_URLorPOSTGRES_*vars) - Local/dev:
AsyncSqliteSaver(USE_SQLITE=true, path:~/.kubeintellect/kubeintellect.db)
Every turn begins with _fresh_turn_state() which resets transient fields (cluster_snapshot, rca_required, rca_result, targeted_investigation, pending_hitl) while preserving the messages accumulation via add_messages reducer.
Memory context (memory_loader) is loaded from a separate cluster_memory table, keyed by user_id. This is pinned, human-edited context (e.g., "default namespace is production") injected into every coordinator system prompt.
LLM Configuration¶
| Setting | Default | Description |
|---|---|---|
LLM_PROVIDER |
azure |
azure or openai |
AZURE_COORDINATOR_DEPLOYMENT |
gpt-4o |
Coordinator model (Azure) |
AZURE_SUBAGENT_DEPLOYMENT |
gpt-4o-mini |
Subagent model (Azure) |
AZURE_OPENAI_API_VERSION |
2024-10-01-preview |
Enables Azure prefix caching |
OPENAI_COORDINATOR_MODEL |
gpt-4o |
Coordinator model (direct OpenAI) |
OPENAI_SUBAGENT_MODEL |
gpt-4o-mini |
Subagent model (direct OpenAI) |
Azure prefix caching: With 2024-10-01-preview, the static _COORDINATOR_SYSTEM prefix (~800 tokens) is cached after the first call. Each subsequent call in the same deployment saves ~800 tokens of input cost.
Observability¶
- Langfuse: Optional. Set
LANGFUSE_ENABLED=true+ keys. Every LLM call, tool call, and subagent span is traced. ConfigureLANGFUSE_HOST(e.g.,langfuse-web.monitoring.svc.cluster.local:3000). - Prometheus: App exposes
/metrics(Starlette Prometheus middleware). Scrape target for the monitoring stack. - Loki: App logs are picked up by Promtail/Loki stack in the
monitoringnamespace.
All observability services install to the monitoring namespace. URLs wired via Helm values.yaml → ConfigMap → env vars.
Deployment¶
Helm (Kubernetes)¶
deploy/helm/kubeintellect/
├── Chart.yaml
├── values.yaml ← single source of truth for all config
├── templates/
│ ├── deployment.yaml
│ ├── service.yaml
│ ├── ingress.yaml
│ ├── configmap.yaml
│ └── ...
Key values:
- config.prometheusUrl, config.lokiUrl — injected as env vars via ConfigMap
- config.blockedNamespaces, config.blockedResources — RBAC guards
- hotReload.enabled — mounts app/ as a hostPath for dev (watchfiles restarts on change)
- ingress.hosts — list of hostnames served
Health probes¶
readinessProbe:
path: /healthz
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 10 # raised from 3 — event loop can be briefly loaded under parallel kubectl calls
failureThreshold: 3
livenessProbe:
path: /healthz
initialDelaySeconds: 15
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 3
Key Design Decisions¶
Why create_react_agent instead of a custom ReAct loop?¶
LangGraph's prebuilt create_react_agent natively supports parallel tool calls when the LLM returns multiple tool_calls in one AIMessage. The LLM handles execution ordering; we just instruct it to emit independent calls together. No custom loop needed.
Why pre-fetch cluster snapshot before LLM runs?¶
Kubernetes puts the exact error message in events: "couldn't find key DB_URL in Secret scenario-test/app-secrets". By pre-fetching events before any LLM call, the coordinator can diagnose without additional tool calls. On a healthy cluster, warning events are near-empty — the cost is minimal (~150ms for 2 parallel subprocess calls).
Why trim tool output before storing in state?¶
add_messages accumulates every ToolMessage in the session. Without trimming, a single kubectl get pods --all-namespaces call can add 4,000+ chars to every future turn's prompt. The trimmer caps at 2,000 chars; the SSE stream still shows users the full output (fired by on_tool_end, before trimming).
Why store rca_result as dict instead of RCAResult?¶
LangGraph's msgpack checkpointer can't serialize unregistered Pydantic models — it logs Deserializing unregistered type warnings. Storing rca.model_dump() (plain dict) avoids this entirely.
Why separate coordinator and subagent_executor nodes?¶
coordinator runs as a standard node (full AgentState). Subagents run as Send payloads (SubagentInput) with isolated state — no session history, just the current query + evidence bundle. This prevents the 6,700-token session history from being passed to subagents, which caused them to respond in prose instead of JSON.
Phase A+B regression-fix highlights¶
Snapshot of measurable improvements after the Phase A+B refactors landed (2026-04-26). These are point-in-time before/after deltas for the regressions those phases targeted, not steady-state performance numbers — current numbers will differ as the agent and tooling continue to evolve.
| Metric | Before | After |
|---|---|---|
| Subagent confidence (RCA) | 0.00 (empty JSON) | 0.80–0.90 |
Duplicate kubectl get pods per RCA |
4 | 0 |
| CreateContainerConfigError diagnosis | Fails (tries to read blocked secret) | Correct (reads events snapshot) |
| Multi-tool query (parallel) | 3 sequential loops | 1 AIMessage with 3 tool_calls |
| Session history growth per turn | +4,442 tokens (ISS-01) | ≤500 tokens |
| context_fetcher latency | N/A | ~150ms |
File Map¶
app/
├── agent/
│ ├── nodes/
│ │ ├── context_fetcher.py — pre-fetch pods + events, cluster_id, playbook trigger match
│ │ ├── coordinator.py — LLM decision + synthesis, tool trimmer, reflexion writes
│ │ ├── memory_loader.py — load pinned cluster context + failure-pattern hints
│ │ └── subagent.py — 4 specialist subagent runners
│ ├── playbooks/ — YAML playbook library + loader (10 playbooks)
│ ├── state.py — AgentState, SubagentInput, AgentFinding, RCAResult
│ ├── workflow.py — LangGraph graph, targeted_investigator, route_coordinator
│ └── hitl.py — HITL approval/denial detection
├── api/
│ ├── middleware.py — auth, logging
│ └── v1/endpoints/
│ ├── chat_completions.py — POST /v1/chat/completions
│ ├── stream.py — GET /v1/chat/stream/{session_id}
│ ├── health.py — GET /healthz
│ └── memory.py — CRUD for pinned context
├── core/
│ ├── config.py — Settings (pydantic-settings, ~/.kubeintellect/.env)
│ └── llm.py — LLM factory (Azure/OpenAI), Langfuse callbacks
├── streaming/
│ └── emitter.py — per-session queue, typed events, SSE serialization
├── tools/
│ ├── kubectl_tool.py — run_kubectl with RBAC + HITL
│ ├── kubectl_errors.py — error-pattern → one-line hint mapper (15 patterns)
│ ├── prometheus_tool.py — query_prometheus
│ ├── loki_tool.py — query_loki
│ └── registry.py — ALL_TOOLS list
├── utils/
│ └── redact.py — secret/PII scrubber for stored outcomes
├── cluster_id.py — stable cluster fingerprint (kube-apiserver + namespace count hash)
└── db/
├── audit.py — audit log (writes to Postgres)
├── memory_store.py — rca_outcomes / failure_patterns reads + writes (reflexion)
└── memory.py — pinned context CRUD