diff --git a/src/core/task/Task.ts b/src/core/task/Task.ts index 659e7772d5d..899ec414014 100644 --- a/src/core/task/Task.ts +++ b/src/core/task/Task.ts @@ -102,6 +102,7 @@ import { restoreTodoListForTask } from "../tools/updateTodoListTool" import { AutoApprovalHandler } from "./AutoApprovalHandler" const MAX_EXPONENTIAL_BACKOFF_SECONDS = 600 // 10 minutes +const DEFAULT_USAGE_COLLECTION_TIMEOUT_MS = 5000 // 5 seconds export type TaskOptions = { provider: ClineProvider @@ -1306,6 +1307,13 @@ export class Task extends EventEmitter implements TaskLike { public dispose(): void { console.log(`[Task] disposing task ${this.taskId}.${this.instanceId}`) + // Remove all event listeners to prevent memory leaks + try { + this.removeAllListeners() + } catch (error) { + console.error("Error removing event listeners:", error) + } + // Stop waiting for child task completion. if (this.pauseInterval) { clearInterval(this.pauseInterval) @@ -1449,465 +1457,639 @@ export class Task extends EventEmitter implements TaskLike { userContent: Anthropic.Messages.ContentBlockParam[], includeFileDetails: boolean = false, ): Promise { - if (this.abort) { - throw new Error(`[RooCode#recursivelyMakeRooRequests] task ${this.taskId}.${this.instanceId} aborted`) + interface StackItem { + userContent: Anthropic.Messages.ContentBlockParam[] + includeFileDetails: boolean } - if (this.consecutiveMistakeLimit > 0 && this.consecutiveMistakeCount >= this.consecutiveMistakeLimit) { - const { response, text, images } = await this.ask( - "mistake_limit_reached", - t("common:errors.mistake_limit_guidance"), - ) - - if (response === "messageResponse") { - userContent.push( - ...[ - { type: "text" as const, text: formatResponse.tooManyMistakes(text) }, - ...formatResponse.imageBlocks(images), - ], - ) + const stack: StackItem[] = [{ userContent, includeFileDetails }] - await this.say("user_feedback", text, images) + while (stack.length > 0) { + const currentItem = stack.pop()! + const currentUserContent = currentItem.userContent + const currentIncludeFileDetails = currentItem.includeFileDetails - // Track consecutive mistake errors in telemetry. - TelemetryService.instance.captureConsecutiveMistakeError(this.taskId) + if (this.abort) { + throw new Error(`[RooCode#recursivelyMakeRooRequests] task ${this.taskId}.${this.instanceId} aborted`) } - this.consecutiveMistakeCount = 0 - } - - // In this Cline request loop, we need to check if this task instance - // has been asked to wait for a subtask to finish before continuing. - const provider = this.providerRef.deref() + if (this.consecutiveMistakeLimit > 0 && this.consecutiveMistakeCount >= this.consecutiveMistakeLimit) { + const { response, text, images } = await this.ask( + "mistake_limit_reached", + t("common:errors.mistake_limit_guidance"), + ) - if (this.isPaused && provider) { - provider.log(`[subtasks] paused ${this.taskId}.${this.instanceId}`) - await this.waitForResume() - provider.log(`[subtasks] resumed ${this.taskId}.${this.instanceId}`) - const currentMode = (await provider.getState())?.mode ?? defaultModeSlug + if (response === "messageResponse") { + currentUserContent.push( + ...[ + { type: "text" as const, text: formatResponse.tooManyMistakes(text) }, + ...formatResponse.imageBlocks(images), + ], + ) - if (currentMode !== this.pausedModeSlug) { - // The mode has changed, we need to switch back to the paused mode. - await provider.handleModeSwitch(this.pausedModeSlug) + await this.say("user_feedback", text, images) - // Delay to allow mode change to take effect before next tool is executed. - await delay(500) + // Track consecutive mistake errors in telemetry. + TelemetryService.instance.captureConsecutiveMistakeError(this.taskId) + } - provider.log( - `[subtasks] task ${this.taskId}.${this.instanceId} has switched back to '${this.pausedModeSlug}' from '${currentMode}'`, - ) + this.consecutiveMistakeCount = 0 } - } - // Getting verbose details is an expensive operation, it uses ripgrep to - // top-down build file structure of project which for large projects can - // take a few seconds. For the best UX we show a placeholder api_req_started - // message with a loading spinner as this happens. + // In this Cline request loop, we need to check if this task instance + // has been asked to wait for a subtask to finish before continuing. + const provider = this.providerRef.deref() - // Determine API protocol based on provider and model - const modelId = getModelId(this.apiConfiguration) - const apiProtocol = getApiProtocol(this.apiConfiguration.apiProvider, modelId) + if (this.isPaused && provider) { + provider.log(`[subtasks] paused ${this.taskId}.${this.instanceId}`) + await this.waitForResume() + provider.log(`[subtasks] resumed ${this.taskId}.${this.instanceId}`) + const currentMode = (await provider.getState())?.mode ?? defaultModeSlug - await this.say( - "api_req_started", - JSON.stringify({ - request: - userContent.map((block) => formatContentBlockToMarkdown(block)).join("\n\n") + "\n\nLoading...", - apiProtocol, - }), - ) + if (currentMode !== this.pausedModeSlug) { + // The mode has changed, we need to switch back to the paused mode. + await provider.handleModeSwitch(this.pausedModeSlug) - const { - showRooIgnoredFiles = true, - includeDiagnosticMessages = true, - maxDiagnosticMessages = 50, - maxReadFileLine = -1, - } = (await this.providerRef.deref()?.getState()) ?? {} - - const parsedUserContent = await processUserContentMentions({ - userContent, - cwd: this.cwd, - urlContentFetcher: this.urlContentFetcher, - fileContextTracker: this.fileContextTracker, - rooIgnoreController: this.rooIgnoreController, - showRooIgnoredFiles, - includeDiagnosticMessages, - maxDiagnosticMessages, - maxReadFileLine, - }) + // Delay to allow mode change to take effect before next tool is executed. + await delay(500) + + provider.log( + `[subtasks] task ${this.taskId}.${this.instanceId} has switched back to '${this.pausedModeSlug}' from '${currentMode}'`, + ) + } + } - const environmentDetails = await getEnvironmentDetails(this, includeFileDetails) + // Getting verbose details is an expensive operation, it uses ripgrep to + // top-down build file structure of project which for large projects can + // take a few seconds. For the best UX we show a placeholder api_req_started + // message with a loading spinner as this happens. + + // Determine API protocol based on provider and model + const modelId = getModelId(this.apiConfiguration) + const apiProtocol = getApiProtocol(this.apiConfiguration.apiProvider, modelId) + + await this.say( + "api_req_started", + JSON.stringify({ + request: + currentUserContent.map((block) => formatContentBlockToMarkdown(block)).join("\n\n") + + "\n\nLoading...", + apiProtocol, + }), + ) - // Add environment details as its own text block, separate from tool - // results. - const finalUserContent = [...parsedUserContent, { type: "text" as const, text: environmentDetails }] + const { + showRooIgnoredFiles = true, + includeDiagnosticMessages = true, + maxDiagnosticMessages = 50, + maxReadFileLine = -1, + } = (await this.providerRef.deref()?.getState()) ?? {} + + const parsedUserContent = await processUserContentMentions({ + userContent: currentUserContent, + cwd: this.cwd, + urlContentFetcher: this.urlContentFetcher, + fileContextTracker: this.fileContextTracker, + rooIgnoreController: this.rooIgnoreController, + showRooIgnoredFiles, + includeDiagnosticMessages, + maxDiagnosticMessages, + maxReadFileLine, + }) - await this.addToApiConversationHistory({ role: "user", content: finalUserContent }) - TelemetryService.instance.captureConversationMessage(this.taskId, "user") + const environmentDetails = await getEnvironmentDetails(this, currentIncludeFileDetails) - // Since we sent off a placeholder api_req_started message to update the - // webview while waiting to actually start the API request (to load - // potential details for example), we need to update the text of that - // message. - const lastApiReqIndex = findLastIndex(this.clineMessages, (m) => m.say === "api_req_started") + // Add environment details as its own text block, separate from tool + // results. + const finalUserContent = [...parsedUserContent, { type: "text" as const, text: environmentDetails }] - this.clineMessages[lastApiReqIndex].text = JSON.stringify({ - request: finalUserContent.map((block) => formatContentBlockToMarkdown(block)).join("\n\n"), - apiProtocol, - } satisfies ClineApiReqInfo) + await this.addToApiConversationHistory({ role: "user", content: finalUserContent }) + TelemetryService.instance.captureConversationMessage(this.taskId, "user") - await this.saveClineMessages() - await provider?.postStateToWebview() + // Since we sent off a placeholder api_req_started message to update the + // webview while waiting to actually start the API request (to load + // potential details for example), we need to update the text of that + // message. + const lastApiReqIndex = findLastIndex(this.clineMessages, (m) => m.say === "api_req_started") - try { - let cacheWriteTokens = 0 - let cacheReadTokens = 0 - let inputTokens = 0 - let outputTokens = 0 - let totalCost: number | undefined - - // We can't use `api_req_finished` anymore since it's a unique case - // where it could come after a streaming message (i.e. in the middle - // of being updated or executed). - // Fortunately `api_req_finished` was always parsed out for the GUI - // anyways, so it remains solely for legacy purposes to keep track - // of prices in tasks from history (it's worth removing a few months - // from now). - const updateApiReqMsg = (cancelReason?: ClineApiReqCancelReason, streamingFailedMessage?: string) => { - const existingData = JSON.parse(this.clineMessages[lastApiReqIndex].text || "{}") - this.clineMessages[lastApiReqIndex].text = JSON.stringify({ - ...existingData, - tokensIn: inputTokens, - tokensOut: outputTokens, - cacheWrites: cacheWriteTokens, - cacheReads: cacheReadTokens, - cost: - totalCost ?? - calculateApiCostAnthropic( - this.api.getModel().info, - inputTokens, - outputTokens, - cacheWriteTokens, - cacheReadTokens, - ), - cancelReason, - streamingFailedMessage, - } satisfies ClineApiReqInfo) - } + this.clineMessages[lastApiReqIndex].text = JSON.stringify({ + request: finalUserContent.map((block) => formatContentBlockToMarkdown(block)).join("\n\n"), + apiProtocol, + } satisfies ClineApiReqInfo) - const abortStream = async (cancelReason: ClineApiReqCancelReason, streamingFailedMessage?: string) => { - if (this.diffViewProvider.isEditing) { - await this.diffViewProvider.revertChanges() // closes diff view - } + await this.saveClineMessages() + await provider?.postStateToWebview() - // if last message is a partial we need to update and save it - const lastMessage = this.clineMessages.at(-1) + try { + let cacheWriteTokens = 0 + let cacheReadTokens = 0 + let inputTokens = 0 + let outputTokens = 0 + let totalCost: number | undefined + + // We can't use `api_req_finished` anymore since it's a unique case + // where it could come after a streaming message (i.e. in the middle + // of being updated or executed). + // Fortunately `api_req_finished` was always parsed out for the GUI + // anyways, so it remains solely for legacy purposes to keep track + // of prices in tasks from history (it's worth removing a few months + // from now). + const updateApiReqMsg = (cancelReason?: ClineApiReqCancelReason, streamingFailedMessage?: string) => { + if (lastApiReqIndex < 0 || !this.clineMessages[lastApiReqIndex]) { + return + } - if (lastMessage && lastMessage.partial) { - // lastMessage.ts = Date.now() DO NOT update ts since it is used as a key for virtuoso list - lastMessage.partial = false - // instead of streaming partialMessage events, we do a save and post like normal to persist to disk - console.log("updating partial message", lastMessage) - // await this.saveClineMessages() + const existingData = JSON.parse(this.clineMessages[lastApiReqIndex].text || "{}") + this.clineMessages[lastApiReqIndex].text = JSON.stringify({ + ...existingData, + tokensIn: inputTokens, + tokensOut: outputTokens, + cacheWrites: cacheWriteTokens, + cacheReads: cacheReadTokens, + cost: + totalCost ?? + calculateApiCostAnthropic( + this.api.getModel().info, + inputTokens, + outputTokens, + cacheWriteTokens, + cacheReadTokens, + ), + cancelReason, + streamingFailedMessage, + } satisfies ClineApiReqInfo) } - // Let assistant know their response was interrupted for when task is resumed - await this.addToApiConversationHistory({ - role: "assistant", - content: [ - { - type: "text", - text: - assistantMessage + - `\n\n[${ - cancelReason === "streaming_failed" - ? "Response interrupted by API Error" - : "Response interrupted by user" - }]`, - }, - ], - }) + const abortStream = async (cancelReason: ClineApiReqCancelReason, streamingFailedMessage?: string) => { + if (this.diffViewProvider.isEditing) { + await this.diffViewProvider.revertChanges() // closes diff view + } - // Update `api_req_started` to have cancelled and cost, so that - // we can display the cost of the partial stream. - updateApiReqMsg(cancelReason, streamingFailedMessage) - await this.saveClineMessages() + // if last message is a partial we need to update and save it + const lastMessage = this.clineMessages.at(-1) - // Signals to provider that it can retrieve the saved messages - // from disk, as abortTask can not be awaited on in nature. - this.didFinishAbortingStream = true - } + if (lastMessage && lastMessage.partial) { + // lastMessage.ts = Date.now() DO NOT update ts since it is used as a key for virtuoso list + lastMessage.partial = false + // instead of streaming partialMessage events, we do a save and post like normal to persist to disk + console.log("updating partial message", lastMessage) + // await this.saveClineMessages() + } - // Reset streaming state. - this.currentStreamingContentIndex = 0 - this.currentStreamingDidCheckpoint = false - this.assistantMessageContent = [] - this.didCompleteReadingStream = false - this.userMessageContent = [] - this.userMessageContentReady = false - this.didRejectTool = false - this.didAlreadyUseTool = false - this.presentAssistantMessageLocked = false - this.presentAssistantMessageHasPendingUpdates = false - if (this.assistantMessageParser) { - this.assistantMessageParser.reset() - } + // Let assistant know their response was interrupted for when task is resumed + await this.addToApiConversationHistory({ + role: "assistant", + content: [ + { + type: "text", + text: + assistantMessage + + `\n\n[${ + cancelReason === "streaming_failed" + ? "Response interrupted by API Error" + : "Response interrupted by user" + }]`, + }, + ], + }) - await this.diffViewProvider.reset() + // Update `api_req_started` to have cancelled and cost, so that + // we can display the cost of the partial stream. + updateApiReqMsg(cancelReason, streamingFailedMessage) + await this.saveClineMessages() - // Yields only if the first chunk is successful, otherwise will - // allow the user to retry the request (most likely due to rate - // limit error, which gets thrown on the first chunk). - const stream = this.attemptApiRequest() - let assistantMessage = "" - let reasoningMessage = "" - this.isStreaming = true + // Signals to provider that it can retrieve the saved messages + // from disk, as abortTask can not be awaited on in nature. + this.didFinishAbortingStream = true + } - try { - for await (const chunk of stream) { - if (!chunk) { - // Sometimes chunk is undefined, no idea that can cause - // it, but this workaround seems to fix it. - continue - } + // Reset streaming state. + this.currentStreamingContentIndex = 0 + this.currentStreamingDidCheckpoint = false + this.assistantMessageContent = [] + this.didCompleteReadingStream = false + this.userMessageContent = [] + this.userMessageContentReady = false + this.didRejectTool = false + this.didAlreadyUseTool = false + this.presentAssistantMessageLocked = false + this.presentAssistantMessageHasPendingUpdates = false + if (this.assistantMessageParser) { + this.assistantMessageParser.reset() + } - switch (chunk.type) { - case "reasoning": - reasoningMessage += chunk.text - await this.say("reasoning", reasoningMessage, undefined, true) - break - case "usage": - inputTokens += chunk.inputTokens - outputTokens += chunk.outputTokens - cacheWriteTokens += chunk.cacheWriteTokens ?? 0 - cacheReadTokens += chunk.cacheReadTokens ?? 0 - totalCost = chunk.totalCost - break - case "text": { - assistantMessage += chunk.text + await this.diffViewProvider.reset() + + // Yields only if the first chunk is successful, otherwise will + // allow the user to retry the request (most likely due to rate + // limit error, which gets thrown on the first chunk). + const stream = this.attemptApiRequest() + let assistantMessage = "" + let reasoningMessage = "" + this.isStreaming = true + + try { + const iterator = stream[Symbol.asyncIterator]() + let item = await iterator.next() + while (!item.done) { + const chunk = item.value + item = await iterator.next() + if (!chunk) { + // Sometimes chunk is undefined, no idea that can cause + // it, but this workaround seems to fix it. + continue + } - // Parse raw assistant message chunk into content blocks. - const prevLength = this.assistantMessageContent.length - if (this.isAssistantMessageParserEnabled && this.assistantMessageParser) { - this.assistantMessageContent = this.assistantMessageParser.processChunk(chunk.text) - } else { - // Use the old parsing method when experiment is disabled - this.assistantMessageContent = parseAssistantMessage(assistantMessage) + switch (chunk.type) { + case "reasoning": + reasoningMessage += chunk.text + await this.say("reasoning", reasoningMessage, undefined, true) + break + case "usage": + inputTokens += chunk.inputTokens + outputTokens += chunk.outputTokens + cacheWriteTokens += chunk.cacheWriteTokens ?? 0 + cacheReadTokens += chunk.cacheReadTokens ?? 0 + totalCost = chunk.totalCost + break + case "text": { + assistantMessage += chunk.text + + // Parse raw assistant message chunk into content blocks. + const prevLength = this.assistantMessageContent.length + if (this.isAssistantMessageParserEnabled && this.assistantMessageParser) { + this.assistantMessageContent = this.assistantMessageParser.processChunk(chunk.text) + } else { + // Use the old parsing method when experiment is disabled + this.assistantMessageContent = parseAssistantMessage(assistantMessage) + } + + if (this.assistantMessageContent.length > prevLength) { + // New content we need to present, reset to + // false in case previous content set this to true. + this.userMessageContentReady = false + } + + // Present content to user. + presentAssistantMessage(this) + break } + } - if (this.assistantMessageContent.length > prevLength) { - // New content we need to present, reset to - // false in case previous content set this to true. - this.userMessageContentReady = false + if (this.abort) { + console.log(`aborting stream, this.abandoned = ${this.abandoned}`) + + if (!this.abandoned) { + // Only need to gracefully abort if this instance + // isn't abandoned (sometimes OpenRouter stream + // hangs, in which case this would affect future + // instances of Cline). + await abortStream("user_cancelled") } - // Present content to user. - presentAssistantMessage(this) + break // Aborts the stream. + } + + if (this.didRejectTool) { + // `userContent` has a tool rejection, so interrupt the + // assistant's response to present the user's feedback. + assistantMessage += "\n\n[Response interrupted by user feedback]" + // Instead of setting this preemptively, we allow the + // present iterator to finish and set + // userMessageContentReady when its ready. + // this.userMessageContentReady = true + break + } + + if (this.didAlreadyUseTool) { + assistantMessage += + "\n\n[Response interrupted by a tool use result. Only one tool may be used at a time and should be placed at the end of the message.]" break } } - if (this.abort) { - console.log(`aborting stream, this.abandoned = ${this.abandoned}`) + // Create a copy of current token values to avoid race conditions + const currentTokens = { + input: inputTokens, + output: outputTokens, + cacheWrite: cacheWriteTokens, + cacheRead: cacheReadTokens, + total: totalCost, + } - if (!this.abandoned) { - // Only need to gracefully abort if this instance - // isn't abandoned (sometimes OpenRouter stream - // hangs, in which case this would affect future - // instances of Cline). - await abortStream("user_cancelled") + const drainStreamInBackgroundToFindAllUsage = async (apiReqIndex: number) => { + const timeoutMs = DEFAULT_USAGE_COLLECTION_TIMEOUT_MS + const startTime = Date.now() + const modelId = getModelId(this.apiConfiguration) + + // Local variables to accumulate usage data without affecting the main flow + let bgInputTokens = currentTokens.input + let bgOutputTokens = currentTokens.output + let bgCacheWriteTokens = currentTokens.cacheWrite + let bgCacheReadTokens = currentTokens.cacheRead + let bgTotalCost = currentTokens.total + + // Helper function to capture telemetry and update messages + const captureUsageData = async ( + tokens: { + input: number + output: number + cacheWrite: number + cacheRead: number + total?: number + }, + messageIndex: number = apiReqIndex, + ) => { + if ( + tokens.input > 0 || + tokens.output > 0 || + tokens.cacheWrite > 0 || + tokens.cacheRead > 0 + ) { + // Update the shared variables atomically + inputTokens = tokens.input + outputTokens = tokens.output + cacheWriteTokens = tokens.cacheWrite + cacheReadTokens = tokens.cacheRead + totalCost = tokens.total + + // Update the API request message with the latest usage data + updateApiReqMsg() + await this.saveClineMessages() + + // Update the specific message in the webview + const apiReqMessage = this.clineMessages[messageIndex] + if (apiReqMessage) { + await this.updateClineMessage(apiReqMessage) + } + + // Capture telemetry + TelemetryService.instance.captureLlmCompletion(this.taskId, { + inputTokens: tokens.input, + outputTokens: tokens.output, + cacheWriteTokens: tokens.cacheWrite, + cacheReadTokens: tokens.cacheRead, + cost: + tokens.total ?? + calculateApiCostAnthropic( + this.api.getModel().info, + tokens.input, + tokens.output, + tokens.cacheWrite, + tokens.cacheRead, + ), + }) + } } - break // Aborts the stream. - } + try { + // Continue processing the original stream from where the main loop left off + let usageFound = false + let chunkCount = 0 + + // Use the same iterator that the main loop was using + while (!item.done) { + // Check for timeout + if (Date.now() - startTime > timeoutMs) { + console.warn( + `[Background Usage Collection] Timed out after ${timeoutMs}ms for model: ${modelId}, processed ${chunkCount} chunks`, + ) + // Clean up the iterator before breaking + if (iterator.return) { + await iterator.return(undefined) + } + break + } + + const chunk = item.value + item = await iterator.next() + chunkCount++ + + if (chunk && chunk.type === "usage") { + usageFound = true + bgInputTokens += chunk.inputTokens + bgOutputTokens += chunk.outputTokens + bgCacheWriteTokens += chunk.cacheWriteTokens ?? 0 + bgCacheReadTokens += chunk.cacheReadTokens ?? 0 + bgTotalCost = chunk.totalCost + } + } - if (this.didRejectTool) { - // `userContent` has a tool rejection, so interrupt the - // assistant's response to present the user's feedback. - assistantMessage += "\n\n[Response interrupted by user feedback]" - // Instead of setting this preemptively, we allow the - // present iterator to finish and set - // userMessageContentReady when its ready. - // this.userMessageContentReady = true - break + if ( + usageFound || + bgInputTokens > 0 || + bgOutputTokens > 0 || + bgCacheWriteTokens > 0 || + bgCacheReadTokens > 0 + ) { + // We have usage data either from a usage chunk or accumulated tokens + await captureUsageData( + { + input: bgInputTokens, + output: bgOutputTokens, + cacheWrite: bgCacheWriteTokens, + cacheRead: bgCacheReadTokens, + total: bgTotalCost, + }, + lastApiReqIndex, + ) + } else { + console.warn( + `[Background Usage Collection] Suspicious: request ${apiReqIndex} is complete, but no usage info was found. Model: ${modelId}`, + ) + } + } catch (error) { + console.error("Error draining stream for usage data:", error) + // Still try to capture whatever usage data we have collected so far + if ( + bgInputTokens > 0 || + bgOutputTokens > 0 || + bgCacheWriteTokens > 0 || + bgCacheReadTokens > 0 + ) { + await captureUsageData( + { + input: bgInputTokens, + output: bgOutputTokens, + cacheWrite: bgCacheWriteTokens, + cacheRead: bgCacheReadTokens, + total: bgTotalCost, + }, + lastApiReqIndex, + ) + } + } } - // PREV: We need to let the request finish for openrouter to - // get generation details. - // UPDATE: It's better UX to interrupt the request at the - // cost of the API cost not being retrieved. - if (this.didAlreadyUseTool) { - assistantMessage += - "\n\n[Response interrupted by a tool use result. Only one tool may be used at a time and should be placed at the end of the message.]" - break + // Start the background task and handle any errors + drainStreamInBackgroundToFindAllUsage(lastApiReqIndex).catch((error) => { + console.error("Background usage collection failed:", error) + }) + } catch (error) { + // Abandoned happens when extension is no longer waiting for the + // Cline instance to finish aborting (error is thrown here when + // any function in the for loop throws due to this.abort). + if (!this.abandoned) { + // If the stream failed, there's various states the task + // could be in (i.e. could have streamed some tools the user + // may have executed), so we just resort to replicating a + // cancel task. + + // Check if this was a user-initiated cancellation BEFORE calling abortTask + // If this.abort is already true, it means the user clicked cancel, so we should + // treat this as "user_cancelled" rather than "streaming_failed" + const cancelReason = this.abort ? "user_cancelled" : "streaming_failed" + + const streamingFailedMessage = this.abort + ? undefined + : (error.message ?? JSON.stringify(serializeError(error), null, 2)) + + // Now call abortTask after determining the cancel reason. + await this.abortTask() + await abortStream(cancelReason, streamingFailedMessage) + + const history = await provider?.getTaskWithId(this.taskId) + + if (history) { + await provider?.initClineWithHistoryItem(history.historyItem) + } } + } finally { + this.isStreaming = false } - } catch (error) { - // Abandoned happens when extension is no longer waiting for the - // Cline instance to finish aborting (error is thrown here when - // any function in the for loop throws due to this.abort). - if (!this.abandoned) { - // If the stream failed, there's various states the task - // could be in (i.e. could have streamed some tools the user - // may have executed), so we just resort to replicating a - // cancel task. - - // Check if this was a user-initiated cancellation BEFORE calling abortTask - // If this.abort is already true, it means the user clicked cancel, so we should - // treat this as "user_cancelled" rather than "streaming_failed" - const cancelReason = this.abort ? "user_cancelled" : "streaming_failed" - - const streamingFailedMessage = this.abort - ? undefined - : (error.message ?? JSON.stringify(serializeError(error), null, 2)) - - // Now call abortTask after determining the cancel reason. - await this.abortTask() - await abortStream(cancelReason, streamingFailedMessage) - - const history = await provider?.getTaskWithId(this.taskId) - - if (history) { - await provider?.initClineWithHistoryItem(history.historyItem) - } + + // Need to call here in case the stream was aborted. + if (this.abort || this.abandoned) { + throw new Error( + `[RooCode#recursivelyMakeRooRequests] task ${this.taskId}.${this.instanceId} aborted`, + ) } - } finally { - this.isStreaming = false - } - if (inputTokens > 0 || outputTokens > 0 || cacheWriteTokens > 0 || cacheReadTokens > 0) { - TelemetryService.instance.captureLlmCompletion(this.taskId, { - inputTokens, - outputTokens, - cacheWriteTokens, - cacheReadTokens, - cost: - totalCost ?? - calculateApiCostAnthropic( - this.api.getModel().info, - inputTokens, - outputTokens, - cacheWriteTokens, - cacheReadTokens, - ), - }) - } + this.didCompleteReadingStream = true + + // Set any blocks to be complete to allow `presentAssistantMessage` + // to finish and set `userMessageContentReady` to true. + // (Could be a text block that had no subsequent tool uses, or a + // text block at the very end, or an invalid tool use, etc. Whatever + // the case, `presentAssistantMessage` relies on these blocks either + // to be completed or the user to reject a block in order to proceed + // and eventually set userMessageContentReady to true.) + const partialBlocks = this.assistantMessageContent.filter((block) => block.partial) + partialBlocks.forEach((block) => (block.partial = false)) + + // Can't just do this b/c a tool could be in the middle of executing. + // this.assistantMessageContent.forEach((e) => (e.partial = false)) + + // Now that the stream is complete, finalize any remaining partial content blocks + if (this.isAssistantMessageParserEnabled && this.assistantMessageParser) { + this.assistantMessageParser.finalizeContentBlocks() + this.assistantMessageContent = this.assistantMessageParser.getContentBlocks() + } + // When using old parser, no finalization needed - parsing already happened during streaming + + if (partialBlocks.length > 0) { + // If there is content to update then it will complete and + // update `this.userMessageContentReady` to true, which we + // `pWaitFor` before making the next request. All this is really + // doing is presenting the last partial message that we just set + // to complete. + presentAssistantMessage(this) + } - // Need to call here in case the stream was aborted. - if (this.abort || this.abandoned) { - throw new Error(`[RooCode#recursivelyMakeRooRequests] task ${this.taskId}.${this.instanceId} aborted`) - } + // Note: updateApiReqMsg() is now called from within drainStreamInBackgroundToFindAllUsage + // to ensure usage data is captured even when the stream is interrupted. The background task + // uses local variables to accumulate usage data before atomically updating the shared state. + await this.persistGpt5Metadata(reasoningMessage) + await this.saveClineMessages() + await this.providerRef.deref()?.postStateToWebview() - this.didCompleteReadingStream = true - - // Set any blocks to be complete to allow `presentAssistantMessage` - // to finish and set `userMessageContentReady` to true. - // (Could be a text block that had no subsequent tool uses, or a - // text block at the very end, or an invalid tool use, etc. Whatever - // the case, `presentAssistantMessage` relies on these blocks either - // to be completed or the user to reject a block in order to proceed - // and eventually set userMessageContentReady to true.) - const partialBlocks = this.assistantMessageContent.filter((block) => block.partial) - partialBlocks.forEach((block) => (block.partial = false)) - - // Can't just do this b/c a tool could be in the middle of executing. - // this.assistantMessageContent.forEach((e) => (e.partial = false)) - - // Now that the stream is complete, finalize any remaining partial content blocks - if (this.isAssistantMessageParserEnabled && this.assistantMessageParser) { - this.assistantMessageParser.finalizeContentBlocks() - this.assistantMessageContent = this.assistantMessageParser.getContentBlocks() - } - // When using old parser, no finalization needed - parsing already happened during streaming - - if (partialBlocks.length > 0) { - // If there is content to update then it will complete and - // update `this.userMessageContentReady` to true, which we - // `pWaitFor` before making the next request. All this is really - // doing is presenting the last partial message that we just set - // to complete. - presentAssistantMessage(this) - } + // Reset parser after each complete conversation round + if (this.assistantMessageParser) { + this.assistantMessageParser.reset() + } - await this.persistGpt5Metadata(reasoningMessage) + // Now add to apiConversationHistory. + // Need to save assistant responses to file before proceeding to + // tool use since user can exit at any moment and we wouldn't be + // able to save the assistant's response. + let didEndLoop = false - updateApiReqMsg() - await this.saveClineMessages() - await this.providerRef.deref()?.postStateToWebview() + if (assistantMessage.length > 0) { + await this.addToApiConversationHistory({ + role: "assistant", + content: [{ type: "text", text: assistantMessage }], + }) - // Reset parser after each complete conversation round - if (this.assistantMessageParser) { - this.assistantMessageParser.reset() - } + TelemetryService.instance.captureConversationMessage(this.taskId, "assistant") + + // NOTE: This comment is here for future reference - this was a + // workaround for `userMessageContent` not getting set to true. + // It was due to it not recursively calling for partial blocks + // when `didRejectTool`, so it would get stuck waiting for a + // partial block to complete before it could continue. + // In case the content blocks finished it may be the api stream + // finished after the last parsed content block was executed, so + // we are able to detect out of bounds and set + // `userMessageContentReady` to true (note you should not call + // `presentAssistantMessage` since if the last block i + // completed it will be presented again). + // const completeBlocks = this.assistantMessageContent.filter((block) => !block.partial) // If there are any partial blocks after the stream ended we can consider them invalid. + // if (this.currentStreamingContentIndex >= completeBlocks.length) { + // this.userMessageContentReady = true + // } + + await pWaitFor(() => this.userMessageContentReady) + + // If the model did not tool use, then we need to tell it to + // either use a tool or attempt_completion. + const didToolUse = this.assistantMessageContent.some((block) => block.type === "tool_use") + + if (!didToolUse) { + this.userMessageContent.push({ type: "text", text: formatResponse.noToolsUsed() }) + this.consecutiveMistakeCount++ + } - // Now add to apiConversationHistory. - // Need to save assistant responses to file before proceeding to - // tool use since user can exit at any moment and we wouldn't be - // able to save the assistant's response. - let didEndLoop = false + if (this.userMessageContent.length > 0) { + stack.push({ + userContent: [...this.userMessageContent], // Create a copy to avoid mutation issues + includeFileDetails: false, // Subsequent iterations don't need file details + }) - if (assistantMessage.length > 0) { - await this.addToApiConversationHistory({ - role: "assistant", - content: [{ type: "text", text: assistantMessage }], - }) + // Add periodic yielding to prevent blocking + await new Promise((resolve) => setImmediate(resolve)) + } + // Continue to next iteration instead of setting didEndLoop from recursive call + continue + } else { + // If there's no assistant_responses, that means we got no text + // or tool_use content blocks from API which we should assume is + // an error. + await this.say( + "error", + "Unexpected API Response: The language model did not provide any assistant messages. This may indicate an issue with the API or the model's output.", + ) - TelemetryService.instance.captureConversationMessage(this.taskId, "assistant") - - // NOTE: This comment is here for future reference - this was a - // workaround for `userMessageContent` not getting set to true. - // It was due to it not recursively calling for partial blocks - // when `didRejectTool`, so it would get stuck waiting for a - // partial block to complete before it could continue. - // In case the content blocks finished it may be the api stream - // finished after the last parsed content block was executed, so - // we are able to detect out of bounds and set - // `userMessageContentReady` to true (note you should not call - // `presentAssistantMessage` since if the last block i - // completed it will be presented again). - // const completeBlocks = this.assistantMessageContent.filter((block) => !block.partial) // If there are any partial blocks after the stream ended we can consider them invalid. - // if (this.currentStreamingContentIndex >= completeBlocks.length) { - // this.userMessageContentReady = true - // } - - await pWaitFor(() => this.userMessageContentReady) - - // If the model did not tool use, then we need to tell it to - // either use a tool or attempt_completion. - const didToolUse = this.assistantMessageContent.some((block) => block.type === "tool_use") - - if (!didToolUse) { - this.userMessageContent.push({ type: "text", text: formatResponse.noToolsUsed() }) - this.consecutiveMistakeCount++ + await this.addToApiConversationHistory({ + role: "assistant", + content: [{ type: "text", text: "Failure: I did not provide a response." }], + }) } - const recDidEndLoop = await this.recursivelyMakeClineRequests(this.userMessageContent) - didEndLoop = recDidEndLoop - } else { - // If there's no assistant_responses, that means we got no text - // or tool_use content blocks from API which we should assume is - // an error. - await this.say( - "error", - "Unexpected API Response: The language model did not provide any assistant messages. This may indicate an issue with the API or the model's output.", - ) - - await this.addToApiConversationHistory({ - role: "assistant", - content: [{ type: "text", text: "Failure: I did not provide a response." }], - }) + // If we reach here without continuing, return false (will always be false for now) + return false + } catch (error) { + // This should never happen since the only thing that can throw an + // error is the attemptApiRequest, which is wrapped in a try catch + // that sends an ask where if noButtonClicked, will clear current + // task and destroy this instance. However to avoid unhandled + // promise rejection, we will end this loop which will end execution + // of this instance (see `startTask`). + return true // Needs to be true so parent loop knows to end task. } - - return didEndLoop // Will always be false for now. - } catch (error) { - // This should never happen since the only thing that can throw an - // error is the attemptApiRequest, which is wrapped in a try catch - // that sends an ask where if noButtonClicked, will clear current - // task and destroy this instance. However to avoid unhandled - // promise rejection, we will end this loop which will end execution - // of this instance (see `startTask`). - return true // Needs to be true so parent loop knows to end task. } + + // If we exit the while loop normally (stack is empty), return false + return false } private async getSystemPrompt(): Promise { diff --git a/src/core/task/__tests__/Task.dispose.test.ts b/src/core/task/__tests__/Task.dispose.test.ts new file mode 100644 index 00000000000..1d93d148a4b --- /dev/null +++ b/src/core/task/__tests__/Task.dispose.test.ts @@ -0,0 +1,201 @@ +import { Task } from "../Task" +import { ClineProvider } from "../../webview/ClineProvider" +import { ProviderSettings } from "@roo-code/types" +import { vi, describe, test, expect, beforeEach, afterEach } from "vitest" + +// Mock dependencies +vi.mock("../../webview/ClineProvider") +vi.mock("../../../integrations/terminal/TerminalRegistry", () => ({ + TerminalRegistry: { + releaseTerminalsForTask: vi.fn(), + }, +})) +vi.mock("../../ignore/RooIgnoreController") +vi.mock("../../protect/RooProtectedController") +vi.mock("../../context-tracking/FileContextTracker") +vi.mock("../../../services/browser/UrlContentFetcher") +vi.mock("../../../services/browser/BrowserSession") +vi.mock("../../../integrations/editor/DiffViewProvider") +vi.mock("../../tools/ToolRepetitionDetector") +vi.mock("../../../api", () => ({ + buildApiHandler: vi.fn(() => ({ + getModel: () => ({ info: {}, id: "test-model" }), + })), +})) +vi.mock("./AutoApprovalHandler") + +// Mock TelemetryService +vi.mock("@roo-code/telemetry", () => ({ + TelemetryService: { + instance: { + captureTaskCreated: vi.fn(), + captureTaskRestarted: vi.fn(), + }, + }, +})) + +describe("Task dispose method", () => { + let mockProvider: any + let mockApiConfiguration: ProviderSettings + let task: Task + + beforeEach(() => { + // Reset all mocks + vi.clearAllMocks() + + // Mock provider + mockProvider = { + context: { + globalStorageUri: { fsPath: "/test/path" }, + }, + getState: vi.fn().mockResolvedValue({ mode: "code" }), + log: vi.fn(), + } + + // Mock API configuration + mockApiConfiguration = { + apiProvider: "anthropic", + apiKey: "test-key", + } as ProviderSettings + + // Create task instance without starting it + task = new Task({ + provider: mockProvider as ClineProvider, + apiConfiguration: mockApiConfiguration, + startTask: false, + }) + }) + + afterEach(() => { + // Clean up + if (task && !task.abort) { + task.dispose() + } + }) + + test("should remove all event listeners when dispose is called", () => { + // Add some event listeners using type assertion to bypass strict typing for testing + const listener1 = vi.fn(() => {}) + const listener2 = vi.fn(() => {}) + const listener3 = vi.fn((taskId: string) => {}) + + // Use type assertion to bypass strict event typing for testing + ;(task as any).on("TaskStarted", listener1) + ;(task as any).on("TaskAborted", listener2) + ;(task as any).on("TaskIdle", listener3) + + // Verify listeners are added + expect(task.listenerCount("TaskStarted")).toBe(1) + expect(task.listenerCount("TaskAborted")).toBe(1) + expect(task.listenerCount("TaskIdle")).toBe(1) + + // Spy on removeAllListeners method + const removeAllListenersSpy = vi.spyOn(task, "removeAllListeners") + + // Call dispose + task.dispose() + + // Verify removeAllListeners was called + expect(removeAllListenersSpy).toHaveBeenCalledOnce() + + // Verify all listeners are removed + expect(task.listenerCount("TaskStarted")).toBe(0) + expect(task.listenerCount("TaskAborted")).toBe(0) + expect(task.listenerCount("TaskIdle")).toBe(0) + }) + + test("should handle errors when removing event listeners", () => { + // Mock removeAllListeners to throw an error + const originalRemoveAllListeners = task.removeAllListeners + task.removeAllListeners = vi.fn(() => { + throw new Error("Test error") + }) + + // Spy on console.error + const consoleErrorSpy = vi.spyOn(console, "error").mockImplementation(() => {}) + + // Call dispose - should not throw + expect(() => task.dispose()).not.toThrow() + + // Verify error was logged + expect(consoleErrorSpy).toHaveBeenCalledWith("Error removing event listeners:", expect.any(Error)) + + // Restore + task.removeAllListeners = originalRemoveAllListeners + consoleErrorSpy.mockRestore() + }) + + test("should clean up all resources in correct order", () => { + const removeAllListenersSpy = vi.spyOn(task, "removeAllListeners") + const consoleLogSpy = vi.spyOn(console, "log").mockImplementation(() => {}) + + // Call dispose + task.dispose() + + // Verify dispose was called and logged + expect(consoleLogSpy).toHaveBeenCalledWith( + expect.stringContaining(`[Task] disposing task ${task.taskId}.${task.instanceId}`), + ) + + // Verify removeAllListeners was called first (before other cleanup) + expect(removeAllListenersSpy).toHaveBeenCalledOnce() + + // Clean up + consoleLogSpy.mockRestore() + }) + + test("should prevent memory leaks by removing listeners before other cleanup", () => { + // Add multiple listeners of different types using type assertion for testing + const listeners = { + TaskStarted: vi.fn(() => {}), + TaskAborted: vi.fn(() => {}), + TaskIdle: vi.fn((taskId: string) => {}), + TaskActive: vi.fn((taskId: string) => {}), + TaskAskResponded: vi.fn(() => {}), + Message: vi.fn((data: { action: "created" | "updated"; message: any }) => {}), + TaskTokenUsageUpdated: vi.fn((taskId: string, tokenUsage: any) => {}), + TaskToolFailed: vi.fn((taskId: string, tool: any, error: string) => {}), + TaskUnpaused: vi.fn(() => {}), + } + + // Add all listeners using type assertion to bypass strict typing for testing + const taskAny = task as any + taskAny.on("TaskStarted", listeners.TaskStarted) + taskAny.on("TaskAborted", listeners.TaskAborted) + taskAny.on("TaskIdle", listeners.TaskIdle) + taskAny.on("TaskActive", listeners.TaskActive) + taskAny.on("TaskAskResponded", listeners.TaskAskResponded) + taskAny.on("Message", listeners.Message) + taskAny.on("TaskTokenUsageUpdated", listeners.TaskTokenUsageUpdated) + taskAny.on("TaskToolFailed", listeners.TaskToolFailed) + taskAny.on("TaskUnpaused", listeners.TaskUnpaused) + + // Verify all listeners are added + expect(task.listenerCount("TaskStarted")).toBe(1) + expect(task.listenerCount("TaskAborted")).toBe(1) + expect(task.listenerCount("TaskIdle")).toBe(1) + expect(task.listenerCount("TaskActive")).toBe(1) + expect(task.listenerCount("TaskAskResponded")).toBe(1) + expect(task.listenerCount("Message")).toBe(1) + expect(task.listenerCount("TaskTokenUsageUpdated")).toBe(1) + expect(task.listenerCount("TaskToolFailed")).toBe(1) + expect(task.listenerCount("TaskUnpaused")).toBe(1) + + // Call dispose + task.dispose() + + // Verify all listeners are removed + expect(task.listenerCount("TaskStarted")).toBe(0) + expect(task.listenerCount("TaskAborted")).toBe(0) + expect(task.listenerCount("TaskIdle")).toBe(0) + expect(task.listenerCount("TaskActive")).toBe(0) + expect(task.listenerCount("TaskAskResponded")).toBe(0) + expect(task.listenerCount("Message")).toBe(0) + expect(task.listenerCount("TaskTokenUsageUpdated")).toBe(0) + expect(task.listenerCount("TaskToolFailed")).toBe(0) + expect(task.listenerCount("TaskUnpaused")).toBe(0) + + // Verify total listener count is 0 + expect(task.eventNames().length).toBe(0) + }) +})