diff --git a/frontend/src/lib/api/types/core.ts b/frontend/src/lib/api/types/core.ts index 2cb1cc13..3f7853b9 100644 --- a/frontend/src/lib/api/types/core.ts +++ b/frontend/src/lib/api/types/core.ts @@ -41,6 +41,19 @@ export interface ProjectInfo { session_count: number; } +/** Matches Go ToolResultEvent struct in internal/db/messages.go */ +export interface ToolResultEvent { + tool_use_id?: string; + agent_id?: string; + subagent_session_id?: string; + source: string; + status: string; + content: string; + content_length: number; + timestamp?: string; + event_index: number; +} + /** Matches Go ToolCall struct in internal/db/messages.go */ export interface ToolCall { tool_name: string; @@ -51,6 +64,7 @@ export interface ToolCall { result_content_length?: number; result_content?: string; subagent_session_id?: string; + result_events?: ToolResultEvent[]; } /** Matches Go Message struct in internal/db/messages.go */ diff --git a/frontend/src/lib/components/content/ToolBlock.svelte b/frontend/src/lib/components/content/ToolBlock.svelte index f5c7b048..877816de 100644 --- a/frontend/src/lib/components/content/ToolBlock.svelte +++ b/frontend/src/lib/components/content/ToolBlock.svelte @@ -20,10 +20,13 @@ let { content, label, toolCall, highlightQuery = "", isCurrentHighlight = false }: Props = $props(); let userCollapsed: boolean = $state(true); let userOutputCollapsed: boolean = $state(true); + let userHistoryCollapsed: boolean = $state(true); let userOverride: boolean = $state(false); let userOutputOverride: boolean = $state(false); + let userHistoryOverride: boolean = $state(false); let searchExpandedInput: boolean = $state(false); let searchExpandedOutput: boolean = $state(false); + let searchExpandedHistory: boolean = $state(false); let prevQuery: string = ""; // Auto-expand when a search match exists in input or output @@ -41,14 +44,19 @@ const inputText = ( taskPrompt ?? content ?? fallbackContent ?? "" ).toLowerCase(); + const historyText = ( + toolCall?.result_events?.map((event) => event.content).join("\n\n") ?? "" + ).toLowerCase(); const outputText = ( - toolCall?.result_content ?? "" + [toolCall?.result_content ?? "", historyText].filter(Boolean).join("\n\n") ).toLowerCase(); searchExpandedInput = inputText.includes(q); searchExpandedOutput = outputText.includes(q); + searchExpandedHistory = historyText.includes(q); if (hq !== prevQuery) { userOverride = false; userOutputOverride = false; + userHistoryOverride = false; prevQuery = hq; } }); @@ -63,6 +71,11 @@ : searchExpandedOutput ? false : userOutputCollapsed, ); + let historyCollapsed = $derived( + userHistoryOverride ? userHistoryCollapsed + : searchExpandedHistory ? false + : userHistoryCollapsed, + ); let outputPreviewLine = $derived.by(() => { const rc = toolCall?.result_content; @@ -71,6 +84,14 @@ return (nl === -1 ? rc : rc.slice(0, nl)).slice(0, 100); }); + let resultEvents = $derived(toolCall?.result_events ?? []); + + let historyPreviewLine = $derived.by(() => { + const last = resultEvents[resultEvents.length - 1]; + if (!last) return ""; + return `${last.status}: ${last.content.split("\n")[0]}`.slice(0, 100); + }); + /** Parsed input parameters from structured tool call data */ let inputParams = $derived.by(() => { if (!toolCall?.input_json) return null; @@ -247,6 +268,51 @@
{@html escapeHTML(toolCall.result_content)}
{/if} {/if} + {#if resultEvents.length > 0} + + {#if !historyCollapsed} +
+ {#each resultEvents as event (event.event_index)} +
+
+ + status: + {event.status} + + + source: + {event.source} + + {#if event.agent_id} + + agent: + {event.agent_id} + + {/if} +
+
{@html escapeHTML(event.content)}
+
+ {/each} +
+ {/if} + {/if} {/if} {#if subagentSessionId} @@ -364,6 +430,26 @@ color: var(--text-primary); } + .history-header { + display: flex; + align-items: center; + gap: 6px; + padding: 5px 10px; + width: 100%; + text-align: left; + font-size: 12px; + color: var(--text-secondary); + min-width: 0; + border-top: 1px solid var(--border-muted); + transition: background 0.1s; + user-select: text; + } + + .history-header:hover { + background: var(--bg-surface-hover); + color: var(--text-primary); + } + .output-label { font-family: var(--font-mono); font-weight: 500; @@ -377,4 +463,24 @@ max-height: 300px; overflow-y: auto; } + + .result-history { + border-top: 1px solid var(--border-muted); + } + + .result-event + .result-event { + border-top: 1px solid var(--border-muted); + } + + .result-event-meta { + display: flex; + flex-wrap: wrap; + gap: 6px; + padding: 6px 14px 0; + } + + .history-content { + border-top: 0; + margin-top: 0; + } diff --git a/frontend/src/lib/components/content/ToolBlock.test.ts b/frontend/src/lib/components/content/ToolBlock.test.ts index 6c677209..c9a1ee84 100644 --- a/frontend/src/lib/components/content/ToolBlock.test.ts +++ b/frontend/src/lib/components/content/ToolBlock.test.ts @@ -133,6 +133,77 @@ describe("ToolBlock output section", () => { expect(preview).not.toBeNull(); expect(preview!.textContent).toBe("first line"); }); + + it("renders history after expanding the tool block when result_events are set", async () => { + const toolCall: ToolCall = { + tool_name: "wait", + category: "Other", + result_content: "latest summary", + result_events: [ + { + source: "wait_output", + status: "completed", + content: "Finished successfully", + content_length: 21, + agent_id: "agent-1", + event_index: 0, + }, + ], + }; + component = mount(ToolBlock, { + target: document.body, + props: { content: "some input", toolCall }, + }); + await tick(); + + expect(document.querySelector(".history-header")).toBeNull(); + + document.querySelector(".tool-header")!.click(); + await tick(); + + expect(document.querySelector(".history-header")).not.toBeNull(); + }); + + it("expands event history and shows chronological event content", async () => { + const toolCall: ToolCall = { + tool_name: "wait", + category: "Other", + result_content: "agent-a:\nFirst finished\n\nagent-b:\nSecond finished", + result_events: [ + { + source: "wait_output", + status: "completed", + content: "First finished", + content_length: 14, + agent_id: "agent-a", + event_index: 0, + }, + { + source: "subagent_notification", + status: "completed", + content: "Second finished", + content_length: 15, + agent_id: "agent-b", + event_index: 1, + }, + ], + }; + component = mount(ToolBlock, { + target: document.body, + props: { content: "some input", toolCall }, + }); + await tick(); + + document.querySelector(".tool-header")!.click(); + await tick(); + document.querySelector(".history-header")!.click(); + await tick(); + + const historyEntries = Array.from(document.querySelectorAll(".history-content")); + expect(historyEntries).toHaveLength(2); + expect(historyEntries[0].textContent).toBe("First finished"); + expect(historyEntries[1].textContent).toBe("Second finished"); + }); }); describe("ToolBlock fallback content", () => { diff --git a/internal/db/db.go b/internal/db/db.go index 189e4d60..0b07ddb8 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -23,7 +23,7 @@ import ( // formatting changes). Old databases with a lower user_version // trigger a non-destructive re-sync (mtime reset + skip cache // clear) so existing session data is preserved. -const dataVersion = 6 +const dataVersion = 7 //go:embed schema.sql var schemaSQL string diff --git a/internal/db/db_test.go b/internal/db/db_test.go index bbf3b02f..77999329 100644 --- a/internal/db/db_test.go +++ b/internal/db/db_test.go @@ -504,6 +504,123 @@ func TestMigration_ResultContentColumn(t *testing.T) { } } +func TestMigration_ToolResultEventsTable(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.db") + + d, err := Open(path) + requireNoError(t, err, "initial open") + insertSession(t, d, "s1", "proj") + d.Close() + + conn, err := sql.Open("sqlite3", path) + requireNoError(t, err, "raw open") + legacyVersion := dataVersion - 1 + _, err = conn.Exec(fmt.Sprintf(` + DROP TABLE tool_result_events; + PRAGMA user_version = %d; + `, legacyVersion)) + requireNoError(t, err, "drop tool_result_events") + + var count int + err = conn.QueryRow( + `SELECT count(*) FROM sqlite_master + WHERE type = 'table' AND name = 'tool_result_events'`, + ).Scan(&count) + requireNoError(t, err, "verify table removed") + if count != 0 { + t.Fatal("expected tool_result_events table to be absent") + } + requireNoError(t, conn.Close(), "close legacy db") + + d2, err := Open(path) + requireNoError(t, err, "reopen after migration") + defer d2.Close() + + requireSessionExists(t, d2, "s1") + if !d2.NeedsResync() { + t.Fatal("expected NeedsResync()=true after data version bump") + } + + err = d2.getReader().QueryRow( + `SELECT count(*) FROM sqlite_master + WHERE type = 'table' AND name = 'tool_result_events'`, + ).Scan(&count) + requireNoError(t, err, "verify table exists") + if count != 1 { + t.Fatal("expected tool_result_events table after reopen") + } +} + +func TestInsertMessages_PreservesToolResultEvents(t *testing.T) { + d := testDB(t) + insertSession(t, d, "s-events", "proj") + + err := d.InsertMessages([]Message{ + { + SessionID: "s-events", + Ordinal: 0, + Role: "assistant", + Content: "tool use response", + HasToolUse: true, + ToolCalls: []ToolCall{ + { + SessionID: "s-events", + ToolName: "wait", + Category: "Task", + ToolUseID: "call_wait", + ResultContentLength: 9, + ResultContent: "latest one", + ResultEvents: []ToolResultEvent{ + { + ToolUseID: "call_wait", + AgentID: "agent-1", + SubagentSessionID: "codex:agent-1", + Source: "wait_output", + Status: "completed", + Content: "first result", + ContentLength: len("first result"), + Timestamp: "2026-03-27T10:00:00Z", + EventIndex: 0, + }, + { + ToolUseID: "call_wait", + AgentID: "agent-2", + SubagentSessionID: "codex:agent-2", + Source: "subagent_notification", + Status: "errored", + Content: "second result", + ContentLength: len("second result"), + Timestamp: "2026-03-27T10:01:00Z", + EventIndex: 1, + }, + }, + }, + }, + }, + }) + requireNoError(t, err, "InsertMessages") + + msgs, err := d.GetMessages(context.Background(), "s-events", 0, 100, true) + requireNoError(t, err, "GetMessages") + if len(msgs) != 1 { + t.Fatalf("got %d messages, want 1", len(msgs)) + } + if len(msgs[0].ToolCalls) != 1 { + t.Fatalf("got %d tool_calls, want 1", len(msgs[0].ToolCalls)) + } + tc := msgs[0].ToolCalls[0] + if len(tc.ResultEvents) != 2 { + t.Fatalf("got %d result events, want 2", len(tc.ResultEvents)) + } + if tc.ResultEvents[0].AgentID != "agent-1" { + t.Errorf("result event 0 agent_id = %q, want %q", tc.ResultEvents[0].AgentID, "agent-1") + } + if tc.ResultEvents[1].Source != "subagent_notification" { + t.Errorf("result event 1 source = %q, want %q", tc.ResultEvents[1].Source, "subagent_notification") + } +} + func TestOpenPreservesDataAtCurrentVersion(t *testing.T) { dir := t.TempDir() path := filepath.Join(dir, "test.db") @@ -2467,6 +2584,85 @@ func TestReplaceSessionMessagesReplacesToolCalls(t *testing.T) { } } +func TestReplaceSessionMessagesReplacesToolResultEvents(t *testing.T) { + d := testDB(t) + + insertSession(t, d, "s1", "p") + + m1 := asstMsg("s1", 0, "[Wait]") + m1.HasToolUse = true + m1.ToolCalls = []ToolCall{{ + SessionID: "s1", + ToolName: "wait", + Category: "Other", + ToolUseID: "call_wait", + ResultContent: "old result", + ResultContentLength: len("old result"), + ResultEvents: []ToolResultEvent{{ + ToolUseID: "call_wait", + AgentID: "agent-1", + Source: "wait_output", + Status: "completed", + Content: "old result", + ContentLength: len("old result"), + EventIndex: 0, + }}, + }} + insertMessages(t, d, m1) + + m2 := asstMsg("s1", 0, "[Wait]") + m2.HasToolUse = true + m2.ToolCalls = []ToolCall{{ + SessionID: "s1", + ToolName: "wait", + Category: "Other", + ToolUseID: "call_wait", + ResultContent: "new result", + ResultContentLength: len("new result"), + ResultEvents: []ToolResultEvent{{ + ToolUseID: "call_wait", + AgentID: "agent-1", + Source: "wait_output", + Status: "completed", + Content: "new result", + ContentLength: len("new result"), + EventIndex: 0, + }}, + }} + if err := d.ReplaceSessionMessages("s1", []Message{m2}); err != nil { + t.Fatalf("ReplaceSessionMessages: %v", err) + } + + msgs, err := d.GetAllMessages(context.Background(), "s1") + requireNoError(t, err, "GetAllMessages") + if len(msgs) != 1 { + t.Fatalf("messages len = %d, want 1", len(msgs)) + } + if len(msgs[0].ToolCalls) != 1 { + t.Fatalf("tool calls len = %d, want 1", len(msgs[0].ToolCalls)) + } + tc := msgs[0].ToolCalls[0] + if tc.ResultContent != "new result" { + t.Fatalf("result_content = %q, want %q", tc.ResultContent, "new result") + } + if len(tc.ResultEvents) != 1 { + t.Fatalf("result events len = %d, want 1", len(tc.ResultEvents)) + } + if tc.ResultEvents[0].Content != "new result" { + t.Fatalf("event content = %q, want %q", tc.ResultEvents[0].Content, "new result") + } + + var count int + err = d.Reader().QueryRow( + "SELECT COUNT(*) FROM tool_result_events WHERE session_id = ?", + "s1", + ).Scan(&count) + requireNoError(t, err, "count tool_result_events") + if count != 1 { + t.Fatalf("tool_result_events count = %d, want 1", count) + } +} + func TestToolCallsNoToolCalls(t *testing.T) { d := testDB(t) @@ -3434,6 +3630,78 @@ func TestCopyOrphanedDataFrom_WithToolCalls(t *testing.T) { } } +func TestCopyOrphanedDataFrom_WithToolResultEvents(t *testing.T) { + dir := t.TempDir() + + srcPath := filepath.Join(dir, "old.db") + srcDB, err := Open(srcPath) + requireNoError(t, err, "Open src") + insertSession(t, srcDB, "s1", "proj") + insertMessages(t, srcDB, + userMsg("s1", 0, "hello"), + asstMsg("s1", 1, "waited on child"), + ) + _, err = srcDB.getWriter().Exec(` + INSERT INTO tool_calls + (message_id, session_id, tool_name, category, + tool_use_id, result_content_length, result_content) + SELECT id, session_id, 'wait', 'Other', + 'call_wait', 23, 'Finished successfully' + FROM messages + WHERE session_id = 's1' AND ordinal = 1`, + ) + requireNoError(t, err, "insert tool_call") + _, err = srcDB.getWriter().Exec(` + INSERT INTO tool_result_events + (session_id, tool_call_message_ordinal, call_index, + tool_use_id, agent_id, subagent_session_id, + source, status, content, content_length, + timestamp, event_index) + VALUES + ('s1', 1, 0, 'call_wait', 'agent-1', 'codex:agent-1', + 'wait_output', 'completed', 'Finished successfully', + 23, '2026-03-27T18:00:00Z', 0)`, + ) + requireNoError(t, err, "insert tool_result_event") + srcDB.Close() + + dstPath := filepath.Join(dir, "new.db") + dstDB, err := Open(dstPath) + requireNoError(t, err, "Open dst") + defer dstDB.Close() + + count, err := dstDB.CopyOrphanedDataFrom(srcPath) + requireNoError(t, err, "CopyOrphanedDataFrom") + if count != 1 { + t.Fatalf("expected 1 orphaned, got %d", count) + } + + msgs, err := dstDB.GetAllMessages(context.Background(), "s1") + requireNoError(t, err, "GetAllMessages") + if len(msgs) != 2 { + t.Fatalf("messages len = %d, want 2", len(msgs)) + } + if len(msgs[1].ToolCalls) != 1 { + t.Fatalf("tool calls len = %d, want 1", len(msgs[1].ToolCalls)) + } + tc := msgs[1].ToolCalls[0] + if tc.ResultContent != "Finished successfully" { + t.Fatalf("result_content = %q, want %q", tc.ResultContent, "Finished successfully") + } + if len(tc.ResultEvents) != 1 { + t.Fatalf("result events len = %d, want 1", len(tc.ResultEvents)) + } + if tc.ResultEvents[0].Source != "wait_output" { + t.Fatalf("event source = %q, want %q", tc.ResultEvents[0].Source, "wait_output") + } + if tc.ResultEvents[0].SubagentSessionID != "codex:agent-1" { + t.Fatalf( + "subagent_session_id = %q, want %q", + tc.ResultEvents[0].SubagentSessionID, "codex:agent-1", + ) + } +} + func TestCopyOrphanedDataFrom_AtomicOnFailure(t *testing.T) { dir := t.TempDir() diff --git a/internal/db/messages.go b/internal/db/messages.go index 283159cb..880d5433 100644 --- a/internal/db/messages.go +++ b/internal/db/messages.go @@ -34,16 +34,17 @@ const ( // ToolCall represents a single tool invocation stored in // the tool_calls table. type ToolCall struct { - MessageID int64 `json:"-"` - SessionID string `json:"-"` - ToolName string `json:"tool_name"` - Category string `json:"category"` - ToolUseID string `json:"tool_use_id,omitempty"` - InputJSON string `json:"input_json,omitempty"` - SkillName string `json:"skill_name,omitempty"` - ResultContentLength int `json:"result_content_length,omitempty"` - ResultContent string `json:"result_content,omitempty"` - SubagentSessionID string `json:"subagent_session_id,omitempty"` + MessageID int64 `json:"-"` + SessionID string `json:"-"` + ToolName string `json:"tool_name"` + Category string `json:"category"` + ToolUseID string `json:"tool_use_id,omitempty"` + InputJSON string `json:"input_json,omitempty"` + SkillName string `json:"skill_name,omitempty"` + ResultContentLength int `json:"result_content_length,omitempty"` + ResultContent string `json:"result_content,omitempty"` + SubagentSessionID string `json:"subagent_session_id,omitempty"` + ResultEvents []ToolResultEvent `json:"result_events,omitempty"` } // ToolResult holds a tool_result content block for pairing. @@ -53,6 +54,19 @@ type ToolResult struct { ContentRaw string // raw JSON of the content field; decode lazily } +// ToolResultEvent represents a canonical chronological result update. +type ToolResultEvent struct { + ToolUseID string `json:"tool_use_id,omitempty"` + AgentID string `json:"agent_id,omitempty"` + SubagentSessionID string `json:"subagent_session_id,omitempty"` + Source string `json:"source"` + Status string `json:"status"` + Content string `json:"content"` + ContentLength int `json:"content_length"` + Timestamp string `json:"timestamp,omitempty"` + EventIndex int `json:"event_index"` +} + // Message represents a row in the messages table. type Message struct { ID int64 `json:"id"` @@ -297,6 +311,45 @@ func insertToolCallsTx( return nil } +func insertToolResultEventsTx( + tx *sql.Tx, rows []toolResultEventRow, +) error { + if len(rows) == 0 { + return nil + } + stmt, err := tx.Prepare(` + INSERT INTO tool_result_events + (session_id, tool_call_message_ordinal, call_index, + tool_use_id, agent_id, subagent_session_id, + source, status, content, content_length, + timestamp, event_index) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) + if err != nil { + return fmt.Errorf("preparing tool_result_events insert: %w", err) + } + defer stmt.Close() + + for _, r := range rows { + if _, err := stmt.Exec( + r.SessionID, r.MessageOrdinal, r.CallIndex, + nilIfEmpty(r.Event.ToolUseID), + nilIfEmpty(r.Event.AgentID), + nilIfEmpty(r.Event.SubagentSessionID), + r.Event.Source, r.Event.Status, + r.Event.Content, + r.Event.ContentLength, + nilIfEmpty(r.Event.Timestamp), + r.Event.EventIndex, + ); err != nil { + return fmt.Errorf( + "inserting tool_result_event %q/%q: %w", + r.Event.Source, r.Event.Status, err, + ) + } + } + return nil +} + const slowOpThreshold = 100 * time.Millisecond // InsertMessages batch-inserts messages for a session. @@ -332,6 +385,10 @@ func (db *DB) InsertMessages(msgs []Message) error { if err := insertToolCallsTx(tx, toolCalls); err != nil { return err } + events := resolveToolResultEvents(msgs) + if err := insertToolResultEventsTx(tx, events); err != nil { + return err + } return tx.Commit() } @@ -381,6 +438,14 @@ func (db *DB) ReplaceSessionMessages( ); err != nil { return fmt.Errorf("deleting old tool_calls: %w", err) } + if _, err := tx.Exec( + "DELETE FROM tool_result_events WHERE session_id = ?", + sessionID, + ); err != nil { + return fmt.Errorf( + "deleting old tool_result_events: %w", err, + ) + } if _, err := tx.Exec( "DELETE FROM messages WHERE session_id = ?", sessionID, @@ -397,6 +462,10 @@ func (db *DB) ReplaceSessionMessages( if err := insertToolCallsTx(tx, toolCalls); err != nil { return err } + events := resolveToolResultEvents(msgs) + if err := insertToolResultEventsTx(tx, events); err != nil { + return err + } } return tx.Commit() @@ -426,6 +495,9 @@ func (db *DB) attachToolCalls( return err } } + if err := db.attachToolResultEvents(ctx, msgs); err != nil { + return err + } return nil } @@ -502,6 +574,110 @@ func (db *DB) attachToolCallsBatch( return rows.Err() } +func (db *DB) attachToolResultEvents( + ctx context.Context, msgs []Message, +) error { + if len(msgs) == 0 { + return nil + } + + sessionID := msgs[0].SessionID + ordToIdx := make(map[int]int, len(msgs)) + ordinals := make([]int, 0, len(msgs)) + for i, m := range msgs { + ordToIdx[m.Ordinal] = i + ordinals = append(ordinals, m.Ordinal) + } + for i := 0; i < len(ordinals); i += attachToolCallBatchSize { + end := min(i+attachToolCallBatchSize, len(ordinals)) + if err := db.attachToolResultEventsBatch( + ctx, msgs, ordToIdx, sessionID, ordinals[i:end], + ); err != nil { + return err + } + } + return nil +} + +func (db *DB) attachToolResultEventsBatch( + ctx context.Context, + msgs []Message, + ordToIdx map[int]int, + sessionID string, + ordinals []int, +) error { + if len(ordinals) == 0 { + return nil + } + + args := []any{sessionID} + placeholders := make([]string, len(ordinals)) + for i, ord := range ordinals { + args = append(args, ord) + placeholders[i] = "?" + } + + query := fmt.Sprintf(` + SELECT tool_call_message_ordinal, call_index, + tool_use_id, agent_id, subagent_session_id, + source, status, content, content_length, + timestamp, event_index + FROM tool_result_events + WHERE session_id = ? AND tool_call_message_ordinal IN (%s) + ORDER BY tool_call_message_ordinal, call_index, event_index`, + strings.Join(placeholders, ",")) + + rows, err := db.getReader().QueryContext(ctx, query, args...) + if err != nil { + return fmt.Errorf("querying tool_result_events: %w", err) + } + defer rows.Close() + + for rows.Next() { + var ( + msgOrdinal int + callIndex int + ev ToolResultEvent + toolUseID sql.NullString + agentID sql.NullString + subID sql.NullString + timestamp sql.NullString + ) + if err := rows.Scan( + &msgOrdinal, &callIndex, + &toolUseID, &agentID, &subID, + &ev.Source, &ev.Status, &ev.Content, + &ev.ContentLength, ×tamp, &ev.EventIndex, + ); err != nil { + return fmt.Errorf("scanning tool_result_event: %w", err) + } + if toolUseID.Valid { + ev.ToolUseID = toolUseID.String + } + if agentID.Valid { + ev.AgentID = agentID.String + } + if subID.Valid { + ev.SubagentSessionID = subID.String + } + if timestamp.Valid { + ev.Timestamp = timestamp.String + } + idx, ok := ordToIdx[msgOrdinal] + if !ok { + continue + } + if callIndex < 0 || callIndex >= len(msgs[idx].ToolCalls) { + continue + } + msgs[idx].ToolCalls[callIndex].ResultEvents = append( + msgs[idx].ToolCalls[callIndex].ResultEvents, + ev, + ) + } + return rows.Err() +} + func scanMessages(rows *sql.Rows) ([]Message, error) { var msgs []Message for rows.Next() { @@ -654,3 +830,37 @@ func resolveToolCalls( } return calls } + +type toolResultEventRow struct { + SessionID string + MessageOrdinal int + CallIndex int + Event ToolResultEvent +} + +func resolveToolResultEvents(msgs []Message) []toolResultEventRow { + var rows []toolResultEventRow + for _, m := range msgs { + for callIndex, tc := range m.ToolCalls { + for eventIndex, ev := range tc.ResultEvents { + ev.EventIndex = eventIndex + if ev.ContentLength == 0 { + ev.ContentLength = len(ev.Content) + } + if ev.ToolUseID == "" { + ev.ToolUseID = tc.ToolUseID + } + if ev.SubagentSessionID == "" { + ev.SubagentSessionID = tc.SubagentSessionID + } + rows = append(rows, toolResultEventRow{ + SessionID: m.SessionID, + MessageOrdinal: m.Ordinal, + CallIndex: callIndex, + Event: ev, + }) + } + } + } + return rows +} diff --git a/internal/db/orphaned.go b/internal/db/orphaned.go index 7ecccac2..2429e64b 100644 --- a/internal/db/orphaned.go +++ b/internal/db/orphaned.go @@ -132,16 +132,27 @@ func (d *DB) CopyOrphanedDataFrom( // Copy tool_calls. Map old message_id to new // message_id via the (session_id, ordinal) natural key. + toolCallCols := []string{ + "message_id", "session_id", "tool_name", "category", + "tool_use_id", "input_json", "skill_name", + "result_content_length", + } + toolCallSelect := []string{ + "new_m.id", "otc.session_id", "otc.tool_name", + "otc.category", "otc.tool_use_id", "otc.input_json", + "otc.skill_name", "otc.result_content_length", + } + if oldDBHasColumn(ctx, tx, "tool_calls", "result_content") { + toolCallCols = append(toolCallCols, "result_content") + toolCallSelect = append(toolCallSelect, "otc.result_content") + } + toolCallCols = append(toolCallCols, "subagent_session_id") + toolCallSelect = append(toolCallSelect, "otc.subagent_session_id") if _, err := tx.ExecContext(ctx, ` INSERT INTO tool_calls - (message_id, session_id, tool_name, category, - tool_use_id, input_json, skill_name, - result_content_length, subagent_session_id) + (`+strings.Join(toolCallCols, ", ")+`) SELECT - new_m.id, otc.session_id, otc.tool_name, - otc.category, otc.tool_use_id, otc.input_json, - otc.skill_name, otc.result_content_length, - otc.subagent_session_id + `+strings.Join(toolCallSelect, ", ")+` FROM old_db.tool_calls otc JOIN old_db.messages old_m ON old_m.id = otc.message_id @@ -157,6 +168,31 @@ func (d *DB) CopyOrphanedDataFrom( ) } + if oldDBHasTable(ctx, tx, "tool_result_events") { + if _, err := tx.ExecContext(ctx, ` + INSERT INTO tool_result_events + (session_id, tool_call_message_ordinal, + call_index, tool_use_id, agent_id, + subagent_session_id, source, status, + content, content_length, timestamp, + event_index) + SELECT + session_id, tool_call_message_ordinal, + call_index, tool_use_id, agent_id, + subagent_session_id, source, status, + content, content_length, timestamp, + event_index + FROM old_db.tool_result_events + WHERE session_id IN ( + SELECT id FROM _orphaned_ids + )`, + ); err != nil { + return 0, fmt.Errorf( + "copying orphaned tool_result_events: %w", err, + ) + } + } + if err := tx.Commit(); err != nil { return 0, fmt.Errorf( "committing orphaned data: %w", err, diff --git a/internal/db/schema.sql b/internal/db/schema.sql index 0b434593..408e575f 100644 --- a/internal/db/schema.sql +++ b/internal/db/schema.sql @@ -126,6 +126,34 @@ CREATE INDEX IF NOT EXISTS idx_tool_calls_subagent ON tool_calls(subagent_session_id) WHERE subagent_session_id IS NOT NULL; +-- Tool result events table: canonical chronological tool outputs. +CREATE TABLE IF NOT EXISTS tool_result_events ( + id INTEGER PRIMARY KEY, + session_id TEXT NOT NULL + REFERENCES sessions(id) ON DELETE CASCADE, + tool_call_message_ordinal INTEGER NOT NULL, + call_index INTEGER NOT NULL DEFAULT 0, + tool_use_id TEXT, + agent_id TEXT, + subagent_session_id TEXT, + source TEXT NOT NULL, + status TEXT NOT NULL, + content TEXT NOT NULL, + content_length INTEGER NOT NULL DEFAULT 0, + timestamp TEXT, + event_index INTEGER NOT NULL DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS idx_tool_result_events_session + ON tool_result_events(session_id); +CREATE INDEX IF NOT EXISTS idx_tool_result_events_call + ON tool_result_events( + session_id, + tool_call_message_ordinal, + call_index, + event_index + ); + -- Insights table for AI-generated activity insights CREATE TABLE IF NOT EXISTS insights ( id INTEGER PRIMARY KEY, diff --git a/internal/parser/codex.go b/internal/parser/codex.go index 84ed0599..70513b80 100644 --- a/internal/parser/codex.go +++ b/internal/parser/codex.go @@ -1,9 +1,11 @@ package parser import ( + "errors" "fmt" "os" "path/filepath" + "sort" "strconv" "strings" "time" @@ -19,26 +21,56 @@ const ( codexOriginatorExec = "codex_exec" ) +var errCodexIncrementalNeedsFullParse = errors.New( + "codex subagent event requires full parse", +) + // codexSessionBuilder accumulates state while scanning a Codex // JSONL session file line by line. type codexSessionBuilder struct { - messages []ParsedMessage - firstMessage string - startedAt time.Time - endedAt time.Time - sessionID string - project string - ordinal int - includeExec bool - currentModel string + messages []ParsedMessage + firstMessage string + startedAt time.Time + endedAt time.Time + sessionID string + project string + ordinal int + includeExec bool + currentModel string + callNames map[string]string + callRefs map[string]codexToolCallRef + agentSpawnCalls map[string]string + agentWaitCalls map[string]string + pendingAgentEvents map[string][]codexPendingEvent + orphanNotificationIx map[string]int +} + +type codexToolCallRef struct { + messageIndex int + callIndex int +} + +type codexPendingEvent struct { + agentID string + source string + status string + text string + timestamp time.Time + ordinal int } func newCodexSessionBuilder( includeExec bool, ) *codexSessionBuilder { return &codexSessionBuilder{ - project: "unknown", - includeExec: includeExec, + project: "unknown", + includeExec: includeExec, + callNames: make(map[string]string), + callRefs: make(map[string]codexToolCallRef), + agentSpawnCalls: make(map[string]string), + agentWaitCalls: make(map[string]string), + pendingAgentEvents: make(map[string][]codexPendingEvent), + orphanNotificationIx: make(map[string]int), } } @@ -98,9 +130,13 @@ func (b *codexSessionBuilder) handleSessionMeta( func (b *codexSessionBuilder) handleResponseItem( payload gjson.Result, ts time.Time, ) { - if payload.Get("type").Str == "function_call" { + switch payload.Get("type").Str { + case "function_call": b.handleFunctionCall(payload, ts) return + case "function_call_output": + b.handleFunctionCallOutput(payload, ts) + return } role := payload.Get("role").Str @@ -113,6 +149,10 @@ func (b *codexSessionBuilder) handleResponseItem( return } + if role == "user" && b.handleSubagentNotification(content, ts) { + return + } + if role == "user" && isCodexSystemMessage(content) { return } @@ -141,9 +181,18 @@ func (b *codexSessionBuilder) handleFunctionCall( if name == "" { return } + callID := payload.Get("call_id").Str + if callID != "" { + b.callNames[callID] = name + } content := formatCodexFunctionCall(name, payload) inputJSON := extractCodexInputJSON(payload) + waitAgentIDs := []string(nil) + if name == "wait" && callID != "" { + args, _ := parseCodexFunctionArgs(payload) + waitAgentIDs = codexWaitAgentIDs(args) + } b.messages = append(b.messages, ParsedMessage{ Ordinal: b.ordinal, @@ -154,12 +203,244 @@ func (b *codexSessionBuilder) handleFunctionCall( ContentLength: len(content), Model: b.currentModel, ToolCalls: []ParsedToolCall{{ + ToolUseID: callID, ToolName: name, Category: NormalizeToolCategory(name), InputJSON: inputJSON, }}, }) + if callID != "" { + b.callRefs[callID] = codexToolCallRef{ + messageIndex: len(b.messages) - 1, + callIndex: 0, + } + } + b.ordinal++ + + if name == "wait" && callID != "" { + for _, agentID := range waitAgentIDs { + b.agentWaitCalls[agentID] = callID + b.claimPendingAgentEvents(callID, agentID) + } + } +} + +func (b *codexSessionBuilder) handleFunctionCallOutput( + payload gjson.Result, ts time.Time, +) { + callID := payload.Get("call_id").Str + if callID == "" { + return + } + + output, _ := parseCodexFunctionOutput(payload) + if !output.Exists() { + return + } + + switch b.callNames[callID] { + case "spawn_agent": + agentID := strings.TrimSpace(output.Get("agent_id").Str) + if agentID == "" { + return + } + b.agentSpawnCalls[agentID] = callID + case "wait": + status := output.Get("status") + if !status.Exists() || !status.IsObject() { + return + } + status.ForEach(func(key, entry gjson.Result) bool { + agentID := key.Str + statusName, text := codexTerminalSubagentEvent(entry) + if text == "" { + return true + } + b.appendCallResultEvent(callID, ParsedToolResultEvent{ + ToolUseID: callID, + AgentID: agentID, + SubagentSessionID: codexSubagentSessionID(agentID), + Source: "wait_output", + Status: statusName, + Content: text, + Timestamp: ts, + }) + return true + }) + } +} + +func (b *codexSessionBuilder) handleSubagentNotification( + content string, ts time.Time, +) bool { + agentID, statusName, text := parseCodexSubagentNotification(content) + if agentID == "" || text == "" { + return false + } + if callID := b.agentWaitCalls[agentID]; callID != "" { + b.appendCallResultEvent(callID, ParsedToolResultEvent{ + AgentID: agentID, + SubagentSessionID: codexSubagentSessionID(agentID), + Source: "subagent_notification", + Status: statusName, + Content: text, + Timestamp: ts, + }) + return true + } + + b.pendingAgentEvents[agentID] = append( + b.pendingAgentEvents[agentID], codexPendingEvent{ + agentID: agentID, + source: "subagent_notification", + status: statusName, + text: text, + timestamp: ts, + ordinal: b.ordinal, + }, + ) b.ordinal++ + return true +} + +func (b *codexSessionBuilder) appendCallResultEvent( + callID string, ev ParsedToolResultEvent, +) { + if callID == "" { + return + } + ref, ok := b.callRefs[callID] + if !ok || ref.messageIndex < 0 || ref.messageIndex >= len(b.messages) { + return + } + if ref.callIndex < 0 || ref.callIndex >= len(b.messages[ref.messageIndex].ToolCalls) { + return + } + tc := &b.messages[ref.messageIndex].ToolCalls[ref.callIndex] + if ev.ToolUseID == "" { + ev.ToolUseID = tc.ToolUseID + } + if ev.SubagentSessionID == "" && ev.AgentID != "" { + ev.SubagentSessionID = codexSubagentSessionID(ev.AgentID) + } + if b.hasEquivalentCallResultEvent(tc.ResultEvents, ev) { + return + } + tc.ResultEvents = append(tc.ResultEvents, ev) +} + +func (b *codexSessionBuilder) hasEquivalentCallResultEvent( + events []ParsedToolResultEvent, candidate ParsedToolResultEvent, +) bool { + for _, existing := range events { + if existing.AgentID == candidate.AgentID && + existing.Status == candidate.Status && + existing.Content == candidate.Content { + return true + } + } + return false +} + +func (b *codexSessionBuilder) claimPendingAgentEvents( + callID, agentID string, +) { + pending := b.pendingAgentEvents[agentID] + if len(pending) == 0 { + return + } + for _, ev := range pending { + b.appendCallResultEvent(callID, ParsedToolResultEvent{ + AgentID: ev.agentID, + SubagentSessionID: codexSubagentSessionID(ev.agentID), + Source: ev.source, + Status: ev.status, + Content: ev.text, + Timestamp: ev.timestamp, + }) + } + delete(b.pendingAgentEvents, agentID) +} + +func (b *codexSessionBuilder) flushPendingAgentResults() { + if len(b.pendingAgentEvents) == 0 { + return + } + agentIDs := make([]string, 0, len(b.pendingAgentEvents)) + for agentID := range b.pendingAgentEvents { + agentIDs = append(agentIDs, agentID) + } + sort.Strings(agentIDs) + for _, agentID := range agentIDs { + pending := b.pendingAgentEvents[agentID] + switch { + case b.agentWaitCalls[agentID] != "": + b.claimPendingAgentEvents(b.agentWaitCalls[agentID], agentID) + case b.agentSpawnCalls[agentID] != "": + b.claimPendingAgentEvents(b.agentSpawnCalls[agentID], agentID) + default: + for _, ev := range pending { + key := agentID + "\x00" + ev.status + "\x00" + ev.text + if _, ok := b.orphanNotificationIx[key]; ok { + continue + } + idx := b.insertMessage(ParsedMessage{ + Ordinal: ev.ordinal, + Role: RoleUser, + Content: ev.text, + Timestamp: ev.timestamp, + Model: b.currentModel, + ContentLength: len(ev.text), + }) + b.orphanNotificationIx[key] = idx + } + delete(b.pendingAgentEvents, agentID) + } + } +} + +func codexSubagentSessionID(agentID string) string { + agentID = strings.TrimSpace(agentID) + if agentID == "" { + return "" + } + return "codex:" + agentID +} + +func (b *codexSessionBuilder) normalizeOrdinals() { + sort.SliceStable(b.messages, func(i, j int) bool { + if b.messages[i].Ordinal == b.messages[j].Ordinal { + return i < j + } + return b.messages[i].Ordinal < b.messages[j].Ordinal + }) + for i := range b.messages { + b.messages[i].Ordinal = i + } +} + +func (b *codexSessionBuilder) insertMessage(msg ParsedMessage) int { + idx := len(b.messages) + for i, existing := range b.messages { + if existing.Ordinal > msg.Ordinal || + (existing.Ordinal == msg.Ordinal && + !msg.Timestamp.IsZero() && + (existing.Timestamp.IsZero() || + msg.Timestamp.Before(existing.Timestamp))) { + idx = i + break + } + } + b.messages = append(b.messages, ParsedMessage{}) + copy(b.messages[idx+1:], b.messages[idx:]) + b.messages[idx] = msg + for callID, ref := range b.callRefs { + if ref.messageIndex >= idx { + ref.messageIndex++ + b.callRefs[callID] = ref + } + } + return idx } func formatCodexFunctionCall( @@ -175,6 +456,8 @@ func formatCodexFunctionCall( return formatCodexWriteStdinCall(summary, args, rawArgs) case "apply_patch": return formatCodexApplyPatchCall(summary, args, rawArgs) + case "spawn_agent": + return formatCodexSpawnAgentCall(summary, args, rawArgs) } category := NormalizeToolCategory(name) @@ -358,6 +641,33 @@ func formatCodexApplyPatchCall( return header } +func formatCodexSpawnAgentCall( + summary string, args gjson.Result, rawArgs string, +) string { + if summary == "" { + summary = firstNonEmpty( + codexArgValue(args, "agent_type"), + codexArgValue(args, "subagent_type"), + "spawn_agent", + ) + } + + header := formatToolHeader("Task", summary) + prompt := firstNonEmpty( + codexArgValue(args, "description"), + codexArgValue(args, "message"), + codexArgValue(args, "prompt"), + ) + if prompt != "" { + firstLine, _, _ := strings.Cut(prompt, "\n") + return header + "\n" + truncate(firstLine, 220) + } + if preview := codexArgPreview(args, rawArgs); preview != "" { + return header + "\n" + preview + } + return header +} + func extractPatchedFiles(patch string) []string { if patch == "" { return nil @@ -515,6 +825,125 @@ func firstNonEmpty(vals ...string) string { return "" } +func parseCodexFunctionOutput( + payload gjson.Result, +) (gjson.Result, string) { + out := payload.Get("output") + if !out.Exists() { + return gjson.Result{}, "" + } + + switch out.Type { + case gjson.String: + s := strings.TrimSpace(out.Str) + if s == "" { + return gjson.Result{}, "" + } + if gjson.Valid(s) { + return gjson.Parse(s), s + } + return gjson.Result{}, s + default: + raw := strings.TrimSpace(out.Raw) + if raw == "" { + return gjson.Result{}, "" + } + if gjson.Valid(raw) { + return gjson.Parse(raw), raw + } + return gjson.Result{}, raw + } +} + +func codexWaitAgentIDs(args gjson.Result) []string { + if !args.Exists() { + return nil + } + ids := args.Get("ids") + if !ids.Exists() || !ids.IsArray() { + return nil + } + + var out []string + for _, item := range ids.Array() { + id := strings.TrimSpace(item.Str) + if id == "" { + continue + } + out = append(out, id) + } + return out +} + +func parseCodexSubagentNotification( + content string, +) (agentID, statusName, text string) { + if !isCodexSubagentNotification(content) { + return "", "", "" + } + body := strings.TrimSpace(content) + body = strings.TrimPrefix(body, "") + body = strings.TrimSuffix(body, "") + body = strings.TrimSpace(body) + if !gjson.Valid(body) { + return "", "", "" + } + parsed := gjson.Parse(body) + agentID = strings.TrimSpace(parsed.Get("agent_id").Str) + status := parsed.Get("status") + statusName, text = codexTerminalSubagentEvent(status) + return agentID, statusName, text +} + +func codexTerminalSubagentEvent(status gjson.Result) (string, string) { + if text := strings.TrimSpace(status.Get("completed").Str); text != "" { + return "completed", text + } + if text := strings.TrimSpace(status.Get("errored").Str); text != "" { + return "errored", text + } + if text := strings.TrimSpace(status.Get("running").Str); text != "" { + return "running", text + } + return "", "" +} + +func codexTerminalSubagentStatus(status gjson.Result) string { + _, text := codexTerminalSubagentEvent(status) + return text +} + +func isCodexSubagentFunctionOutput(output gjson.Result) bool { + if !output.Exists() { + return false + } + if strings.TrimSpace(output.Get("agent_id").Str) != "" { + return true + } + + status := output.Get("status") + if !status.Exists() || !status.IsObject() { + return false + } + entries := status.Map() + if len(entries) == 0 { + return false + } + for agentID, entry := range entries { + if strings.TrimSpace(agentID) == "" || !entry.IsObject() { + return false + } + if codexTerminalSubagentStatus(entry) != "" { + continue + } + if strings.TrimSpace(entry.Get("running").Str) != "" { + continue + } + return false + } + return true +} + // extractCodexContent joins all text blocks from a Codex // response item's content array. func extractCodexContent(payload gjson.Result) string { @@ -571,6 +1000,9 @@ func ParseCodexSession( fmt.Errorf("reading codex %s: %w", path, err) } + b.flushPendingAgentResults() + b.normalizeOrdinals() + sessionID := b.sessionID if sessionID == "" { sessionID = strings.TrimSuffix( @@ -619,15 +1051,23 @@ func ParseCodexSessionFrom( ) ([]ParsedMessage, time.Time, int64, error) { b := newCodexSessionBuilder(includeExec) b.ordinal = startOrdinal + var fallbackErr error consumed, err := readJSONLFrom( path, offset, func(line string) { + if fallbackErr != nil { + return + } // Skip session_meta — already processed in // the initial full parse. if gjson.Get(line, "type").Str == codexTypeSessionMeta { return } + if codexIncrementalNeedsFullParse(line) { + fallbackErr = errCodexIncrementalNeedsFullParse + return + } b.processLine(line) }, ) @@ -637,12 +1077,55 @@ func ParseCodexSessionFrom( path, offset, err, ) } + if fallbackErr != nil { + return nil, time.Time{}, 0, fallbackErr + } + + b.flushPendingAgentResults() return b.messages, b.endedAt, consumed, nil } +// IsIncrementalFullParseFallback reports whether an incremental +// Codex parse error requires the caller to fall back to a full parse. +func IsIncrementalFullParseFallback(err error) bool { + return errors.Is(err, errCodexIncrementalNeedsFullParse) +} + func isCodexSystemMessage(content string) bool { return strings.HasPrefix(content, "# AGENTS.md") || strings.HasPrefix(content, "") || - strings.HasPrefix(content, "") + strings.HasPrefix(content, "") || + isCodexSubagentNotification(content) +} + +func isCodexSubagentNotification(content string) bool { + return strings.HasPrefix( + strings.TrimSpace(content), + "", + ) +} + +func codexIncrementalNeedsFullParse(line string) bool { + if gjson.Get(line, "type").Str != codexTypeResponseItem { + return false + } + + payload := gjson.Get(line, "payload") + switch payload.Get("type").Str { + case "function_call": + return payload.Get("name").Str == "wait" + case "function_call_output": + output, _ := parseCodexFunctionOutput(payload) + return isCodexSubagentFunctionOutput(output) + default: + role := payload.Get("role").Str + if role != "user" { + return false + } + agentID, _, text := parseCodexSubagentNotification( + extractCodexContent(payload), + ) + return agentID != "" && text != "" + } } diff --git a/internal/parser/codex_parser_test.go b/internal/parser/codex_parser_test.go index 5670c56f..e3805319 100644 --- a/internal/parser/codex_parser_test.go +++ b/internal/parser/codex_parser_test.go @@ -20,6 +20,23 @@ func runCodexParserTest(t *testing.T, fileName, content string, includeExec bool return sess, msgs } +func assertToolResultEvents( + t *testing.T, + got []ParsedToolResultEvent, + want []ParsedToolResultEvent, +) { + t.Helper() + require.Len(t, got, len(want)) + for i := range want { + assert.Equal(t, want[i].ToolUseID, got[i].ToolUseID, "event %d tool_use_id", i) + assert.Equal(t, want[i].AgentID, got[i].AgentID, "event %d agent_id", i) + assert.Equal(t, want[i].SubagentSessionID, got[i].SubagentSessionID, "event %d subagent_session_id", i) + assert.Equal(t, want[i].Source, got[i].Source, "event %d source", i) + assert.Equal(t, want[i].Status, got[i].Status, "event %d status", i) + assert.Equal(t, want[i].Content, got[i].Content, "event %d content", i) + } +} + func TestParseCodexSession_Basic(t *testing.T) { content := loadFixture(t, "codex/standard_session.jsonl") sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) @@ -49,6 +66,31 @@ func TestParseCodexSession_ExecOriginator(t *testing.T) { }) } +func TestCodexInsertMessage_PreservesChronologyOnSameOrdinal(t *testing.T) { + b := newCodexSessionBuilder(false) + b.messages = []ParsedMessage{{ + Ordinal: 2, + Role: RoleAssistant, + Content: "later assistant message", + Timestamp: parseTimestamp("2024-01-01T10:01:06Z"), + }} + + idx := b.insertMessage(ParsedMessage{ + Ordinal: 2, + Role: RoleUser, + Content: "earlier orphan notification", + Timestamp: parseTimestamp("2024-01-01T10:01:05Z"), + }) + + assert.Equal(t, 0, idx) + b.normalizeOrdinals() + require.Len(t, b.messages, 2) + assert.Equal(t, "earlier orphan notification", b.messages[0].Content) + assert.Equal(t, "later assistant message", b.messages[1].Content) + assert.Equal(t, 0, b.messages[0].Ordinal) + assert.Equal(t, 1, b.messages[1].Ordinal) +} + func TestParseCodexSession_FunctionCalls(t *testing.T) { t.Run("function calls", func(t *testing.T) { content := loadFixture(t, "codex/function_calls.jsonl") @@ -129,6 +171,472 @@ func TestParseCodexSession_FunctionCalls(t *testing.T) { assertToolCalls(t, msgs[1].ToolCalls, []ParsedToolCall{{ToolName: "Agent", Category: "Task"}}) }) + t.Run("spawn_agent links child session and wait output becomes tool result", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + waitSummary := "Exit code: `1`\n\nFull output:\n```text\nTraceback...\n```" + notification := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Exit code: `1`\\n\\nFull output:\\n```text\\nTraceback...\\n```\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + "timeout_ms": 600000, + }, tsLateS5), + testjsonl.CodexFunctionCallOutputJSON("call_wait", "{\"status\":{\""+childID+"\":{\"completed\":\"Exit code: `1`\\n\\nFull output:\\n```text\\nTraceback...\\n```\"}}}", "2024-01-01T10:01:06Z"), + testjsonl.CodexMsgJSON("user", notification, "2024-01-01T10:01:07Z"), + testjsonl.CodexMsgJSON("assistant", "continuing", "2024-01-01T10:01:08Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 4, len(msgs)) + assert.Equal(t, RoleAssistant, msgs[1].Role) + assertToolCalls(t, msgs[1].ToolCalls, []ParsedToolCall{{ + ToolUseID: "call_spawn", + ToolName: "spawn_agent", + Category: "Task", + }}) + assert.Equal(t, RoleAssistant, msgs[2].Role) + assertToolCalls(t, msgs[2].ToolCalls, []ParsedToolCall{{ + ToolUseID: "call_wait", + ToolName: "wait", + Category: "Other", + }}) + assertToolResultEvents(t, msgs[2].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{{ + ToolUseID: "call_wait", + AgentID: childID, + SubagentSessionID: "codex:" + childID, + Source: "wait_output", + Status: "completed", + Content: waitSummary, + }}) + assert.Equal(t, RoleAssistant, msgs[3].Role) + assert.Equal(t, "continuing", msgs[3].Content) + }) + + t.Run("subagent notification without wait result falls back to spawn_agent output", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + summary := "Exit code: `1`\n\nFull output:\n```text\nTraceback...\n```" + notification := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Exit code: `1`\\n\\nFull output:\\n```text\\nTraceback...\\n```\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-notify", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", notification, tsLateS5), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 2, len(msgs)) + assertToolCalls(t, msgs[1].ToolCalls, []ParsedToolCall{{ + ToolUseID: "call_spawn", + ToolName: "spawn_agent", + Category: "Task", + }}) + assertToolResultEvents(t, msgs[1].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{{ + ToolUseID: "call_spawn", + AgentID: childID, + SubagentSessionID: "codex:" + childID, + Source: "subagent_notification", + Status: "completed", + Content: summary, + }}) + }) + + t.Run("no-wait fallback preserves chronology before later messages", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + summary := "Exit code: `1`\n\nFull output:\n```text\nTraceback...\n```" + notification := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Exit code: `1`\\n\\nFull output:\\n```text\\nTraceback...\\n```\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-notify-order", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", notification, tsLateS5), + testjsonl.CodexMsgJSON("assistant", "continuing", "2024-01-01T10:01:06Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 3, len(msgs)) + assertToolResultEvents(t, msgs[1].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{{ + ToolUseID: "call_spawn", + AgentID: childID, + SubagentSessionID: "codex:" + childID, + Source: "subagent_notification", + Status: "completed", + Content: summary, + }}) + assert.Equal(t, RoleAssistant, msgs[2].Role) + assert.Equal(t, "continuing", msgs[2].Content) + }) + + t.Run("duplicate pending notification preserves earliest chronology", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + summary := "Exit code: `1`\n\nFull output:\n```text\nTraceback...\n```" + notification := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Exit code: `1`\\n\\nFull output:\\n```text\\nTraceback...\\n```\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-notify-dupe-order", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", notification, tsLateS5), + testjsonl.CodexMsgJSON("assistant", "continuing", "2024-01-01T10:01:06Z"), + testjsonl.CodexMsgJSON("user", notification, "2024-01-01T10:01:07Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 3, len(msgs)) + assertToolResultEvents(t, msgs[1].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{{ + ToolUseID: "call_spawn", + AgentID: childID, + SubagentSessionID: "codex:" + childID, + Source: "subagent_notification", + Status: "completed", + Content: summary, + }}) + assert.Equal(t, RoleAssistant, msgs[2].Role) + assert.Equal(t, "continuing", msgs[2].Content) + }) + + t.Run("running subagent notification does not suppress later completion", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + running := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"running\":\"Still working\"}}\n" + + "" + completed := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-running", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", running, tsLateS5), + testjsonl.CodexMsgJSON("user", completed, "2024-01-01T10:01:06Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 2, len(msgs)) + assertToolResultEvents(t, msgs[1].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{ + { + ToolUseID: "call_spawn", + AgentID: childID, + SubagentSessionID: "codex:" + childID, + Source: "subagent_notification", + Status: "running", + Content: "Still working", + }, + { + ToolUseID: "call_spawn", + AgentID: childID, + SubagentSessionID: "codex:" + childID, + Source: "subagent_notification", + Status: "completed", + Content: "Finished successfully", + }, + }) + }) + + t.Run("notification after wait binds to wait call", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + completed := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-wait-bind", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + }, tsLateS5), + testjsonl.CodexMsgJSON("user", completed, "2024-01-01T10:01:06Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 3, len(msgs)) + assertToolCalls(t, msgs[2].ToolCalls, []ParsedToolCall{{ + ToolUseID: "call_wait", + ToolName: "wait", + Category: "Other", + }}) + assertToolResultEvents(t, msgs[2].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{{ + ToolUseID: "call_wait", + AgentID: childID, + SubagentSessionID: "codex:" + childID, + Source: "subagent_notification", + Status: "completed", + Content: "Finished successfully", + }}) + }) + + t.Run("notification before wait binds to later wait call", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + completed := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-wait-rebind", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", completed, tsLateS5), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + }, "2024-01-01T10:01:06Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 3, len(msgs)) + assertToolResultEvents(t, msgs[2].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{{ + ToolUseID: "call_wait", + AgentID: childID, + SubagentSessionID: "codex:" + childID, + Source: "subagent_notification", + Status: "completed", + Content: "Finished successfully", + }}) + }) + + t.Run("late spawn output does not override wait binding", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + completed := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-late-spawn-output", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + }, tsLate), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLateS5), + testjsonl.CodexMsgJSON("user", completed, "2024-01-01T10:01:06Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 3, len(msgs)) + assertToolResultEvents(t, msgs[2].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{{ + ToolUseID: "call_wait", + AgentID: childID, + SubagentSessionID: "codex:" + childID, + Source: "subagent_notification", + Status: "completed", + Content: "Finished successfully", + }}) + }) + + t.Run("wait output does not duplicate terminal notification result", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + completed := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-wait-dedupe", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + }, tsLateS5), + testjsonl.CodexMsgJSON("user", completed, "2024-01-01T10:01:06Z"), + testjsonl.CodexFunctionCallOutputJSON("call_wait", + "{\"status\":{\""+childID+"\":{\"completed\":\"Finished successfully\"}}}", + "2024-01-01T10:01:07Z", + ), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 3, len(msgs)) + assertToolResultEvents(t, msgs[2].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{{ + ToolUseID: "call_wait", + AgentID: childID, + SubagentSessionID: "codex:" + childID, + Source: "subagent_notification", + Status: "completed", + Content: "Finished successfully", + }}) + }) + + t.Run("mixed wait status preserves later completion for running agent", func(t *testing.T) { + completedID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + runningID := "019c9c96-6ee7-77c0-ba4c-380f844289d6" + laterCompleted := "\n" + + "{\"agent_id\":\"" + runningID + "\",\"status\":{\"completed\":\"Second agent finished\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-mixed-wait", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run child agents", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{completedID, runningID}, + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_wait", + "{\"status\":{\""+completedID+"\":{\"completed\":\"First agent finished\"},\""+runningID+"\":{\"running\":\"Still working\"}}}", + tsLate, + ), + testjsonl.CodexMsgJSON("user", laterCompleted, tsLateS5), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 2, len(msgs)) + assertToolResultEvents(t, msgs[1].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{ + { + ToolUseID: "call_wait", + AgentID: completedID, + SubagentSessionID: "codex:" + completedID, + Source: "wait_output", + Status: "completed", + Content: "First agent finished", + }, + { + ToolUseID: "call_wait", + AgentID: runningID, + SubagentSessionID: "codex:" + runningID, + Source: "wait_output", + Status: "running", + Content: "Still working", + }, + { + ToolUseID: "call_wait", + AgentID: runningID, + SubagentSessionID: "codex:" + runningID, + Source: "subagent_notification", + Status: "completed", + Content: "Second agent finished", + }, + }) + }) + + t.Run("running-only wait output is preserved as a result event", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-running-wait", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_wait", + "{\"status\":{\""+childID+"\":{\"running\":\"Still working\"}}}", + tsLate, + ), + ) + + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 2, len(msgs)) + assertToolResultEvents(t, msgs[1].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{{ + ToolUseID: "call_wait", + AgentID: childID, + SubagentSessionID: "codex:" + childID, + Source: "wait_output", + Status: "running", + Content: "Still working", + }}) + }) + + t.Run("wait result events preserve JSON order for multiple agents", func(t *testing.T) { + firstID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + secondID := "019c9c96-6ee7-77c0-ba4c-380f844289d6" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-order", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run child agents", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{firstID, secondID}, + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_wait", + "{\"status\":{\""+secondID+"\":{\"completed\":\"Second agent finished\"},\""+firstID+"\":{\"completed\":\"First agent finished\"}}}", + tsLate, + ), + ) + + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 2, len(msgs)) + assertToolResultEvents(t, msgs[1].ToolCalls[0].ResultEvents, []ParsedToolResultEvent{ + { + ToolUseID: "call_wait", + AgentID: secondID, + SubagentSessionID: "codex:" + secondID, + Source: "wait_output", + Status: "completed", + Content: "Second agent finished", + }, + { + ToolUseID: "call_wait", + AgentID: firstID, + SubagentSessionID: "codex:" + firstID, + Source: "wait_output", + Status: "completed", + Content: "First agent finished", + }, + }) + }) + + t.Run("orphaned terminal notifications dedupe", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + completed := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-orphan", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", completed, tsEarlyS1), + testjsonl.CodexMsgJSON("user", completed, tsEarlyS5), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 1, len(msgs)) + assert.Equal(t, "Finished successfully", msgs[0].Content) + }) + t.Run("function call no name skipped", func(t *testing.T) { content := testjsonl.JoinJSONL( testjsonl.CodexSessionMetaJSON("fc-2", "/tmp", "user", tsEarly), @@ -518,3 +1026,155 @@ func TestParseCodexSessionFrom_NoNewData(t *testing.T) { assert.Equal(t, 0, len(newMsgs)) assert.True(t, endedAt.IsZero()) } + +func TestParseCodexSessionFrom_SubagentOutputRequiresFullParse(t *testing.T) { + t.Parallel() + + initial := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("inc-sub", "/tmp", "codex_cli_rs", tsEarly), + testjsonl.CodexMsgJSON("user", "run child", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "run it", + }, tsEarlyS5), + ) + path := createTestFile(t, "codex-subagent-inc.jsonl", initial) + + info, err := os.Stat(path) + require.NoError(t, err) + offset := info.Size() + + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0o644) + require.NoError(t, err) + _, err = f.WriteString(testjsonl.JoinJSONL( + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"019c9c96-6ee7-77c0-ba4c-380f844289d5","nickname":"Fennel"}`, tsLate), + )) + require.NoError(t, err) + require.NoError(t, f.Close()) + + _, _, _, err = ParseCodexSessionFrom(path, offset, 2, false) + require.Error(t, err) + assert.Contains(t, err.Error(), "full parse") +} + +func TestParseCodexSessionFrom_WaitCallRequiresFullParse(t *testing.T) { + t.Parallel() + + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + notification := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + initial := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("inc-wait", "/tmp", "codex_cli_rs", tsEarly), + testjsonl.CodexMsgJSON("user", "run child", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "run it", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", notification, tsLateS5), + ) + path := createTestFile(t, "codex-wait-inc.jsonl", initial) + + info, err := os.Stat(path) + require.NoError(t, err) + offset := info.Size() + + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0o644) + require.NoError(t, err) + _, err = f.WriteString(testjsonl.JoinJSONL( + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + }, "2024-01-01T10:01:06Z"), + )) + require.NoError(t, err) + require.NoError(t, f.Close()) + + _, _, _, err = ParseCodexSessionFrom(path, offset, 4, false) + require.Error(t, err) + assert.Contains(t, err.Error(), "full parse") +} + +func TestParseCodexSessionFrom_SystemMessageDoesNotRequireFullParse(t *testing.T) { + t.Parallel() + + initial := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("inc-system", "/tmp", "codex_cli_rs", tsEarly), + testjsonl.CodexMsgJSON("user", "hello", tsEarlyS1), + ) + path := createTestFile(t, "codex-system-inc.jsonl", initial) + + info, err := os.Stat(path) + require.NoError(t, err) + offset := info.Size() + + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0o644) + require.NoError(t, err) + _, err = f.WriteString(testjsonl.JoinJSONL( + testjsonl.CodexMsgJSON("user", "# AGENTS.md\nsome instructions", tsLate), + )) + require.NoError(t, err) + require.NoError(t, f.Close()) + + newMsgs, endedAt, _, err := ParseCodexSessionFrom(path, offset, 1, false) + require.NoError(t, err) + assert.Equal(t, 0, len(newMsgs)) + assert.False(t, endedAt.IsZero()) +} + +func TestParseCodexSessionFrom_RunningNotificationRequiresFullParse(t *testing.T) { + t.Parallel() + + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + running := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"running\":\"Still working\"}}\n" + + "" + initial := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("inc-running", "/tmp", "codex_cli_rs", tsEarly), + testjsonl.CodexMsgJSON("user", "hello", tsEarlyS1), + ) + path := createTestFile(t, "codex-running-inc.jsonl", initial) + + info, err := os.Stat(path) + require.NoError(t, err) + offset := info.Size() + + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0o644) + require.NoError(t, err) + _, err = f.WriteString(testjsonl.JoinJSONL( + testjsonl.CodexMsgJSON("user", running, tsLate), + )) + require.NoError(t, err) + require.NoError(t, f.Close()) + + _, _, _, err = ParseCodexSessionFrom(path, offset, 1, false) + require.Error(t, err) + assert.Contains(t, err.Error(), "full parse") +} + +func TestParseCodexSessionFrom_NonSubagentFunctionOutputDoesNotRequireFullParse(t *testing.T) { + t.Parallel() + + initial := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("inc-nonsubagent-output", "/tmp", "codex_cli_rs", tsEarly), + testjsonl.CodexMsgJSON("user", "hello", tsEarlyS1), + ) + path := createTestFile(t, "codex-nonsubagent-output-inc.jsonl", initial) + + info, err := os.Stat(path) + require.NoError(t, err) + offset := info.Size() + + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0o644) + require.NoError(t, err) + _, err = f.WriteString(testjsonl.JoinJSONL( + testjsonl.CodexFunctionCallOutputJSON("call_other", `{"status":"ok"}`, tsLate), + )) + require.NoError(t, err) + require.NoError(t, f.Close()) + + newMsgs, endedAt, _, err := ParseCodexSessionFrom(path, offset, 1, false) + require.NoError(t, err) + assert.Equal(t, 0, len(newMsgs)) + assert.False(t, endedAt.IsZero()) +} diff --git a/internal/parser/taxonomy.go b/internal/parser/taxonomy.go index 657a9d6a..6e0d07c0 100644 --- a/internal/parser/taxonomy.go +++ b/internal/parser/taxonomy.go @@ -31,6 +31,8 @@ func NormalizeToolCategory(rawName string) string { return "Bash" case "apply_patch": return "Edit" + case "spawn_agent": + return "Task" // Gemini tools case "read_file", "list_directory": diff --git a/internal/parser/types.go b/internal/parser/types.go index 14d09989..33f3b3a7 100644 --- a/internal/parser/types.go +++ b/internal/parser/types.go @@ -295,6 +295,7 @@ type ParsedToolCall struct { InputJSON string // raw JSON of the input object SkillName string // skill name when ToolName is "Skill" SubagentSessionID string // linked subagent session file (e.g. "agent-{task_id}") + ResultEvents []ParsedToolResultEvent } // ParsedToolResult holds metadata about a tool result block in a @@ -305,6 +306,18 @@ type ParsedToolResult struct { ContentRaw string // raw JSON of the content field; decode with DecodeContent } +// ParsedToolResultEvent is a canonical chronological update attached +// to a tool call. Used for Codex subagent terminal status updates. +type ParsedToolResultEvent struct { + ToolUseID string + AgentID string + SubagentSessionID string + Source string + Status string + Content string + Timestamp time.Time +} + // ParsedMessage holds a single extracted message. type ParsedMessage struct { Ordinal int diff --git a/internal/postgres/messages.go b/internal/postgres/messages.go index 2dcd18ee..eb2b9f5d 100644 --- a/internal/postgres/messages.go +++ b/internal/postgres/messages.go @@ -383,6 +383,11 @@ func (s *Store) attachToolCalls( return err } } + if err := s.attachToolResultEvents( + ctx, msgs, ordToIdx, sessionID, ordinals, + ); err != nil { + return err + } return nil } @@ -450,6 +455,96 @@ func (s *Store) attachToolCallsBatch( return rows.Err() } +func (s *Store) attachToolResultEvents( + ctx context.Context, + msgs []db.Message, + ordToIdx map[int]int, + sessionID string, + ordinals []int, +) error { + for i := 0; i < len(ordinals); i += attachToolCallBatchSize { + end := min(i+attachToolCallBatchSize, len(ordinals)) + if err := s.attachToolResultEventsBatch( + ctx, msgs, ordToIdx, sessionID, ordinals[i:end], + ); err != nil { + return err + } + } + return nil +} + +func (s *Store) attachToolResultEventsBatch( + ctx context.Context, + msgs []db.Message, + ordToIdx map[int]int, + sessionID string, + ordinals []int, +) error { + if len(ordinals) == 0 { + return nil + } + + args := []any{sessionID} + phs := make([]string, len(ordinals)) + for i, ord := range ordinals { + args = append(args, ord) + phs[i] = fmt.Sprintf("$%d", i+2) + } + + query := fmt.Sprintf(` + SELECT tool_call_message_ordinal, call_index, + COALESCE(tool_use_id, ''), + COALESCE(agent_id, ''), + COALESCE(subagent_session_id, ''), + source, status, content, content_length, + timestamp, event_index + FROM tool_result_events + WHERE session_id = $1 + AND tool_call_message_ordinal IN (%s) + ORDER BY tool_call_message_ordinal, call_index, event_index`, + strings.Join(phs, ",")) + + rows, err := s.pg.QueryContext(ctx, query, args...) + if err != nil { + return fmt.Errorf("querying tool_result_events: %w", err) + } + defer rows.Close() + + for rows.Next() { + var ( + msgOrdinal int + callIndex int + ev db.ToolResultEvent + ts *time.Time + ) + if err := rows.Scan( + &msgOrdinal, &callIndex, + &ev.ToolUseID, &ev.AgentID, + &ev.SubagentSessionID, + &ev.Source, &ev.Status, + &ev.Content, &ev.ContentLength, + &ts, &ev.EventIndex, + ); err != nil { + return fmt.Errorf("scanning tool_result_event: %w", err) + } + if ts != nil { + ev.Timestamp = FormatISO8601(*ts) + } + idx, ok := ordToIdx[msgOrdinal] + if !ok { + continue + } + if callIndex < 0 || callIndex >= len(msgs[idx].ToolCalls) { + continue + } + msgs[idx].ToolCalls[callIndex].ResultEvents = append( + msgs[idx].ToolCalls[callIndex].ResultEvents, + ev, + ) + } + return rows.Err() +} + // scanPGMessages scans message rows from PostgreSQL, // converting TIMESTAMPTZ to string. func scanPGMessages(rows interface { diff --git a/internal/postgres/push.go b/internal/postgres/push.go index 6fec4b16..0853b28f 100644 --- a/internal/postgres/push.go +++ b/internal/postgres/push.go @@ -660,6 +660,14 @@ func (s *Sync) pushMessages( ) } if localCount == 0 { + if _, err := tx.ExecContext(ctx, + `DELETE FROM tool_result_events WHERE session_id = $1`, + sessionID, + ); err != nil { + return 0, fmt.Errorf( + "deleting stale pg tool_result_events: %w", err, + ) + } if _, err := tx.ExecContext(ctx, `DELETE FROM tool_calls WHERE session_id = $1`, sessionID, @@ -755,6 +763,14 @@ func (s *Sync) pushMessages( } } + if _, err := tx.ExecContext(ctx, ` + DELETE FROM tool_result_events + WHERE session_id = $1 + `, sessionID); err != nil { + return 0, fmt.Errorf( + "deleting pg tool_result_events: %w", err, + ) + } if _, err := tx.ExecContext(ctx, ` DELETE FROM tool_calls WHERE session_id = $1 @@ -808,6 +824,11 @@ func (s *Sync) pushMessages( ); err != nil { return count, err } + if err := bulkInsertToolResultEvents( + ctx, tx, sessionID, msgs, + ); err != nil { + return count, err + } count += len(msgs) startOrdinal = nextOrdinal } @@ -938,6 +959,78 @@ func bulkInsertToolCalls( return nil } +func bulkInsertToolResultEvents( + ctx context.Context, tx *sql.Tx, + sessionID string, msgs []db.Message, +) error { + type evRow struct { + ordinal int + index int + ev db.ToolResultEvent + } + var rows []evRow + for _, m := range msgs { + for i, tc := range m.ToolCalls { + for _, ev := range tc.ResultEvents { + rows = append(rows, evRow{m.Ordinal, i, ev}) + } + } + } + if len(rows) == 0 { + return nil + } + + const evBatch = 100 + for i := 0; i < len(rows); i += evBatch { + end := min(i+evBatch, len(rows)) + batch := rows[i:end] + + var b strings.Builder + b.WriteString(`INSERT INTO tool_result_events ( + session_id, tool_call_message_ordinal, call_index, + tool_use_id, agent_id, subagent_session_id, + source, status, content, content_length, + timestamp, event_index) VALUES `) + args := make([]any, 0, len(batch)*12) + for j, r := range batch { + if j > 0 { + b.WriteByte(',') + } + p := j*12 + 1 + fmt.Fprintf(&b, + "($%d,$%d,$%d,$%d,$%d,$%d,"+ + "$%d,$%d,$%d,$%d,$%d,$%d)", + p, p+1, p+2, p+3, p+4, p+5, + p+6, p+7, p+8, p+9, p+10, p+11, + ) + var ts any + if r.ev.Timestamp != "" { + if t, ok := ParseSQLiteTimestamp(r.ev.Timestamp); ok { + ts = t + } + } + args = append(args, + sessionID, + r.ordinal, + r.index, + nilIfEmpty(r.ev.ToolUseID), + nilIfEmpty(r.ev.AgentID), + nilIfEmpty(r.ev.SubagentSessionID), + sanitizePG(r.ev.Source), + sanitizePG(r.ev.Status), + sanitizePG(r.ev.Content), + r.ev.ContentLength, + ts, + r.ev.EventIndex, + ) + } + if _, err := tx.ExecContext(ctx, b.String(), args...); err != nil { + return fmt.Errorf("bulk inserting tool_result_events: %w", err) + } + } + return nil +} + // normalizeSyncTimestamps ensures schema exists and normalizes // local sync state timestamps. func (s *Sync) normalizeSyncTimestamps( diff --git a/internal/postgres/schema.go b/internal/postgres/schema.go index fdb58e28..db14f43e 100644 --- a/internal/postgres/schema.go +++ b/internal/postgres/schema.go @@ -72,6 +72,33 @@ CREATE UNIQUE INDEX IF NOT EXISTS idx_tool_calls_dedup CREATE INDEX IF NOT EXISTS idx_tool_calls_session ON tool_calls (session_id); + +CREATE TABLE IF NOT EXISTS tool_result_events ( + id BIGSERIAL PRIMARY KEY, + session_id TEXT NOT NULL, + tool_call_message_ordinal INT NOT NULL, + call_index INT NOT NULL DEFAULT 0, + tool_use_id TEXT, + agent_id TEXT, + subagent_session_id TEXT, + source TEXT NOT NULL, + status TEXT NOT NULL, + content TEXT NOT NULL, + content_length INT NOT NULL DEFAULT 0, + timestamp TIMESTAMPTZ, + event_index INT NOT NULL DEFAULT 0, + FOREIGN KEY (session_id) + REFERENCES sessions(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_tool_result_events_session + ON tool_result_events (session_id); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_tool_result_events_dedup + ON tool_result_events ( + session_id, tool_call_message_ordinal, + call_index, event_index + ); ` // EnsureSchema creates the schema (if needed), then runs @@ -163,6 +190,16 @@ func CheckSchemaCompat( ) } rows.Close() + + rows, err = db.QueryContext(ctx, + `SELECT event_index FROM tool_result_events LIMIT 0`) + if err != nil { + return fmt.Errorf( + "tool_result_events table missing required columns: %w", + err, + ) + } + rows.Close() return nil } diff --git a/internal/postgres/sync_test.go b/internal/postgres/sync_test.go index 137113e5..443940ff 100644 --- a/internal/postgres/sync_test.go +++ b/internal/postgres/sync_test.go @@ -59,6 +59,14 @@ func TestEnsureSchemaIdempotent(t *testing.T) { if err := ps.EnsureSchema(ctx); err != nil { t.Fatalf("second EnsureSchema: %v", err) } + + var eventIndex int + err = ps.pg.QueryRowContext(ctx, + "SELECT event_index FROM tool_result_events LIMIT 0", + ).Scan(&eventIndex) + if err != nil && err != sql.ErrNoRows { + t.Fatalf("tool_result_events schema probe: %v", err) + } } func TestPushSingleSession(t *testing.T) { @@ -302,6 +310,85 @@ func TestPushWithToolCalls(t *testing.T) { } } +func TestPushWithToolResultEvents(t *testing.T) { + pgURL := testPGURL(t) + cleanPGSchema(t, pgURL) + t.Cleanup(func() { cleanPGSchema(t, pgURL) }) + + local := testDB(t) + ps, err := New( + pgURL, "agentsview", local, + "test-machine", true, + ) + if err != nil { + t.Fatalf("creating sync: %v", err) + } + defer ps.Close() + + ctx := context.Background() + if err := ps.EnsureSchema(ctx); err != nil { + t.Fatalf("ensure schema: %v", err) + } + + sess := db.Session{ + ID: "sess-events-001", + Project: "test-project", + Machine: "local", + Agent: "codex", + MessageCount: 1, + } + if err := local.UpsertSession(sess); err != nil { + t.Fatalf("upsert session: %v", err) + } + if err := local.InsertMessages([]db.Message{ + { + SessionID: "sess-events-001", + Ordinal: 0, + Role: "assistant", + Content: "tool use response", + HasToolUse: true, + ToolCalls: []db.ToolCall{ + { + ToolName: "wait", + Category: "Task", + ToolUseID: "call_wait", + ResultEvents: []db.ToolResultEvent{ + { + ToolUseID: "call_wait", + AgentID: "agent-1", + SubagentSessionID: "codex:agent-1", + Source: "wait_output", + Status: "completed", + Content: "first result", + ContentLength: len("first result"), + Timestamp: "2026-03-27T10:00:00Z", + EventIndex: 0, + }, + }, + }, + }, + }, + }); err != nil { + t.Fatalf("insert messages: %v", err) + } + + if _, err := ps.Push(ctx, false); err != nil { + t.Fatalf("push: %v", err) + } + + var count int + err = ps.pg.QueryRowContext(ctx, + "SELECT COUNT(*) FROM tool_result_events WHERE session_id = $1", + "sess-events-001", + ).Scan(&count) + if err != nil { + t.Fatalf("querying pg tool_result_events: %v", err) + } + if count != 1 { + t.Fatalf("pg tool_result_events = %d, want 1", count) + } +} + func TestStatus(t *testing.T) { pgURL := testPGURL(t) cleanPGSchema(t, pgURL) diff --git a/internal/sync/engine.go b/internal/sync/engine.go index 657cf2ff..d7015781 100644 --- a/internal/sync/engine.go +++ b/internal/sync/engine.go @@ -1478,6 +1478,13 @@ func (e *Engine) tryIncrementalJSONL( file.Path, inc.FileSize, maxOrd+1, ) if err != nil { + if parser.IsIncrementalFullParseFallback(err) { + log.Printf( + "incremental %s %s: %v (explicit full parse fallback)", + agent, file.Path, err, + ) + return processResult{}, false + } log.Printf( "incremental %s %s: %v (full parse)", agent, file.Path, err, @@ -2473,11 +2480,35 @@ func convertToolCalls( InputJSON: tc.InputJSON, SkillName: tc.SkillName, SubagentSessionID: tc.SubagentSessionID, + ResultEvents: convertToolResultEvents(tc.ResultEvents), } } return calls } +func convertToolResultEvents( + parsed []parser.ParsedToolResultEvent, +) []db.ToolResultEvent { + if len(parsed) == 0 { + return nil + } + events := make([]db.ToolResultEvent, len(parsed)) + for i, ev := range parsed { + events[i] = db.ToolResultEvent{ + ToolUseID: ev.ToolUseID, + AgentID: ev.AgentID, + SubagentSessionID: ev.SubagentSessionID, + Source: ev.Source, + Status: ev.Status, + Content: ev.Content, + ContentLength: len(ev.Content), + Timestamp: timeutil.Format(ev.Timestamp), + EventIndex: i, + } + } + return events +} + // convertToolResults maps parsed tool results to db.ToolResult // structs for use in pairing before DB insert. func convertToolResults( @@ -2502,6 +2533,7 @@ func convertToolResults( // tool_result blocks (no displayable text). func pairAndFilter(msgs []db.Message, blocked map[string]bool) []db.Message { pairToolResults(msgs, blocked) + pairToolResultEventSummaries(msgs, blocked) filtered := msgs[:0] for _, m := range msgs { if m.Role == "user" && @@ -2541,3 +2573,80 @@ func pairToolResults(msgs []db.Message, blocked map[string]bool) { } } } + +func pairToolResultEventSummaries( + msgs []db.Message, blocked map[string]bool, +) { + for i := range msgs { + for j := range msgs[i].ToolCalls { + tc := &msgs[i].ToolCalls[j] + if len(tc.ResultEvents) == 0 { + continue + } + summary := summarizeToolResultEvents(tc.ResultEvents) + tc.ResultContentLength = len(summary) + if blocked[tc.Category] { + tc.ResultContent = "" + tc.ResultEvents = nil + continue + } + tc.ResultContent = summary + } + } +} + +func summarizeToolResultEvents( + events []db.ToolResultEvent, +) string { + if len(events) == 0 { + return "" + } + type agentSummary struct { + order int + content string + } + latestByAgent := map[string]agentSummary{} + orderedAgents := make([]string, 0, len(events)) + lastAnon := "" + allHaveAgentID := true + for _, ev := range events { + if strings.TrimSpace(ev.Content) == "" { + continue + } + agentID := strings.TrimSpace(ev.AgentID) + if agentID == "" { + allHaveAgentID = false + lastAnon = ev.Content + continue + } + if _, ok := latestByAgent[agentID]; !ok { + latestByAgent[agentID] = agentSummary{ + order: len(orderedAgents), + content: ev.Content, + } + orderedAgents = append(orderedAgents, agentID) + continue + } + entry := latestByAgent[agentID] + entry.content = ev.Content + latestByAgent[agentID] = entry + } + if len(latestByAgent) <= 1 { + if len(latestByAgent) == 1 { + summary := latestByAgent[orderedAgents[0]].content + if lastAnon != "" { + return summary + "\n\n" + lastAnon + } + return summary + } + return lastAnon + } + parts := make([]string, 0, len(orderedAgents)) + for _, agentID := range orderedAgents { + parts = append(parts, agentID+":\n"+latestByAgent[agentID].content) + } + if !allHaveAgentID && lastAnon != "" { + parts = append(parts, lastAnon) + } + return strings.Join(parts, "\n\n") +} diff --git a/internal/sync/engine_integration_test.go b/internal/sync/engine_integration_test.go index e4c61e30..5930367d 100644 --- a/internal/sync/engine_integration_test.go +++ b/internal/sync/engine_integration_test.go @@ -2989,6 +2989,92 @@ func TestIncrementalSync_CodexAppend(t *testing.T) { } } +func TestIncrementalSync_CodexSubagentAppendFallsBackToFullParse(t *testing.T) { + env := setupTestEnv(t) + + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + initial := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON( + "inc-cx-sub", "/tmp/proj", + "codex_cli_rs", tsEarly, + ), + testjsonl.CodexMsgJSON("user", "run child", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "run it", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, "2024-01-01T10:01:00Z"), + ) + path := env.writeCodexSession( + t, filepath.Join("2024", "01", "01"), + "rollout-20240101-inc-cx-sub.jsonl", initial, + ) + env.engine.SyncAll(context.Background(), nil) + + assertSessionMessageCount( + t, env.db, "codex:inc-cx-sub", 2, + ) + + appended := testjsonl.JoinJSONL( + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + }, "2024-01-01T10:01:06Z"), + testjsonl.CodexFunctionCallOutputJSON("call_wait", + "{\"status\":{\""+childID+"\":{\"completed\":\"Finished successfully\"}}}", + "2024-01-01T10:01:07Z", + ), + ) + f, err := os.OpenFile( + path, os.O_APPEND|os.O_WRONLY, 0o644, + ) + if err != nil { + t.Fatalf("open for append: %v", err) + } + _, err = f.WriteString(appended) + f.Close() + if err != nil { + t.Fatalf("append: %v", err) + } + + // SyncPaths hits the incremental Codex path first. The appended + // wait call is an explicit full-parse fallback case and should + // still produce the final parsed state successfully. + env.engine.SyncPaths([]string{path}) + + assertSessionMessageCount( + t, env.db, "codex:inc-cx-sub", 3, + ) + msgs := fetchMessages(t, env.db, "codex:inc-cx-sub") + if len(msgs) != 3 { + t.Fatalf("messages len = %d, want 3", len(msgs)) + } + if len(msgs[2].ToolCalls) != 1 { + t.Fatalf("tool calls len = %d, want 1", len(msgs[2].ToolCalls)) + } + waitCall := msgs[2].ToolCalls[0] + if waitCall.ToolName != "wait" { + t.Fatalf("tool name = %q, want %q", waitCall.ToolName, "wait") + } + if len(waitCall.ResultEvents) != 1 { + t.Fatalf("result events len = %d, want 1", len(waitCall.ResultEvents)) + } + if waitCall.ResultEvents[0].AgentID != childID { + t.Fatalf("event agent_id = %q, want %q", waitCall.ResultEvents[0].AgentID, childID) + } + if waitCall.ResultEvents[0].Content != "Finished successfully" { + t.Fatalf( + "event content = %q, want %q", + waitCall.ResultEvents[0].Content, "Finished successfully", + ) + } + if waitCall.ResultContent != "Finished successfully" { + t.Fatalf( + "result_content = %q, want %q", + waitCall.ResultContent, "Finished successfully", + ) + } +} + func TestResyncAllCancelledPreservesOriginalDB(t *testing.T) { env := setupTestEnv(t) diff --git a/internal/sync/engine_test.go b/internal/sync/engine_test.go index 28b73a0d..64daea72 100644 --- a/internal/sync/engine_test.go +++ b/internal/sync/engine_test.go @@ -430,6 +430,212 @@ func TestPairToolResultsContent(t *testing.T) { } } +func TestPairToolResultEventSummaries(t *testing.T) { + tests := []struct { + name string + msgs []db.Message + blocked map[string]bool + want []db.Message + }{ + { + name: "single event becomes summary", + msgs: []db.Message{{ + ToolCalls: []db.ToolCall{{ + ToolUseID: "call_wait", + ToolName: "wait", + Category: "Other", + ResultEvents: []db.ToolResultEvent{{ + ToolUseID: "call_wait", + AgentID: "agent-1", + Source: "wait_output", + Status: "completed", + Content: "Finished successfully", + ContentLength: len("Finished successfully"), + }}, + }}, + }}, + want: []db.Message{{ + ToolCalls: []db.ToolCall{{ + ToolUseID: "call_wait", + ToolName: "wait", + Category: "Other", + ResultContentLength: len("Finished successfully"), + ResultContent: "Finished successfully", + ResultEvents: []db.ToolResultEvent{{ + ToolUseID: "call_wait", + AgentID: "agent-1", + Source: "wait_output", + Status: "completed", + Content: "Finished successfully", + ContentLength: len("Finished successfully"), + }}, + }}, + }}, + }, + { + name: "multi-agent latest summary keeps one line per agent", + msgs: []db.Message{{ + ToolCalls: []db.ToolCall{{ + ToolUseID: "call_wait", + ToolName: "wait", + Category: "Other", + ResultEvents: []db.ToolResultEvent{ + { + ToolUseID: "call_wait", + AgentID: "agent-a", + Source: "wait_output", + Status: "completed", + Content: "First finished", + ContentLength: len("First finished"), + }, + { + ToolUseID: "call_wait", + AgentID: "agent-b", + Source: "subagent_notification", + Status: "completed", + Content: "Second finished", + ContentLength: len("Second finished"), + }, + }, + }}, + }}, + want: []db.Message{{ + ToolCalls: []db.ToolCall{{ + ToolUseID: "call_wait", + ToolName: "wait", + Category: "Other", + ResultContentLength: len("agent-a:\nFirst finished\n\nagent-b:\nSecond finished"), + ResultContent: "agent-a:\nFirst finished\n\nagent-b:\nSecond finished", + ResultEvents: []db.ToolResultEvent{ + { + ToolUseID: "call_wait", + AgentID: "agent-a", + Source: "wait_output", + Status: "completed", + Content: "First finished", + ContentLength: len("First finished"), + }, + { + ToolUseID: "call_wait", + AgentID: "agent-b", + Source: "subagent_notification", + Status: "completed", + Content: "Second finished", + ContentLength: len("Second finished"), + }, + }, + }}, + }}, + }, + { + name: "blocked category keeps length but drops summary content", + msgs: []db.Message{{ + ToolCalls: []db.ToolCall{{ + ToolUseID: "call_read", + ToolName: "Read", + Category: "Read", + ResultEvents: []db.ToolResultEvent{{ + ToolUseID: "call_read", + Source: "wait_output", + Status: "completed", + Content: "secret file body", + ContentLength: len("secret file body"), + }}, + }}, + }}, + blocked: map[string]bool{"Read": true}, + want: []db.Message{{ + ToolCalls: []db.ToolCall{{ + ToolUseID: "call_read", + ToolName: "Read", + Category: "Read", + ResultContentLength: len("secret file body"), + ResultContent: "", + ResultEvents: nil, + }}, + }}, + }, + { + name: "mixed anonymous and multi-agent content keeps both", + msgs: []db.Message{{ + ToolCalls: []db.ToolCall{{ + ToolUseID: "call_wait", + ToolName: "wait", + Category: "Other", + ResultEvents: []db.ToolResultEvent{ + { + ToolUseID: "call_wait", + AgentID: "agent-a", + Source: "wait_output", + Status: "completed", + Content: "First finished", + ContentLength: len("First finished"), + }, + { + ToolUseID: "call_wait", + AgentID: "agent-b", + Source: "wait_output", + Status: "completed", + Content: "Second finished", + ContentLength: len("Second finished"), + }, + { + ToolUseID: "call_wait", + Source: "subagent_notification", + Status: "completed", + Content: "Detached note", + ContentLength: len("Detached note"), + }, + }, + }}, + }}, + want: []db.Message{{ + ToolCalls: []db.ToolCall{{ + ToolUseID: "call_wait", + ToolName: "wait", + Category: "Other", + ResultContentLength: len("agent-a:\nFirst finished\n\nagent-b:\nSecond finished\n\nDetached note"), + ResultContent: "agent-a:\nFirst finished\n\nagent-b:\nSecond finished\n\nDetached note", + ResultEvents: []db.ToolResultEvent{ + { + ToolUseID: "call_wait", + AgentID: "agent-a", + Source: "wait_output", + Status: "completed", + Content: "First finished", + ContentLength: len("First finished"), + }, + { + ToolUseID: "call_wait", + AgentID: "agent-b", + Source: "wait_output", + Status: "completed", + Content: "Second finished", + ContentLength: len("Second finished"), + }, + { + ToolUseID: "call_wait", + Source: "subagent_notification", + Status: "completed", + Content: "Detached note", + ContentLength: len("Detached note"), + }, + }, + }}, + }}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pairToolResultEventSummaries(tt.msgs, tt.blocked) + if diff := cmp.Diff(tt.want, tt.msgs); diff != "" { + t.Fatalf("pairToolResultEventSummaries() mismatch (-want +got):\n%s", diff) + } + }) + } +} + func TestBlockedCategorySet(t *testing.T) { tests := []struct { name string diff --git a/internal/testjsonl/testjsonl.go b/internal/testjsonl/testjsonl.go index ba1b5fc6..1ace7861 100644 --- a/internal/testjsonl/testjsonl.go +++ b/internal/testjsonl/testjsonl.go @@ -217,6 +217,44 @@ func CodexFunctionCallFieldsJSON( return mustMarshal(m) } +// CodexFunctionCallWithCallIDJSON returns a Codex function_call +// response_item with an explicit call_id. +func CodexFunctionCallWithCallIDJSON( + name, callID string, arguments any, timestamp string, +) string { + payload := map[string]any{ + "type": "function_call", + "name": name, + "call_id": callID, + } + if arguments != nil { + payload["arguments"] = arguments + } + m := map[string]any{ + "type": "response_item", + "timestamp": timestamp, + "payload": payload, + } + return mustMarshal(m) +} + +// CodexFunctionCallOutputJSON returns a Codex +// function_call_output response_item. +func CodexFunctionCallOutputJSON( + callID string, output any, timestamp string, +) string { + m := map[string]any{ + "type": "response_item", + "timestamp": timestamp, + "payload": map[string]any{ + "type": "function_call_output", + "call_id": callID, + "output": output, + }, + } + return mustMarshal(m) +} + // CodexTurnContextJSON returns a Codex turn_context entry as a // JSON string with the given model. func CodexTurnContextJSON(model, timestamp string) string {