diff --git a/packages/types/src/global-settings.ts b/packages/types/src/global-settings.ts index f5e9fc32bd..c657499e7a 100644 --- a/packages/types/src/global-settings.ts +++ b/packages/types/src/global-settings.ts @@ -29,6 +29,13 @@ export const DEFAULT_WRITE_DELAY_MS = 1000 */ export const DEFAULT_TERMINAL_OUTPUT_CHARACTER_LIMIT = 50_000 +/** + * Default timeout for background usage collection in milliseconds. + * This timeout prevents the background task from running indefinitely + * when collecting usage data from streaming API responses. + */ +export const DEFAULT_USAGE_COLLECTION_TIMEOUT_MS = 30_000 + /** * GlobalSettings */ diff --git a/src/core/task/Task.ts b/src/core/task/Task.ts index 5e96b6fb16..8921997dc7 100644 --- a/src/core/task/Task.ts +++ b/src/core/task/Task.ts @@ -29,6 +29,7 @@ import { getApiProtocol, getModelId, DEFAULT_CONSECUTIVE_MISTAKE_LIMIT, + DEFAULT_USAGE_COLLECTION_TIMEOUT_MS, isBlockingAsk, } from "@roo-code/types" import { TelemetryService } from "@roo-code/telemetry" @@ -1522,6 +1523,10 @@ export class Task extends EventEmitter implements TaskLike { // of prices in tasks from history (it's worth removing a few months // from now). const updateApiReqMsg = (cancelReason?: ClineApiReqCancelReason, streamingFailedMessage?: string) => { + if (lastApiReqIndex < 0 || !this.clineMessages[lastApiReqIndex]) { + return + } + const existingData = JSON.parse(this.clineMessages[lastApiReqIndex].text || "{}") this.clineMessages[lastApiReqIndex].text = JSON.stringify({ ...existingData, @@ -1612,7 +1617,11 @@ export class Task extends EventEmitter implements TaskLike { this.isStreaming = true try { - for await (const chunk of stream) { + const iterator = stream[Symbol.asyncIterator]() + let item = await iterator.next() + while (!item.done) { + const chunk = item.value + item = await iterator.next() if (!chunk) { // Sometimes chunk is undefined, no idea that can cause // it, but this workaround seems to fix it. @@ -1680,16 +1689,165 @@ export class Task extends EventEmitter implements TaskLike { break } - // PREV: We need to let the request finish for openrouter to - // get generation details. - // UPDATE: It's better UX to interrupt the request at the - // cost of the API cost not being retrieved. if (this.didAlreadyUseTool) { assistantMessage += "\n\n[Response interrupted by a tool use result. Only one tool may be used at a time and should be placed at the end of the message.]" break } } + + // Create a copy of current token values to avoid race conditions + const currentTokens = { + input: inputTokens, + output: outputTokens, + cacheWrite: cacheWriteTokens, + cacheRead: cacheReadTokens, + total: totalCost, + } + + const drainStreamInBackgroundToFindAllUsage = async (apiReqIndex: number) => { + const timeoutMs = DEFAULT_USAGE_COLLECTION_TIMEOUT_MS + const startTime = Date.now() + const modelId = getModelId(this.apiConfiguration) + + // Local variables to accumulate usage data without affecting the main flow + let bgInputTokens = currentTokens.input + let bgOutputTokens = currentTokens.output + let bgCacheWriteTokens = currentTokens.cacheWrite + let bgCacheReadTokens = currentTokens.cacheRead + let bgTotalCost = currentTokens.total + + // Helper function to capture telemetry and update messages + const captureUsageData = async ( + tokens: { + input: number + output: number + cacheWrite: number + cacheRead: number + total?: number + }, + messageIndex: number = apiReqIndex, + ) => { + if (tokens.input > 0 || tokens.output > 0 || tokens.cacheWrite > 0 || tokens.cacheRead > 0) { + // Update the shared variables atomically + inputTokens = tokens.input + outputTokens = tokens.output + cacheWriteTokens = tokens.cacheWrite + cacheReadTokens = tokens.cacheRead + totalCost = tokens.total + + // Update the API request message with the latest usage data + updateApiReqMsg() + await this.saveClineMessages() + + // Update the specific message in the webview + const apiReqMessage = this.clineMessages[messageIndex] + if (apiReqMessage) { + await this.updateClineMessage(apiReqMessage) + } + + // Capture telemetry + TelemetryService.instance.captureLlmCompletion(this.taskId, { + inputTokens: tokens.input, + outputTokens: tokens.output, + cacheWriteTokens: tokens.cacheWrite, + cacheReadTokens: tokens.cacheRead, + cost: + tokens.total ?? + calculateApiCostAnthropic( + this.api.getModel().info, + tokens.input, + tokens.output, + tokens.cacheWrite, + tokens.cacheRead, + ), + }) + } + } + + try { + // Continue processing the original stream from where the main loop left off + let usageFound = false + let chunkCount = 0 + + // Use the same iterator that the main loop was using + while (!item.done) { + // Check for timeout + if (Date.now() - startTime > timeoutMs) { + console.warn( + `[Background Usage Collection] Timed out after ${timeoutMs}ms for model: ${modelId}, processed ${chunkCount} chunks`, + ) + // Clean up the iterator before breaking + if (iterator.return) { + await iterator.return(undefined) + } + break + } + + const chunk = item.value + item = await iterator.next() + chunkCount++ + + if (chunk && chunk.type === "usage") { + usageFound = true + bgInputTokens += chunk.inputTokens + bgOutputTokens += chunk.outputTokens + bgCacheWriteTokens += chunk.cacheWriteTokens ?? 0 + bgCacheReadTokens += chunk.cacheReadTokens ?? 0 + bgTotalCost = chunk.totalCost + } + } + + if ( + usageFound || + bgInputTokens > 0 || + bgOutputTokens > 0 || + bgCacheWriteTokens > 0 || + bgCacheReadTokens > 0 + ) { + // We have usage data either from a usage chunk or accumulated tokens + await captureUsageData( + { + input: bgInputTokens, + output: bgOutputTokens, + cacheWrite: bgCacheWriteTokens, + cacheRead: bgCacheReadTokens, + total: bgTotalCost, + }, + lastApiReqIndex, + ) + } else { + console.warn( + `[Background Usage Collection] Suspicious: request ${apiReqIndex} is complete, but no usage info was found. Model: ${modelId}`, + ) + } + } catch (error) { + console.error("Error draining stream for usage data:", error) + // Still try to capture whatever usage data we have collected so far + if ( + bgInputTokens > 0 || + bgOutputTokens > 0 || + bgCacheWriteTokens > 0 || + bgCacheReadTokens > 0 + ) { + await captureUsageData( + { + input: bgInputTokens, + output: bgOutputTokens, + cacheWrite: bgCacheWriteTokens, + cacheRead: bgCacheReadTokens, + total: bgTotalCost, + }, + lastApiReqIndex, + ) + } + } + } + + // Start the background task and handle any errors + drainStreamInBackgroundToFindAllUsage(lastApiReqIndex).catch((error) => { + console.error("Background usage collection failed:", error) + }) } catch (error) { // Abandoned happens when extension is no longer waiting for the // Cline instance to finish aborting (error is thrown here when @@ -1723,24 +1881,6 @@ export class Task extends EventEmitter implements TaskLike { this.isStreaming = false } - if (inputTokens > 0 || outputTokens > 0 || cacheWriteTokens > 0 || cacheReadTokens > 0) { - TelemetryService.instance.captureLlmCompletion(this.taskId, { - inputTokens, - outputTokens, - cacheWriteTokens, - cacheReadTokens, - cost: - totalCost ?? - calculateApiCostAnthropic( - this.api.getModel().info, - inputTokens, - outputTokens, - cacheWriteTokens, - cacheReadTokens, - ), - }) - } - // Need to call here in case the stream was aborted. if (this.abort || this.abandoned) { throw new Error(`[RooCode#recursivelyMakeRooRequests] task ${this.taskId}.${this.instanceId} aborted`) @@ -1777,9 +1917,10 @@ export class Task extends EventEmitter implements TaskLike { presentAssistantMessage(this) } + // Note: updateApiReqMsg() is now called from within drainStreamInBackgroundToFindAllUsage + // to ensure usage data is captured even when the stream is interrupted. The background task + // uses local variables to accumulate usage data before atomically updating the shared state. await this.persistGpt5Metadata(reasoningMessage) - - updateApiReqMsg() await this.saveClineMessages() await this.providerRef.deref()?.postStateToWebview()