Skip to content
7 changes: 7 additions & 0 deletions packages/types/src/global-settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ export const DEFAULT_WRITE_DELAY_MS = 1000
*/
export const DEFAULT_TERMINAL_OUTPUT_CHARACTER_LIMIT = 50_000

/**
* Default timeout for background usage collection in milliseconds.
* This timeout prevents the background task from running indefinitely
* when collecting usage data from streaming API responses.
*/
export const DEFAULT_USAGE_COLLECTION_TIMEOUT_MS = 30_000

/**
* GlobalSettings
*/
Expand Down
191 changes: 166 additions & 25 deletions src/core/task/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
getApiProtocol,
getModelId,
DEFAULT_CONSECUTIVE_MISTAKE_LIMIT,
DEFAULT_USAGE_COLLECTION_TIMEOUT_MS,
isBlockingAsk,
} from "@roo-code/types"
import { TelemetryService } from "@roo-code/telemetry"
Expand Down Expand Up @@ -1522,6 +1523,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
// of prices in tasks from history (it's worth removing a few months
// from now).
const updateApiReqMsg = (cancelReason?: ClineApiReqCancelReason, streamingFailedMessage?: string) => {
if (lastApiReqIndex < 0 || !this.clineMessages[lastApiReqIndex]) {
return
}

const existingData = JSON.parse(this.clineMessages[lastApiReqIndex].text || "{}")
this.clineMessages[lastApiReqIndex].text = JSON.stringify({
...existingData,
Expand Down Expand Up @@ -1612,7 +1617,11 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
this.isStreaming = true

try {
for await (const chunk of stream) {
const iterator = stream[Symbol.asyncIterator]()
let item = await iterator.next()
while (!item.done) {
const chunk = item.value
item = await iterator.next()
if (!chunk) {
// Sometimes chunk is undefined, no idea that can cause
// it, but this workaround seems to fix it.
Expand Down Expand Up @@ -1680,16 +1689,165 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
break
}

// PREV: We need to let the request finish for openrouter to
// get generation details.
// UPDATE: It's better UX to interrupt the request at the
// cost of the API cost not being retrieved.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment calls out OpenRouter specifically, but I think if seen usage being misreported with other providers as well.

if (this.didAlreadyUseTool) {
assistantMessage +=
"\n\n[Response interrupted by a tool use result. Only one tool may be used at a time and should be placed at the end of the message.]"
break
}
}

// Create a copy of current token values to avoid race conditions
const currentTokens = {
input: inputTokens,
output: outputTokens,
cacheWrite: cacheWriteTokens,
cacheRead: cacheReadTokens,
total: totalCost,
}

const drainStreamInBackgroundToFindAllUsage = async (apiReqIndex: number) => {
const timeoutMs = DEFAULT_USAGE_COLLECTION_TIMEOUT_MS
const startTime = Date.now()
const modelId = getModelId(this.apiConfiguration)

// Local variables to accumulate usage data without affecting the main flow
let bgInputTokens = currentTokens.input
let bgOutputTokens = currentTokens.output
let bgCacheWriteTokens = currentTokens.cacheWrite
let bgCacheReadTokens = currentTokens.cacheRead
let bgTotalCost = currentTokens.total

// Helper function to capture telemetry and update messages
const captureUsageData = async (
tokens: {
input: number
output: number
cacheWrite: number
cacheRead: number
total?: number
},
messageIndex: number = apiReqIndex,
) => {
if (tokens.input > 0 || tokens.output > 0 || tokens.cacheWrite > 0 || tokens.cacheRead > 0) {
// Update the shared variables atomically
inputTokens = tokens.input
outputTokens = tokens.output
cacheWriteTokens = tokens.cacheWrite
cacheReadTokens = tokens.cacheRead
totalCost = tokens.total

// Update the API request message with the latest usage data
updateApiReqMsg()
await this.saveClineMessages()

// Update the specific message in the webview
const apiReqMessage = this.clineMessages[messageIndex]
if (apiReqMessage) {
await this.updateClineMessage(apiReqMessage)
}

// Capture telemetry
TelemetryService.instance.captureLlmCompletion(this.taskId, {
inputTokens: tokens.input,
outputTokens: tokens.output,
cacheWriteTokens: tokens.cacheWrite,
cacheReadTokens: tokens.cacheRead,
cost:
tokens.total ??
calculateApiCostAnthropic(
this.api.getModel().info,
tokens.input,
tokens.output,
tokens.cacheWrite,
tokens.cacheRead,
),
})
}
}

try {
// Continue processing the original stream from where the main loop left off
let usageFound = false
let chunkCount = 0

// Use the same iterator that the main loop was using
while (!item.done) {
// Check for timeout
if (Date.now() - startTime > timeoutMs) {
console.warn(
`[Background Usage Collection] Timed out after ${timeoutMs}ms for model: ${modelId}, processed ${chunkCount} chunks`,
)
// Clean up the iterator before breaking
if (iterator.return) {
await iterator.return(undefined)
}
break
}

const chunk = item.value
item = await iterator.next()
chunkCount++

if (chunk && chunk.type === "usage") {
usageFound = true
bgInputTokens += chunk.inputTokens
bgOutputTokens += chunk.outputTokens
bgCacheWriteTokens += chunk.cacheWriteTokens ?? 0
bgCacheReadTokens += chunk.cacheReadTokens ?? 0
bgTotalCost = chunk.totalCost
}
}

if (
usageFound ||
bgInputTokens > 0 ||
bgOutputTokens > 0 ||
bgCacheWriteTokens > 0 ||
bgCacheReadTokens > 0
) {
// We have usage data either from a usage chunk or accumulated tokens
await captureUsageData(
{
input: bgInputTokens,
output: bgOutputTokens,
cacheWrite: bgCacheWriteTokens,
cacheRead: bgCacheReadTokens,
total: bgTotalCost,
},
lastApiReqIndex,
)
} else {
console.warn(
`[Background Usage Collection] Suspicious: request ${apiReqIndex} is complete, but no usage info was found. Model: ${modelId}`,
)
}
} catch (error) {
console.error("Error draining stream for usage data:", error)
// Still try to capture whatever usage data we have collected so far
if (
bgInputTokens > 0 ||
bgOutputTokens > 0 ||
bgCacheWriteTokens > 0 ||
bgCacheReadTokens > 0
) {
await captureUsageData(
{
input: bgInputTokens,
output: bgOutputTokens,
cacheWrite: bgCacheWriteTokens,
cacheRead: bgCacheReadTokens,
total: bgTotalCost,
},
lastApiReqIndex,
)
}
}
}

// Start the background task and handle any errors
drainStreamInBackgroundToFindAllUsage(lastApiReqIndex).catch((error) => {
console.error("Background usage collection failed:", error)
})
} catch (error) {
// Abandoned happens when extension is no longer waiting for the
// Cline instance to finish aborting (error is thrown here when
Expand Down Expand Up @@ -1723,24 +1881,6 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
this.isStreaming = false
}

if (inputTokens > 0 || outputTokens > 0 || cacheWriteTokens > 0 || cacheReadTokens > 0) {
TelemetryService.instance.captureLlmCompletion(this.taskId, {
inputTokens,
outputTokens,
cacheWriteTokens,
cacheReadTokens,
cost:
totalCost ??
calculateApiCostAnthropic(
this.api.getModel().info,
inputTokens,
outputTokens,
cacheWriteTokens,
cacheReadTokens,
),
})
}

// Need to call here in case the stream was aborted.
if (this.abort || this.abandoned) {
throw new Error(`[RooCode#recursivelyMakeRooRequests] task ${this.taskId}.${this.instanceId} aborted`)
Expand Down Expand Up @@ -1777,9 +1917,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
presentAssistantMessage(this)
}

// Note: updateApiReqMsg() is now called from within drainStreamInBackgroundToFindAllUsage
// to ensure usage data is captured even when the stream is interrupted. The background task
// uses local variables to accumulate usage data before atomically updating the shared state.
await this.persistGpt5Metadata(reasoningMessage)

updateApiReqMsg()
await this.saveClineMessages()
await this.providerRef.deref()?.postStateToWebview()

Expand Down
Loading