Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions packages/types/src/global-settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ export const DEFAULT_WRITE_DELAY_MS = 1000
*/
export const DEFAULT_TERMINAL_OUTPUT_CHARACTER_LIMIT = 50_000

/**
* Default timeout for background usage collection in milliseconds.
* This timeout prevents the background task from running indefinitely
* when collecting usage data from streaming API responses.
*/
export const DEFAULT_USAGE_COLLECTION_TIMEOUT_MS = 30_000

/**
* GlobalSettings
*/
Expand Down
191 changes: 25 additions & 166 deletions src/core/task/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import {
getApiProtocol,
getModelId,
DEFAULT_CONSECUTIVE_MISTAKE_LIMIT,
DEFAULT_USAGE_COLLECTION_TIMEOUT_MS,
isBlockingAsk,
} from "@roo-code/types"
import { TelemetryService } from "@roo-code/telemetry"
Expand Down Expand Up @@ -1566,10 +1565,6 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
// of prices in tasks from history (it's worth removing a few months
// from now).
const updateApiReqMsg = (cancelReason?: ClineApiReqCancelReason, streamingFailedMessage?: string) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removal of the lastApiReqIndex < 0 check could potentially cause issues if this function is called with an invalid index. Consider keeping this defensive check even though the background processing is removed:

