diff --git a/VERSION b/VERSION index d77878cf..ce106c98 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.4.4.4 +0.4.4.5 diff --git a/docs/orchestrator-agentic-loop.md b/docs/orchestrator-agentic-loop.md new file mode 100644 index 00000000..d7e992f9 --- /dev/null +++ b/docs/orchestrator-agentic-loop.md @@ -0,0 +1,357 @@ +# Autonomous Orchestrator — the Agentic Loop + +## Overview + +The **autonomous orchestrator** (Approach D) is an agent that investigates an +incident for its *real* root cause by running an unbounded, read-only +**move-loop** — hypothesize, gather evidence, test, follow the cause across +service boundaries — until it either deterministically confirms a cause or a +safety guard stops it. + +It is a **new agent that WRAPS the fixed investigation DAG**, it does not replace +it. Where the bounded investigation (`InvestigationRunner` → the +planning/metrics/logs/infra/synthesis workflow) runs a fixed pipeline once, the +orchestrator decides its *next move* each turn and can spawn the fixed +investigation as a depth-1 **subagent** scoped to a sub-question. Deep mode +re-judges an existing report's ruled-out hypotheses; the orchestrator goes +further — it can pivot, follow dependencies, and assemble a cross-service causal +chain. + +It is **gated off** by default (`config.agent.orchestratorEnabled`, default +`false`) and is internal until validation (increment 7) completes. + +| | Bounded investigation | Deep mode | Autonomous orchestrator | +|---|---|---|---| +| Shape | Fixed DAG, one pass | Re-test ruled-out causes | Unbounded decide→act move-loop | +| Cross-service | No | No | Yes (follow-cause via the dependency graph) | +| Stop signal | End of pipeline | End of re-test set | Deterministic keystone **or** a guard | +| Cost | 1× | ~1× | 3–10× (guarded) | + +--- + +## The two decisions that define it + +**DECISION 1 — Hybrid stop (the keystone).** The agent may *propose* `conclude`, +but the loop only actually stops on a `conclude` when the leading hypothesis is +**deterministically confirmed** by the corroboration keystone +(`evaluatePrediction` → verdict `satisfied`). The LLM's self-reported confidence +is recorded in the trace but is **never** the gate. Self-confidence can *direct* +the search; it can never *end* it. + +**DECISION 2 — Safety harness.** Every other way the loop ends is a hard, +config-tunable guard (budget / depth / strikes / tool-cap / wall-clock / per-op +watchdog / abort), plus a stall detector and an absolute move backstop. Hitting +the **strike limit** is a first-class `operator-pause` outcome — the agent hands +an ambiguous call back to a human rather than guessing. + +--- + +## The move loop + +Each turn the agent's brain (`decideMove`, an LLM in prod, scripted in tests) +picks exactly one move from the read-only state. The loop checks the safety +guards **before** spending the move, dispatches the move, records a trace entry, +and repeats. + +```mermaid +flowchart TD + start([start]) --> guards{guards OK?} + guards -- aborted --> oAbort([aborted]) + guards -- tokens spent --> oBudget([budget-exhausted]) + guards -- tool cap --> oTool([tool-cap]) + guards -- wall-clock --> oWall([wall-clock]) + guards -- strikes >= max --> pause{onOperatorPause
wired?} + guards -- stalled --> oInc([inconclusive]) + guards -- ok --> decide[decideMove state] + + pause -- no, or continues spent --> oPause([operator-pause]) + pause -- continue --> reset[strikes = 0] --> decide + pause -- escalate / wait --> oPause + + decide -- null --> oExh([exhausted]) + decide --> move{move type} + + move -- hypothesize --> mH[track candidate cause
+ checkable prediction] + move -- query --> mQ[gather read-only evidence
watchdog-bounded] + move -- test --> mT[score vs evidence
via keystone] + move -- spawn-subagent --> mS[depth-1 sub-investigation
findings fold into evidence] + move -- follow-cause --> mF[sub-investigate a known
dependency neighbor] + move -- conclude --> gate{leading hypothesis
verdict == satisfied?} + + mH --> guards + mQ --> guards + mS --> guards + mF --> guards + mT -- satisfied --> reset2[standing = confirmed
strikes = 0] --> guards + mT -- contradicted / absent --> strike[standing = ruled-out
strikes++] --> guards + + gate -- no --> rej[record 'not confirmed'
keep looking] --> guards + gate -- yes --> xguard{blames an un-followed
dependency?} + xguard -- yes --> rej + xguard -- no --> oConf([confirmed]) +``` + +### The loop in pseudocode + +The same loop as `runOrchestrator` (`src/agents/orchestrator.ts`), condensed. +Note the order: **guards are checked at the top of every iteration, before the +next move is spent**, and `conclude` only ends the run when the deterministic +keystone agrees (DECISION 1) — self-reported confidence never does. + +```text +function runOrchestrator(deps): + state = { hypotheses: [], evidence: [], dependencies, followedServices: {} } + strikes = tokensSpent = toolCalls = moves = stall = operatorContinues = 0 + + while moves < MAX_MOVES: # absolute backstop (1000) + + # ── 1. Safety harness (DECISION 2) — checked BEFORE spending a move ── + if deps.signal.aborted: return finish("aborted") # operator disconnected + if tokensSpent >= maxTokens: return finish("budget-exhausted") + if toolCalls >= maxToolCalls:return finish("tool-cap") + if elapsed() >= wallClockMs: return finish("wall-clock") + if strikes >= maxStrikes: # N causes ruled out + if onOperatorPause and operatorContinues < MAX_OPERATOR_CONTINUES: + decision = await onOperatorPause(state) # increment 5 — BLOCKS on a human + if decision == "continue": + operatorContinues += 1 + strikes = 0 + continue # resume; other guards still bound it + return finish("operator-pause") # escalate / wait / timeout / no hook + if stall >= MAX_STALL: return finish("inconclusive") # no progress (8) + + # ── 2. The agent's brain picks ONE move ── + move = await deps.decideMove(state) # LLM in prod, scripted in tests + if move == null: return finish("exhausted") + moves += 1 + tokensSpent += estimateTokens(move) + + # ── 3. Act on the move ── + switch move.type: + case "hypothesize": + hypotheses.add(move.hypothesis) # + a checkable prediction + stall = 0 + + case "query": + obs = watchdog(gatherEvidence(h), opTimeoutMs) # read-only; bounded + evidence += obs; toolCalls += 1 + stall = obs.empty ? stall + 1 : 0 + + case "test": # the keystone — deterministic + verdict = evaluate(h.prediction, evidence) + if verdict == "satisfied": h.standing = "confirmed"; strikes = 0 + else: h.standing = "ruled-out"; strikes += 1 + + case "spawn-subagent" | "follow-cause": # follow-cause: known dep only + followedServices.add(move.service) + evidence += watchdog(spawnSubagent(move.service), opTimeoutMs) + + case "conclude": # DECISION 1 — hybrid stop + if lead.standing == "confirmed" and lead.lastVerdict == "satisfied": + if lead names a dependency NOT in followedServices: # inc-7 guard + record("not confirmed — investigate that dep first"); stall += 1 + else: + return finish("confirmed", lead) + else: + # self-reported confidence is recorded for the trace, never the gate + stall += 1 + + return finish("inconclusive") # hit the move backstop +``` + +### Moves + +| Move | Effect | +|---|---| +| `hypothesize` | Add a candidate root cause with a **structured, checkable** `prediction` (metric-threshold / log-pattern / infra-status / change-in-window). | +| `query` | Gather read-only evidence for a hypothesis's prediction (one MCP query, watchdog-bounded). | +| `test` | Score a hypothesis against gathered evidence via the deterministic keystone → `satisfied` resets strikes & marks `confirmed`; `contradicted`/`absent` marks `ruled-out` & increments strikes. | +| `conclude` | *Propose* done. Gated by DECISION 1 (keystone) + the cross-service guard. | +| `spawn-subagent` | Run a depth-1 scoped sub-investigation (the `quick` template) on a service; its conclusion folds back as one observation. | +| `follow-cause` | Like spawn-subagent, but **grounded**: the target MUST be a dependency-graph neighbor of the incident service, so the agent can't wander to arbitrary services. | + +The loop never throws on a bad move — unknown/out-of-range targets are traced and +skipped, so a confused LLM degrades to `inconclusive`/`exhausted` rather than +crashing. + +--- + +## Safety harness (DECISION 2) + +Guards are checked at the top of every iteration, before the next move is spent, +so a tripped limit never does "one more" expensive thing. + +| Guard | Default | Outcome on trip | +|---|---|---| +| Abort signal (operator disconnected) | — | `aborted` | +| Token budget (`maxTokens`) | 150,000 | `budget-exhausted` | +| Tool-call cap (`maxToolCalls`) | 40 | `tool-cap` | +| Wall-clock (`wallClockMs`) | 10 min | `wall-clock` | +| Consecutive rule-outs (`maxStrikes`) | 3 | `operator-pause` (interactive — see below) | +| Stall (no progress) | 8 moves | `inconclusive` | +| Subagent cap (`maxSubagents`) | 3 | move skipped | +| Per-operation watchdog (`opTimeoutMs`) | 150 s | gather/subagent abandoned (no findings), loop continues | +| Move backstop (`MAX_MOVES`) | 1000 | `inconclusive` | + +The **per-op watchdog** bounds a single hung MCP/LLM call so it can't strand the +loop *between* guard checks. The **abort signal** is checked each move and is +wired so a WebSocket disconnect stops the run instead of letting it run on +headless with no one watching. + +--- + +## Increment 5 — interactive operator-pause + +When the loop hits `maxStrikes` (N candidate causes tested and ruled out, nothing +discriminating found), it does not silently stop. It emits a WebSocket prompt and +**blocks on a human decision**: + +- **continue** → reset the strike counter and resume (the other guards still + bound the resumed run). Capped at `MAX_OPERATOR_CONTINUES` (3) so a hung or + perpetually-continuing operator can't spin the loop forever. +- **escalate to on-call** → stop with that disposition. +- **instrument & wait** → stop with that disposition. + +In v1, escalate / wait have no backend (no paging, no scheduler) — they record +the disposition in the banner. A 5-minute timeout (or a disconnect) defaults to +`escalate` so a closed tab never strands the loop. + +```mermaid +sequenceDiagram + participant Orch as Orchestrator loop + participant WS as ws-handler + participant UI as OrchestratorStream UI + participant Op as Operator + + Orch->>Orch: strikes == maxStrikes + Orch->>WS: onOperatorPause(state) + WS->>UI: orchestrator:operator_pause {strikes, hypothesesTried} + UI->>Op: render OperatorPauseCard (continue / escalate / instrument & wait) + Note over WS: pending-pause registry,
5-min timeout → escalate,
cleared on disconnect + Op->>UI: click "continue" + UI->>WS: orchestrator_decision {decision: "continue"} + WS-->>Orch: resolve → "continue" + Orch->>Orch: strikes = 0, resume +``` + +--- + +## Cross-service: follow-cause + the false-confirm guard + +The incident service's dependency-graph neighbors (resolved via +`inferDependencyGraph`, both directions) are threaded into the loop as the only +services `follow-cause` may enter. After a follow-cause returns findings, the +agent is prompted to turn them into a tested hypothesis rather than stop on the +bare lead. + +**False-confirm guard (increment 7).** Observing that a dependency is *unhealthy* +is correlational, not causal. So a `conclude` that names a dependency the agent +**never followed-cause'd into** is rejected ("blames X but never investigated +it") and the loop keeps looking. Mentions of the incident service's own behaviour +are fine. This is what stops a confident-but-wrong RCA like "checkout is down due +to a degraded payment-service" when payment-service was never actually examined. + +--- + +## Output: causal chain + trace + +On a finished run the orchestrator produces two headline artifacts beyond the +move log: + +- **Causal chain** — ordered cause→effect with **source attribution**: + `incident service → each followed dependency (+ the finding that pointed there) + → root cause (+ the prediction the keystone confirmed)`. Rendered as a vertical + stack; a service followed more than once is one link, not a repeated hop. +- **Trace summary** — a one-line run trace, e.g. + `12 moves · 5 queries · 2 subagents · confirmed at depth 1`. + +--- + +## Architecture & data flow + +The core is a **pure, fully-injected control loop** (deterministic given a +deterministic `decideMove`) — unit-testable without an LLM or MCP. The real LLM +brain, evidence gather, keystone, subagent dispatch, and WebSocket streaming are +layered around it. + +```mermaid +flowchart LR + subgraph UI["Web UI (React)"] + IP[InvestigationPane] --> OS[OrchestratorStream
stream · pause card · causal chain] + IP -- orchestrator_investigate --> APP[App ws.send] + APP -- orchestrator_decision --> IP + end + + subgraph SRV["Server"] + WS[ws-handler
runOrchestratorStreamed
+ pending-pause registry
+ concurrency guard / abort] + AD[agents.ts · orchestrate
spawnSubagent = quick investigation] + end + + subgraph CORE["Agent core"] + LLM[orchestrator-llm.ts
createLlmDecideMove
runAutonomousOrchestrator] + LOOP[orchestrator.ts
runOrchestrator — pure move-loop] + STREAM[orchestrator-stream.ts
trace→stream · causal chain · trace summary] + end + + subgraph SHARED["Reused building blocks"] + GATHER[createGatherEvidence
read-only] + KEY[evaluatePrediction
keystone] + DAG[runInvestigation
fixed DAG = subagent] + DEP[inferDependencyGraph] + end + + APP -. WebSocket .-> WS + WS --> AD --> LLM --> LOOP + LOOP --> STREAM --> WS --> OS + LLM --> GATHER + LOOP --> KEY + AD --> DAG + WS --> DEP +``` + +| Layer | File | Responsibility | +|---|---|---| +| Pure core | `src/agents/orchestrator.ts` | `runOrchestrator` — move-loop, guards, hybrid stop, operator-pause hook, watchdog, abort, cross-service guard. No LLM/MCP. | +| LLM brain + runner | `src/agents/orchestrator-llm.ts` | `createLlmDecideMove` (generateText + JSON parse — no tools/responseFormat, sidesteps the gpt-oss quirk) + `runAutonomousOrchestrator` wiring decide / gather / keystone / subagent / deps / signal. | +| Trace → UI | `src/agents/orchestrator-stream.ts` | `traceEntryToStreamEvent`, `assembleCausalChain`, `traceSummary`. | +| Adapter | `src/server/agents.ts` | `orchestrate` closure — reuses investigation providers + model; `spawnSubagent` runs the `quick` template read-only; `DEFAULT_ORCHESTRATOR_GUARDS`. | +| WebSocket | `src/server/ws-handler.ts` | `handleOrchestratorInvestigate` + `runOrchestratorStreamed`; pending-pause registry, per-connection concurrency guard, abort-on-disconnect; resolves dependency neighbors. | +| UI | `src/web/components/OrchestratorStream.tsx` (+ shared `AgentStream.tsx`) | Live move stream, working indicator, outcome banner, operator-pause card, causal-chain card, trace summary. | + +--- + +## Outcomes + +| Outcome | Meaning | +|---|---| +| `confirmed` | Hybrid stop — leading hypothesis deterministically `satisfied` (and not a guard-rejected cross-service claim). | +| `operator-pause` | Strike limit reached; handed to a human (or escalate/wait/timeout). | +| `budget-exhausted` / `tool-cap` / `wall-clock` | A resource guard tripped. | +| `exhausted` | The brain signalled no further moves. | +| `inconclusive` | Stalled (no progress) or hit the move backstop. | +| `aborted` | Caller aborted (e.g. the operator disconnected). | + +--- + +## Protocol + +**Client → server:** `orchestrator_investigate { investigationId }`, +`orchestrator_decision { investigationId, decision }`. + +**Server → client:** `orchestrator:started`, `orchestrator:step { event }`, +`orchestrator:operator_pause { strikes, hypothesesTried }`, +`orchestrator:complete { outcome, stats, causalChain, traceSummary }`, +`orchestrator:error`. + +--- + +## Configuration + +```yaml +agent: + orchestratorEnabled: false # master gate; default off (internal until inc-7 validation) +``` + +Guards default conservatively (`DEFAULT_ORCHESTRATOR_GUARDS` in +`src/server/agents.ts`) because an autonomous run costs 3–10× a normal +investigation; the budget guard is the cost backstop. diff --git a/package.json b/package.json index 4819d6bb..dce2d7d6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "dops-assistant", - "version": "0.4.4.4", + "version": "0.4.4.5", "description": "Agentic infrastructure monitoring assistant — Grafana MCP + CLI", "type": "module", "main": "dist/index.js", diff --git a/src/agents/orchestrator-llm.test.ts b/src/agents/orchestrator-llm.test.ts new file mode 100644 index 00000000..2b5f7966 --- /dev/null +++ b/src/agents/orchestrator-llm.test.ts @@ -0,0 +1,173 @@ +import { describe, it, expect } from "vitest"; +import { parseMove, buildStatePrompt, createLlmDecideMove } from "./orchestrator-llm.js"; +import type { OrchestratorState, OrchestratorGuards } from "./orchestrator.js"; +import type { LanguageModel } from "ai"; +import { LlmUnavailableError } from "./shared/llm-errors.js"; + +const guards: OrchestratorGuards = { + maxTokens: 150_000, + maxDepth: 3, + maxSubagents: 3, + maxStrikes: 3, + maxToolCalls: 40, + wallClockMs: 600_000, +}; + +const emptyState: OrchestratorState = { + hypotheses: [], + evidence: [], + dependencies: [], + depth: 0, + subagents: 0, + strikes: 0, + tokensSpent: 0, + toolCalls: 0, + elapsedMs: 0, + trace: [], +}; + +const stubModel = {} as unknown as LanguageModel; + +describe("parseMove", () => { + it("parses a hypothesize move with a valid prediction", () => { + const m = parseMove('{"move":"hypothesize","hypothesis":"oom","prediction":{"kind":"infra-status","resource":"pod","status":"OOMKilled"}}'); + expect(m).toEqual({ + type: "hypothesize", + hypothesis: { hypothesis: "oom", prediction: { kind: "infra-status", resource: "pod", status: "OOMKilled" } }, + }); + }); + + it("parses query / test / spawn-subagent / follow-cause", () => { + expect(parseMove('{"move":"query","target":2}')).toEqual({ type: "query", target: 2 }); + expect(parseMove('{"move":"test","target":0}')).toEqual({ type: "test", target: 0 }); + expect(parseMove('{"move":"spawn-subagent","service":"payments","question":"why slow?"}')).toEqual({ + type: "spawn-subagent", + service: "payments", + question: "why slow?", + }); + expect(parseMove('{"move":"follow-cause","service":"db"}')).toEqual({ type: "follow-cause", service: "db" }); + }); + + it("applies defaults for conclude confidence/rationale", () => { + expect(parseMove('{"move":"conclude","leading":1}')).toEqual({ + type: "conclude", + leading: 1, + confidence: 0.5, + rationale: "", + }); + }); + + it("treats an explicit done as null (exhausted)", () => { + expect(parseMove('{"move":"done"}')).toBeNull(); + }); + + it("extracts JSON from ```json fences and surrounding prose", () => { + expect(parseMove('Here is my move:\n```json\n{"move":"query","target":0}\n```')).toEqual({ type: "query", target: 0 }); + expect(parseMove('I think we should query. {"move":"query","target":3} done.')).toEqual({ type: "query", target: 3 }); + }); + + it("returns null for unparseable / schema-invalid output (graceful, no throw)", () => { + expect(parseMove("not json at all")).toBeNull(); + expect(parseMove("{ broken json")).toBeNull(); + expect(parseMove('{"move":"hypothesize","hypothesis":"x"}')).toBeNull(); // missing prediction + expect(parseMove('{"move":"hypothesize","hypothesis":"x","prediction":{"kind":"bogus"}}')).toBeNull(); // bad kind + expect(parseMove('{"move":"frobnicate"}')).toBeNull(); // unknown move + expect(parseMove('{"move":"query","target":-1}')).toBeNull(); // negative index + }); +}); + +describe("buildStatePrompt", () => { + it("renders budget, hypotheses with verdicts, and evidence", () => { + const state: OrchestratorState = { + ...emptyState, + tokensSpent: 5000, + toolCalls: 3, + strikes: 1, + hypotheses: [ + { hypothesis: { hypothesis: "memory exhaustion", prediction: { kind: "metric-threshold", metric: "mem", op: ">", value: 90 } }, standing: "confirmed", lastVerdict: "satisfied" }, + { hypothesis: { hypothesis: "disk pressure", prediction: { kind: "infra-status", status: "DiskPressure" } }, standing: "ruled-out", lastVerdict: "absent" }, + ], + evidence: [{ phase: "metrics", subject: "mem", value: 99 }], + }; + const prompt = buildStatePrompt("checkout-api 5xx spike", state, guards); + expect(prompt).toContain("checkout-api 5xx spike"); + expect(prompt).toContain("strikes 1/3"); + expect(prompt).toContain("[0] memory exhaustion — standing: confirmed, verdict: satisfied"); + expect(prompt).toContain("[1] disk pressure — standing: ruled-out, verdict: absent"); + expect(prompt).toContain("metrics mem = 99"); + // budget left = 150000 - 5000 + expect(prompt).toContain("145000"); + }); + + it("guides the agent when there are no hypotheses yet", () => { + const prompt = buildStatePrompt("incident", emptyState, guards); + expect(prompt).toContain("(none — start by hypothesizing"); + expect(prompt).toContain("(none yet)"); + }); + + it("lists follow-cause dependencies when present (and omits the line when empty)", () => { + const withDeps = buildStatePrompt("incident", { ...emptyState, dependencies: ["payments", "db"] }, guards); + expect(withDeps).toContain("follow-cause into: payments, db"); + const noDeps = buildStatePrompt("incident", emptyState, guards); + expect(noDeps).not.toContain("follow-cause into:"); + }); +}); + +describe("createLlmDecideMove", () => { + it("returns the parsed move from the model text (via injected callModel)", async () => { + const decide = createLlmDecideMove({ + model: stubModel, + focus: "incident", + guards, + callModel: async () => '{"move":"hypothesize","hypothesis":"oom","prediction":{"kind":"metric-threshold","metric":"mem","op":">","value":90}}', + }); + const move = await decide(emptyState); + expect(move).toEqual({ + type: "hypothesize", + hypothesis: { hypothesis: "oom", prediction: { kind: "metric-threshold", metric: "mem", op: ">", value: 90 } }, + }); + }); + + it("feeds the rendered state (focus + hypotheses) into the model prompt", async () => { + let seenPrompt = ""; + const decide = createLlmDecideMove({ + model: stubModel, + focus: "payments latency", + guards, + callModel: async (_system, prompt) => { + seenPrompt = prompt; + return '{"move":"done"}'; + }, + }); + await decide({ + ...emptyState, + hypotheses: [{ hypothesis: { hypothesis: "pool starvation", prediction: { kind: "log-pattern", pattern: "timeout" } }, standing: "open" }], + }); + expect(seenPrompt).toContain("payments latency"); + expect(seenPrompt).toContain("pool starvation"); + }); + + it("propagates LlmUnavailableError so the runner can fail cleanly", async () => { + const decide = createLlmDecideMove({ + model: stubModel, + focus: "x", + guards, + callModel: async () => { + throw new LlmUnavailableError("upstream down"); + }, + }); + await expect(decide(emptyState)).rejects.toBeInstanceOf(LlmUnavailableError); + }); + + it("degrades a non-LLM-unavailable error to null (one bad turn doesn't crash the loop)", async () => { + const decide = createLlmDecideMove({ + model: stubModel, + focus: "x", + guards, + callModel: async () => { + throw new Error("transient parse weirdness"); + }, + }); + await expect(decide(emptyState)).resolves.toBeNull(); + }); +}); diff --git a/src/agents/orchestrator-llm.ts b/src/agents/orchestrator-llm.ts new file mode 100644 index 00000000..75b0a14e --- /dev/null +++ b/src/agents/orchestrator-llm.ts @@ -0,0 +1,341 @@ +/** + * Orchestrator increment 2 — the real LLM decide-fn + headless runner. + * + * `createLlmDecideMove` is the agent's brain: given the orchestrator's + * read-only state, an LLM picks the next move. It follows the project's + * structured-output convention (generateText + JSON parse, NO tools / NO + * responseFormat — which sidesteps the gpt-oss `<|constrain|>json` quirk; see + * CLAUDE.md) and is robust to messy output (fenced / prose-wrapped JSON, schema + * drift → graceful null rather than a throw). + * + * `runAutonomousOrchestrator` wires the three injected deps of the pure core + * (orchestrator.ts) to their real implementations: decideMove → this LLM, + * gatherEvidence → createGatherEvidence (read-only by construction), evaluate → + * the evaluatePrediction keystone. Token usage from both the decide calls and + * the evidence queries feeds the budget guard. + */ +import { generateText, type LanguageModel } from "ai"; +import { z } from "zod"; +import { + runOrchestrator, + type OrchestratorMove, + type OrchestratorState, + type OrchestratorGuards, + type OrchestratorResult, + type TraceEntry, +} from "./orchestrator.js"; +import { HypothesisPredictionSchema } from "../workflows/schemas.js"; +import { createGatherEvidence } from "../workflows/steps/hypothesis-requery.js"; +import { evaluatePrediction, type CorroborationContext, type HypothesisPrediction, type NormalizedObservation } from "../workflows/steps/corroboration.js"; +import { withLlmRetry, type LlmRetryConfig } from "./shared/llm-retry.js"; +import { LlmUnavailableError } from "./shared/llm-errors.js"; +import type { MastraProvider } from "../mcp/provider.js"; + +/** The move shape the LLM emits (`move` discriminant), validated before mapping + * to the core's `OrchestratorMove` (`type` discriminant). `done` → null. */ +const LlmMoveSchema = z.discriminatedUnion("move", [ + z.object({ move: z.literal("hypothesize"), hypothesis: z.string().min(1), prediction: HypothesisPredictionSchema }), + z.object({ move: z.literal("query"), target: z.number().int().nonnegative() }), + z.object({ move: z.literal("test"), target: z.number().int().nonnegative() }), + z.object({ + move: z.literal("conclude"), + leading: z.number().int().nonnegative(), + confidence: z.number().min(0).max(1).default(0.5), + rationale: z.string().default(""), + }), + z.object({ move: z.literal("spawn-subagent"), service: z.string().min(1), question: z.string().min(1) }), + z.object({ move: z.literal("follow-cause"), service: z.string().min(1) }), + z.object({ move: z.literal("done") }), +]); + +/** Pull the first balanced JSON object out of an LLM response (tolerates code + * fences and surrounding prose). Returns the raw substring, or null. */ +function extractJsonObject(text: string): string | null { + const trimmed = text.trim(); + // Strip ```json ... ``` fences if present. + const fenced = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i); + const body = fenced ? fenced[1].trim() : trimmed; + const start = body.indexOf("{"); + if (start === -1) return null; + // Walk to the matching closing brace (string-aware) so trailing prose is ignored. + let depth = 0; + let inStr = false; + let esc = false; + for (let i = start; i < body.length; i++) { + const ch = body[i]; + if (inStr) { + if (esc) esc = false; + else if (ch === "\\") esc = true; + else if (ch === '"') inStr = false; + continue; + } + if (ch === '"') inStr = true; + else if (ch === "{") depth++; + else if (ch === "}") { + depth--; + if (depth === 0) return body.slice(start, i + 1); + } + } + return null; +} + +/** + * Parse an LLM response into an OrchestratorMove. Returns null for an explicit + * `done`, or for any unparseable / schema-invalid output (the caller treats + * null as "no move" → the loop exhausts gracefully rather than crashing). + */ +export function parseMove(text: string): OrchestratorMove | null { + const json = extractJsonObject(text); + if (!json) return null; + let raw: unknown; + try { + raw = JSON.parse(json); + } catch { + return null; + } + const parsed = LlmMoveSchema.safeParse(raw); + if (!parsed.success) return null; + const m = parsed.data; + switch (m.move) { + case "hypothesize": + return { type: "hypothesize", hypothesis: { hypothesis: m.hypothesis, prediction: m.prediction } }; + case "query": + return { type: "query", target: m.target }; + case "test": + return { type: "test", target: m.target }; + case "conclude": + return { type: "conclude", leading: m.leading, confidence: m.confidence, rationale: m.rationale }; + case "spawn-subagent": + return { type: "spawn-subagent", service: m.service, question: m.question }; + case "follow-cause": + return { type: "follow-cause", service: m.service }; + case "done": + return null; + } +} + +const SYSTEM_PROMPT = `You are an autonomous incident investigator. Each turn you choose ONE next move to find the ROOT CAUSE of an incident using read-only evidence. Reason briefly, then emit your move. + +Moves — emit EXACTLY ONE as a single JSON object (no prose, no code fence): +- {"move":"hypothesize","hypothesis":"","prediction":} + add a candidate cause with a CHECKABLE prediction. PREDICTION is one of: + {"kind":"metric-threshold","metric":"","op":">"|"<"|">="|"<=","value":} + {"kind":"log-pattern","pattern":"","present":true|false} + {"kind":"infra-status","resource":"","status":""} + {"kind":"change-in-window","withinMinutesBefore":} +- {"move":"query","target":} gather read-only evidence for that hypothesis's prediction. +- {"move":"test","target":} score that hypothesis against gathered evidence. +- {"move":"conclude","leading":,"confidence":<0..1>,"rationale":""} + propose the leading hypothesis as the root cause. +- {"move":"spawn-subagent","service":"","question":""} + run a scoped sub-investigation on a RELATED service and fold its findings + into the evidence. +- {"move":"follow-cause","service":""} + follow the incident into one of the listed dependency services (a scoped + sub-investigation there). Only valid for services in the dependencies list. +- {"move":"done"} nothing left to try. + +Rules: +- A "conclude" ONLY ends the investigation if that hypothesis was already TESTED and its evidence came back satisfied. Confidence alone never ends it. So: hypothesize → query → test BEFORE you conclude. +- If a test fails (contradicted/absent), hypothesize a different cause; don't keep retesting the same one. +- IMPORTANT: after just ONE or TWO local hypotheses fail AND a dependencies list is shown, follow-cause into a dependency instead of trying more local guesses — the fault is often in a connected service. Don't burn all your strikes locally. +- After a follow-cause or subagent returns findings, those findings are your BEST lead. Immediately hypothesize the specific cause they point to (with a checkable prediction) and test it — never stop right after following without turning the finding into a tested hypothesis. +- CROSS-SERVICE CAUSES NEED A FOLLOW-CAUSE: observing that a dependency is unhealthy is only CORRELATIONAL. To conclude that a dependency caused this incident you MUST follow-cause into it first and establish the failure there — you cannot confirm "caused by " from the incident service's metrics alone. +- Be decisive — your budget is limited. Prefer the most likely cause first. +Output ONLY the JSON object for your chosen move.`; + +/** Render the read-only state into the per-turn user prompt. */ +export function buildStatePrompt(focus: string, state: OrchestratorState, guards: OrchestratorGuards): string { + const lines: string[] = []; + lines.push(`Incident under investigation: ${focus}`); + lines.push(""); + const tokensLeft = Math.max(0, guards.maxTokens - state.tokensSpent); + const queriesLeft = Math.max(0, guards.maxToolCalls - state.toolCalls); + lines.push( + `Budget: ~${tokensLeft} output tokens, ${queriesLeft} queries left; strikes ${state.strikes}/${guards.maxStrikes} (consecutive failed tests).`, + ); + lines.push(""); + + if (state.dependencies.length > 0) { + lines.push(`Dependencies you can follow-cause into: ${state.dependencies.join(", ")}`); + lines.push(""); + } + + if (state.hypotheses.length === 0) { + lines.push("Hypotheses so far: (none — start by hypothesizing the most likely cause)"); + } else { + lines.push("Hypotheses so far:"); + state.hypotheses.forEach((h, i) => { + const v = h.lastVerdict ? `, verdict: ${h.lastVerdict}` : ", untested"; + lines.push(` [${i}] ${h.hypothesis.hypothesis} — standing: ${h.standing}${v}`); + }); + } + lines.push(""); + + if (state.evidence.length === 0) { + lines.push("Evidence gathered: (none yet)"); + } else { + lines.push(`Evidence gathered (${state.evidence.length} observations):`); + for (const o of state.evidence.slice(-12)) { + const val = o.value !== undefined ? ` = ${o.value}` : ""; + const txt = o.text ? ` (${o.text.slice(0, 60)})` : ""; + lines.push(` - ${o.phase} ${o.subject}${val}${txt}`); + } + } + lines.push(""); + + const recent = state.trace.slice(-6); + if (recent.length > 0) { + lines.push("Recent moves:"); + for (const t of recent) { + lines.push(` - ${t.move}: ${t.detail}${t.verdict ? ` [${t.verdict}]` : ""}`); + } + lines.push(""); + } + + lines.push("Pick the next move (single JSON object)."); + return lines.join("\n"); +} + +export interface UsageEvent { + outputTokens?: number; + totalTokens?: number; +} + +export interface CreateLlmDecideMoveOptions { + model: LanguageModel; + /** One-line incident description shown to the agent each turn. */ + focus: string; + /** Guards, so the prompt can show the agent its remaining budget. */ + guards: OrchestratorGuards; + llmRetry?: LlmRetryConfig; + /** Per-call idle timeout (ms); generateText has none of its own. */ + llmCallMs?: number; + /** Best-effort token accounting sink (feeds the budget guard). */ + onUsage?: (usage: UsageEvent) => void; + /** + * Test seam: return the raw model text for (system, prompt). Defaults to the + * real generateText path. Injected in unit tests so move selection can be + * verified without a live model. + */ + callModel?: (system: string, prompt: string) => Promise; +} + +/** Build the LLM-backed decide-fn for runOrchestrator. */ +export function createLlmDecideMove( + opts: CreateLlmDecideMoveOptions, +): (state: OrchestratorState) => Promise { + const retry: LlmRetryConfig = opts.llmRetry ?? { maxAttempts: 1 }; + const call = + opts.callModel ?? + (async (system: string, prompt: string): Promise => { + const { text, usage } = await withLlmRetry(() => { + const abortSignal = + opts.llmCallMs && opts.llmCallMs > 0 ? AbortSignal.timeout(opts.llmCallMs) : undefined; + return generateText({ model: opts.model, system, prompt, temperature: 0, abortSignal }); + }, retry); + if (usage) { + opts.onUsage?.({ + outputTokens: (usage as { outputTokens?: number }).outputTokens, + totalTokens: (usage as { totalTokens?: number }).totalTokens, + }); + } + return text; + }); + + return async (state) => { + const prompt = buildStatePrompt(opts.focus, state, opts.guards); + let text: string; + try { + text = await call(SYSTEM_PROMPT, prompt); + } catch (err) { + // LLM truly unavailable → propagate so the runner can fail cleanly. + // Any other error degrades to "no move" so a single bad turn doesn't crash. + if (err instanceof LlmUnavailableError) throw err; + return null; + } + return parseMove(text); + }; +} + +export interface RunAutonomousOrchestratorOptions { + /** One-line incident description (e.g. "checkout-api 5xx spike at 13:58"). */ + focus: string; + model: LanguageModel; + providers: MastraProvider[]; + guards: OrchestratorGuards; + timeRange?: { from: string; to: string }; + ctx?: CorroborationContext; + llmRetry?: LlmRetryConfig; + llmCallMs?: number; + onStep?: (entry: TraceEntry) => void; + /** Depth-1 subagent dispatch (scoped sub-investigation → observations). Wired + * by the orchestrate adapter; absent → spawn-subagent gracefully skips. */ + spawnSubagent?: (args: { service: string; question: string }) => Promise; + /** Dependency-graph neighbors of the incident service the agent may + * follow-cause into. Empty → follow-cause disabled. */ + dependencies?: string[]; + /** The incident service itself (for the cross-service confirm guard). */ + incidentService?: string; + /** Interactive strike-limit hook (increment 5). Absent → the strike limit + * stops directly. Wired by the orchestrate adapter to the WS pause card. */ + onOperatorPause?: (state: OrchestratorState) => Promise<"continue" | "escalate" | "wait">; + /** Cooperative abort (e.g. the operator disconnected) → the loop stops. */ + signal?: AbortSignal; +} + +/** + * Headless entry point: wire the LLM decide-fn + read-only evidence gather + + * keystone into the pure orchestrator loop and run it. Token usage from decide + * calls and evidence queries both feed the budget guard. + */ +export async function runAutonomousOrchestrator( + opts: RunAutonomousOrchestratorOptions, +): Promise { + let pendingTokens = 0; + const addTokens = (u: UsageEvent): void => { + pendingTokens += u.outputTokens ?? u.totalTokens ?? 0; + }; + + const gather = createGatherEvidence({ + providers: opts.providers, + model: opts.model, + timeRange: opts.timeRange, + useQuirkHandling: true, + llmRetry: opts.llmRetry, + ctx: opts.ctx, + onTokenUsage: (u: { outputTokens?: number; totalTokens?: number }) => addTokens(u), + }); + + const decideMove = createLlmDecideMove({ + model: opts.model, + focus: opts.focus, + guards: opts.guards, + llmRetry: opts.llmRetry, + llmCallMs: opts.llmCallMs, + onUsage: addTokens, + }); + + return runOrchestrator({ + decideMove, + // The core's RankedHypothesis carries prediction as Record + // (rca-types), but every prediction in play was validated against + // HypothesisPredictionSchema when the LLM emitted the hypothesize move, so + // it is a real HypothesisPrediction at runtime. Coerce at this boundary. + gatherEvidence: (h) => gather({ hypothesis: h.hypothesis, prediction: h.prediction as HypothesisPrediction }, 1), + evaluate: (prediction: HypothesisPrediction, evidence) => evaluatePrediction(prediction, evidence, opts.ctx ?? {}), + spawnSubagent: opts.spawnSubagent, + dependencies: opts.dependencies, + incidentService: opts.incidentService, + onOperatorPause: opts.onOperatorPause, + signal: opts.signal, + guards: opts.guards, + onStep: opts.onStep, + // Drain tokens accrued (decide + query) since the previous move. + estimateTokens: () => { + const t = pendingTokens; + pendingTokens = 0; + return t; + }, + }); +} diff --git a/src/agents/orchestrator-stream.test.ts b/src/agents/orchestrator-stream.test.ts new file mode 100644 index 00000000..14eb34fe --- /dev/null +++ b/src/agents/orchestrator-stream.test.ts @@ -0,0 +1,133 @@ +import { describe, it, expect } from "vitest"; +import { traceEntryToStreamEvent, assembleCausalChain, traceSummary } from "./orchestrator-stream.js"; +import type { TraceEntry } from "./orchestrator.js"; +import type { NormalizedObservation } from "../workflows/steps/corroboration.js"; + +describe("traceEntryToStreamEvent", () => { + it("maps hypothesize → a running 'proposed a cause' row", () => { + expect(traceEntryToStreamEvent({ move: "hypothesize", detail: "memory exhaustion" })).toEqual({ + verb: "proposed a cause:", + target: "memory exhaustion", + status: "running", + }); + }); + + it("maps query → a done 'gathered evidence' row", () => { + const ev = traceEntryToStreamEvent({ move: "query", detail: "memory exhaustion → +3 observations" }); + expect(ev.verb).toBe("gathered evidence"); + expect(ev.status).toBe("done"); + expect(ev.detail).toContain("+3 observations"); + }); + + it("maps a satisfied test → strong 'evidence backs'", () => { + expect(traceEntryToStreamEvent({ move: "test", detail: "memory exhaustion", verdict: "satisfied" })).toEqual({ + verb: "evidence backs", + target: "memory exhaustion", + status: "strong", + }); + }); + + it("maps a failed test → rejected 'ruled out' with a plain reason", () => { + const absent = traceEntryToStreamEvent({ move: "test", detail: "disk pressure", verdict: "absent" }); + expect(absent.verb).toBe("ruled out"); + expect(absent.status).toBe("rejected"); + expect(absent.detail).toContain("no supporting evidence"); + + const contradicted = traceEntryToStreamEvent({ move: "test", detail: "leak", verdict: "contradicted" }); + expect(contradicted.detail).toContain("contradicts"); + }); + + it("maps an accepted conclude → strong 'root cause'", () => { + expect(traceEntryToStreamEvent({ move: "conclude", detail: "confirmed: memory exhaustion" })).toEqual({ + verb: "root cause:", + target: "memory exhaustion", + status: "strong", + }); + }); + + it("maps a rejected conclude → running 'kept looking'", () => { + const ev = traceEntryToStreamEvent({ + move: "conclude", + detail: "rejected — self-confidence 0.9 not backed by the keystone; continuing", + }); + expect(ev.verb).toBe("not confirmed yet — kept looking"); + expect(ev.status).toBe("running"); + }); + + it("assembles a causal chain: incident → followed deps → root cause", () => { + const trace: TraceEntry[] = [ + { move: "hypothesize", detail: "local OOM" }, + { move: "test", detail: "local OOM", verdict: "absent" }, + { move: "follow-cause", detail: "impala-catalog → +1 findings" }, + { move: "hypothesize", detail: "catalog pool starvation" }, + { move: "test", detail: "catalog pool starvation", verdict: "satisfied" }, + ]; + const chain = assembleCausalChain(trace, { hypothesis: "catalog pool starvation", prediction: {} }, "impala"); + expect(chain).toEqual([ + { label: "impala", kind: "incident" }, + { label: "impala-catalog", kind: "followed", evidence: undefined }, + { label: "root cause: catalog pool starvation", kind: "root-cause", evidence: undefined }, + ]); + }); + + it("attributes each link to its supporting finding/prediction (source attribution)", () => { + const trace: TraceEntry[] = [ + { move: "follow-cause", detail: "impala-catalog → +1 findings" }, + { move: "hypothesize", detail: "catalog pool starvation" }, + { move: "test", detail: "catalog pool starvation", verdict: "satisfied" }, + ]; + const evidence: NormalizedObservation[] = [ + { phase: "infra", subject: "impala-catalog", text: "subagent: connection pool saturated — clients blocked" }, + ]; + const chain = assembleCausalChain( + trace, + { hypothesis: "catalog pool starvation", prediction: { kind: "metric-threshold", metric: "pool_used", op: ">", value: 95 } }, + "impala", + evidence, + ); + expect(chain[1]).toEqual({ + label: "impala-catalog", + kind: "followed", + evidence: "connection pool saturated — clients blocked", + }); + expect(chain[2]).toEqual({ + label: "root cause: catalog pool starvation", + kind: "root-cause", + evidence: "confirmed by pool_used > 95", + }); + }); + + it("dedupes a service followed more than once into a single chain link", () => { + const trace: TraceEntry[] = [ + { move: "follow-cause", detail: "statestore → +1 findings" }, + { move: "follow-cause", detail: "catalog → +1 findings" }, + { move: "follow-cause", detail: "statestore → +1 findings" }, // re-followed + ]; + const chain = assembleCausalChain(trace, undefined, "impala"); + expect(chain.map((l) => l.label)).toEqual(["impala", "statestore", "catalog"]); + }); + + it("causal chain is just the incident when nothing was followed or confirmed", () => { + const chain = assembleCausalChain([{ move: "test", detail: "x", verdict: "absent" }], undefined, "impala"); + expect(chain).toEqual([{ label: "impala", kind: "incident" }]); + }); + + it("traceSummary reads as a one-line run trace", () => { + expect(traceSummary({ moves: 12, toolCalls: 5, tokensSpent: 0, strikes: 0, depth: 1, subagents: 2, elapsedMs: 0 }, "confirmed")) + .toBe("12 moves · 5 queries · 2 subagents · confirmed at depth 1"); + expect(traceSummary({ moves: 1, toolCalls: 1, tokensSpent: 0, strikes: 0, depth: 0, subagents: 0, elapsedMs: 0 }, "operator-pause")) + .toBe("1 move · 1 query · operator-pause at depth 0"); + }); + + it("maps subagent + follow-cause completions to done rows", () => { + expect(traceEntryToStreamEvent({ move: "spawn-subagent", detail: "payments: why slow? → +2 findings" })).toMatchObject({ + verb: "spun up a subagent", + status: "done", + indent: 1, + }); + expect(traceEntryToStreamEvent({ move: "follow-cause", detail: "payments → +1 findings" })).toMatchObject({ + verb: "followed the trail to", + status: "done", + }); + }); +}); diff --git a/src/agents/orchestrator-stream.ts b/src/agents/orchestrator-stream.ts new file mode 100644 index 00000000..2eba7183 --- /dev/null +++ b/src/agents/orchestrator-stream.ts @@ -0,0 +1,136 @@ +/** + * Orchestrator increment 2b — map the core's lossy TraceEntry into the shared + * AgentStreamEvent the UI renders (the same event type the deep-mode stream + * uses, so one AgentStream component serves both). + * + * Wording is plain and lead-with-takeaway (matching the deep-mode copy): the + * operator should be able to read the move log without decoding jargon. + */ +import type { TraceEntry, OrchestratorResult } from "./orchestrator.js"; +import type { AgentStreamEvent, CausalChainLink } from "../types/ws-types.js"; +import type { RankedHypothesis } from "../types/rca-types.js"; +import type { NormalizedObservation } from "../workflows/steps/corroboration.js"; + +/** Trim a finding/observation string to a compact, one-line attribution. */ +function attributionFromText(text: string): string { + const stripped = text.replace(/^subagent:\s*/i, "").trim(); + return stripped.length > 90 ? `${stripped.slice(0, 87)}…` : stripped; +} + +/** One-line, human-readable summary of the prediction a confirmed hypothesis + * was checked against — the evidence standard the keystone held it to. */ +function predictionSummary(prediction: Record | undefined): string | undefined { + if (!prediction || typeof prediction !== "object") return undefined; + const p = prediction as Record; + switch (p.kind) { + case "metric-threshold": + return `confirmed by ${p.metric} ${p.op} ${p.value}`; + case "log-pattern": + return `confirmed by log ${p.present === false ? "absence of" : "pattern"} "${p.pattern}"`; + case "infra-status": + return `confirmed by ${p.resource} ${p.status}`; + case "change-in-window": + return `confirmed by a change within ${p.withinMinutesBefore}m`; + default: + return undefined; + } +} + +/** + * Assemble the causal chain from a finished run: the incident service, each + * dependency the agent followed into (in order), and the confirmed root cause. + * Ordered cause→effect with SOURCE ATTRIBUTION (increment 6) — each followed + * link carries the finding that pointed there, and the root cause carries the + * prediction the keystone confirmed it against. A chain of length 1 (just the + * incident) means nothing was followed or confirmed. + */ +export function assembleCausalChain( + trace: TraceEntry[], + confirmed: RankedHypothesis | undefined, + incidentService: string, + evidence: NormalizedObservation[] = [], +): CausalChainLink[] { + const chain: CausalChainLink[] = []; + const seen = new Set(); + if (incidentService) { chain.push({ label: incidentService, kind: "incident" }); seen.add(incidentService); } + for (const t of trace) { + if (t.move === "follow-cause" && / → \+\d+ findings$/.test(t.detail)) { + const service = t.detail.replace(/ → \+\d+ findings$/, "").trim(); + // A service followed more than once is one link, not a repeated hop — the + // chain is the distinct cause path, not the move log. + if (seen.has(service)) continue; + seen.add(service); + // The follow-cause subagent folds its conclusion back as a `subagent:`- + // prefixed observation keyed by the followed service; prefer it for the + // attribution, falling back to any other observation on that service. + const finding = + evidence.find((o) => o.subject === service && typeof o.text === "string" && /^subagent:/i.test(o.text)) ?? + evidence.find((o) => o.subject === service && !!o.text); + chain.push({ + label: service, + kind: "followed", + evidence: finding?.text ? attributionFromText(finding.text) : undefined, + }); + } + } + if (confirmed) { + chain.push({ + label: `root cause: ${confirmed.hypothesis}`, + kind: "root-cause", + evidence: predictionSummary(confirmed.prediction), + }); + } + return chain; +} + +/** + * One-line run trace for the footer/report (increment 6, spec §8): e.g. + * "12 moves · 5 queries · 2 subagents · confirmed at depth 1". Lead with the + * work done, end with how it stopped. + */ +export function traceSummary(stats: OrchestratorResult["stats"], outcome: OrchestratorResult["outcome"]): string { + const plural = (n: number, one: string, many: string): string => `${n} ${n === 1 ? one : many}`; + const parts = [plural(stats.moves, "move", "moves"), plural(stats.toolCalls, "query", "queries")]; + if (stats.subagents > 0) parts.push(plural(stats.subagents, "subagent", "subagents")); + const ending = outcome === "confirmed" ? `confirmed at depth ${stats.depth}` : `${outcome} at depth ${stats.depth}`; + return `${parts.join(" · ")} · ${ending}`; +} + +/** Pure, presentation-only mapping. The orchestrator core stays UI-agnostic. */ +export function traceEntryToStreamEvent(e: TraceEntry): Omit { + switch (e.move) { + case "hypothesize": + return { verb: "proposed a cause:", target: e.detail, status: "running" }; + + case "query": + // detail is " → +N observations" + return { verb: "gathered evidence", detail: e.detail, status: "done" }; + + case "test": { + if (e.verdict === "satisfied") { + return { verb: "evidence backs", target: e.detail, status: "strong" }; + } + return { + verb: "ruled out", + target: e.detail, + detail: e.verdict ? `(${e.verdict === "absent" ? "no supporting evidence" : "evidence contradicts it"})` : undefined, + status: "rejected", + }; + } + + case "conclude": { + // Core sets detail to "confirmed: " on accept, or a + // "rejected — …" explanation when self-confidence wasn't keystone-backed. + if (e.detail.startsWith("confirmed:")) { + return { verb: "root cause:", target: e.detail.replace(/^confirmed:\s*/, ""), status: "strong" }; + } + return { verb: "not confirmed yet — kept looking", detail: e.detail, status: "running" }; + } + + case "spawn-subagent": + return { verb: "spun up a subagent", target: e.detail, status: "done", indent: 1 }; + + case "follow-cause": + return { verb: "followed the trail to", target: e.detail, status: "done" }; + } +} diff --git a/src/agents/orchestrator.test.ts b/src/agents/orchestrator.test.ts new file mode 100644 index 00000000..0183cf7c --- /dev/null +++ b/src/agents/orchestrator.test.ts @@ -0,0 +1,545 @@ +import { describe, it, expect } from "vitest"; +import { runOrchestrator, MAX_OPERATOR_CONTINUES } from "./orchestrator.js"; +import type { OrchestratorMove, OrchestratorDeps, OrchestratorState } from "./orchestrator.js"; +import type { RankedHypothesis } from "../types/rca-types.js"; +import type { NormalizedObservation, Verdict } from "../workflows/steps/corroboration.js"; + +const h = (name: string): RankedHypothesis => ({ + hypothesis: name, + prediction: { kind: "metric-threshold", metric: "mem", op: ">", value: 90 }, +}); + +const obs: NormalizedObservation = { phase: "metrics", subject: "mem", value: 99 }; + +const generousGuards = { + maxTokens: 1e9, + maxDepth: 3, + maxSubagents: 3, + maxStrikes: 3, + maxToolCalls: 100, + wallClockMs: 1e9, +}; + +/** Scripted decide-fn: replay a fixed move sequence, then signal exhausted. */ +function scripted(moves: Array): OrchestratorDeps["decideMove"] { + let i = 0; + return async () => (i < moves.length ? moves[i++] : null); +} + +/** Build deps with sensible test defaults; override per case. */ +function makeDeps(over: Partial & Pick): OrchestratorDeps { + return { + gatherEvidence: async () => [obs], + evaluate: () => "satisfied", + guards: generousGuards, + ...over, + }; +} + +describe("runOrchestrator — happy path", () => { + it("hypothesize → query → test(satisfied) → conclude → confirmed", async () => { + const result = await runOrchestrator( + makeDeps({ + decideMove: scripted([ + { type: "hypothesize", hypothesis: h("memory exhaustion") }, + { type: "query", target: 0 }, + { type: "test", target: 0 }, + { type: "conclude", leading: 0, confidence: 0.9, rationale: "mem > 90" }, + ]), + evaluate: () => "satisfied", + }), + ); + expect(result.outcome).toBe("confirmed"); + expect(result.confirmed?.hypothesis).toBe("memory exhaustion"); + expect(result.hypotheses[0].standing).toBe("confirmed"); + expect(result.stats.toolCalls).toBe(1); + expect(result.evidence).toHaveLength(1); + }); +}); + +describe("runOrchestrator — DECISION 1: hybrid stop never trusts self-confidence", () => { + it("rejects conclude when the leading hypothesis was never keystone-confirmed", async () => { + const result = await runOrchestrator( + makeDeps({ + // Propose conclude at confidence 0.99 on an UNTESTED hypothesis, then stop. + decideMove: scripted([ + { type: "hypothesize", hypothesis: h("a guess") }, + { type: "conclude", leading: 0, confidence: 0.99, rationale: "I'm sure" }, + null, + ]), + }), + ); + expect(result.outcome).toBe("exhausted"); + expect(result.confirmed).toBeUndefined(); + expect(result.trace.some((t) => t.move === "conclude" && t.detail.includes("rejected"))).toBe(true); + }); + + it("rejects conclude when the keystone verdict is 'contradicted' despite high confidence", async () => { + const result = await runOrchestrator( + makeDeps({ + decideMove: scripted([ + { type: "hypothesize", hypothesis: h("wrong cause") }, + { type: "query", target: 0 }, + { type: "test", target: 0 }, + { type: "conclude", leading: 0, confidence: 0.95, rationale: "looks right" }, + null, + ]), + evaluate: () => "contradicted", + }), + ); + expect(result.outcome).toBe("exhausted"); + expect(result.confirmed).toBeUndefined(); + expect(result.hypotheses[0].standing).toBe("ruled-out"); + }); +}); + +describe("runOrchestrator — DECISION 2: safety harness", () => { + it("strikes limit → operator-pause (not a silent stop)", async () => { + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, maxStrikes: 3 }, + decideMove: scripted([ + { type: "hypothesize", hypothesis: h("c1") }, + { type: "test", target: 0 }, + { type: "hypothesize", hypothesis: h("c2") }, + { type: "test", target: 1 }, + { type: "hypothesize", hypothesis: h("c3") }, + { type: "test", target: 2 }, + ]), + evaluate: () => "absent", // every test fails → strikes accumulate + }), + ); + expect(result.outcome).toBe("operator-pause"); + expect(result.stats.strikes).toBe(3); + }); + + it("a satisfied test resets the strike counter", async () => { + const verdicts: Verdict[] = ["absent", "absent", "satisfied"]; + let i = 0; + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, maxStrikes: 3 }, + decideMove: scripted([ + { type: "hypothesize", hypothesis: h("c1") }, + { type: "test", target: 0 }, + { type: "hypothesize", hypothesis: h("c2") }, + { type: "test", target: 1 }, + { type: "hypothesize", hypothesis: h("c3") }, + { type: "test", target: 2 }, + { type: "conclude", leading: 2, confidence: 0.8, rationale: "" }, + ]), + evaluate: () => verdicts[i++] ?? "absent", + }), + ); + // 2 strikes then a satisfied (resets to 0) then confirmed conclude. + expect(result.outcome).toBe("confirmed"); + expect(result.stats.strikes).toBe(0); + }); + + it("token budget exhaustion → budget-exhausted", async () => { + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, maxTokens: 10 }, + estimateTokens: () => 4, + decideMove: async () => ({ type: "hypothesize", hypothesis: h("loop") }), + }), + ); + expect(result.outcome).toBe("budget-exhausted"); + expect(result.stats.tokensSpent).toBeGreaterThanOrEqual(10); + }); + + it("tool-call cap → tool-cap", async () => { + let first = true; + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, maxToolCalls: 2 }, + decideMove: async (): Promise => { + if (first) { + first = false; + return { type: "hypothesize", hypothesis: h("x") }; + } + return { type: "query", target: 0 }; + }, + }), + ); + expect(result.outcome).toBe("tool-cap"); + expect(result.stats.toolCalls).toBe(2); + }); + + it("wall-clock budget → wall-clock", async () => { + let clock = 0; + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, wallClockMs: 1000 }, + now: () => clock, + decideMove: async () => { + clock += 500; // each move advances the injected clock + return { type: "hypothesize", hypothesis: h("tick") }; + }, + }), + ); + expect(result.outcome).toBe("wall-clock"); + }); +}); + +describe("runOrchestrator — robustness", () => { + it("decideMove returning null immediately → exhausted", async () => { + const result = await runOrchestrator(makeDeps({ decideMove: async () => null })); + expect(result.outcome).toBe("exhausted"); + expect(result.hypotheses).toHaveLength(0); + }); + + it("out-of-range move target is traced and skipped, never throws", async () => { + const result = await runOrchestrator( + makeDeps({ + decideMove: scripted([ + { type: "query", target: 5 }, + { type: "test", target: 9 }, + null, + ]), + }), + ); + expect(result.outcome).toBe("exhausted"); + expect(result.trace.filter((t) => t.detail.includes("no hypothesis"))).toHaveLength(2); + }); + + it("a decide-fn that only spins on rejected conclude bails to inconclusive (no infinite loop)", async () => { + const result = await runOrchestrator( + makeDeps({ + decideMove: async () => ({ type: "conclude", leading: 0, confidence: 1, rationale: "spin" }), + }), + ); + expect(result.outcome).toBe("inconclusive"); + expect(result.stats.moves).toBeLessThan(50); // stalled out well before the hard backstop + }); + + it("spawn-subagent folds findings into evidence and counts the subagent", async () => { + const finding: NormalizedObservation = { phase: "metrics", subject: "payments_p99", value: 8 }; + const result = await runOrchestrator( + makeDeps({ + spawnSubagent: async () => [finding], + decideMove: scripted([ + { type: "spawn-subagent", service: "payments", question: "why slow?" }, + null, + ]), + }), + ); + expect(result.outcome).toBe("exhausted"); + expect(result.stats.subagents).toBe(1); + expect(result.evidence).toContainEqual(finding); + expect(result.trace[0].detail).toContain("+1 findings"); + }); + + it("spawn-subagent skips gracefully when no subagent dep is wired", async () => { + const result = await runOrchestrator( + makeDeps({ + decideMove: scripted([{ type: "spawn-subagent", service: "x", question: "q" }, null]), + }), + ); + expect(result.stats.subagents).toBe(0); + expect(result.trace[0].detail).toContain("unavailable"); + }); + + it("enforces the maxSubagents limit", async () => { + let spawns = 0; + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, maxSubagents: 2 }, + spawnSubagent: async () => { + spawns++; + return [{ phase: "infra", subject: "x", text: "y" }]; + }, + decideMove: scripted([ + { type: "spawn-subagent", service: "a", question: "q" }, + { type: "spawn-subagent", service: "b", question: "q" }, + { type: "spawn-subagent", service: "c", question: "q" }, + null, + ]), + }), + ); + expect(spawns).toBe(2); // third refused + expect(result.stats.subagents).toBe(2); + expect(result.trace[2].detail).toContain("limit"); + }); + + it("follow-cause investigates a known dependency and folds findings in", async () => { + const finding: NormalizedObservation = { phase: "infra", subject: "payments", text: "pg pool saturated" }; + const result = await runOrchestrator( + makeDeps({ + dependencies: ["payments", "db"], + spawnSubagent: async () => [finding], + decideMove: scripted([{ type: "follow-cause", service: "payments" }, null]), + }), + ); + expect(result.stats.subagents).toBe(1); + expect(result.evidence).toContainEqual(finding); + expect(result.trace[0].detail).toContain("payments → +1 findings"); + }); + + it("follow-cause rejects a service that is not a known dependency", async () => { + const result = await runOrchestrator( + makeDeps({ + dependencies: ["payments"], + spawnSubagent: async () => [{ phase: "infra", subject: "x", text: "y" }], + decideMove: scripted([{ type: "follow-cause", service: "unrelated" }, null]), + }), + ); + expect(result.stats.subagents).toBe(0); + expect(result.trace[0].detail).toContain("not a known dependency"); + }); + + it("follow-cause is disabled when there is no dependency graph", async () => { + const result = await runOrchestrator( + makeDeps({ + dependencies: [], + spawnSubagent: async () => [{ phase: "infra", subject: "x", text: "y" }], + decideMove: scripted([{ type: "follow-cause", service: "payments" }, null]), + }), + ); + expect(result.stats.subagents).toBe(0); + expect(result.trace[0].detail).toContain("no dependency graph"); + }); + + it("onStep receives every recorded trace entry", async () => { + const seen: string[] = []; + await runOrchestrator( + makeDeps({ + onStep: (e) => seen.push(e.move), + decideMove: scripted([ + { type: "hypothesize", hypothesis: h("x") }, + { type: "query", target: 0 }, + { type: "test", target: 0 }, + { type: "conclude", leading: 0, confidence: 0.7, rationale: "" }, + ]), + }), + ); + expect(seen).toEqual(["hypothesize", "query", "test", "conclude"]); + }); +}); + +describe("runOrchestrator — interactive operator-pause hook", () => { + /** A decide-fn that never stops failing: hypothesize, then test the newest + * hypothesis, forever. With `evaluate: absent` every test is a strike, so + * strikes accumulate until a guard (or the operator hook) ends the run. */ + function endlessFailing(): OrchestratorDeps["decideMove"] { + let n = 0; + return async (state: OrchestratorState) => + n++ % 2 === 0 + ? { type: "hypothesize", hypothesis: h(`c${n}`) } + : { type: "test", target: state.hypotheses.length - 1 }; + } + + it("continue resets strikes and resumes; a later escalate/wait stops with operator-pause", async () => { + const decisions: Array<"continue" | "escalate" | "wait"> = ["continue", "wait"]; + let calls = 0; + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, maxStrikes: 2 }, + evaluate: () => "absent", + decideMove: endlessFailing(), + onOperatorPause: async () => decisions[calls++] ?? "wait", + }), + ); + expect(result.outcome).toBe("operator-pause"); + // First pause → continue (resumed), second pause → wait (stopped). + expect(calls).toBe(2); + }); + + it("escalate stops immediately at the first strike limit", async () => { + let calls = 0; + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, maxStrikes: 2 }, + evaluate: () => "absent", + decideMove: endlessFailing(), + onOperatorPause: async () => { calls++; return "escalate"; }, + }), + ); + expect(result.outcome).toBe("operator-pause"); + expect(calls).toBe(1); // consulted once, then stopped + }); + + it("no hook → strike limit stops directly (unchanged behavior)", async () => { + let paused = false; + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, maxStrikes: 2 }, + evaluate: () => "absent", + decideMove: endlessFailing(), + // onOperatorPause intentionally omitted + }), + ); + expect(paused).toBe(false); + expect(result.outcome).toBe("operator-pause"); + }); + + it("caps operator continues so a perpetually-continuing operator can't spin forever", async () => { + let calls = 0; + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, maxStrikes: 2 }, + evaluate: () => "absent", + decideMove: endlessFailing(), + onOperatorPause: async () => { calls++; return "continue"; }, + }), + ); + expect(result.outcome).toBe("operator-pause"); + // Consulted exactly MAX_OPERATOR_CONTINUES times, then stops without asking again. + expect(calls).toBe(MAX_OPERATOR_CONTINUES); + }); +}); + +describe("runOrchestrator — cross-service confirm guard", () => { + it("rejects a confirm that blames an un-followed dependency (correlational, not established)", async () => { + const result = await runOrchestrator( + makeDeps({ + dependencies: ["payments"], + incidentService: "checkout", + evaluate: () => "satisfied", + decideMove: scripted([ + { type: "hypothesize", hypothesis: h("checkout failing due to degraded payments service") }, + { type: "query", target: 0 }, + { type: "test", target: 0 }, + { type: "conclude", leading: 0, confidence: 0.9, rationale: "payments slow" }, + null, + ]), + }), + ); + expect(result.outcome).toBe("exhausted"); // confirm was blocked → ran to null + expect(result.confirmed).toBeUndefined(); + expect(result.trace.some((t) => t.move === "conclude" && /never followed-cause/.test(t.detail))).toBe(true); + }); + + it("allows the confirm once the implicated dependency was followed", async () => { + const result = await runOrchestrator( + makeDeps({ + dependencies: ["payments"], + incidentService: "checkout", + spawnSubagent: async () => [{ phase: "infra", subject: "payments", text: "pool saturated" }], + evaluate: () => "satisfied", + decideMove: scripted([ + { type: "follow-cause", service: "payments" }, + { type: "hypothesize", hypothesis: h("checkout failing due to degraded payments service") }, + { type: "query", target: 0 }, + { type: "test", target: 0 }, + { type: "conclude", leading: 0, confidence: 0.9, rationale: "payments pool saturated" }, + ]), + }), + ); + expect(result.outcome).toBe("confirmed"); + expect(result.confirmed?.hypothesis).toContain("payments"); + }); + + it("does not block a cause about the incident service's own behavior", async () => { + const result = await runOrchestrator( + makeDeps({ + dependencies: ["payments"], + incidentService: "checkout", + evaluate: () => "satisfied", + decideMove: scripted([ + { type: "hypothesize", hypothesis: h("checkout pod OOMKilled under load") }, + { type: "query", target: 0 }, + { type: "test", target: 0 }, + { type: "conclude", leading: 0, confidence: 0.9, rationale: "checkout OOM" }, + ]), + }), + ); + expect(result.outcome).toBe("confirmed"); + }); + + it("is inert when there are no dependencies", async () => { + const result = await runOrchestrator( + makeDeps({ + evaluate: () => "satisfied", + decideMove: scripted([ + { type: "hypothesize", hypothesis: h("anything at all") }, + { type: "query", target: 0 }, + { type: "test", target: 0 }, + { type: "conclude", leading: 0, confidence: 0.9, rationale: "" }, + ]), + }), + ); + expect(result.outcome).toBe("confirmed"); + }); +}); + +describe("runOrchestrator — abort signal + per-op watchdog", () => { + it("returns 'aborted' immediately when the signal is already aborted", async () => { + const ac = new AbortController(); + ac.abort(); + const result = await runOrchestrator( + makeDeps({ + signal: ac.signal, + decideMove: scripted([{ type: "hypothesize", hypothesis: h("x") }]), + }), + ); + expect(result.outcome).toBe("aborted"); + expect(result.stats.moves).toBe(0); // bailed before spending a move + }); + + it("aborts cooperatively when the signal fires mid-run", async () => { + const ac = new AbortController(); + let n = 0; + const result = await runOrchestrator( + makeDeps({ + signal: ac.signal, + decideMove: async () => { + if (n++ === 1) ac.abort(); // fire after the 2nd decision + return { type: "hypothesize", hypothesis: h(`c${n}`) }; + }, + }), + ); + expect(result.outcome).toBe("aborted"); + }); + + it("per-op watchdog abandons a hung gather and keeps the loop alive", async () => { + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, opTimeoutMs: 20 }, + gatherEvidence: () => new Promise(() => {}), // never resolves + decideMove: scripted([ + { type: "hypothesize", hypothesis: h("x") }, + { type: "query", target: 0 }, + null, + ]), + }), + ); + expect(result.outcome).toBe("exhausted"); // didn't hang — ran to the null + expect(result.stats.toolCalls).toBe(1); // the attempt was counted + expect(result.trace.find((t) => t.move === "query")?.detail).toContain("timed out"); + }); + + it("per-op watchdog bounds a hung subagent too", async () => { + const result = await runOrchestrator( + makeDeps({ + guards: { ...generousGuards, opTimeoutMs: 20 }, + spawnSubagent: () => new Promise(() => {}), // never resolves + decideMove: scripted([{ type: "spawn-subagent", service: "x", question: "q" }, null]), + }), + ); + expect(result.outcome).toBe("exhausted"); + expect(result.stats.subagents).toBe(1); + expect(result.trace[0].detail).toContain("timed out"); + }); +}); + +describe("runOrchestrator — integration with the real keystone", () => { + it("uses evaluatePrediction verdicts to drive the confirm gate", async () => { + // Wire the REAL keystone so the loop's confirm decision is deterministic + // against actual observations, not a fake verdict. + const { evaluatePrediction } = await import("../workflows/steps/corroboration.js"); + const result = await runOrchestrator( + makeDeps({ + evaluate: (prediction, evidence) => evaluatePrediction(prediction, evidence), + gatherEvidence: async () => [{ phase: "metrics", subject: "mem", value: 99 }], + decideMove: scripted([ + { type: "hypothesize", hypothesis: h("memory exhaustion") }, + { type: "query", target: 0 }, + { type: "test", target: 0 }, + { type: "conclude", leading: 0, confidence: 0.6, rationale: "mem 99 > 90" }, + ]), + }), + ); + expect(result.outcome).toBe("confirmed"); + }); +}); diff --git a/src/agents/orchestrator.ts b/src/agents/orchestrator.ts new file mode 100644 index 00000000..6d0c79fb --- /dev/null +++ b/src/agents/orchestrator.ts @@ -0,0 +1,472 @@ +/** + * Autonomous investigation orchestrator (Approach D) — core control loop. + * + * This is the move-loop + safety harness + hybrid stop signal. It is a NEW + * agent that WRAPS the fixed investigation DAG (it does not replace it): + * `spawn-subagent` runs `runInvestigation` scoped to a sub-question, and the + * Step-2 corroboration keystone (`evaluatePrediction`) is the stop gate. + * + * Increment 1 (this file): pure, fully-injected control flow so it is + * unit-testable without an LLM or MCP. Moves implemented: hypothesize, query, + * test, conclude. `spawn-subagent` / `follow-cause` are recognized but deferred + * to later increments (they no-op with a trace entry). The real LLM decide-fn, + * evidence gather, and keystone are wired in a later increment via OrchestratorDeps. + * + * DECISION 1 (hybrid stop) — the crux: the agent may PROPOSE `conclude`, but the + * loop only actually stops on a `conclude` when the leading hypothesis is + * DETERMINISTICALLY confirmed by the keystone (its latest verdict is + * "satisfied"). The LLM's self-reported confidence is recorded but is NEVER the + * gate. Self-confidence can DIRECT the search; it can never END it. The only + * other ways the loop ends are guard trips (Decision 2). + * + * DECISION 2 (safety harness): budget (tokens), depth (subagent nesting), + * strikes (consecutive rule-outs → operator pause), tool-cap, wall-clock. All + * hard limits, all config-tunable. Strikes hitting the limit is a first-class + * `operator-pause` outcome, not a silent stop. + */ +import type { RankedHypothesis } from "../types/rca-types.js"; +import type { + NormalizedObservation, + HypothesisPrediction, + Verdict, +} from "../workflows/steps/corroboration.js"; + +/** The moves the orchestrator LLM can pick at each step. */ +export type OrchestratorMove = + /** Add a candidate root cause (with a structured, checkable prediction). */ + | { type: "hypothesize"; hypothesis: RankedHypothesis } + /** Gather read-only evidence for hypotheses[target]'s prediction. */ + | { type: "query"; target: number } + /** Score hypotheses[target] against gathered evidence via the keystone. */ + | { type: "test"; target: number } + /** Propose done. Gated — see DECISION 1. `confidence` is advisory only. */ + | { type: "conclude"; leading: number; confidence: number; rationale: string } + /** Scoped sub-investigation on a service (increment 3 — deferred). */ + | { type: "spawn-subagent"; service: string; question: string } + /** Follow the cause into a dependent service (increment 4 — deferred). */ + | { type: "follow-cause"; service: string }; + +export type HypothesisStanding = "open" | "confirmed" | "ruled-out"; + +export interface TrackedHypothesis { + hypothesis: RankedHypothesis; + standing: HypothesisStanding; + /** Most recent deterministic keystone verdict, if tested. */ + lastVerdict?: Verdict; +} + +export interface TraceEntry { + move: OrchestratorMove["type"]; + detail: string; + verdict?: Verdict; +} + +/** Read-only view handed to the decide-fn each step. */ +export interface OrchestratorState { + readonly hypotheses: ReadonlyArray; + readonly evidence: ReadonlyArray; + /** The incident service's dependency-graph neighbors — the only services the + * agent may follow-cause into. Empty when no dependency data is available. */ + readonly dependencies: ReadonlyArray; + readonly depth: number; + /** Subagents (scoped sub-investigations) spawned so far. */ + readonly subagents: number; + /** Consecutive ruled-out tests since the last confirmation. */ + readonly strikes: number; + readonly tokensSpent: number; + readonly toolCalls: number; + readonly elapsedMs: number; + readonly trace: ReadonlyArray; +} + +export type OrchestratorOutcome = + | "confirmed" // hybrid stop: leading hypothesis deterministically satisfied + | "operator-pause" // strikes limit → hand back to a human + | "budget-exhausted" + | "tool-cap" + | "wall-clock" + | "exhausted" // decide-fn signalled no further moves + | "inconclusive" // stalled (no progress) or hit the move backstop + | "aborted"; // caller aborted (e.g. the operator disconnected) + +export interface OrchestratorGuards { + /** Output-token budget. */ + maxTokens: number; + /** Subagent / follow-cause nesting depth (for future recursion; v1 subagents are depth-1). */ + maxDepth: number; + /** Max scoped sub-investigations (depth-1) the orchestrator may spawn. */ + maxSubagents: number; + /** Consecutive rule-outs before pausing for an operator. */ + maxStrikes: number; + /** Total read-only queries. */ + maxToolCalls: number; + /** Wall-clock budget in ms. */ + wallClockMs: number; + /** + * Per-operation watchdog (ms): a single evidence gather or subagent run that + * exceeds this is abandoned (treated as no findings) so one hung MCP/LLM call + * can't strand the whole loop between guard checks. Absent / ≤0 → no per-op + * bound (the wall-clock guard is the only backstop). + */ + opTimeoutMs?: number; +} + +export interface OrchestratorDeps { + /** + * The agent's brain: pick the next move from the current state. In prod this + * is an LLM; in tests it's a scripted sequence. Return `null` to signal "no + * further moves" (→ `exhausted`). + */ + decideMove: (state: OrchestratorState) => Promise; + /** Read-only evidence gather for a hypothesis's prediction (createGatherEvidence in prod). */ + gatherEvidence: (hypothesis: RankedHypothesis) => Promise; + /** Deterministic keystone (evaluatePrediction in prod). */ + evaluate: (prediction: HypothesisPrediction, evidence: NormalizedObservation[]) => Verdict; + /** + * Run a scoped sub-investigation (depth-1) on a service and fold its findings + * back as observations. In prod this dispatches runInvestigation; absent in + * tests / when subagents aren't wired (then spawn-subagent gracefully skips). + */ + spawnSubagent?: (args: { service: string; question: string }) => Promise; + /** The incident service's dependency neighbors the agent may follow-cause into + * (resolved from the dependency graph). Empty → follow-cause is disabled. */ + dependencies?: string[]; + /** The incident service itself. Used by the cross-service confirm guard so a + * cause about the incident service's own behavior isn't treated as needing a + * follow-cause. Mentions of OTHER (dependency) services do. */ + incidentService?: string; + /** + * Strikes-limit hook: instead of silently stopping at the strike limit, ask a + * human. "continue" resets the strike counter and resumes the loop (the other + * guards still bound it); "escalate"/"wait" stop with that disposition. Absent + * → the strike limit stops directly (operator-pause), as before. + */ + onOperatorPause?: (state: OrchestratorState) => Promise<"continue" | "escalate" | "wait">; + guards: OrchestratorGuards; + /** + * Abort the run cooperatively (checked at the top of each move). The WS layer + * wires this to the connection: if the operator disconnects, the loop stops + * (`aborted`) instead of running on headless with no one watching. + */ + signal?: AbortSignal; + /** Injected clock so wall-clock is testable. Defaults to Date.now. */ + now?: () => number; + /** Output-token estimate per move, for budget accounting. Defaults to 0. */ + estimateTokens?: (move: OrchestratorMove) => number; + /** Live progress sink (the agent-stream UX wires this). */ + onStep?: (entry: TraceEntry) => void; +} + +export interface OrchestratorResult { + outcome: OrchestratorOutcome; + /** Set only on `confirmed`. */ + confirmed?: RankedHypothesis; + hypotheses: TrackedHypothesis[]; + evidence: NormalizedObservation[]; + trace: TraceEntry[]; + stats: { + moves: number; + toolCalls: number; + tokensSpent: number; + strikes: number; + depth: number; + subagents: number; + elapsedMs: number; + }; +} + +/** Case-insensitive whole-token-ish mention of a service name in free text. */ +function mentionsService(text: string, service: string): boolean { + return text.toLowerCase().includes(service.toLowerCase()); +} + +/** Absolute backstop on move count — far above any real run; catches a runaway decide-fn. */ +const MAX_MOVES = 1000; +/** Consecutive non-productive moves (no new evidence / hypotheses, rejected conclude) → inconclusive. */ +const MAX_STALL = 8; +/** + * Hard cap on operator "continue" decisions. Each continue resets the strike + * counter and resumes; without a ceiling, a hung or looping operator prompt + * (held open by a generous wall-clock) could resume forever. After this many + * continues the loop stops with `operator-pause` without asking again — the + * other guards still bound each resumed leg, this just bounds the legs. + */ +export const MAX_OPERATOR_CONTINUES = 3; + +/** + * Bound an async operation by a watchdog timeout. On timeout, resolves to + * `{ timedOut: true, value: fallback }` (the in-flight promise is abandoned, not + * cancelled — acceptable for the read-only gather/subagent ops). `ms ≤ 0` / + * undefined → no bound (await the promise as-is). Rejections propagate. + */ +async function raceOp(op: Promise, ms: number | undefined, fallback: T): Promise<{ timedOut: boolean; value: T }> { + if (!ms || ms <= 0) return { timedOut: false, value: await op }; + let timer: ReturnType; + const timeout = new Promise<{ timedOut: true; value: T }>((resolve) => { + timer = setTimeout(() => resolve({ timedOut: true, value: fallback }), ms); + }); + try { + return await Promise.race([op.then((value) => ({ timedOut: false as const, value })), timeout]); + } finally { + clearTimeout(timer!); + } +} + +/** + * Run the orchestrator loop. Pure control flow over injected dependencies: + * deterministic given a deterministic `decideMove`. Never throws on a bad move + * (unknown / out-of-range targets are traced and skipped), so a confused LLM + * degrades to `inconclusive`/`exhausted` rather than crashing. + */ +export async function runOrchestrator(deps: OrchestratorDeps): Promise { + const now = deps.now ?? Date.now; + const estimate = deps.estimateTokens ?? (() => 0); + const start = now(); + + const hypotheses: TrackedHypothesis[] = []; + const evidence: NormalizedObservation[] = []; + const dependencies = deps.dependencies ?? []; + // Services the agent actually ran a sub-investigation into (spawn / follow). + // The cross-service confirm guard requires a service to be in here before a + // cause implicating it can be confirmed. + const followedServices = new Set(); + const trace: TraceEntry[] = []; + let depth = 0; + let subagents = 0; + let strikes = 0; + let tokensSpent = 0; + let toolCalls = 0; + let moves = 0; + let stall = 0; + let operatorContinues = 0; + + const record = (entry: TraceEntry): void => { + trace.push(entry); + deps.onStep?.(entry); + }; + + const elapsed = (): number => now() - start; + + const finish = (outcome: OrchestratorOutcome, confirmed?: RankedHypothesis): OrchestratorResult => ({ + outcome, + confirmed, + hypotheses, + evidence, + trace, + stats: { moves, toolCalls, tokensSpent, strikes, depth, subagents, elapsedMs: elapsed() }, + }); + + while (moves < MAX_MOVES) { + // Guards are checked BEFORE spending the next move so a tripped limit never + // does "one more" expensive thing. + if (deps.signal?.aborted) return finish("aborted"); + if (tokensSpent >= deps.guards.maxTokens) return finish("budget-exhausted"); + if (toolCalls >= deps.guards.maxToolCalls) return finish("tool-cap"); + if (elapsed() >= deps.guards.wallClockMs) return finish("wall-clock"); + // strikes → operator pause: the design's headline safety feature. The signal + // is ambiguous (N hypotheses failed, nothing discriminating emerged); rather + // than guess, hand the call to a human (if wired) — who can resume the run + // ("continue", resetting strikes) or stop it. Other guards still bound a + // resumed run, so "continue" can't run away. + if (strikes >= deps.guards.maxStrikes) { + // Only consult the operator while there are continues left in the budget; + // once spent, stop without re-prompting so a stuck operator can't keep the + // loop alive indefinitely. + if (deps.onOperatorPause && operatorContinues < MAX_OPERATOR_CONTINUES) { + const pauseState: OrchestratorState = { + hypotheses, evidence, dependencies, depth, subagents, strikes, tokensSpent, toolCalls, elapsedMs: elapsed(), trace, + }; + const decision = await deps.onOperatorPause(pauseState); + if (decision === "continue") { + operatorContinues++; + strikes = 0; + continue; + } + } + return finish("operator-pause"); + } + if (stall >= MAX_STALL) return finish("inconclusive"); + + const state: OrchestratorState = { + hypotheses, + evidence, + dependencies, + depth, + subagents, + strikes, + tokensSpent, + toolCalls, + elapsedMs: elapsed(), + trace, + }; + + const move = await deps.decideMove(state); + if (move === null) return finish("exhausted"); + moves++; + tokensSpent += Math.max(0, estimate(move)); + + switch (move.type) { + case "hypothesize": { + hypotheses.push({ hypothesis: move.hypothesis, standing: "open" }); + record({ move: "hypothesize", detail: move.hypothesis.hypothesis }); + stall = 0; + break; + } + case "query": { + const h = hypotheses[move.target]; + if (!h) { + record({ move: "query", detail: `no hypothesis at index ${move.target} — skipped` }); + stall++; + break; + } + const before = evidence.length; + const { timedOut, value: obs } = await raceOp(deps.gatherEvidence(h.hypothesis), deps.guards.opTimeoutMs, []); + evidence.push(...obs); + toolCalls++; + record({ + move: "query", + detail: timedOut + ? `${h.hypothesis.hypothesis} → timed out (no observations)` + : `${h.hypothesis.hypothesis} → +${obs.length} observations`, + }); + stall = evidence.length > before ? 0 : stall + 1; + break; + } + case "test": { + const h = hypotheses[move.target]; + if (!h) { + record({ move: "test", detail: `no hypothesis at index ${move.target} — skipped` }); + stall++; + break; + } + const verdict = deps.evaluate(h.hypothesis.prediction as HypothesisPrediction, evidence); + h.lastVerdict = verdict; + if (verdict === "satisfied") { + h.standing = "confirmed"; + strikes = 0; + } else { + h.standing = "ruled-out"; + strikes++; + } + record({ move: "test", detail: h.hypothesis.hypothesis, verdict }); + stall = 0; + break; + } + case "conclude": { + const lead = hypotheses[move.leading]; + // HYBRID STOP: stop only on deterministic confirmation. Self-reported + // confidence is recorded for the trace but is never the gate. + if (lead && lead.standing === "confirmed" && lead.lastVerdict === "satisfied") { + // CROSS-SERVICE GUARD: a cause that blames a dependency the agent + // never investigated is correlational, not established — observing + // that a neighbor is unhealthy doesn't prove it caused this incident. + // Require a follow-cause into that service before naming it the cause. + // (Mentions of the incident service's own behavior are fine.) + const unfollowedDep = dependencies.find( + (dep) => + dep !== deps.incidentService && + !followedServices.has(dep) && + mentionsService(lead.hypothesis.hypothesis, dep), + ); + if (unfollowedDep) { + record({ + move: "conclude", + detail: `not confirmed — blames ${unfollowedDep} but never followed-cause into it; investigate it before concluding`, + }); + stall++; + break; + } + record({ move: "conclude", detail: `confirmed: ${lead.hypothesis.hypothesis}` }); + return finish("confirmed", lead.hypothesis); + } + record({ + move: "conclude", + detail: `rejected — self-confidence ${move.confidence} not backed by the keystone; continuing`, + }); + stall++; + break; + } + case "spawn-subagent": { + if (!deps.spawnSubagent) { + record({ move: "spawn-subagent", detail: `${move.service}: ${move.question} — subagents unavailable` }); + stall++; + break; + } + if (subagents >= deps.guards.maxSubagents) { + record({ move: "spawn-subagent", detail: `${move.service}: subagent limit (${deps.guards.maxSubagents}) reached — skipped` }); + stall++; + break; + } + subagents++; + followedServices.add(move.service); + const before = evidence.length; + // Depth-1 scoped sub-investigation; its findings fold back as evidence + // the orchestrator's subsequent test moves can score against. Watchdog- + // bounded so a hung sub-investigation can't strand the loop. + const { timedOut, value: findings } = await raceOp( + deps.spawnSubagent({ service: move.service, question: move.question }), + deps.guards.opTimeoutMs, + [], + ); + evidence.push(...findings); + record({ + move: "spawn-subagent", + detail: timedOut + ? `${move.service}: ${move.question} → timed out (no findings)` + : `${move.service}: ${move.question} → +${findings.length} findings`, + }); + stall = evidence.length > before ? 0 : stall + 1; + break; + } + case "follow-cause": { + // Follow the incident into a dependency: a scoped sub-investigation on a + // neighbor from the dependency graph. Reuses the subagent machinery + + // budget, but is grounded — the target MUST be a known dependency, so + // the agent can't wander to arbitrary services. + if (!deps.spawnSubagent || dependencies.length === 0) { + record({ + move: "follow-cause", + detail: + dependencies.length === 0 + ? `${move.service} — no dependency graph available for this incident` + : `${move.service} — subagents unavailable`, + }); + stall++; + break; + } + if (!dependencies.includes(move.service)) { + record({ move: "follow-cause", detail: `${move.service} is not a known dependency — skipped` }); + stall++; + break; + } + if (subagents >= deps.guards.maxSubagents) { + record({ move: "follow-cause", detail: `${move.service}: subagent limit (${deps.guards.maxSubagents}) reached — skipped` }); + stall++; + break; + } + subagents++; + followedServices.add(move.service); + const followedBefore = evidence.length; + const { timedOut, value: followFindings } = await raceOp( + deps.spawnSubagent({ + service: move.service, + question: `Following the dependency from the incident service: is ${move.service} the cause?`, + }), + deps.guards.opTimeoutMs, + [], + ); + evidence.push(...followFindings); + record({ + move: "follow-cause", + detail: timedOut + ? `${move.service} → timed out (no findings)` + : `${move.service} → +${followFindings.length} findings`, + }); + stall = evidence.length > followedBefore ? 0 : stall + 1; + break; + } + } + } + + return finish("inconclusive"); +} diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index e53d6730..29186323 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -57,6 +57,19 @@ describe("ConfigSchema – defaults", () => { } }); + it("defaults the autonomous orchestrator to OFF", () => { + const result = ConfigSchema.safeParse({ + llm: { apiKey: "sk-test", model: "gpt-4" }, + providers: [grafanaProvider], + }); + expect(result.success).toBe(true); + if (result.success) { + // The orchestrator is the heaviest mode (3-10x a normal run) and is built + // incrementally — it must never ship on by default. + expect(result.data.agent.orchestratorEnabled).toBe(false); + } + }); + it("accepts serviceAliases config", () => { const result = ConfigSchema.safeParse({ llm, diff --git a/src/config/schema.ts b/src/config/schema.ts index 73d7e60f..d775aa0c 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -147,6 +147,17 @@ const AgentSchema = z.object({ * for internal testing (e.g. dev/config.yaml). */ deepModeEnabled: z.boolean().default(false), + /** + * Autonomous orchestrator (Approach D). When true, exposes the unbounded, + * read-only move-loop that investigates for the real cause (not just + * re-judges the existing one like deep mode). Hard-guarded by the safety + * harness (budget / depth / strikes→operator-pause / tool-cap / wall-clock) + * and a hybrid stop (deterministic keystone confirmation, never the LLM's + * self-confidence). Default OFF — it's the heaviest, opt-in mode and is built + * incrementally; do not expose to users until validated. See + * src/agents/orchestrator.ts. + */ + orchestratorEnabled: z.boolean().default(false), // @deprecated: use top-level `memory` config instead; will be removed in a future version conversationMemory: ConversationMemorySchema.optional().default({}), investigationTriggerPhrases: z.array(z.string()).optional().default([ diff --git a/src/server/agents.ts b/src/server/agents.ts index d8334306..2944bed9 100644 --- a/src/server/agents.ts +++ b/src/server/agents.ts @@ -46,6 +46,26 @@ import type { ServiceRegistryStore } from "../services/registry.js"; import type { LanguageModel } from "ai"; import type { LlmRetryConfig } from "../agents/shared/llm-retry.js"; import { LlmUnavailableError } from "../agents/shared/llm-errors.js"; +import { runAutonomousOrchestrator } from "../agents/orchestrator-llm.js"; +import { traceEntryToStreamEvent } from "../agents/orchestrator-stream.js"; +import type { OrchestratorGuards, OrchestratorResult, OrchestratorState } from "../agents/orchestrator.js"; +import type { CorroborationContext, NormalizedObservation } from "../workflows/steps/corroboration.js"; + +/** Conservative default safety harness for the autonomous orchestrator. The + * budget guard is the cost backstop; defaults stay low because an autonomous + * run is 3-10x a normal investigation. Config-tunable knobs come later. */ +export const DEFAULT_ORCHESTRATOR_GUARDS: OrchestratorGuards = { + maxTokens: 150_000, + maxDepth: 3, + maxSubagents: 3, + maxStrikes: 3, + maxToolCalls: 40, + wallClockMs: 10 * 60_000, + // Per-op watchdog: a single gather/subagent gets ~2.5 min before it's + // abandoned. A quick subagent investigation normally finishes well under + // that; the bound just stops one hung MCP/LLM call from stranding the loop. + opTimeoutMs: 150_000, +}; type MastraChatAgent = ReturnType; type MastraStreamInput = Parameters[0]; @@ -706,5 +726,77 @@ export async function createMastraAdapters(deps: MastraAdapterDeps) { }) : undefined; - return { chatAgent, investigationAgent, discoverAgent, deepModeReexamine }; + /** + * Autonomous orchestrator (Approach D): run the unbounded read-only move-loop + * for `focus`, reusing the investigation providers + model wired above. The + * core's TraceEntry stream is mapped to AgentStreamEvent for the UI. Read-only + * throughout (gather forces read-only tools); guarded by the safety harness. + */ + async function orchestrate( + focus: string, + opts?: { + timeRange?: { from: string; to: string }; + ctx?: CorroborationContext; + onStep?: (ev: Omit) => void; + guards?: Partial; + /** Incident service's dependency neighbors (resolved by the caller). */ + dependencies?: string[]; + /** The incident service itself (for the cross-service confirm guard). */ + incidentService?: string; + /** Interactive operator-pause hook (increment 5). Wired by the WS layer to + * the pause card; absent → the strike limit stops directly. */ + onOperatorPause?: (state: OrchestratorState) => Promise<"continue" | "escalate" | "wait">; + /** Cooperative abort — the WS layer aborts on operator disconnect. */ + signal?: AbortSignal; + }, + ): Promise { + const guards: OrchestratorGuards = { ...DEFAULT_ORCHESTRATOR_GUARDS, ...opts?.guards }; + const onStep = opts?.onStep; + // Depth-1 subagent: a scoped, read-only sub-investigation on a related + // service. Its conclusion folds back as one observation the orchestrator can + // test against. Read-only (readOnlyTools=true); failures degrade to no + // findings so a bad subagent never aborts the parent run. Subagent token + // usage is bounded by maxSubagents + wall-clock (not the token budget) in v1. + const spawnSubagent = async ( + args: { service: string; question: string }, + ): Promise => { + const svc: ServiceConfig = + config.services.find((s) => s.name === args.service) ?? { name: args.service, metrics: [], logLabels: {}, probeRules: [] }; + try { + const report = await investigationAgent.investigate( + // "quick" (metrics-only) keeps a subagent ~1 min instead of the 2-3 min + // a "standard" run costs — an autonomous run can spawn several, so the + // cheaper template matters. Revisit if subagents miss log-based causes. + svc, null, undefined, undefined, args.question, + undefined, undefined, undefined, undefined, "quick", true, + ); + const rc = + report.rootCause && !/^under investigation$|^unable to determine/i.test(report.rootCause.trim()) + ? report.rootCause + : ""; + const text = [rc, report.summary].filter(Boolean).join(" — ").slice(0, 300); + return text ? [{ phase: "infra", subject: args.service, text: `subagent: ${text}` }] : []; + } catch { + return []; + } + }; + return runAutonomousOrchestrator({ + focus, + model: investigationModel, + providers, + guards, + timeRange: opts?.timeRange, + ctx: opts?.ctx, + llmRetry: config.llm.retry, + llmCallMs: config.timeouts?.llmCallMs, + onStep: onStep ? (entry) => onStep(traceEntryToStreamEvent(entry)) : undefined, + spawnSubagent, + dependencies: opts?.dependencies, + incidentService: opts?.incidentService, + onOperatorPause: opts?.onOperatorPause, + signal: opts?.signal, + }); + } + + return { chatAgent, investigationAgent, discoverAgent, deepModeReexamine, orchestrate }; } diff --git a/src/server/index.ts b/src/server/index.ts index 2a61706a..19eee9e0 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -660,10 +660,13 @@ async function main() { // Deep mode (Step 3) is hidden from users until the Autonomous Orchestrator // ships; the web bundle reads this to suppress the "Deep investigate" button. const deepModeEnabled = config.agent?.deepModeEnabled === true; + // Autonomous orchestrator (Approach D) — likewise hidden until validated; the + // web bundle reads this to suppress the "Investigate autonomously" trigger. + const orchestratorEnabled = config.agent?.orchestratorEnabled === true; function buildIndexHtml(): string { const raw = readFileSync(path.resolve(staticDir, "index.html"), "utf-8"); - if (appBasePath === "/" && !demoModeActive && !deepModeEnabled) return raw; + if (appBasePath === "/" && !demoModeActive && !deepModeEnabled && !orchestratorEnabled) return raw; // Rewrite any absolute /assets/... reference to ${base}assets/... when a // sub-path is configured. @@ -680,6 +683,7 @@ async function main() { if (appBasePath !== "/") globals.push(`window.__APP_BASE__=${basePathForScript}`); if (demoModeActive) globals.push(`window.__DEMO_MODE__=true`); if (deepModeEnabled) globals.push(`window.__DEEP_MODE_ENABLED__=true`); + if (orchestratorEnabled) globals.push(`window.__ORCHESTRATOR_ENABLED__=true`); if (globals.length === 0) return afterAssets; const inlineScript = ``; diff --git a/src/server/rate-limit.test.ts b/src/server/rate-limit.test.ts index b99cc43f..daf97aa3 100644 --- a/src/server/rate-limit.test.ts +++ b/src/server/rate-limit.test.ts @@ -286,6 +286,14 @@ describe("classifyWsMessage", () => { expect(classifyWsMessage("deep_investigate")).toBe("investigation"); }); + it("classifies deep_mode_investigate as investigation", () => { + expect(classifyWsMessage("deep_mode_investigate")).toBe("investigation"); + }); + + it("classifies orchestrator_investigate as investigation", () => { + expect(classifyWsMessage("orchestrator_investigate")).toBe("investigation"); + }); + it("classifies other message types as general", () => { expect(classifyWsMessage("new_session")).toBe("general"); expect(classifyWsMessage("discover")).toBe("general"); diff --git a/src/server/rate-limit.ts b/src/server/rate-limit.ts index f8d45070..db78c9f8 100644 --- a/src/server/rate-limit.ts +++ b/src/server/rate-limit.ts @@ -142,14 +142,23 @@ export class WsRateLimiter { /** * Classify a WebSocket message type into a rate limiting category. * - * Investigation-triggering messages: + * Investigation-triggering messages (heavy, LLM-spending, autonomous): * - "chat" messages (which may trigger investigations via intent routing) * - "deep_investigate" messages + * - "deep_mode_investigate" messages (re-examines a completed report) + * - "orchestrator_investigate" messages (autonomous read-only move-loop) * + * All of these route through the stricter `investigation` bucket so a direct + * WS client can't bypass the investigation cap via the looser general limit. * All other messages are "general". */ export function classifyWsMessage(msgType: string): WsMessageCategory { - if (msgType === "chat" || msgType === "deep_investigate") { + if ( + msgType === "chat" || + msgType === "deep_investigate" || + msgType === "deep_mode_investigate" || + msgType === "orchestrator_investigate" + ) { return "investigation"; } return "general"; diff --git a/src/server/ws-handler.test.ts b/src/server/ws-handler.test.ts index 7dd4747c..adf495c4 100644 --- a/src/server/ws-handler.test.ts +++ b/src/server/ws-handler.test.ts @@ -578,6 +578,87 @@ describe("handleClientMessage — deep_investigate", () => { }); }); +describe("handleClientMessage — orchestrator_investigate", () => { + it("rejects when the orchestrator gate is disabled", async () => { + const deps = mockDeps(); + const ctx = mockCtx(); + // Default mockDeps config has no agent.orchestratorEnabled flag. + const sent: ServerMessage[] = []; + const send = (m: ServerMessage) => sent.push(m); + + await callHandler( + { type: "orchestrator_investigate", investigationId: "inv_1" }, + send, deps, ctx, + ); + + const err = sent.find((m) => m.type === "orchestrator:error"); + expect(err).toBeDefined(); + expect((err as any).message).toContain("not enabled"); + expect(deps.db.getInvestigation).not.toHaveBeenCalled(); + }); + + it("returns error for a non-existent investigation when gated on", async () => { + const deps = mockDeps(); + (deps.config as any).agent.orchestratorEnabled = true; + const ctx = mockCtx(); + (deps.db.getInvestigation as ReturnType).mockReturnValue(undefined); + + const sent: ServerMessage[] = []; + const send = (m: ServerMessage) => sent.push(m); + + await callHandler( + { type: "orchestrator_investigate", investigationId: "inv_missing" }, + send, deps, ctx, + ); + + const err = sent.find((m) => m.type === "orchestrator:error"); + expect(err).toBeDefined(); + expect((err as any).message).toContain("not found"); + }); + + it("rejects a still-running investigation — no autonomous run without a completed report", async () => { + const deps = mockDeps(); + (deps.config as any).agent.orchestratorEnabled = true; + const ctx = mockCtx(); + (deps.db.getInvestigation as ReturnType).mockReturnValue({ + id: "inv_running", service: "payments-api", query: "orig", status: "running", report: null, + }); + + const sent: ServerMessage[] = []; + const send = (m: ServerMessage) => sent.push(m); + + await callHandler( + { type: "orchestrator_investigate", investigationId: "inv_running" }, + send, deps, ctx, + ); + + const err = sent.find((m) => m.type === "orchestrator:error"); + expect(err).toBeDefined(); + expect((err as any).message).toContain("completed investigation"); + }); + + it("rejects a completed-but-report-less investigation", async () => { + const deps = mockDeps(); + (deps.config as any).agent.orchestratorEnabled = true; + const ctx = mockCtx(); + (deps.db.getInvestigation as ReturnType).mockReturnValue({ + id: "inv_noreport", service: "payments-api", query: "orig", status: "complete", report: null, + }); + + const sent: ServerMessage[] = []; + const send = (m: ServerMessage) => sent.push(m); + + await callHandler( + { type: "orchestrator_investigate", investigationId: "inv_noreport" }, + send, deps, ctx, + ); + + const err = sent.find((m) => m.type === "orchestrator:error"); + expect(err).toBeDefined(); + expect((err as any).message).toContain("completed investigation"); + }); +}); + describe("handleClientMessage — rerun", () => { it("emits investigation:started with parentInvestigationId so the client can navigate", async () => { const deps = mockDeps(); diff --git a/src/server/ws-handler.ts b/src/server/ws-handler.ts index bbeab1b6..09542742 100644 --- a/src/server/ws-handler.ts +++ b/src/server/ws-handler.ts @@ -9,6 +9,9 @@ import { resolveServiceFromHistory } from "../agents/intent.js"; import type { ServiceConfig, DiscoveryConfig, Config } from "../config/schema.js"; import type { ClientMessage, ServerMessage, ChartSeries } from "../types/ws-types.js"; import { DEFAULT_STACK_SLUG } from "../types/stack-types.js"; +import { inferDependencyGraph } from "./dependency-graph.js"; +import { assembleCausalChain, traceSummary } from "../agents/orchestrator-stream.js"; +import type { OrchestratorState } from "../agents/orchestrator.js"; import type { ValidatedServiceConfig } from "../types/discovery-types.js"; import type { SkillStore } from "../skills/store.js"; import { LlmUnavailableError } from "../agents/shared/llm-errors.js"; @@ -30,6 +33,16 @@ const logger = createLogger(); const MAX_CHART_SERIES = 4; +/** A paused orchestrator run, awaiting an operator decision. Resolving the + * promise (via `orchestrator_decision`, timeout, or WS close) resumes the loop. */ +type OperatorDecision = "continue" | "escalate" | "wait"; +type PendingPause = { resolve: (decision: OperatorDecision) => void; timer: ReturnType }; + +/** How long an operator-pause prompt waits for a decision before defaulting to + * `escalate` (stop). A disconnected/idle operator must not strand the loop — + * the wall-clock guard would eventually trip, but this is the explicit cap. */ +const OPERATOR_PAUSE_TIMEOUT_MS = 5 * 60_000; + /** Return true when a series is a flat constant (no variation worth charting) */ function isFlatSeries(values: [string, number][]): boolean { if (values.length < 2) return true; @@ -272,6 +285,19 @@ export function setupWebSocket(server: Server, deps: WsDeps): void { // removed — late cancels are silently ignored. const pendingDispatches: Map = new Map(); + // Per-connection paused orchestrator runs: a run that hit the strike limit + // and is awaiting an operator decision. Keyed by investigationId; the + // `orchestrator_decision` handler resolves the matching entry, and WS close + // resolves any survivors with `escalate` so a disconnect never strands the + // loop (which would otherwise sit blocked until its wall-clock guard trips). + const pendingPauses: Map = new Map(); + + // Per-connection active orchestrator runs, keyed by investigationId. Gives a + // concurrency guard (one run per investigation per connection — no pile-on + // from a double-click) and an abort handle: WS close aborts every in-flight + // run so the loop stops cooperatively instead of running on headless. + const activeOrchestrations: Map = new Map(); + const send = (m: ServerMessage) => { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify(m)); @@ -400,6 +426,8 @@ export function setupWebSocket(server: Server, deps: WsDeps): void { () => { pendingDiscovery = null; }, pendingDispatches, activeDiscovery, + pendingPauses, + activeOrchestrations, ); } catch (err) { if (err instanceof LlmUnavailableError) { @@ -422,6 +450,19 @@ export function setupWebSocket(server: Server, deps: WsDeps): void { controller.abort(); } pendingDispatches.clear(); + // Resolve any paused orchestrator runs so the loop unblocks and stops + // cleanly (escalate) instead of sitting until its wall-clock guard trips. + for (const pause of pendingPauses.values()) { + clearTimeout(pause.timer); + pause.resolve("escalate"); + } + pendingPauses.clear(); + // Abort in-flight orchestrator runs so the loop stops at its next move + // instead of running on headless after the operator has gone. + for (const controller of activeOrchestrations.values()) { + controller.abort(); + } + activeOrchestrations.clear(); activeDiscovery.current?.abort(new Error("WebSocket disconnected")); activeDiscovery.current = null; wsRateLimiter.destroy(threadId); @@ -474,6 +515,164 @@ async function handleDeepModeInvestigate( await runDeepModeStreamed(msg.investigationId, report, agents.deepModeReexamine, db, send); } +/** + * Autonomous orchestrator (Approach D): run the unbounded read-only move-loop + * seeded from a completed investigation's context, streaming each move to the + * agent-stream UI. Gated behind config.agent.orchestratorEnabled — the trigger + * is hidden client-side; this rejects any direct message when disabled. + */ +async function handleOrchestratorInvestigate( + msg: { type: "orchestrator_investigate"; investigationId: string }, + send: (m: ServerMessage) => void, + deps: WsDeps, + stackId: string, + ctx: StackContext, + pendingPauses: Map, + activeOrchestrations: Map, +): Promise { + const { db } = deps; + if (!deps.config.agent?.orchestratorEnabled) { + send({ type: "orchestrator:error", investigationId: msg.investigationId, message: "Autonomous orchestrator is not enabled." }); + return; + } + // Concurrency guard: one orchestrator run per investigation per connection. + // A double-clicked trigger shouldn't spawn a second parallel run (each spawns + // its own subagents — real LLM/MCP load). + if (activeOrchestrations.has(msg.investigationId)) { + send({ type: "orchestrator:error", investigationId: msg.investigationId, message: "An autonomous investigation is already running for this report." }); + return; + } + const investigation = db.getInvestigation(stackId, msg.investigationId); + if (!investigation) { + send({ type: "orchestrator:error", investigationId: msg.investigationId, message: "Investigation not found." }); + return; + } + // The orchestrator is seeded from a *completed* investigation's context + // (focus + report time window). The UI only surfaces the trigger after + // completion, but a direct WS message could otherwise launch a costly + // autonomous run against a still-running, failed, or report-less row. + // Reject those here, mirroring the deep-mode handler. + if (investigation.status !== "complete" || !investigation.report) { + send({ type: "orchestrator:error", investigationId: msg.investigationId, message: "The orchestrator needs a completed investigation with a report." }); + return; + } + // Seed the orchestrator from the investigation's context: the original ask + // as the focus, and the report's time window so re-queries stay in range. + let report: RcaReport | undefined; + try { + report = investigation.report ? (JSON.parse(investigation.report) as RcaReport) : undefined; + } catch { + report = undefined; + } + const focus = investigation.query?.trim() || report?.summary || `investigate ${investigation.service}`; + const timeRange = report?.timeRange; + + // Resolve the incident service's dependency-graph neighbors (both directions) + // so the agent can follow-cause into them. Empty when there's no usable graph + // (follow-cause then disables gracefully). Mirrors GET /api/dependencies/:service. + const allServices = ctx.slug === DEFAULT_STACK_SLUG + ? [...deps.config.services, ...ctx.serviceRegistry.load().filter((s) => !deps.config.services.some((c) => c.name === s.name))] + : ctx.serviceRegistry.load(); + const neighbors = new Set(); + try { + for (const edge of inferDependencyGraph(allServices).edges) { + if (edge.source === investigation.service) neighbors.add(edge.target); + if (edge.target === investigation.service) neighbors.add(edge.source); + } + } catch { /* no graph → empty neighbors → follow-cause disabled */ } + neighbors.delete(investigation.service); + const dependencies = [...neighbors]; + + const agents = await getOrCreateAgents(stackId, ctx, deps.config, deps.db); + const abort = new AbortController(); + activeOrchestrations.set(msg.investigationId, abort); + try { + await runOrchestratorStreamed( + msg.investigationId, + focus, + { timeRange, ctx: { incidentTime: timeRange?.from }, dependencies, incidentService: investigation.service, signal: abort.signal }, + agents.orchestrate, + send, + pendingPauses, + ); + } finally { + activeOrchestrations.delete(msg.investigationId); + } +} + +async function runOrchestratorStreamed( + investigationId: string, + focus: string, + opts: { timeRange?: { from: string; to: string }; ctx?: { incidentTime?: string }; dependencies?: string[]; incidentService?: string; signal?: AbortSignal }, + orchestrate: StackAgents["orchestrate"], + send: (m: ServerMessage) => void, + pendingPauses: Map, +): Promise { + send({ type: "orchestrator:started", investigationId }); + const startMs = Date.now(); + let seq = 0; + try { + const result = await orchestrate(focus, { + timeRange: opts.timeRange, + ctx: opts.ctx, + dependencies: opts.dependencies, + incidentService: opts.incidentService, + signal: opts.signal, + onStep: (ev) => send({ type: "orchestrator:step", investigationId, event: { ...ev, seq: seq++ } }), + // Interactive strike-limit pause (increment 5): emit the prompt and block + // the loop on the operator's reply. Resolved by the `orchestrator_decision` + // handler, the timeout below, or WS close (all via `pendingPauses`). + onOperatorPause: (state: OrchestratorState) => { + send({ + type: "orchestrator:operator_pause", + investigationId, + strikes: state.strikes, + hypothesesTried: state.hypotheses.map((hyp) => hyp.hypothesis.hypothesis), + }); + return new Promise((resolve) => { + const timer = setTimeout(() => { + pendingPauses.delete(investigationId); + resolve("escalate"); + }, OPERATOR_PAUSE_TIMEOUT_MS); + pendingPauses.set(investigationId, { resolve, timer }); + }); + }, + }); + send({ + type: "orchestrator:complete", + investigationId, + outcome: result.outcome, + causalChain: assembleCausalChain(result.trace, result.confirmed, opts.incidentService ?? "", result.evidence), + traceSummary: traceSummary(result.stats, result.outcome), + stats: { + moves: result.stats.moves, + toolCalls: result.stats.toolCalls, + subagents: result.stats.subagents, + tokensSpent: result.stats.tokensSpent, + strikes: result.stats.strikes, + depth: result.stats.depth, + durationMs: Date.now() - startMs, + }, + }); + } catch (err) { + const message = + err instanceof LlmUnavailableError + ? "The model is unavailable right now — try again shortly." + : "The orchestrator hit an error."; + logger.error({ err, investigationId }, "Orchestrator run failed"); + send({ type: "orchestrator:error", investigationId, message }); + } finally { + // Defensive: a finished run must never leave a pause entry behind (e.g. if + // a future code path threw mid-pause), or a stale `orchestrator_decision` + // could resolve a dead promise. + const stale = pendingPauses.get(investigationId); + if (stale) { + clearTimeout(stale.timer); + pendingPauses.delete(investigationId); + } + } +} + /** * Run deep mode for an already-loaded report, streaming progress to the Console * and persisting the result. Shared by the on-demand trigger (above) and the @@ -796,6 +995,8 @@ export async function handleClientMessage( clearPendingDiscovery: () => void, pendingDispatches: Map = new Map(), activeDiscovery: { current: AbortController | null } = { current: null }, + pendingPauses: Map = new Map(), + activeOrchestrations: Map = new Map(), ): Promise { const memory = ctx.conversationMemory; @@ -828,6 +1029,24 @@ export async function handleClientMessage( return; } + if (msg.type === "orchestrator_investigate") { + await handleOrchestratorInvestigate(msg, send, deps, stackId, ctx, pendingPauses, activeOrchestrations); + return; + } + + // Operator's reply to an `orchestrator:operator_pause`. Resolve the matching + // paused run; unknown ids are silently ignored (already resumed, timed out, + // or never paused — a stale client can't wedge anything). + if (msg.type === "orchestrator_decision") { + const pause = pendingPauses.get(msg.investigationId); + if (pause) { + clearTimeout(pause.timer); + pendingPauses.delete(msg.investigationId); + pause.resolve(msg.decision); + } + return; + } + if (msg.type === "rerun") { await handleRerun(msg, send, deps, threadId, stackId, ctx); return; diff --git a/src/types/ws-types.ts b/src/types/ws-types.ts index 643bb6c3..965b9c42 100644 --- a/src/types/ws-types.ts +++ b/src/types/ws-types.ts @@ -14,6 +14,14 @@ export type ClientMessage = // hypotheses with deeper read-only re-queries. Distinct from `deep_investigate` // above, which is free-text follow-up chat about an investigation. | { type: "deep_mode_investigate"; investigationId: string } + // Autonomous orchestrator (Approach D): run the unbounded read-only move-loop + // seeded from a completed investigation's context. Heavier opt-in than deep + // mode; gated behind config.agent.orchestratorEnabled. + | { type: "orchestrator_investigate"; investigationId: string } + // Operator's reply to an `orchestrator:operator_pause` prompt (increment 5): + // "continue" resets strikes and resumes the move-loop; "escalate"/"wait" stop + // it with that disposition. Matched to the paused run by investigationId. + | { type: "orchestrator_decision"; investigationId: string; decision: "continue" | "escalate" | "wait" } | { type: "rerun"; investigationId: string; template?: "quick" | "standard" | "full" } | { type: "new_session" } | { type: "discover" } @@ -74,6 +82,29 @@ export type AgentStreamStats = { durationMs: number; }; +/** Footer stats for a completed autonomous-orchestrator run. */ +export type OrchestratorStreamStats = { + moves: number; + toolCalls: number; + subagents: number; + tokensSpent: number; + strikes: number; + depth: number; + durationMs: number; +}; + +/** One link in a completed run's causal chain (increment 6, source attribution). + * Ordered cause→effect: the incident service, each dependency the agent followed + * into, then the confirmed root cause. `evidence` is the observation/finding that + * supports this link (absent for the incident anchor). */ +export type CausalChainLink = { + /** Display label: a service name, or "root cause: ". */ + label: string; + kind: "incident" | "followed" | "root-cause"; + /** Short attribution — the finding/observation this link rests on. */ + evidence?: string; +}; + export type ServerMessage = | { type: "chat"; role: "user" | "assistant" | "system"; content: string; investigationId?: string; report?: unknown; chartData?: ChartSeries[] } | { type: "chat:tool_call"; tool: string; status: "calling" | "complete" } @@ -99,6 +130,14 @@ export type ServerMessage = | { type: "deep_mode:step"; investigationId: string; event: AgentStreamEvent } | { type: "deep_mode:complete"; investigationId: string; report: unknown; stats?: AgentStreamStats } | { type: "deep_mode:error"; investigationId: string; message: string } + | { type: "orchestrator:started"; investigationId: string } + | { type: "orchestrator:step"; investigationId: string; event: AgentStreamEvent } + | { type: "orchestrator:complete"; investigationId: string; outcome: string; stats?: OrchestratorStreamStats; causalChain?: CausalChainLink[]; traceSummary?: string } + // Strike limit reached (increment 5): the loop is awaiting an operator + // decision (continue / escalate / instrument-&-wait). The client renders the + // pause card and replies with `orchestrator_decision`. + | { type: "orchestrator:operator_pause"; investigationId: string; strikes: number; hypothesesTried?: string[] } + | { type: "orchestrator:error"; investigationId: string; message: string } | { type: "session_cleared" } | { type: "context_switch"; previousService: string; newService: string } | { type: "services:health"; data: unknown[] } diff --git a/src/web/App.tsx b/src/web/App.tsx index 9701b0a2..0765d242 100644 --- a/src/web/App.tsx +++ b/src/web/App.tsx @@ -599,6 +599,12 @@ export function App() { onDeepMode={(invId) => { ws.send({ type: "deep_mode_investigate", investigationId: invId }); }} + onOrchestrate={(invId) => { + ws.send({ type: "orchestrator_investigate", investigationId: invId }); + }} + onOrchestratorDecision={(invId, decision) => { + ws.send({ type: "orchestrator_decision", investigationId: invId, decision }); + }} onWrongStack={handleWrongStack} /> ) : ( diff --git a/src/web/components/AgentStream.tsx b/src/web/components/AgentStream.tsx new file mode 100644 index 00000000..f554e532 --- /dev/null +++ b/src/web/components/AgentStream.tsx @@ -0,0 +1,147 @@ +import { useEffect, useState } from "react"; +import type { AgentStreamEvent } from "../../types/ws-types.js"; + +/** Ticking "Ns" since the run went live — proves the agent is alive between + * steps (decideMove thinking, a long query, a running subagent), so the stream + * never looks hung. Resets each time `running` flips true. */ +function useElapsedSeconds(running: boolean): number { + const [elapsed, setElapsed] = useState(0); + useEffect(() => { + if (!running) { + setElapsed(0); + return; + } + const start = Date.now(); + setElapsed(0); + const id = setInterval(() => setElapsed(Math.floor((Date.now() - start) / 1000)), 1000); + return () => clearInterval(id); + }, [running]); + return elapsed; +} + +/** + * Shared structured agent stream — colored verbs, query targets in info-blue, + * status icons, indented sub-steps with a left rail, and a generic footer. + * Always expanded (unlike the chat "thinking" block). Used by both the + * deep-mode stream and the autonomous orchestrator; each supplies its own + * header label and footer items. + */ + +export type AgentStreamFooterItem = { + label: string; + value: string | number; + tone?: "default" | "warn" | "good"; +}; + +/** Terminal outcome callout shown once the run finishes (above the footer). */ +export type AgentStreamBanner = { + text: string; + tone: "good" | "warn" | "muted"; +}; + +const STATUS_ICON: Record = { + running: "◉", + done: "✓", + rejected: "✗", + strong: "✓", +}; + +// Icon color by status; verbs reuse the same intent so the eye groups by outcome. +function statusClass(status: AgentStreamEvent["status"]): string { + switch (status) { + case "running": return "text-primary"; + case "strong": return "text-success"; + case "rejected": return "text-destructive"; + default: return "text-muted-foreground"; + } +} +function verbClass(status: AgentStreamEvent["status"]): string { + switch (status) { + case "strong": return "text-success"; + case "rejected": return "text-destructive"; + default: return "text-accent/90"; // coral for actions, matching the design + } +} +function toneClass(tone: AgentStreamFooterItem["tone"]): string { + switch (tone) { + case "warn": return "text-warning"; + case "good": return "text-success"; + default: return "text-foreground/80"; + } +} + +const BANNER_CLASS: Record = { + good: "bg-success/8 border-success/20 text-success/90", + warn: "bg-warning/8 border-warning/25 text-warning/90", + muted: "bg-muted/40 border-border/60 text-muted-foreground", +}; + +export function AgentStream({ + label, + events, + footer, + banner, + running, +}: { + label: string; + events: AgentStreamEvent[]; + footer?: AgentStreamFooterItem[]; + banner?: AgentStreamBanner; + running: boolean; +}) { + const elapsed = useElapsedSeconds(running); + if (events.length === 0 && !running) return null; + return ( +
+
+ {running && } + + {label}{running ? ` · live · ${elapsed}s` : ""} + +
+ +
+ {events.map((e) => ( +
+ + {STATUS_ICON[e.status]} + + + {e.verb} + {e.target && ( + {e.target} + )} + {e.detail && {e.detail}} + +
+ ))} +
+ + {running && ( +
+ + working… {elapsed}s +
+ )} + + {banner && !running && ( +
+ {banner.text} +
+ )} + + {footer && footer.length > 0 && ( +
+ {footer.map((f) => ( + + {f.label} {f.value} + + ))} +
+ )} +
+ ); +} diff --git a/src/web/components/DeepModeStream.tsx b/src/web/components/DeepModeStream.tsx index 91d3d17b..697ad1ba 100644 --- a/src/web/components/DeepModeStream.tsx +++ b/src/web/components/DeepModeStream.tsx @@ -1,36 +1,11 @@ +import { AgentStream, type AgentStreamFooterItem } from "./AgentStream.js"; import type { AgentStreamEvent, AgentStreamStats } from "../../types/ws-types.js"; /** - * Dedicated, structured agent stream for deep mode (Step 3) — colored verbs, - * query targets in info-blue, status icons, indented sub-steps with a left rail, - * and a footer of run stats. Always expanded (unlike the chat "thinking" block). - * Matches the "LIVE · AGENT STREAM" design. + * Deep mode (Step 3) stream — a thin wrapper over the shared AgentStream that + * supplies the deep-mode label and footer. Rendering lives in AgentStream so the + * orchestrator reuses the exact same look. */ - -const STATUS_ICON: Record = { - running: "◉", - done: "✓", - rejected: "✗", - strong: "✓", -}; - -// Icon color by status; verbs reuse the same intent so the eye groups by outcome. -function statusClass(status: AgentStreamEvent["status"]): string { - switch (status) { - case "running": return "text-primary"; - case "strong": return "text-success"; - case "rejected": return "text-destructive"; - default: return "text-muted-foreground"; - } -} -function verbClass(status: AgentStreamEvent["status"]): string { - switch (status) { - case "strong": return "text-success"; - case "rejected": return "text-destructive"; - default: return "text-accent/90"; // coral for actions, matching the design - } -} - export function DeepModeStream({ events, stats, @@ -40,45 +15,14 @@ export function DeepModeStream({ stats?: AgentStreamStats; running: boolean; }) { - if (events.length === 0 && !running) return null; - return ( -
-
- {running && } - - Deep Mode · second look{running ? " · live" : ""} - -
- -
- {events.map((e) => ( -
- - {STATUS_ICON[e.status]} - - - {e.verb} - {e.target && ( - {e.target} - )} - {e.detail && {e.detail}} - -
- ))} -
- - {stats && ( -
- re-checked {stats.examined} - checks {stats.toolCalls} - {stats.resurrected > 0 && brought back {stats.resurrected}} - {stats.shaken > 0 && weakened {stats.shaken}} - took {(stats.durationMs / 1000).toFixed(1)}s -
- )} -
- ); + const footer: AgentStreamFooterItem[] | undefined = stats + ? [ + { label: "re-checked", value: stats.examined }, + { label: "checks", value: stats.toolCalls }, + ...(stats.resurrected > 0 ? [{ label: "brought back", value: stats.resurrected, tone: "warn" as const }] : []), + ...(stats.shaken > 0 ? [{ label: "weakened", value: stats.shaken, tone: "warn" as const }] : []), + { label: "took", value: `${(stats.durationMs / 1000).toFixed(1)}s` }, + ] + : undefined; + return ; } diff --git a/src/web/components/InvestigationPane.tsx b/src/web/components/InvestigationPane.tsx index 2e099865..8701acc3 100644 --- a/src/web/components/InvestigationPane.tsx +++ b/src/web/components/InvestigationPane.tsx @@ -8,17 +8,18 @@ import { DropdownMenuSeparator, DropdownMenuTrigger, } from "@/components/ui/dropdown-menu"; -import { ArrowLeft, FilePlus, RotateCw, ChevronDown, Download, Link2, FileText, Image as ImageIcon, ClipboardCopy, Check, Telescope } from "lucide-react"; +import { ArrowLeft, FilePlus, RotateCw, ChevronDown, Download, Link2, FileText, Image as ImageIcon, ClipboardCopy, Check, Telescope, Compass } from "lucide-react"; import { PhaseStepper, type PhaseState } from "./PhaseStepper"; import { EvidenceTimeline } from "./EvidenceTimeline"; import { RcaReport } from "./RcaReport"; import { DeepModeStream } from "./DeepModeStream"; +import { OrchestratorStream, type OrchestratorPause, type OrchestratorDisposition } from "./OrchestratorStream"; import { InvestigationFeedback } from "./InvestigationFeedback"; import { useStackContext } from "../contexts/StackContext"; import { useUnreadInvestigations } from "../hooks/useUnreadInvestigations"; import type { TimelineEvent } from "./ActivityTimeline"; import type { TimeSeriesData } from "./MetricChart"; -import type { ServerMessage, AgentStreamEvent, AgentStreamStats } from "../../types/ws-types.js"; +import type { ServerMessage, AgentStreamEvent, AgentStreamStats, OrchestratorStreamStats, CausalChainLink } from "../../types/ws-types.js"; import type { RcaReport as RcaReportType } from "../../types/rca-types.js"; import { formatTokens } from "../lib/formatTokens.js"; import { buildPhaseActions } from "../lib/grafana-links.js"; @@ -140,6 +141,8 @@ export function InvestigationPane({ onNavigateSkills, onRerun, onDeepMode, + onOrchestrate, + onOrchestratorDecision, onWrongStack, }: { investigationId: string; @@ -150,6 +153,13 @@ export function InvestigationPane({ /** Trigger deep mode (Step 3): skeptical re-examination of the loop's * ruled-out causes. Parent wires it to the deep_mode_investigate WS message. */ onDeepMode?: (investigationId: string) => void; + /** Trigger the autonomous orchestrator (Approach D): the unbounded read-only + * move-loop that investigates for the real cause. Parent wires it to the + * orchestrator_investigate WS message. */ + onOrchestrate?: (investigationId: string) => void; + /** Send the operator's strike-limit decision (increment 5) back over the WS. + * Parent wires it to the orchestrator_decision message. */ + onOrchestratorDecision?: (investigationId: string, decision: "continue" | "escalate" | "wait") => void; /** Called when the investigation 404s in the active stack but the locate * endpoint reports it lives in a different stack. The parent should * switchStack + navigate to the correct stack-scoped URL — keeps @@ -166,6 +176,15 @@ export function InvestigationPane({ const [deepModeError, setDeepModeError] = useState(null); const [deepSteps, setDeepSteps] = useState([]); const [deepStats, setDeepStats] = useState(undefined); + const [orchRunning, setOrchRunning] = useState(false); + const [orchError, setOrchError] = useState(null); + const [orchSteps, setOrchSteps] = useState([]); + const [orchStats, setOrchStats] = useState(undefined); + const [orchOutcome, setOrchOutcome] = useState(undefined); + const [orchChain, setOrchChain] = useState(undefined); + const [orchTraceSummary, setOrchTraceSummary] = useState(undefined); + const [orchPause, setOrchPause] = useState(null); + const [orchDisposition, setOrchDisposition] = useState(undefined); const [service, setService] = useState(""); const [query, setQuery] = useState(""); /** Set when the REST fetch comes back 404. Visiting an investigation URL @@ -491,6 +510,39 @@ export function InvestigationPane({ setDeepModeRunning(false); if (typeof msg.message === "string") setDeepModeError(msg.message); } + // Autonomous orchestrator (Approach D) — the unbounded move-loop. + if (msg.type === "orchestrator:started" && msg.investigationId === investigationId) { + setOrchRunning(true); + setOrchError(null); + setOrchSteps([]); + setOrchStats(undefined); + setOrchOutcome(undefined); + setOrchChain(undefined); + setOrchTraceSummary(undefined); + setOrchPause(null); + setOrchDisposition(undefined); + } + if (msg.type === "orchestrator:step" && msg.investigationId === investigationId) { + setOrchSteps((prev) => [...prev, msg.event]); + // A new move means the loop resumed past any pause — clear the card. + setOrchPause(null); + } + if (msg.type === "orchestrator:operator_pause" && msg.investigationId === investigationId) { + setOrchPause({ strikes: msg.strikes, hypothesesTried: msg.hypothesesTried }); + } + if (msg.type === "orchestrator:complete" && msg.investigationId === investigationId) { + setOrchRunning(false); + setOrchPause(null); + setOrchStats(msg.stats); + setOrchOutcome(msg.outcome); + setOrchChain(msg.causalChain); + setOrchTraceSummary(msg.traceSummary); + } + if (msg.type === "orchestrator:error" && msg.investigationId === investigationId) { + setOrchRunning(false); + setOrchPause(null); + if (typeof msg.message === "string") setOrchError(msg.message); + } } }, [wsMessages, investigationId]); @@ -660,11 +712,33 @@ export function InvestigationPane({ ); })()} + {/* Autonomous orchestrator (Approach D): hidden until validated. Gated + behind config.agent.orchestratorEnabled (server injects + window.__ORCHESTRATOR_ENABLED__). Unlike deep mode it investigates + for the real cause, so it doesn't require a prior loop outcome. */} + {isComplete && onOrchestrate && (() => { + if (typeof window !== "undefined" && !window.__ORCHESTRATOR_ENABLED__) return null; + return ( + + ); + })()} {deepModeError && (
{deepModeError}
)} + {orchError && ( +
{orchError}
+ )} {/* Progress bar — visible while running */} {isRunning && ( @@ -760,6 +834,21 @@ export function InvestigationPane({ {/* Deep mode (Step 3) — dedicated structured agent stream (live + final). */} + { + if (decision === "escalate" || decision === "wait") setOrchDisposition(decision); + setOrchPause(null); + onOrchestratorDecision?.(investigationId, decision); + }} + /> {investigationStatus === "failed" && !report ? (
diff --git a/src/web/components/OrchestratorStream.tsx b/src/web/components/OrchestratorStream.tsx new file mode 100644 index 00000000..ae23dde0 --- /dev/null +++ b/src/web/components/OrchestratorStream.tsx @@ -0,0 +1,161 @@ +import { AgentStream, type AgentStreamFooterItem, type AgentStreamBanner } from "./AgentStream.js"; +import type { AgentStreamEvent, OrchestratorStreamStats, CausalChainLink } from "../../types/ws-types.js"; + +/** + * Autonomous orchestrator (Approach D) stream — the same AgentStream rendering + * as deep mode, with the orchestrator's footer (moves / queries / subagents / + * depth / strikes / tokens / elapsed), a terminal outcome banner, the causal + * chain (with source attribution), a one-line trace summary, and the + * interactive operator-pause card (continue / escalate / instrument-&-wait) + * shown when the loop hits its strike limit and is awaiting a human call. + */ + +/** Operator's pending decision at a strike-limit pause (increment 5). */ +export type OrchestratorPause = { strikes: number; hypothesesTried?: string[] }; +/** The disposition the operator chose at a pause, once stopped (escalate/wait). */ +export type OrchestratorDisposition = "escalate" | "wait"; + +function outcomeBanner(outcome: string | undefined, disposition?: OrchestratorDisposition): AgentStreamBanner | undefined { + // An explicit operator decision overrides the generic pause copy so the + // banner reflects what the human actually chose. Neither escalate nor wait + // has a backend in v1 (no on-call page / scheduler) — they record intent. + if (outcome === "operator-pause" && disposition === "escalate") { + return { text: "Escalated to on-call. (Recorded only — no paging integration yet.)", tone: "warn" }; + } + if (outcome === "operator-pause" && disposition === "wait") { + return { text: "Marked to instrument & revisit. (Recorded only — no scheduler yet.)", tone: "muted" }; + } + switch (outcome) { + case "confirmed": + return { text: "Confirmed a root cause — see the conclusion above.", tone: "good" }; + case "operator-pause": + return { + text: "Paused — ruled out every hypothesis it tried without finding the cause. The signal is ambiguous; this one needs a human call.", + tone: "warn", + }; + case "budget-exhausted": + return { text: "Stopped — hit the token budget before confirming a cause.", tone: "warn" }; + case "tool-cap": + return { text: "Stopped — hit the query limit before confirming a cause.", tone: "warn" }; + case "wall-clock": + return { text: "Stopped — hit the time limit before confirming a cause.", tone: "warn" }; + case "exhausted": + return { text: "Stopped — the agent ran out of moves without confirming a cause.", tone: "muted" }; + case "inconclusive": + return { text: "Stopped — no further progress; inconclusive.", tone: "muted" }; + case "aborted": + return { text: "Stopped — the run was cancelled.", tone: "muted" }; + default: + return undefined; + } +} + +/** The strike-limit pause card: three explicit dispositions, no silent guess. */ +function OperatorPauseCard({ pause, onDecision }: { pause: OrchestratorPause; onDecision: (d: "continue" | OrchestratorDisposition) => void }) { + return ( +
+
+ + Paused — {pause.strikes} {pause.strikes === 1 ? "hypothesis" : "hypotheses"} failed +
+

+ The signal is ambiguous: every candidate cause tested was ruled out, and no discriminating evidence emerged. + Rather than guess, the orchestrator stops and asks you. +

+
+ + + +
+
+ ); +} + +export function OrchestratorStream({ + events, + stats, + outcome, + causalChain, + traceSummary, + running, + pause, + disposition, + onDecision, +}: { + events: AgentStreamEvent[]; + stats?: OrchestratorStreamStats; + outcome?: string; + causalChain?: CausalChainLink[]; + traceSummary?: string; + running: boolean; + /** Set while a run is blocked at the strike limit awaiting an operator call. */ + pause?: OrchestratorPause | null; + /** The disposition chosen at the last pause (escalate/wait), for the banner. */ + disposition?: OrchestratorDisposition; + onDecision?: (decision: "continue" | OrchestratorDisposition) => void; +}) { + const footer: AgentStreamFooterItem[] | undefined = stats + ? [ + { label: "moves", value: stats.moves }, + { label: "queries", value: stats.toolCalls }, + ...(stats.subagents > 0 ? [{ label: "subagents", value: stats.subagents }] : []), + { label: "depth", value: stats.depth }, + { label: "strikes", value: stats.strikes, tone: stats.strikes > 0 ? "warn" : "default" }, + { label: "tokens", value: stats.tokensSpent }, + { label: "took", value: `${(stats.durationMs / 1000).toFixed(1)}s` }, + ] + : undefined; + return ( + <> + + {running && pause && onDecision && } + {!running && causalChain && causalChain.length > 1 && ( +
+
Causal chain
+ {/* Vertical cause→effect stack: each link on its own row with a subtle + connector, evidence indented beneath. Reads cleanly even when every + link carries a (multi-word) attribution line — unlike inline arrows, + which go ragged once the evidence sublines wrap. */} +
+ {causalChain.map((link, i) => ( +
+ {i > 0 && } + {link.label} + {link.evidence && ( + {link.evidence} + )} +
+ ))} +
+
+ )} + {!running && traceSummary && ( +
{traceSummary}
+ )} + + ); +} diff --git a/src/web/lib/createStackFetch.ts b/src/web/lib/createStackFetch.ts index 5f8fabcf..4abb7992 100644 --- a/src/web/lib/createStackFetch.ts +++ b/src/web/lib/createStackFetch.ts @@ -8,6 +8,9 @@ declare global { * Gates the "Deep investigate" button — hidden from users until the * Autonomous Orchestrator ships. */ __DEEP_MODE_ENABLED__?: boolean; + /** Server-injected (index.ts) when config.agent.orchestratorEnabled is true. + * Gates the "Investigate autonomously" trigger — hidden until validated. */ + __ORCHESTRATOR_ENABLED__?: boolean; } } diff --git a/src/workflows/investigation.test.ts b/src/workflows/investigation.test.ts index d335bd5d..dac0c64c 100644 --- a/src/workflows/investigation.test.ts +++ b/src/workflows/investigation.test.ts @@ -454,3 +454,53 @@ describe("synthesis step degradation and defaults", () => { expect(retryResult.confidenceScore).toBe(0.95); }); }); + +// ── Quick-template wiring regression ────────────────────────────────────────── +// The synthesis step's input schema requires a `metrics-evidence` KEY (the shape +// `.parallel([...])` produces, keyed by step id). The quick template used to chain +// `.then(metricsStep).then(synthesisStep)`, which hands synthesis the metrics +// step's RAW output (no `metrics-evidence` key) → "Step input validation failed: +// metrics-evidence: Required" → every quick run degraded to an empty report. +// This run-to-success test fails on the old wiring and passes once quick feeds +// synthesis the keyed shape (caught the bug that silently degraded every +// orchestrator subagent, which uses the quick template). +describe("quick template wiring (regression)", () => { + it("quick workflow runs to success — synthesis receives the metrics-evidence key", async () => { + const { createSynthesisAgent } = await import("../agents/synthesis.js"); + vi.mocked(createSynthesisAgent).mockReturnValue({ + generate: vi.fn().mockResolvedValue({ + text: JSON.stringify({ + severity: "high", + summary: "quick synthesis ran", + rootCause: "quick-template-root-cause", + trigger: "t", + confidence: "high", + confidenceScore: 0.8, + }), + }), + } as any); + + // Throwing model + no providers → prefetch/anomaly/planning/metrics all + // degrade gracefully (covered above); synthesis uses the mocked agent. So the + // ONLY thing that can fail the run is the metrics→synthesis input wiring. + const throwingModel = { + doGenerate: vi.fn().mockRejectedValue(new Error("LLM unavailable")), + doStream: vi.fn().mockRejectedValue(new Error("LLM unavailable")), + specificationVersion: "v1" as const, + provider: "test", + modelId: "test-model", + } as unknown as LanguageModel; + + const workflow = createInvestigationWorkflow( + { model: throwingModel, providers: [], services: [noopService] }, + "quick", + ); + const run = await workflow.createRun(); + const runResult = await run.start({ + inputData: { userMessage: "investigate test-svc", serviceName: "test-svc" }, + }); + + expect(runResult.status).toBe("success"); + expect((runResult.result as any)?.rootCause).toBe("quick-template-root-cause"); + }); +}); diff --git a/src/workflows/investigation.ts b/src/workflows/investigation.ts index 3ec32385..43b1e5bd 100644 --- a/src/workflows/investigation.ts +++ b/src/workflows/investigation.ts @@ -105,7 +105,13 @@ export function createInvestigationWorkflow(workflowConfig: WorkflowConfig, temp .then(prefetchStep) .then(anomalyStep) .then(planningStep) - .then(metricsStep) as any) + // `.parallel([metricsStep])` (not `.then`) so synthesis receives the + // step-id-keyed `{ "metrics-evidence": … }` shape its input schema + // requires — same contract standard/full produce via their parallel + // evidence block. A plain `.then(metricsStep)` hands synthesis the raw + // EvidenceOutput (no `metrics-evidence` key) → input validation fails and + // every quick run degrades to an empty report. + .parallel([metricsStep]) as any) .then(synthesisStep) .then(postSynthesisStep) .commit(); diff --git a/src/workflows/schemas.ts b/src/workflows/schemas.ts index 881f9b25..bf6fb994 100644 --- a/src/workflows/schemas.ts +++ b/src/workflows/schemas.ts @@ -85,7 +85,7 @@ export const ParallelEvidenceSchema = z.object({ // ── Hypothesis loop (Step 2) — additive, optional. Populated only when the // synthesis loop runs (N>1); the default single-pass path leaves these unset. -const HypothesisPredictionSchema = z.discriminatedUnion("kind", [ +export const HypothesisPredictionSchema = z.discriminatedUnion("kind", [ z.object({ kind: z.literal("metric-threshold"), metric: z.string(), op: z.enum([">", "<", ">=", "<="]), value: z.number() }), z.object({ kind: z.literal("log-pattern"), pattern: z.string(), present: z.boolean().optional() }), z.object({ kind: z.literal("infra-status"), resource: z.string().optional(), status: z.string() }),