diff --git a/packages/types/src/global-settings.ts b/packages/types/src/global-settings.ts index 39480e5a3d..c071726d8a 100644 --- a/packages/types/src/global-settings.ts +++ b/packages/types/src/global-settings.ts @@ -29,13 +29,6 @@ export const DEFAULT_WRITE_DELAY_MS = 1000 */ export const DEFAULT_TERMINAL_OUTPUT_CHARACTER_LIMIT = 50_000 -/** - * Default timeout for background usage collection in milliseconds. - * This timeout prevents the background task from running indefinitely - * when collecting usage data from streaming API responses. - */ -export const DEFAULT_USAGE_COLLECTION_TIMEOUT_MS = 30_000 - /** * GlobalSettings */ diff --git a/src/core/task/Task.ts b/src/core/task/Task.ts index cb6694b7f0..2103dacb27 100644 --- a/src/core/task/Task.ts +++ b/src/core/task/Task.ts @@ -29,7 +29,6 @@ import { getApiProtocol, getModelId, DEFAULT_CONSECUTIVE_MISTAKE_LIMIT, - DEFAULT_USAGE_COLLECTION_TIMEOUT_MS, isBlockingAsk, } from "@roo-code/types" import { TelemetryService } from "@roo-code/telemetry" @@ -1566,10 +1565,6 @@ export class Task extends EventEmitter implements TaskLike { // of prices in tasks from history (it's worth removing a few months // from now). const updateApiReqMsg = (cancelReason?: ClineApiReqCancelReason, streamingFailedMessage?: string) => { - if (lastApiReqIndex < 0 || !this.clineMessages[lastApiReqIndex]) { - return - } - const existingData = JSON.parse(this.clineMessages[lastApiReqIndex].text || "{}") this.clineMessages[lastApiReqIndex].text = JSON.stringify({ ...existingData, @@ -1660,11 +1655,7 @@ export class Task extends EventEmitter implements TaskLike { this.isStreaming = true try { - const iterator = stream[Symbol.asyncIterator]() - let item = await iterator.next() - while (!item.done) { - const chunk = item.value - item = await iterator.next() + for await (const chunk of stream) { if (!chunk) { // Sometimes chunk is undefined, no idea that can cause // it, but this workaround seems to fix it. @@ -1732,165 +1723,16 @@ export class Task extends EventEmitter implements TaskLike { break } + // PREV: We need to let the request finish for openrouter to + // get generation details. + // UPDATE: It's better UX to interrupt the request at the + // cost of the API cost not being retrieved. if (this.didAlreadyUseTool) { assistantMessage += "\n\n[Response interrupted by a tool use result. Only one tool may be used at a time and should be placed at the end of the message.]" break } } - - // Create a copy of current token values to avoid race conditions - const currentTokens = { - input: inputTokens, - output: outputTokens, - cacheWrite: cacheWriteTokens, - cacheRead: cacheReadTokens, - total: totalCost, - } - - const drainStreamInBackgroundToFindAllUsage = async (apiReqIndex: number) => { - const timeoutMs = DEFAULT_USAGE_COLLECTION_TIMEOUT_MS - const startTime = Date.now() - const modelId = getModelId(this.apiConfiguration) - - // Local variables to accumulate usage data without affecting the main flow - let bgInputTokens = currentTokens.input - let bgOutputTokens = currentTokens.output - let bgCacheWriteTokens = currentTokens.cacheWrite - let bgCacheReadTokens = currentTokens.cacheRead - let bgTotalCost = currentTokens.total - - // Helper function to capture telemetry and update messages - const captureUsageData = async ( - tokens: { - input: number - output: number - cacheWrite: number - cacheRead: number - total?: number - }, - messageIndex: number = apiReqIndex, - ) => { - if (tokens.input > 0 || tokens.output > 0 || tokens.cacheWrite > 0 || tokens.cacheRead > 0) { - // Update the shared variables atomically - inputTokens = tokens.input - outputTokens = tokens.output - cacheWriteTokens = tokens.cacheWrite - cacheReadTokens = tokens.cacheRead - totalCost = tokens.total - - // Update the API request message with the latest usage data - updateApiReqMsg() - await this.saveClineMessages() - - // Update the specific message in the webview - const apiReqMessage = this.clineMessages[messageIndex] - if (apiReqMessage) { - await this.updateClineMessage(apiReqMessage) - } - - // Capture telemetry - TelemetryService.instance.captureLlmCompletion(this.taskId, { - inputTokens: tokens.input, - outputTokens: tokens.output, - cacheWriteTokens: tokens.cacheWrite, - cacheReadTokens: tokens.cacheRead, - cost: - tokens.total ?? - calculateApiCostAnthropic( - this.api.getModel().info, - tokens.input, - tokens.output, - tokens.cacheWrite, - tokens.cacheRead, - ), - }) - } - } - - try { - // Continue processing the original stream from where the main loop left off - let usageFound = false - let chunkCount = 0 - - // Use the same iterator that the main loop was using - while (!item.done) { - // Check for timeout - if (Date.now() - startTime > timeoutMs) { - console.warn( - `[Background Usage Collection] Timed out after ${timeoutMs}ms for model: ${modelId}, processed ${chunkCount} chunks`, - ) - // Clean up the iterator before breaking - if (iterator.return) { - await iterator.return(undefined) - } - break - } - - const chunk = item.value - item = await iterator.next() - chunkCount++ - - if (chunk && chunk.type === "usage") { - usageFound = true - bgInputTokens += chunk.inputTokens - bgOutputTokens += chunk.outputTokens - bgCacheWriteTokens += chunk.cacheWriteTokens ?? 0 - bgCacheReadTokens += chunk.cacheReadTokens ?? 0 - bgTotalCost = chunk.totalCost - } - } - - if ( - usageFound || - bgInputTokens > 0 || - bgOutputTokens > 0 || - bgCacheWriteTokens > 0 || - bgCacheReadTokens > 0 - ) { - // We have usage data either from a usage chunk or accumulated tokens - await captureUsageData( - { - input: bgInputTokens, - output: bgOutputTokens, - cacheWrite: bgCacheWriteTokens, - cacheRead: bgCacheReadTokens, - total: bgTotalCost, - }, - lastApiReqIndex, - ) - } else { - console.warn( - `[Background Usage Collection] Suspicious: request ${apiReqIndex} is complete, but no usage info was found. Model: ${modelId}`, - ) - } - } catch (error) { - console.error("Error draining stream for usage data:", error) - // Still try to capture whatever usage data we have collected so far - if ( - bgInputTokens > 0 || - bgOutputTokens > 0 || - bgCacheWriteTokens > 0 || - bgCacheReadTokens > 0 - ) { - await captureUsageData( - { - input: bgInputTokens, - output: bgOutputTokens, - cacheWrite: bgCacheWriteTokens, - cacheRead: bgCacheReadTokens, - total: bgTotalCost, - }, - lastApiReqIndex, - ) - } - } - } - - // Start the background task and handle any errors - drainStreamInBackgroundToFindAllUsage(lastApiReqIndex).catch((error) => { - console.error("Background usage collection failed:", error) - }) } catch (error) { // Abandoned happens when extension is no longer waiting for the // Cline instance to finish aborting (error is thrown here when @@ -1924,6 +1766,24 @@ export class Task extends EventEmitter implements TaskLike { this.isStreaming = false } + if (inputTokens > 0 || outputTokens > 0 || cacheWriteTokens > 0 || cacheReadTokens > 0) { + TelemetryService.instance.captureLlmCompletion(this.taskId, { + inputTokens, + outputTokens, + cacheWriteTokens, + cacheReadTokens, + cost: + totalCost ?? + calculateApiCostAnthropic( + this.api.getModel().info, + inputTokens, + outputTokens, + cacheWriteTokens, + cacheReadTokens, + ), + }) + } + // Need to call here in case the stream was aborted. if (this.abort || this.abandoned) { throw new Error(`[RooCode#recursivelyMakeRooRequests] task ${this.taskId}.${this.instanceId} aborted`) @@ -1960,10 +1820,9 @@ export class Task extends EventEmitter implements TaskLike { presentAssistantMessage(this) } - // Note: updateApiReqMsg() is now called from within drainStreamInBackgroundToFindAllUsage - // to ensure usage data is captured even when the stream is interrupted. The background task - // uses local variables to accumulate usage data before atomically updating the shared state. await this.persistGpt5Metadata(reasoningMessage) + + updateApiReqMsg() await this.saveClineMessages() await this.providerRef.deref()?.postStateToWebview()