Suggested change
const updateApiReqMsg = (cancelReason?: ClineApiReqCancelReason, streamingFailedMessage?: string) => {
const updateApiReqMsg = (cancelReason?: ClineApiReqCancelReason, streamingFailedMessage?: string) => {
if (lastApiReqIndex < 0 || !this.clineMessages[lastApiReqIndex]) {
return
}
const existingData = JSON.parse(this.clineMessages[lastApiReqIndex].text || "{}")

if (lastApiReqIndex < 0 || !this.clineMessages[lastApiReqIndex]) {
return
}

const existingData = JSON.parse(this.clineMessages[lastApiReqIndex].text || "{}")
this.clineMessages[lastApiReqIndex].text = JSON.stringify({
...existingData,
Expand Down Expand Up @@ -1660,11 +1655,7 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
this.isStreaming = true

try {
const iterator = stream[Symbol.asyncIterator]()
let item = await iterator.next()
while (!item.done) {
const chunk = item.value
item = await iterator.next()
for await (const chunk of stream) {
if (!chunk) {
// Sometimes chunk is undefined, no idea that can cause
// it, but this workaround seems to fix it.
Expand Down Expand Up @@ -1732,165 +1723,16 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
break
}

// PREV: We need to let the request finish for openrouter to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment here specifically mentions OpenRouter, but this behavior affects all providers that return usage data at the end of streams. Consider making it more generic:

Suggested change
// PREV: We need to let the request finish for openrouter to
// PREV: We need to let the request finish for providers to
// get complete usage/generation details.
// UPDATE: It's better UX to interrupt the request at the
// cost of the API cost not being retrieved.

// get generation details.
// UPDATE: It's better UX to interrupt the request at the
// cost of the API cost not being retrieved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a comment here explaining the trade-off decision. Something like:

Suggested change
// cost of the API cost not being retrieved.
// PREV: We need to let the request finish for openrouter to
// get generation details.
// UPDATE: It's better UX to interrupt the request at the
// cost of the API cost not being retrieved.
// This prioritizes user experience (immediate interruption) over
// perfect cost tracking based on user feedback.

This would help future developers understand why this approach was chosen.

if (this.didAlreadyUseTool) {
assistantMessage +=
"\n\n[Response interrupted by a tool use result. Only one tool may be used at a time and should be placed at the end of the message.]"
break
}
}

// Create a copy of current token values to avoid race conditions
const currentTokens = {
input: inputTokens,
output: outputTokens,
cacheWrite: cacheWriteTokens,
cacheRead: cacheReadTokens,
total: totalCost,
}

const drainStreamInBackgroundToFindAllUsage = async (apiReqIndex: number) => {
const timeoutMs = DEFAULT_USAGE_COLLECTION_TIMEOUT_MS
const startTime = Date.now()
const modelId = getModelId(this.apiConfiguration)

// Local variables to accumulate usage data without affecting the main flow
let bgInputTokens = currentTokens.input
let bgOutputTokens = currentTokens.output
let bgCacheWriteTokens = currentTokens.cacheWrite
let bgCacheReadTokens = currentTokens.cacheRead
let bgTotalCost = currentTokens.total

// Helper function to capture telemetry and update messages
const captureUsageData = async (
tokens: {
input: number
output: number
cacheWrite: number
cacheRead: number
total?: number
},
messageIndex: number = apiReqIndex,
) => {
if (tokens.input > 0 || tokens.output > 0 || tokens.cacheWrite > 0 || tokens.cacheRead > 0) {
// Update the shared variables atomically
inputTokens = tokens.input
outputTokens = tokens.output
cacheWriteTokens = tokens.cacheWrite
cacheReadTokens = tokens.cacheRead
totalCost = tokens.total

// Update the API request message with the latest usage data
updateApiReqMsg()
await this.saveClineMessages()

// Update the specific message in the webview
const apiReqMessage = this.clineMessages[messageIndex]
if (apiReqMessage) {
await this.updateClineMessage(apiReqMessage)
}

// Capture telemetry
TelemetryService.instance.captureLlmCompletion(this.taskId, {
inputTokens: tokens.input,
outputTokens: tokens.output,
cacheWriteTokens: tokens.cacheWrite,
cacheReadTokens: tokens.cacheRead,
cost:
tokens.total ??
calculateApiCostAnthropic(
this.api.getModel().info,
tokens.input,
tokens.output,
tokens.cacheWrite,
tokens.cacheRead,
),
})
}
}

try {
// Continue processing the original stream from where the main loop left off
let usageFound = false
let chunkCount = 0

// Use the same iterator that the main loop was using
while (!item.done) {
// Check for timeout
if (Date.now() - startTime > timeoutMs) {
console.warn(
`[Background Usage Collection] Timed out after ${timeoutMs}ms for model: ${modelId}, processed ${chunkCount} chunks`,
)
// Clean up the iterator before breaking
if (iterator.return) {
await iterator.return(undefined)
}
break
}

const chunk = item.value
item = await iterator.next()
chunkCount++

if (chunk && chunk.type === "usage") {
usageFound = true
bgInputTokens += chunk.inputTokens
bgOutputTokens += chunk.outputTokens
bgCacheWriteTokens += chunk.cacheWriteTokens ?? 0
bgCacheReadTokens += chunk.cacheReadTokens ?? 0
bgTotalCost = chunk.totalCost
}
}

if (
usageFound ||
bgInputTokens > 0 ||
bgOutputTokens > 0 ||
bgCacheWriteTokens > 0 ||
bgCacheReadTokens > 0
) {
// We have usage data either from a usage chunk or accumulated tokens
await captureUsageData(
{
input: bgInputTokens,
output: bgOutputTokens,
cacheWrite: bgCacheWriteTokens,
cacheRead: bgCacheReadTokens,
total: bgTotalCost,
},
lastApiReqIndex,
)
} else {
console.warn(
`[Background Usage Collection] Suspicious: request ${apiReqIndex} is complete, but no usage info was found. Model: ${modelId}`,
)
}
} catch (error) {
console.error("Error draining stream for usage data:", error)
// Still try to capture whatever usage data we have collected so far
if (
bgInputTokens > 0 ||
bgOutputTokens > 0 ||
bgCacheWriteTokens > 0 ||
bgCacheReadTokens > 0
) {
await captureUsageData(
{
input: bgInputTokens,
output: bgOutputTokens,
cacheWrite: bgCacheWriteTokens,
cacheRead: bgCacheReadTokens,
total: bgTotalCost,
},
lastApiReqIndex,
)
}
}
}

// Start the background task and handle any errors
drainStreamInBackgroundToFindAllUsage(lastApiReqIndex).catch((error) => {
console.error("Background usage collection failed:", error)
})
} catch (error) {
// Abandoned happens when extension is no longer waiting for the
// Cline instance to finish aborting (error is thrown here when
Expand Down Expand Up @@ -1924,6 +1766,24 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
this.isStreaming = false
}

if (inputTokens > 0 || outputTokens > 0 || cacheWriteTokens > 0 || cacheReadTokens > 0) {
TelemetryService.instance.captureLlmCompletion(this.taskId, {
inputTokens,
outputTokens,
cacheWriteTokens,
cacheReadTokens,
cost:
totalCost ??
calculateApiCostAnthropic(
this.api.getModel().info,
inputTokens,
outputTokens,
cacheWriteTokens,
cacheReadTokens,
),
})
}

// Need to call here in case the stream was aborted.
if (this.abort || this.abandoned) {
throw new Error(`[RooCode#recursivelyMakeRooRequests] task ${this.taskId}.${this.instanceId} aborted`)
Expand Down Expand Up @@ -1960,10 +1820,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
presentAssistantMessage(this)
}

// Note: updateApiReqMsg() is now called from within drainStreamInBackgroundToFindAllUsage
// to ensure usage data is captured even when the stream is interrupted. The background task
// uses local variables to accumulate usage data before atomically updating the shared state.
await this.persistGpt5Metadata(reasoningMessage)

updateApiReqMsg()
await this.saveClineMessages()
await this.providerRef.deref()?.postStateToWebview()

Expand Down
Loading