Skip to content

Commit b30372d

Browse files
Fix token usage / cost often being underreported (#6122)
Co-authored-by: Daniel Riccio <[email protected]>
1 parent 1018b88 commit b30372d

File tree

2 files changed

+173
-25
lines changed

2 files changed

+173
-25
lines changed

packages/types/src/global-settings.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ export const DEFAULT_WRITE_DELAY_MS = 1000
2929
*/
3030
export const DEFAULT_TERMINAL_OUTPUT_CHARACTER_LIMIT = 50_000
3131

32+
/**
33+
* Default timeout for background usage collection in milliseconds.
34+
* This timeout prevents the background task from running indefinitely
35+
* when collecting usage data from streaming API responses.
36+
*/
37+
export const DEFAULT_USAGE_COLLECTION_TIMEOUT_MS = 30_000
38+
3239
/**
3340
* GlobalSettings
3441
*/

src/core/task/Task.ts

Lines changed: 166 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
getApiProtocol,
3030
getModelId,
3131
DEFAULT_CONSECUTIVE_MISTAKE_LIMIT,
32+
DEFAULT_USAGE_COLLECTION_TIMEOUT_MS,
3233
isBlockingAsk,
3334
} from "@roo-code/types"
3435
import { TelemetryService } from "@roo-code/telemetry"
@@ -1522,6 +1523,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
15221523
// of prices in tasks from history (it's worth removing a few months
15231524
// from now).
15241525
const updateApiReqMsg = (cancelReason?: ClineApiReqCancelReason, streamingFailedMessage?: string) => {
1526+
if (lastApiReqIndex < 0 || !this.clineMessages[lastApiReqIndex]) {
1527+
return
1528+
}
1529+
15251530
const existingData = JSON.parse(this.clineMessages[lastApiReqIndex].text || "{}")
15261531
this.clineMessages[lastApiReqIndex].text = JSON.stringify({
15271532
...existingData,
@@ -1612,7 +1617,11 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
16121617
this.isStreaming = true
16131618

16141619
try {
1615-
for await (const chunk of stream) {
1620+
const iterator = stream[Symbol.asyncIterator]()
1621+
let item = await iterator.next()
1622+
while (!item.done) {
1623+
const chunk = item.value
1624+
item = await iterator.next()
16161625
if (!chunk) {
16171626
// Sometimes chunk is undefined, no idea that can cause
16181627
// it, but this workaround seems to fix it.
@@ -1680,16 +1689,165 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
16801689
break
16811690
}
16821691

1683-
// PREV: We need to let the request finish for openrouter to
1684-
// get generation details.
1685-
// UPDATE: It's better UX to interrupt the request at the
1686-
// cost of the API cost not being retrieved.
16871692
if (this.didAlreadyUseTool) {
16881693
assistantMessage +=
16891694
"\n\n[Response interrupted by a tool use result. Only one tool may be used at a time and should be placed at the end of the message.]"
16901695
break
16911696
}
16921697
}
1698+
1699+
// Create a copy of current token values to avoid race conditions
1700+
const currentTokens = {
1701+
input: inputTokens,
1702+
output: outputTokens,
1703+
cacheWrite: cacheWriteTokens,
1704+
cacheRead: cacheReadTokens,
1705+
total: totalCost,
1706+
}
1707+
1708+
const drainStreamInBackgroundToFindAllUsage = async (apiReqIndex: number) => {
1709+
const timeoutMs = DEFAULT_USAGE_COLLECTION_TIMEOUT_MS
1710+
const startTime = Date.now()
1711+
const modelId = getModelId(this.apiConfiguration)
1712+
1713+
// Local variables to accumulate usage data without affecting the main flow
1714+
let bgInputTokens = currentTokens.input
1715+
let bgOutputTokens = currentTokens.output
1716+
let bgCacheWriteTokens = currentTokens.cacheWrite
1717+
let bgCacheReadTokens = currentTokens.cacheRead
1718+
let bgTotalCost = currentTokens.total
1719+
1720+
// Helper function to capture telemetry and update messages
1721+
const captureUsageData = async (
1722+
tokens: {
1723+
input: number
1724+
output: number
1725+
cacheWrite: number
1726+
cacheRead: number
1727+
total?: number
1728+
},
1729+
messageIndex: number = apiReqIndex,
1730+
) => {
1731+
if (tokens.input > 0 || tokens.output > 0 || tokens.cacheWrite > 0 || tokens.cacheRead > 0) {
1732+
// Update the shared variables atomically
1733+
inputTokens = tokens.input
1734+
outputTokens = tokens.output
1735+
cacheWriteTokens = tokens.cacheWrite
1736+
cacheReadTokens = tokens.cacheRead
1737+
totalCost = tokens.total
1738+
1739+
// Update the API request message with the latest usage data
1740+
updateApiReqMsg()
1741+
await this.saveClineMessages()
1742+
1743+
// Update the specific message in the webview
1744+
const apiReqMessage = this.clineMessages[messageIndex]
1745+
if (apiReqMessage) {
1746+
await this.updateClineMessage(apiReqMessage)
1747+
}
1748+
1749+
// Capture telemetry
1750+
TelemetryService.instance.captureLlmCompletion(this.taskId, {
1751+
inputTokens: tokens.input,
1752+
outputTokens: tokens.output,
1753+
cacheWriteTokens: tokens.cacheWrite,
1754+
cacheReadTokens: tokens.cacheRead,
1755+
cost:
1756+
tokens.total ??
1757+
calculateApiCostAnthropic(
1758+
this.api.getModel().info,
1759+
tokens.input,
1760+
tokens.output,
1761+
tokens.cacheWrite,
1762+
tokens.cacheRead,
1763+
),
1764+
})
1765+
}
1766+
}
1767+
1768+
try {
1769+
// Continue processing the original stream from where the main loop left off
1770+
let usageFound = false
1771+
let chunkCount = 0
1772+
1773+
// Use the same iterator that the main loop was using
1774+
while (!item.done) {
1775+
// Check for timeout
1776+
if (Date.now() - startTime > timeoutMs) {
1777+
console.warn(
1778+
`[Background Usage Collection] Timed out after ${timeoutMs}ms for model: ${modelId}, processed ${chunkCount} chunks`,
1779+
)
1780+
// Clean up the iterator before breaking
1781+
if (iterator.return) {
1782+
await iterator.return(undefined)
1783+
}
1784+
break
1785+
}
1786+
1787+
const chunk = item.value
1788+
item = await iterator.next()
1789+
chunkCount++
1790+
1791+
if (chunk && chunk.type === "usage") {
1792+
usageFound = true
1793+
bgInputTokens += chunk.inputTokens
1794+
bgOutputTokens += chunk.outputTokens
1795+
bgCacheWriteTokens += chunk.cacheWriteTokens ?? 0
1796+
bgCacheReadTokens += chunk.cacheReadTokens ?? 0
1797+
bgTotalCost = chunk.totalCost
1798+
}
1799+
}
1800+
1801+
if (
1802+
usageFound ||
1803+
bgInputTokens > 0 ||
1804+
bgOutputTokens > 0 ||
1805+
bgCacheWriteTokens > 0 ||
1806+
bgCacheReadTokens > 0
1807+
) {
1808+
// We have usage data either from a usage chunk or accumulated tokens
1809+
await captureUsageData(
1810+
{
1811+
input: bgInputTokens,
1812+
output: bgOutputTokens,
1813+
cacheWrite: bgCacheWriteTokens,
1814+
cacheRead: bgCacheReadTokens,
1815+
total: bgTotalCost,
1816+
},
1817+
lastApiReqIndex,
1818+
)
1819+
} else {
1820+
console.warn(
1821+
`[Background Usage Collection] Suspicious: request ${apiReqIndex} is complete, but no usage info was found. Model: ${modelId}`,
1822+
)
1823+
}
1824+
} catch (error) {
1825+
console.error("Error draining stream for usage data:", error)
1826+
// Still try to capture whatever usage data we have collected so far
1827+
if (
1828+
bgInputTokens > 0 ||
1829+
bgOutputTokens > 0 ||
1830+
bgCacheWriteTokens > 0 ||
1831+
bgCacheReadTokens > 0
1832+
) {
1833+
await captureUsageData(
1834+
{
1835+
input: bgInputTokens,
1836+
output: bgOutputTokens,
1837+
cacheWrite: bgCacheWriteTokens,
1838+
cacheRead: bgCacheReadTokens,
1839+
total: bgTotalCost,
1840+
},
1841+
lastApiReqIndex,
1842+
)
1843+
}
1844+
}
1845+
}
1846+
1847+
// Start the background task and handle any errors
1848+
drainStreamInBackgroundToFindAllUsage(lastApiReqIndex).catch((error) => {
1849+
console.error("Background usage collection failed:", error)
1850+
})
16931851
} catch (error) {
16941852
// Abandoned happens when extension is no longer waiting for the
16951853
// Cline instance to finish aborting (error is thrown here when
@@ -1723,24 +1881,6 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
17231881
this.isStreaming = false
17241882
}
17251883

1726-
if (inputTokens > 0 || outputTokens > 0 || cacheWriteTokens > 0 || cacheReadTokens > 0) {
1727-
TelemetryService.instance.captureLlmCompletion(this.taskId, {
1728-
inputTokens,
1729-
outputTokens,
1730-
cacheWriteTokens,
1731-
cacheReadTokens,
1732-
cost:
1733-
totalCost ??
1734-
calculateApiCostAnthropic(
1735-
this.api.getModel().info,
1736-
inputTokens,
1737-
outputTokens,
1738-
cacheWriteTokens,
1739-
cacheReadTokens,
1740-
),
1741-
})
1742-
}
1743-
17441884
// Need to call here in case the stream was aborted.
17451885
if (this.abort || this.abandoned) {
17461886
throw new Error(`[RooCode#recursivelyMakeRooRequests] task ${this.taskId}.${this.instanceId} aborted`)
@@ -1777,9 +1917,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
17771917
presentAssistantMessage(this)
17781918
}
17791919

1920+
// Note: updateApiReqMsg() is now called from within drainStreamInBackgroundToFindAllUsage
1921+
// to ensure usage data is captured even when the stream is interrupted. The background task
1922+
// uses local variables to accumulate usage data before atomically updating the shared state.
17801923
await this.persistGpt5Metadata(reasoningMessage)
1781-
1782-
updateApiReqMsg()
17831924
await this.saveClineMessages()
17841925
await this.providerRef.deref()?.postStateToWebview()
17851926

0 commit comments

Comments
 (0)