diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index 1b046e0708..81f708f645 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -7,7 +7,7 @@ import { execSync } from "node:child_process"; import { Config } from "@/node/config"; import { HistoryService } from "@/node/services/historyService"; import { PartialService } from "@/node/services/partialService"; -import { TaskService } from "@/node/services/taskService"; +import { TaskService, type AgentTaskStatus } from "@/node/services/taskService"; import { createRuntime } from "@/node/runtime/runtimeFactory"; import { Ok, Err, type Result } from "@/common/types/result"; import type { StreamEndEvent } from "@/common/types/stream"; @@ -15,8 +15,21 @@ import { createMuxMessage } from "@/common/types/message"; import type { WorkspaceMetadata } from "@/common/types/workspace"; import type { AIService } from "@/node/services/aiService"; import type { WorkspaceService } from "@/node/services/workspaceService"; -import type { InitStateManager } from "@/node/services/initStateManager"; -import { InitStateManager as RealInitStateManager } from "@/node/services/initStateManager"; +import { + InitStateManager as RealInitStateManager, + type InitStateManager, +} from "@/node/services/initStateManager"; +interface TaskServiceStatusInternal { + setTaskStatus: ( + workspaceId: string, + status: AgentTaskStatus, + options?: { allowMissing?: boolean } + ) => Promise; +} + +interface TaskServiceWaiterInternal extends TaskServiceStatusInternal { + resolveWaiters: (taskId: string, report: { reportMarkdown: string; title?: string }) => void; +} function initGitRepo(projectPath: string): void { execSync("git init -b main", { cwd: projectPath, stdio: "ignore" }); @@ -121,6 +134,7 @@ function createWorkspaceServiceMocks( resumeStream: ReturnType; remove: ReturnType; emit: ReturnType; + on: ReturnType; }> ): { workspaceService: WorkspaceService; @@ -128,6 +142,7 @@ function createWorkspaceServiceMocks( resumeStream: ReturnType; remove: ReturnType; emit: ReturnType; + on: ReturnType; } { const sendMessage = overrides?.sendMessage ?? mock((): Promise> => Promise.resolve(Ok(undefined))); @@ -135,19 +150,47 @@ function createWorkspaceServiceMocks( overrides?.resumeStream ?? mock((): Promise> => Promise.resolve(Ok(undefined))); const remove = overrides?.remove ?? mock((): Promise> => Promise.resolve(Ok(undefined))); - const emit = overrides?.emit ?? mock(() => true); + + const listenersByEvent = new Map void>>(); + + const workspaceService = {} as unknown as WorkspaceService; + + const emit = + overrides?.emit ?? + mock((event: string, ...args: unknown[]) => { + const listeners = listenersByEvent.get(event); + if (listeners) { + for (const listener of listeners) { + listener(...args); + } + } + return true; + }); + + const on = + overrides?.on ?? + mock((event: string, listener: (...args: unknown[]) => void) => { + const list = listenersByEvent.get(event) ?? []; + list.push(listener); + listenersByEvent.set(event, list); + return workspaceService; + }); + + Object.assign(workspaceService as unknown as Record, { + sendMessage, + resumeStream, + remove, + emit, + on, + }); return { - workspaceService: { - sendMessage, - resumeStream, - remove, - emit, - } as unknown as WorkspaceService, + workspaceService, sendMessage, resumeStream, remove, emit, + on, }; } @@ -354,15 +397,8 @@ describe("TaskService", () => { expect(queued.data.status).toBe("queued"); // Free the slot by marking the first task as reported. - await config.editConfig((cfg) => { - for (const [_project, project] of cfg.projects) { - const ws = project.workspaces.find((w) => w.id === running.data.taskId); - if (ws) { - ws.taskStatus = "reported"; - } - } - return cfg; - }); + const internal = taskService as unknown as TaskServiceStatusInternal; + await internal.setTaskStatus(running.data.taskId, "reported"); await taskService.initialize(); @@ -458,9 +494,8 @@ describe("TaskService", () => { requestingWorkspaceId: parentTask.data.taskId, }); - const internal = taskService as unknown as { + const internal = taskService as unknown as TaskServiceWaiterInternal & { maybeStartQueuedTasks: () => Promise; - resolveWaiters: (taskId: string, report: { reportMarkdown: string; title?: string }) => void; }; await internal.maybeStartQueuedTasks(); @@ -579,15 +614,8 @@ describe("TaskService", () => { ); // Free slot and start queued tasks. - await config.editConfig((cfg) => { - for (const [_project, project] of cfg.projects) { - const ws = project.workspaces.find((w) => w.id === running.data.taskId); - if (ws) { - ws.taskStatus = "reported"; - } - } - return cfg; - }); + const internal = taskService as unknown as TaskServiceStatusInternal; + await internal.setTaskStatus(running.data.taskId, "reported"); await taskService.initialize(); @@ -894,7 +922,16 @@ describe("TaskService", () => { const { aiService } = createAIServiceMocks(config); const { workspaceService, resumeStream } = createWorkspaceServiceMocks(); - const { taskService } = createTaskServiceHarness(config, { aiService, workspaceService }); + const { historyService, taskService } = createTaskServiceHarness(config, { + aiService, + workspaceService, + }); + + const writeHistory = await historyService.appendToHistory( + rootWorkspaceId, + createMuxMessage("user-root", "user", "hi", { timestamp: Date.now() }) + ); + expect(writeHistory.success).toBe(true); const internal = taskService as unknown as { handleStreamEnd: (event: StreamEndEvent) => Promise; @@ -929,6 +966,102 @@ describe("TaskService", () => { expect(additionalSystemInstructions).toContain(childTaskId); }); + test("defers auto-resume during compaction until history is rewritten", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const rootWorkspaceId = "root-111"; + const childTaskId = "task-222"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { + path: path.join(projectPath, "root"), + id: rootWorkspaceId, + name: "root", + aiSettings: { model: "openai:gpt-5.2", thinkingLevel: "medium" }, + }, + { + path: path.join(projectPath, "child-task"), + id: childTaskId, + name: "agent_explore_child", + parentWorkspaceId: rootWorkspaceId, + agentType: "explore", + taskStatus: "running", + taskModelString: "openai:gpt-5.2", + taskThinkingLevel: "medium", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, + }); + + const { aiService } = createAIServiceMocks(config); + const { workspaceService, resumeStream } = createWorkspaceServiceMocks(); + const { historyService, taskService } = createTaskServiceHarness(config, { + aiService, + workspaceService, + }); + + const writeHistory = await historyService.appendToHistory( + rootWorkspaceId, + createMuxMessage("compaction-request", "user", "/compact", { + timestamp: Date.now(), + muxMetadata: { + type: "compaction-request", + rawCommand: "/compact", + parsed: {}, + }, + }) + ); + expect(writeHistory.success).toBe(true); + + const internal = taskService as unknown as { + handleStreamEnd: (event: StreamEndEvent) => Promise; + }; + + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: rootWorkspaceId, + messageId: "assistant-compaction", + metadata: { model: "openai:gpt-5.2" }, + parts: [], + }); + + // Should not resume immediately while compaction is in progress. + expect(resumeStream).toHaveBeenCalledTimes(0); + + const clearResult = await historyService.clearHistory(rootWorkspaceId); + expect(clearResult.success).toBe(true); + + const appendSummary = await historyService.appendToHistory( + rootWorkspaceId, + createMuxMessage("summary", "assistant", "summary", { timestamp: Date.now() }) + ); + expect(appendSummary.success).toBe(true); + + await new Promise((resolve) => setTimeout(resolve, 250)); + + expect(resumeStream).toHaveBeenCalledTimes(1); + + const resumeCalls = (resumeStream as unknown as { mock: { calls: unknown[][] } }).mock.calls; + const options = resumeCalls[0]?.[1]; + if (!options || typeof options !== "object") { + throw new Error("Expected resumeStream to be called with an options object"); + } + + const additionalSystemInstructions = (options as { additionalSystemInstructions?: unknown }) + .additionalSystemInstructions; + expect(typeof additionalSystemInstructions).toBe("string"); + expect(additionalSystemInstructions).toContain(childTaskId); + }); + test("terminateDescendantAgentTask stops stream, removes workspace, and rejects waiters", async () => { const config = await createTestConfig(rootDir); @@ -1038,6 +1171,107 @@ describe("TaskService", () => { expect(remove).toHaveBeenNthCalledWith(2, parentTaskId, true); }); + test("initialize cleans up orphaned agent tasks when parent workspace is missing", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const missingParentId = "missing-parent"; + const orphanId = "task-orphan"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { + path: path.join(projectPath, "orphan"), + id: orphanId, + name: "agent_explore_orphan", + parentWorkspaceId: missingParentId, + agentType: "explore", + taskStatus: "queued", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 1, maxTaskNestingDepth: 3 }, + }); + + const { aiService } = createAIServiceMocks(config); + const remove = mock(async (workspaceId: string): Promise> => { + await config.removeWorkspace(workspaceId); + return Ok(undefined); + }); + const { workspaceService } = createWorkspaceServiceMocks({ remove }); + const { taskService } = createTaskServiceHarness(config, { aiService, workspaceService }); + + await taskService.initialize(); + + expect(remove).toHaveBeenCalledWith(orphanId, true); + expect(config.findWorkspace(orphanId)).toBeNull(); + }); + + test("handleWorkspaceRemoved terminates descendant tasks for a removed workspace", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const rootWorkspaceId = "root-111"; + const parentTaskId = "task-parent"; + const childTaskId = "task-child"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "root"), id: rootWorkspaceId, name: "root" }, + { + path: path.join(projectPath, "parent-task"), + id: parentTaskId, + name: "agent_exec_parent", + parentWorkspaceId: rootWorkspaceId, + agentType: "exec", + taskStatus: "running", + }, + { + path: path.join(projectPath, "child-task"), + id: childTaskId, + name: "agent_explore_child", + parentWorkspaceId: parentTaskId, + agentType: "explore", + taskStatus: "running", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, + }); + + const { aiService } = createAIServiceMocks(config); + const remove = mock(async (workspaceId: string): Promise> => { + await config.removeWorkspace(workspaceId); + return Ok(undefined); + }); + const { workspaceService } = createWorkspaceServiceMocks({ remove }); + const { taskService } = createTaskServiceHarness(config, { aiService, workspaceService }); + + const internal = taskService as unknown as { + handleWorkspaceRemoved: (workspaceId: string) => Promise; + }; + + await internal.handleWorkspaceRemoved(rootWorkspaceId); + + expect(remove).toHaveBeenNthCalledWith(1, childTaskId, true); + expect(remove).toHaveBeenNthCalledWith(2, parentTaskId, true); + + expect(config.findWorkspace(childTaskId)).toBeNull(); + expect(config.findWorkspace(parentTaskId)).toBeNull(); + }); + test("initialize resumes awaiting_report tasks after restart", async () => { const config = await createTestConfig(rootDir); @@ -1118,10 +1352,7 @@ describe("TaskService", () => { // Wait longer than timeout while task is still queued. await new Promise((r) => setTimeout(r, 100)); - const internal = taskService as unknown as { - setTaskStatus: (workspaceId: string, status: "queued" | "running") => Promise; - resolveWaiters: (taskId: string, report: { reportMarkdown: string; title?: string }) => void; - }; + const internal = taskService as unknown as TaskServiceWaiterInternal; await internal.setTaskStatus(childId, "running"); internal.resolveWaiters(childId, { reportMarkdown: "ok" }); @@ -1130,6 +1361,155 @@ describe("TaskService", () => { expect(report.reportMarkdown).toBe("ok"); }); + test("setTaskStatus refuses invalid transitions out of reported", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const parentId = "parent-111"; + const childId = "child-222"; + const reportedAt = new Date().toISOString(); + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "parent"), id: parentId, name: "parent" }, + { + path: path.join(projectPath, "child"), + id: childId, + name: "agent_explore_child", + parentWorkspaceId: parentId, + agentType: "explore", + taskStatus: "reported", + reportedAt, + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 1, maxTaskNestingDepth: 3 }, + }); + + const { taskService } = createTaskServiceHarness(config); + + const internal = taskService as unknown as TaskServiceStatusInternal; + + await internal.setTaskStatus(childId, "running"); + + const cfg = config.loadConfigOrDefault(); + const child = Array.from(cfg.projects.values()) + .flatMap((p) => p.workspaces) + .find((w) => w.id === childId); + + expect(child?.taskStatus).toBe("reported"); + expect(child?.reportedAt).toBe(reportedAt); + }); + + test("setTaskStatus stamps reportedAt when marking reported", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const parentId = "parent-111"; + const childId = "child-222"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "parent"), id: parentId, name: "parent" }, + { + path: path.join(projectPath, "child"), + id: childId, + name: "agent_explore_child", + parentWorkspaceId: parentId, + agentType: "explore", + taskStatus: "running", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 1, maxTaskNestingDepth: 3 }, + }); + + const { taskService } = createTaskServiceHarness(config); + + const internal = taskService as unknown as TaskServiceStatusInternal; + + await internal.setTaskStatus(childId, "reported"); + + const cfg = config.loadConfigOrDefault(); + const child = Array.from(cfg.projects.values()) + .flatMap((p) => p.workspaces) + .find((w) => w.id === childId); + + expect(child?.taskStatus).toBe("reported"); + expect(child?.reportedAt).toBeTruthy(); + expect(Number.isNaN(Date.parse(child!.reportedAt!))).toBe(false); + }); + + test("waitForAgentReport returns immediately for reported tasks (no hang)", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const parentId = "parent-111"; + const childId = "child-222"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "parent"), id: parentId, name: "parent" }, + { + path: path.join(projectPath, "child"), + id: childId, + name: "agent_explore_child", + parentWorkspaceId: parentId, + agentType: "explore", + taskStatus: "reported", + reportedAt: new Date().toISOString(), + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 1, maxTaskNestingDepth: 3 }, + }); + + const { historyService, taskService } = createTaskServiceHarness(config); + + // Persist the agent_report tool-call so waitForAgentReport can recover after restarts. + const historyMsg = createMuxMessage( + "assistant-child-history", + "assistant", + "", + { timestamp: Date.now() }, + [ + { + type: "dynamic-tool", + toolCallId: "agent-report-call-1", + toolName: "agent_report", + input: { reportMarkdown: "Hello from child", title: "Result" }, + state: "output-available", + output: { success: true }, + }, + ] + ); + + const append = await historyService.appendToHistory(childId, historyMsg); + expect(append.success).toBe(true); + + const report = await taskService.waitForAgentReport(childId, { timeoutMs: 25 }); + expect(report.reportMarkdown).toBe("Hello from child"); + expect(report.title).toBe("Result"); + }); + test("waitForAgentReport returns cached report even after workspace is removed", async () => { const config = await createTestConfig(rootDir); @@ -1161,9 +1541,7 @@ describe("TaskService", () => { const { taskService } = createTaskServiceHarness(config); - const internal = taskService as unknown as { - resolveWaiters: (taskId: string, report: { reportMarkdown: string; title?: string }) => void; - }; + const internal = taskService as unknown as TaskServiceWaiterInternal; internal.resolveWaiters(childId, { reportMarkdown: "ok", title: "t" }); await config.removeWorkspace(childId); @@ -1204,9 +1582,7 @@ describe("TaskService", () => { const { taskService } = createTaskServiceHarness(config); - const internal = taskService as unknown as { - resolveWaiters: (taskId: string, report: { reportMarkdown: string; title?: string }) => void; - }; + const internal = taskService as unknown as TaskServiceWaiterInternal; internal.resolveWaiters(childId, { reportMarkdown: "ok", title: "t" }); await config.removeWorkspace(childId); @@ -1246,9 +1622,7 @@ describe("TaskService", () => { const { taskService } = createTaskServiceHarness(config); - const internal = taskService as unknown as { - resolveWaiters: (taskId: string, report: { reportMarkdown: string; title?: string }) => void; - }; + const internal = taskService as unknown as TaskServiceWaiterInternal; internal.resolveWaiters(childId, { reportMarkdown: "ok", title: "t" }); await config.removeWorkspace(childId); @@ -1288,8 +1662,7 @@ describe("TaskService", () => { const { taskService } = createTaskServiceHarness(config); - const internal = taskService as unknown as { - resolveWaiters: (taskId: string, report: { reportMarkdown: string; title?: string }) => void; + const internal = taskService as unknown as TaskServiceWaiterInternal & { cleanupExpiredCompletedReports: (nowMs: number) => void; }; internal.resolveWaiters(childId, { reportMarkdown: "ok", title: "t" }); diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index d264f465c0..1502633ee6 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -80,9 +80,33 @@ export interface DescendantAgentTaskInfo { type AgentTaskWorkspaceEntry = WorkspaceConfigEntry & { projectPath: string }; +const POST_COMPACTION_PARENT_AWAIT_DELAY_MS = 50; +const POST_COMPACTION_PARENT_AWAIT_MAX_RETRIES = 10; const COMPLETED_REPORT_CACHE_TTL_MS = 60 * 60 * 1000; // 1 hour const COMPLETED_REPORT_CACHE_MAX_ENTRIES = 128; +const AGENT_TASK_STATUS_TRANSITIONS: Record = { + queued: ["queued", "running", "awaiting_report", "reported"], + running: ["running", "awaiting_report", "reported"], + awaiting_report: ["awaiting_report", "running", "reported"], + reported: ["reported"], +}; + +function isAllowedAgentTaskStatusTransition(from: AgentTaskStatus, to: AgentTaskStatus): boolean { + return AGENT_TASK_STATUS_TRANSITIONS[from].includes(to); +} + +function normalizeAgentTaskStatus(status: WorkspaceConfigEntry["taskStatus"]): AgentTaskStatus { + // Backward compatibility: older task workspaces may omit taskStatus. + // Treat missing as running so parallelism accounting is conservative. + return (status ?? "running") as AgentTaskStatus; +} +const ACTIVE_AGENT_TASK_STATUSES: ReadonlySet = new Set([ + "queued", + "running", + "awaiting_report", +]); + interface AgentTaskIndex { byId: Map; childrenByParent: Map; @@ -173,6 +197,49 @@ function buildAgentWorkspaceName(agentType: string, workspaceId: string): string return name.length <= 64 ? name : `agent_${workspaceId}`.slice(0, 64); } +function shouldDeferParentResumeForCompaction(messages: MuxMessage[]): boolean { + if (messages.length === 0) { + return true; + } + + const lastUserMsg = [...messages].reverse().find((m) => m.role === "user"); + const muxMeta = lastUserMsg?.metadata?.muxMetadata; + const type = muxMeta && typeof muxMeta === "object" ? (muxMeta as { type?: unknown }).type : null; + return type === "compaction-request"; +} + +function buildParentAwaitSystemInstructions(activeTaskIds: string[]): string { + const taskIdsJson = JSON.stringify(activeTaskIds); + + return ( + `You have active background sub-agent task(s) (${activeTaskIds.join(", ")}). ` + + "Before writing your final response, wait for these sub-agent tasks to finish and report. " + + `Call task_await with task_ids=${taskIdsJson} (do not omit task_ids; omitting may include long-running bash/background processes like dev servers). ` + + "If any are still queued/running/awaiting_report after that, call task_await again with the same task_ids. " + + "Once all are completed, write your final response integrating their reports." + ); +} + +// Shared helper for walking up the task ancestry chain with a fixed hop limit. +// Keeping this centralized ensures consistent cycle detection across callers. +function* iterateAncestorWorkspaceIds( + parentById: Map, + taskId: string +): Generator { + let current = taskId; + for (let i = 0; i < 32; i++) { + const parent = parentById.get(current); + if (!parent) { + return; + } + yield parent; + current = parent; + } + + throw new Error( + `iterateAncestorWorkspaceIds: possible parentWorkspaceId cycle starting at ${taskId}` + ); +} function getIsoNow(): string { return new Date().toISOString(); } @@ -192,6 +259,7 @@ export class TaskService { // Bounded by TTL + max entries (see COMPLETED_REPORT_CACHE_*). private readonly completedReportsByTaskId = new Map(); private readonly remindedAwaitingReport = new Set(); + private readonly postCompactionAwaitTimers = new Map>(); constructor( private readonly config: Config, @@ -227,6 +295,19 @@ export class TaskService { log.error("TaskService.handleStreamEnd failed", { error }); }); }); + + this.workspaceService.on("metadata", (event) => { + // If the parent workspace is removed, any descendant tasks become unreachable/unawaitable. + // Clean them up to avoid leaving orphaned sub-agent workspaces around. + if (event.metadata !== null) return; + + void this.handleWorkspaceRemoved(event.workspaceId).catch((error: unknown) => { + log.error("TaskService.handleWorkspaceRemoved failed", { + workspaceId: event.workspaceId, + error, + }); + }); + }); } private async emitWorkspaceMetadata(workspaceId: string): Promise { @@ -264,7 +345,172 @@ export class TaskService { return found; } + private async cleanupOrphanedAgentTasks(): Promise { + const config = this.config.loadConfigOrDefault(); + + const orphanRoots: Array<{ taskId: string; parentWorkspaceId: string }> = []; + + for (const task of this.listAgentTaskWorkspaces(config)) { + const taskId = coerceNonEmptyString(task.id); + const parentWorkspaceId = coerceNonEmptyString(task.parentWorkspaceId); + if (!taskId || !parentWorkspaceId) continue; + + if (this.config.findWorkspace(parentWorkspaceId)) { + continue; + } + + orphanRoots.push({ taskId, parentWorkspaceId }); + } + + if (orphanRoots.length === 0) { + return; + } + + log.warn("Cleaning up orphaned agent tasks (parent workspace missing)", { + count: orphanRoots.length, + taskIds: orphanRoots.map((t) => t.taskId), + }); + + for (const orphan of orphanRoots) { + const terminateResult = await this.terminateDescendantAgentTask( + orphan.parentWorkspaceId, + orphan.taskId + ); + if (!terminateResult.success) { + if (/not found/i.test(terminateResult.error)) { + continue; + } + log.error("Failed to clean up orphaned agent task", { + taskId: orphan.taskId, + parentWorkspaceId: orphan.parentWorkspaceId, + error: terminateResult.error, + }); + } + } + } + + private clearPostCompactionAwaitTimer(workspaceId: string): void { + const trimmed = workspaceId.trim(); + if (trimmed.length === 0) { + return; + } + + const existingTimer = this.postCompactionAwaitTimers.get(trimmed); + if (!existingTimer) { + return; + } + + clearTimeout(existingTimer); + this.postCompactionAwaitTimers.delete(trimmed); + } + + private async handleWorkspaceRemoved(workspaceId: string): Promise { + assert(workspaceId.length > 0, "handleWorkspaceRemoved: workspaceId must be non-empty"); + + this.clearPostCompactionAwaitTimer(workspaceId); + + const config = this.config.loadConfigOrDefault(); + const directChildTaskIds = + this.buildAgentTaskIndex(config).childrenByParent.get(workspaceId) ?? []; + + if (directChildTaskIds.length === 0) { + return; + } + + for (const taskId of directChildTaskIds) { + const terminateResult = await this.terminateDescendantAgentTask(workspaceId, taskId); + if (!terminateResult.success) { + if (/not found/i.test(terminateResult.error)) { + continue; + } + log.error("Failed to terminate descendant agent task for removed workspace", { + workspaceId, + taskId, + error: terminateResult.error, + }); + } + } + } + + private scheduleParentAwaitAfterCompaction(workspaceId: string, retryCount = 0): void { + const trimmed = workspaceId.trim(); + if (trimmed.length === 0) { + return; + } + + this.clearPostCompactionAwaitTimer(trimmed); + + const timer = setTimeout(() => { + this.postCompactionAwaitTimers.delete(trimmed); + void this.ensureParentAwaitingTasksPostCompaction(trimmed, retryCount).catch( + (error: unknown) => { + log.error("ensureParentAwaitingTasksPostCompaction failed", { + workspaceId: trimmed, + error, + }); + } + ); + }, POST_COMPACTION_PARENT_AWAIT_DELAY_MS); + + this.postCompactionAwaitTimers.set(trimmed, timer); + } + + private async ensureParentAwaitingTasksPostCompaction( + workspaceId: string, + retryCount: number + ): Promise { + // Only relevant for root/parent workspaces. + const cfg = this.config.loadConfigOrDefault(); + const entry = this.findWorkspaceEntry(cfg, workspaceId); + if (!entry) { + return; + } + if (entry.workspace.parentWorkspaceId) { + return; + } + + // Skip if workspace is gone (or being removed) or already streaming again. + if (!this.config.findWorkspace(workspaceId)) { + return; + } + if (this.aiService.isStreaming(workspaceId)) { + return; + } + + // If compaction is still in progress (history empty or last user message is still the + // compaction-request), wait and retry. + const historyResult = await this.historyService.getHistory(workspaceId); + if (historyResult.success && shouldDeferParentResumeForCompaction(historyResult.data)) { + if (retryCount < POST_COMPACTION_PARENT_AWAIT_MAX_RETRIES) { + this.scheduleParentAwaitAfterCompaction(workspaceId, retryCount + 1); + } + return; + } + + const freshCfg = this.config.loadConfigOrDefault(); + const hasActiveDescendants = this.hasActiveDescendantAgentTasks(freshCfg, workspaceId); + if (!hasActiveDescendants) { + return; + } + + const activeTaskIds = this.listActiveDescendantAgentTaskIds(workspaceId); + const model = entry.workspace.aiSettings?.model ?? defaultModel; + + const resumeResult = await this.workspaceService.resumeStream(workspaceId, { + model, + thinkingLevel: entry.workspace.aiSettings?.thinkingLevel, + additionalSystemInstructions: buildParentAwaitSystemInstructions(activeTaskIds), + }); + if (!resumeResult.success) { + log.error("Failed to resume parent after compaction with active background tasks", { + workspaceId, + error: resumeResult.error, + }); + } + } + async initialize(): Promise { + await this.cleanupOrphanedAgentTasks(); await this.maybeStartQueuedTasks(); const config = this.config.loadConfigOrDefault(); @@ -789,35 +1035,42 @@ export class TaskService { }; } - waitForAgentReport( + async waitForAgentReport( taskId: string, options?: { timeoutMs?: number; abortSignal?: AbortSignal; requestingWorkspaceId?: string } ): Promise<{ reportMarkdown: string; title?: string }> { assert(taskId.length > 0, "waitForAgentReport: taskId must be non-empty"); - const cached = this.completedReportsByTaskId.get(taskId); + const cached = this.getCompletedReportCacheEntry(taskId); if (cached) { - const nowMs = Date.now(); - if (cached.expiresAtMs > nowMs) { - return Promise.resolve({ reportMarkdown: cached.reportMarkdown, title: cached.title }); - } - this.completedReportsByTaskId.delete(taskId); + return { reportMarkdown: cached.reportMarkdown, title: cached.title }; } const timeoutMs = options?.timeoutMs ?? 10 * 60 * 1000; // 10 minutes assert(Number.isFinite(timeoutMs) && timeoutMs > 0, "waitForAgentReport: timeoutMs invalid"); + // Validate existence early to avoid waiting on never-resolving task IDs. + const cfg = this.config.loadConfigOrDefault(); + const taskWorkspaceEntry = this.findWorkspaceEntry(cfg, taskId); + if (!taskWorkspaceEntry) { + throw new Error("Task not found"); + } + + // If the task already reported but we missed caching the report (e.g. restart + TTL expiry), + // avoid blocking until timeout (task_await should not hang on already-reported tasks). + // Instead, best-effort fetch the report payload from persisted history/partial and return immediately. + if (taskWorkspaceEntry.workspace.taskStatus === "reported") { + const reportArgs = await this.readLatestAgentReportArgs(taskId); + if (!reportArgs) { + throw new Error("Task already reported (agent_report payload not found)"); + } + this.cacheCompletedReport(taskId, reportArgs); + return reportArgs; + } + const requestingWorkspaceId = coerceNonEmptyString(options?.requestingWorkspaceId); return new Promise<{ reportMarkdown: string; title?: string }>((resolve, reject) => { - // Validate existence early to avoid waiting on never-resolving task IDs. - const cfg = this.config.loadConfigOrDefault(); - const taskWorkspaceEntry = this.findWorkspaceEntry(cfg, taskId); - if (!taskWorkspaceEntry) { - reject(new Error("Task not found")); - return; - } - let timeout: ReturnType | null = null; let startWaiter: PendingTaskStartWaiter | null = null; let abortListener: (() => void) | null = null; @@ -964,6 +1217,34 @@ export class TaskService { return this.hasActiveDescendantAgentTasks(cfg, workspaceId); } + private *iterateDescendantAgentTasksFromIndex( + index: AgentTaskIndex, + workspaceId: string + ): Generator<{ taskId: string; entry: AgentTaskWorkspaceEntry; depth: number }, void> { + assert( + workspaceId.length > 0, + "iterateDescendantAgentTasksFromIndex: workspaceId must be non-empty" + ); + + const stack: Array<{ taskId: string; depth: number }> = []; + for (const childTaskId of index.childrenByParent.get(workspaceId) ?? []) { + stack.push({ taskId: childTaskId, depth: 1 }); + } + + while (stack.length > 0) { + const next = stack.pop()!; + + const entry = index.byId.get(next.taskId); + if (entry) { + yield { taskId: next.taskId, entry, depth: next.depth }; + } + + for (const childTaskId of index.childrenByParent.get(next.taskId) ?? []) { + stack.push({ taskId: childTaskId, depth: next.depth + 1 }); + } + } + } + listActiveDescendantAgentTaskIds(workspaceId: string): string[] { assert( workspaceId.length > 0, @@ -973,20 +1254,11 @@ export class TaskService { const cfg = this.config.loadConfigOrDefault(); const index = this.buildAgentTaskIndex(cfg); - const activeStatuses = new Set(["queued", "running", "awaiting_report"]); const result: string[] = []; - const stack: string[] = [...(index.childrenByParent.get(workspaceId) ?? [])]; - while (stack.length > 0) { - const next = stack.pop()!; - const status = index.byId.get(next)?.taskStatus; - if (status && activeStatuses.has(status)) { - result.push(next); - } - const children = index.childrenByParent.get(next); - if (children) { - for (const child of children) { - stack.push(child); - } + for (const { taskId, entry } of this.iterateDescendantAgentTasksFromIndex(index, workspaceId)) { + const status = entry.taskStatus; + if (status && ACTIVE_AGENT_TASK_STATUSES.has(status)) { + result.push(taskId); } } return result; @@ -1006,25 +1278,19 @@ export class TaskService { const result: DescendantAgentTaskInfo[] = []; - const stack: Array<{ taskId: string; depth: number }> = []; - for (const childTaskId of index.childrenByParent.get(workspaceId) ?? []) { - stack.push({ taskId: childTaskId, depth: 1 }); - } - - while (stack.length > 0) { - const next = stack.pop()!; - const entry = index.byId.get(next.taskId); - if (!entry) continue; - + for (const { taskId, entry, depth } of this.iterateDescendantAgentTasksFromIndex( + index, + workspaceId + )) { assert( entry.parentWorkspaceId, - `listDescendantAgentTasks: task ${next.taskId} is missing parentWorkspaceId` + `listDescendantAgentTasks: task ${taskId} is missing parentWorkspaceId` ); - const status: AgentTaskStatus = entry.taskStatus ?? "running"; + const status = normalizeAgentTaskStatus(entry.taskStatus); if (!statusFilter || statusFilter.has(status)) { result.push({ - taskId: next.taskId, + taskId, status, parentWorkspaceId: entry.parentWorkspaceId, agentType: entry.agentType, @@ -1033,13 +1299,9 @@ export class TaskService { createdAt: entry.createdAt, modelString: entry.aiSettings?.model, thinkingLevel: entry.aiSettings?.thinkingLevel, - depth: next.depth, + depth, }); } - - for (const childTaskId of index.childrenByParent.get(next.taskId) ?? []) { - stack.push({ taskId: childTaskId, depth: next.depth + 1 }); - } } // Stable ordering: oldest first, then depth (ties by taskId for determinism). @@ -1065,7 +1327,6 @@ export class TaskService { const parentById = this.buildAgentTaskIndex(cfg).parentById; const nowMs = Date.now(); - this.cleanupExpiredCompletedReports(nowMs); const result: string[] = []; for (const taskId of taskIds) { @@ -1076,11 +1337,9 @@ export class TaskService { } // Preserve scope checks for tasks whose workspace was cleaned up after completion. - const cached = this.completedReportsByTaskId.get(taskId); - if (cached && cached.expiresAtMs > nowMs) { - if (cached.ancestorWorkspaceIds.includes(ancestorWorkspaceId)) { - result.push(taskId); - } + const cached = this.getCompletedReportCacheEntry(taskId, nowMs); + if (cached?.ancestorWorkspaceIds.includes(ancestorWorkspaceId)) { + result.push(taskId); } } @@ -1123,10 +1382,8 @@ export class TaskService { // The task workspace may have been removed after it reported (cleanup). Preserve scope checks // by consulting the completed-report cache, which tracks the task's ancestor chain. - const nowMs = Date.now(); - this.cleanupExpiredCompletedReports(nowMs); - const cached = this.completedReportsByTaskId.get(taskId); - if (cached && cached.expiresAtMs > nowMs) { + const cached = this.getCompletedReportCacheEntry(taskId); + if (cached) { return cached.ancestorWorkspaceIds.includes(ancestorWorkspaceId); } @@ -1138,17 +1395,13 @@ export class TaskService { ancestorWorkspaceId: string, taskId: string ): boolean { - let current = taskId; - for (let i = 0; i < 32; i++) { - const parent = parentById.get(current); - if (!parent) return false; - if (parent === ancestorWorkspaceId) return true; - current = parent; + for (const parent of iterateAncestorWorkspaceIds(parentById, taskId)) { + if (parent === ancestorWorkspaceId) { + return true; + } } - throw new Error( - `isDescendantAgentTaskUsingParentById: possible parentWorkspaceId cycle starting at ${taskId}` - ); + return false; } // --- Internal orchestration --- @@ -1157,19 +1410,7 @@ export class TaskService { parentById: Map, taskId: string ): string[] { - const ancestors: string[] = []; - - let current = taskId; - for (let i = 0; i < 32; i++) { - const parent = parentById.get(current); - if (!parent) return ancestors; - ancestors.push(parent); - current = parent; - } - - throw new Error( - `listAncestorWorkspaceIdsUsingParentById: possible parentWorkspaceId cycle starting at ${taskId}` - ); + return [...iterateAncestorWorkspaceIds(parentById, taskId)]; } private listAgentTaskWorkspaces( @@ -1210,7 +1451,7 @@ export class TaskService { private countActiveAgentTasks(config: ReturnType): number { let activeCount = 0; for (const task of this.listAgentTaskWorkspaces(config)) { - const status: AgentTaskStatus = task.taskStatus ?? "running"; + const status = normalizeAgentTaskStatus(task.taskStatus); // If this task workspace is blocked in a foreground wait, do not count it towards parallelism. // This prevents deadlocks where a task spawns a nested task in the foreground while // maxParallelAgentTasks is low (e.g. 1). @@ -1243,20 +1484,11 @@ export class TaskService { const index = this.buildAgentTaskIndex(config); - const activeStatuses = new Set(["queued", "running", "awaiting_report"]); - const stack: string[] = [...(index.childrenByParent.get(workspaceId) ?? [])]; - while (stack.length > 0) { - const next = stack.pop()!; - const status = index.byId.get(next)?.taskStatus; - if (status && activeStatuses.has(status)) { + for (const { entry } of this.iterateDescendantAgentTasksFromIndex(index, workspaceId)) { + const status = entry.taskStatus; + if (status && ACTIVE_AGENT_TASK_STATUSES.has(status)) { return true; } - const children = index.childrenByParent.get(next); - if (children) { - for (const child of children) { - stack.push(child); - } - } } return false; @@ -1576,28 +1808,64 @@ export class TaskService { } } - private async setTaskStatus(workspaceId: string, status: AgentTaskStatus): Promise { + // Centralized taskStatus mutation point. + // Keeping lifecycle side-effects here (clearing taskPrompt, stamping reportedAt, waking start waiters) + // prevents ad-hoc status edits from breaking task_await / scheduling invariants. + private async setTaskStatus( + workspaceId: string, + nextStatus: AgentTaskStatus, + options?: { allowMissing?: boolean } + ): Promise { assert(workspaceId.length > 0, "setTaskStatus: workspaceId must be non-empty"); - await this.editWorkspaceEntry(workspaceId, (ws) => { - ws.taskStatus = status; - if (status === "running") { - ws.taskPrompt = undefined; - } - }); + let shouldStartWaiters = false; - await this.emitWorkspaceMetadata(workspaceId); + const found = await this.editWorkspaceEntry( + workspaceId, + (ws) => { + const currentStatus = normalizeAgentTaskStatus(ws.taskStatus); + if (!isAllowedAgentTaskStatusTransition(currentStatus, nextStatus)) { + log.error("Invalid agent task status transition", { + workspaceId, + currentStatus, + nextStatus, + parentWorkspaceId: ws.parentWorkspaceId ?? null, + persistedStatus: ws.taskStatus ?? null, + }); + return; + } - if (status === "running") { - const waiters = this.pendingStartWaitersByTaskId.get(workspaceId); - if (!waiters || waiters.length === 0) return; - this.pendingStartWaitersByTaskId.delete(workspaceId); - for (const waiter of waiters) { - try { - waiter.start(); - } catch (error: unknown) { - log.error("Task start waiter callback failed", { workspaceId, error }); + ws.taskStatus = nextStatus; + shouldStartWaiters = nextStatus === "running"; + + if (nextStatus !== "queued") { + ws.taskPrompt = undefined; } + + if (nextStatus === "reported") { + ws.reportedAt ??= getIsoNow(); + } else if (ws.reportedAt !== undefined) { + // Defensive: reportedAt is only meaningful when taskStatus is reported. + ws.reportedAt = undefined; + } + }, + { allowMissing: options?.allowMissing } + ); + + await this.emitWorkspaceMetadata(workspaceId); + + if (!found || !shouldStartWaiters) { + return; + } + + const waiters = this.pendingStartWaitersByTaskId.get(workspaceId); + if (!waiters || waiters.length === 0) return; + this.pendingStartWaitersByTaskId.delete(workspaceId); + for (const waiter of waiters) { + try { + waiter.start(); + } catch (error: unknown) { + log.error("Task start waiter callback failed", { workspaceId, error }); } } } @@ -1621,18 +1889,21 @@ export class TaskService { return; } + const historyResult = await this.historyService.getHistory(workspaceId); + // If the parent is undergoing compaction, avoid resuming mid-compaction (history is being + // rewritten). Instead, schedule a follow-up nudge after compaction completes. + if (historyResult.success && shouldDeferParentResumeForCompaction(historyResult.data)) { + this.scheduleParentAwaitAfterCompaction(workspaceId); + return; + } + const activeTaskIds = this.listActiveDescendantAgentTaskIds(workspaceId); const model = entry.workspace.aiSettings?.model ?? defaultModel; const resumeResult = await this.workspaceService.resumeStream(workspaceId, { model, thinkingLevel: entry.workspace.aiSettings?.thinkingLevel, - additionalSystemInstructions: - `You have active background sub-agent task(s) (${activeTaskIds.join(", ")}). ` + - "You MUST NOT end your turn while any sub-agent tasks are queued/running/awaiting_report. " + - "Call task_await now to wait for them to finish (omit timeout_secs to wait up to 10 minutes). " + - "If any tasks are still queued/running/awaiting_report after that, call task_await again. " + - "Only once all tasks are completed should you write your final response, integrating their reports.", + additionalSystemInstructions: buildParentAwaitSystemInstructions(activeTaskIds), }); if (!resumeResult.success) { log.error("Failed to resume parent with active background tasks", { @@ -1689,8 +1960,7 @@ export class TaskService { workspace: WorkspaceConfigEntry; }): Promise { const childWorkspaceId = entry.workspace.id; - const parentWorkspaceId = entry.workspace.parentWorkspaceId; - if (!childWorkspaceId || !parentWorkspaceId) { + if (!childWorkspaceId) { return; } @@ -1702,44 +1972,10 @@ export class TaskService { "posting its last assistant output as a fallback.)*\n\n" + (lastText?.trim().length ? lastText : "(No assistant output found.)"); - // Notify clients immediately even if we can't delete the workspace yet. - await this.editWorkspaceEntry( - childWorkspaceId, - (ws) => { - ws.taskStatus = "reported"; - ws.reportedAt = getIsoNow(); - }, - { allowMissing: true } - ); - - await this.emitWorkspaceMetadata(childWorkspaceId); - - await this.deliverReportToParent(parentWorkspaceId, entry, { - reportMarkdown, - title: `Subagent (${agentType}) report (fallback)`, - }); - - this.resolveWaiters(childWorkspaceId, { + await this.finalizeAgentTaskReport(childWorkspaceId, entry, { reportMarkdown, title: `Subagent (${agentType}) report (fallback)`, }); - - await this.maybeStartQueuedTasks(); - await this.cleanupReportedLeafTask(childWorkspaceId); - - const postCfg = this.config.loadConfigOrDefault(); - const hasActiveDescendants = this.hasActiveDescendantAgentTasks(postCfg, parentWorkspaceId); - if (!hasActiveDescendants && !this.aiService.isStreaming(parentWorkspaceId)) { - const resumeResult = await this.workspaceService.resumeStream(parentWorkspaceId, { - model: entry.workspace.taskModelString ?? defaultModel, - }); - if (!resumeResult.success) { - log.error("Failed to auto-resume parent after fallback report", { - parentWorkspaceId, - error: resumeResult.error, - }); - } - } } private async readLatestAssistantText(workspaceId: string): Promise { @@ -1841,16 +2077,7 @@ export class TaskService { } // Notify clients immediately even if we can't delete the workspace yet. - await this.editWorkspaceEntry( - childWorkspaceId, - (ws) => { - ws.taskStatus = "reported"; - ws.reportedAt = getIsoNow(); - }, - { allowMissing: true } - ); - - await this.emitWorkspaceMetadata(childWorkspaceId); + await this.setTaskStatus(childWorkspaceId, "reported", { allowMissing: true }); if (options?.stopStream) { // `agent_report` is terminal. Stop the child stream immediately to prevent any further token @@ -1931,6 +2158,23 @@ export class TaskService { } } + private getCompletedReportCacheEntry( + taskId: string, + nowMs = Date.now() + ): CompletedAgentReportCacheEntry | null { + const cached = this.completedReportsByTaskId.get(taskId); + if (!cached) { + return null; + } + + if (cached.expiresAtMs <= nowMs) { + this.completedReportsByTaskId.delete(taskId); + return null; + } + + return cached; + } + private enforceCompletedReportCacheLimit(): void { while (this.completedReportsByTaskId.size > COMPLETED_REPORT_CACHE_MAX_ENTRIES) { const first = this.completedReportsByTaskId.keys().next(); @@ -1939,12 +2183,17 @@ export class TaskService { } } - private resolveWaiters(taskId: string, report: { reportMarkdown: string; title?: string }): void { + private cacheCompletedReport( + taskId: string, + report: { reportMarkdown: string; title?: string } + ): void { const nowMs = Date.now(); this.cleanupExpiredCompletedReports(nowMs); const cfg = this.config.loadConfigOrDefault(); const parentById = this.buildAgentTaskIndex(cfg).parentById; + + // Capture ancestor chain so scope checks keep working even if we later clean up the task workspace. const ancestorWorkspaceIds = this.listAncestorWorkspaceIdsUsingParentById(parentById, taskId); this.completedReportsByTaskId.set(taskId, { @@ -1954,6 +2203,10 @@ export class TaskService { ancestorWorkspaceIds, }); this.enforceCompletedReportCacheLimit(); + } + + private resolveWaiters(taskId: string, report: { reportMarkdown: string; title?: string }): void { + this.cacheCompletedReport(taskId, report); const waiters = this.pendingWaitersByTaskId.get(taskId); if (!waiters || waiters.length === 0) { @@ -2218,9 +2471,8 @@ export class TaskService { if (!parentWorkspaceId) return; if (ws.taskStatus !== "reported") return; - const hasChildren = this.listAgentTaskWorkspaces(config).some( - (t) => t.parentWorkspaceId === currentWorkspaceId - ); + const index = this.buildAgentTaskIndex(config); + const hasChildren = (index.childrenByParent.get(currentWorkspaceId) ?? []).length > 0; if (hasChildren) return; const removeResult = await this.workspaceService.remove(currentWorkspaceId, true);