diff --git a/src/browser/components/ChatMetaSidebar.tsx b/src/browser/components/ChatMetaSidebar.tsx index 64555cb98..7b539b1cd 100644 --- a/src/browser/components/ChatMetaSidebar.tsx +++ b/src/browser/components/ChatMetaSidebar.tsx @@ -19,7 +19,7 @@ const ChatMetaSidebarComponent: React.FC = ({ workspaceId, const use1M = options.anthropic?.use1MContext ?? false; const chatAreaSize = useResizeObserver(chatAreaRef); - const lastUsage = usage?.usageHistory[usage.usageHistory.length - 1]; + const lastUsage = usage?.liveUsage ?? usage?.usageHistory[usage.usageHistory.length - 1]; // Memoize vertical meter data calculation to prevent unnecessary re-renders const verticalMeterData = React.useMemo(() => { diff --git a/src/browser/components/RightSidebar.tsx b/src/browser/components/RightSidebar.tsx index fb032c785..2308c9d26 100644 --- a/src/browser/components/RightSidebar.tsx +++ b/src/browser/components/RightSidebar.tsx @@ -134,7 +134,7 @@ const RightSidebarComponent: React.FC = ({ const costsPanelId = `${baseId}-panel-costs`; const reviewPanelId = `${baseId}-panel-review`; - const lastUsage = usage?.usageHistory[usage.usageHistory.length - 1]; + const lastUsage = usage?.liveUsage ?? usage?.usageHistory[usage.usageHistory.length - 1]; // Memoize vertical meter data calculation to prevent unnecessary re-renders const verticalMeterData = React.useMemo(() => { diff --git a/src/browser/components/RightSidebar/CostsTab.tsx b/src/browser/components/RightSidebar/CostsTab.tsx index f5053c8d5..a48b5786b 100644 --- a/src/browser/components/RightSidebar/CostsTab.tsx +++ b/src/browser/components/RightSidebar/CostsTab.tsx @@ -63,8 +63,15 @@ const CostsTabComponent: React.FC = ({ workspaceId }) => { const { options } = useProviderOptions(); const use1M = options.anthropic?.use1MContext ?? false; - // Check if we have any data to display - const hasUsageData = usage && usage.usageHistory.length > 0; + // Session usage for cost + const sessionUsage = React.useMemo(() => { + const historicalSum = sumUsageHistory(usage.usageHistory); + if (!usage.liveUsage) return historicalSum; + if (!historicalSum) return usage.liveUsage; + return sumUsageHistory([historicalSum, usage.liveUsage]); + }, [usage.usageHistory, usage.liveUsage]); + + const hasUsageData = usage && (usage.usageHistory.length > 0 || usage.liveUsage !== undefined); const hasConsumerData = consumers && (consumers.totalTokens > 0 || consumers.isCalculating); const hasAnyData = hasUsageData || hasConsumerData; @@ -80,16 +87,11 @@ const CostsTabComponent: React.FC = ({ workspaceId }) => { ); } - // Context Usage always shows Last Request data - const lastRequestUsage = hasUsageData - ? usage.usageHistory[usage.usageHistory.length - 1] - : undefined; + // Last Request (for Cost section): always the last completed request + const lastRequestUsage = usage.usageHistory[usage.usageHistory.length - 1]; // Cost and Details table use viewMode - const displayUsage = - viewMode === "last-request" - ? usage.usageHistory[usage.usageHistory.length - 1] - : sumUsageHistory(usage.usageHistory); + const displayUsage = viewMode === "last-request" ? lastRequestUsage : sessionUsage; return (
@@ -97,11 +99,10 @@ const CostsTabComponent: React.FC = ({ workspaceId }) => {
{(() => { - // Context Usage always uses last request - const contextUsage = lastRequestUsage; - - // Get model from last request (for context window display) - const model = lastRequestUsage?.model ?? "unknown"; + // Context usage: live when streaming, else last historical + const contextUsage = + usage.liveUsage ?? usage.usageHistory[usage.usageHistory.length - 1]; + const model = contextUsage?.model ?? "unknown"; // Get max tokens for the model from the model stats database const modelStats = getModelStats(model); diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 4576fb3fd..4b0be45f8 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -17,7 +17,7 @@ import { isRestoreToInput, } from "@/common/types/ipc"; import { MapStore } from "./MapStore"; -import { collectUsageHistory } from "@/common/utils/tokens/displayUsage"; +import { collectUsageHistory, createDisplayUsage } from "@/common/utils/tokens/displayUsage"; import { WorkspaceConsumerManager } from "./WorkspaceConsumerManager"; import type { ChatUsageDisplay } from "@/common/utils/tokens/usageAggregator"; import type { TokenConsumer } from "@/common/types/chatStats"; @@ -63,6 +63,8 @@ type DerivedState = Record; export interface WorkspaceUsageState { usageHistory: ChatUsageDisplay[]; totalTokens: number; + /** Live usage during streaming (inputTokens = current context window) */ + liveUsage?: ChatUsageDisplay; } /** @@ -178,6 +180,10 @@ export class WorkspaceStore { aggregator.handleReasoningEnd(data as never); this.states.bump(workspaceId); }, + "usage-delta": (workspaceId, aggregator, data) => { + aggregator.handleUsageDelta(data as never); + this.usageStore.bump(workspaceId); + }, "init-start": (workspaceId, aggregator, data) => { aggregator.handleMessage(data); this.states.bump(workspaceId); @@ -449,7 +455,12 @@ export class WorkspaceStore { 0 ); - return { usageHistory, totalTokens }; + // Include active stream usage if currently streaming (already converted) + const activeStreamId = aggregator.getActiveStreamMessageId(); + const rawUsage = activeStreamId ? aggregator.getActiveStreamUsage(activeStreamId) : undefined; + const liveUsage = rawUsage && model ? createDisplayUsage(rawUsage, model) : undefined; + + return { usageHistory, totalTokens, liveUsage }; }); } diff --git a/src/browser/utils/messages/StreamingMessageAggregator.test.ts b/src/browser/utils/messages/StreamingMessageAggregator.test.ts index 21f0af510..cac12d623 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.test.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.test.ts @@ -379,4 +379,96 @@ describe("StreamingMessageAggregator", () => { expect(aggregator.getCurrentTodos()).toHaveLength(0); }); }); + + describe("usage-delta handling", () => { + test("handleUsageDelta stores usage by messageId", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + }); + + expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({ + inputTokens: 1000, + outputTokens: 50, + totalTokens: 1050, + }); + }); + + test("clearTokenState removes usage", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + }); + + expect(aggregator.getActiveStreamUsage("msg-1")).toBeDefined(); + + aggregator.clearTokenState("msg-1"); + + expect(aggregator.getActiveStreamUsage("msg-1")).toBeUndefined(); + }); + + test("latest usage-delta replaces previous for same messageId", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + // First step usage + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + }); + + // Second step usage (larger context after tool result added) + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1500, outputTokens: 100, totalTokens: 1600 }, + }); + + // Should have latest values, not summed + expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({ + inputTokens: 1500, + outputTokens: 100, + totalTokens: 1600, + }); + }); + + test("tracks usage independently per messageId", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + }); + + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-2", + usage: { inputTokens: 2000, outputTokens: 100, totalTokens: 2100 }, + }); + + expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({ + inputTokens: 1000, + outputTokens: 50, + totalTokens: 1050, + }); + expect(aggregator.getActiveStreamUsage("msg-2")).toEqual({ + inputTokens: 2000, + outputTokens: 100, + totalTokens: 2100, + }); + }); + }); }); diff --git a/src/browser/utils/messages/StreamingMessageAggregator.ts b/src/browser/utils/messages/StreamingMessageAggregator.ts index 10bccd05d..7e5a47269 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.ts @@ -9,6 +9,7 @@ import { createMuxMessage } from "@/common/types/message"; import type { StreamStartEvent, StreamDeltaEvent, + UsageDeltaEvent, StreamEndEvent, StreamAbortEvent, ToolCallStartEvent, @@ -17,6 +18,7 @@ import type { ReasoningDeltaEvent, ReasoningEndEvent, } from "@/common/types/stream"; +import type { LanguageModelV2Usage } from "@ai-sdk/provider"; import type { TodoItem, StatusSetToolResult } from "@/common/types/tools"; import type { WorkspaceChatMessage, StreamErrorMessage, DeleteMessage } from "@/common/types/ipc"; @@ -72,6 +74,10 @@ export class StreamingMessageAggregator { // Delta history for token counting and TPS calculation private deltaHistory = new Map(); + // Active stream step usage (updated on each stream-step event) + // Tracks cumulative usage for the currently streaming message + private activeStreamStepUsage = new Map(); + // Current TODO list (updated when todo_write succeeds, cleared on stream end) // Stream-scoped: automatically reset when stream completes // On reload: only reconstructed if reconnecting to active stream @@ -992,5 +998,20 @@ export class StreamingMessageAggregator { */ clearTokenState(messageId: string): void { this.deltaHistory.delete(messageId); + this.activeStreamStepUsage.delete(messageId); + } + + /** + * Handle usage-delta event: update cumulative usage for active stream + */ + handleUsageDelta(data: UsageDeltaEvent): void { + this.activeStreamStepUsage.set(data.messageId, data.usage); + } + + /** + * Get active stream usage for a message (if streaming) + */ + getActiveStreamUsage(messageId: string): LanguageModelV2Usage | undefined { + return this.activeStreamStepUsage.get(messageId); } } diff --git a/src/common/types/ipc.ts b/src/common/types/ipc.ts index a6ea39192..d25760081 100644 --- a/src/common/types/ipc.ts +++ b/src/common/types/ipc.ts @@ -21,6 +21,7 @@ import type { StreamDeltaEvent, StreamEndEvent, StreamAbortEvent, + UsageDeltaEvent, ToolCallStartEvent, ToolCallDeltaEvent, ToolCallEndEvent, @@ -103,6 +104,7 @@ export type WorkspaceChatMessage = | DeleteMessage | StreamStartEvent | StreamDeltaEvent + | UsageDeltaEvent | StreamEndEvent | StreamAbortEvent | ToolCallStartEvent @@ -149,6 +151,11 @@ export function isStreamAbort(msg: WorkspaceChatMessage): msg is StreamAbortEven return "type" in msg && msg.type === "stream-abort"; } +// Type guard for usage delta events +export function isUsageDelta(msg: WorkspaceChatMessage): msg is UsageDeltaEvent { + return "type" in msg && msg.type === "usage-delta"; +} + // Type guard for tool call start events export function isToolCallStart(msg: WorkspaceChatMessage): msg is ToolCallStartEvent { return "type" in msg && msg.type === "tool-call-start"; diff --git a/src/common/types/stream.ts b/src/common/types/stream.ts index 4639329a7..e667f7a7e 100644 --- a/src/common/types/stream.ts +++ b/src/common/types/stream.ts @@ -121,6 +121,17 @@ export interface ReasoningEndEvent { messageId: string; } +/** + * Emitted on each AI SDK finish-step event, providing incremental usage updates. + * Allows UI to update token display as steps complete (after each tool call or at stream end). + */ +export interface UsageDeltaEvent { + type: "usage-delta"; + workspaceId: string; + messageId: string; + usage: LanguageModelV2Usage; // This step's usage (inputTokens = full context) +} + export type AIServiceEvent = | StreamStartEvent | StreamDeltaEvent @@ -132,4 +143,5 @@ export type AIServiceEvent = | ToolCallEndEvent | ReasoningStartEvent | ReasoningDeltaEvent - | ReasoningEndEvent; + | ReasoningEndEvent + | UsageDeltaEvent; diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index a7b55b3ab..94697c46d 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -464,6 +464,7 @@ export class AgentSession { }); forward("reasoning-delta", (payload) => this.emitChatEvent(payload)); forward("reasoning-end", (payload) => this.emitChatEvent(payload)); + forward("usage-delta", (payload) => this.emitChatEvent(payload)); forward("stream-end", async (payload) => { const handled = await this.compactionHandler.handleCompletion(payload as StreamEndEvent); diff --git a/src/node/services/aiService.ts b/src/node/services/aiService.ts index 6cb748f91..f32037661 100644 --- a/src/node/services/aiService.ts +++ b/src/node/services/aiService.ts @@ -203,6 +203,7 @@ export class AIService extends EventEmitter { // Forward reasoning events this.streamManager.on("reasoning-delta", (data) => this.emit("reasoning-delta", data)); this.streamManager.on("reasoning-end", (data) => this.emit("reasoning-end", data)); + this.streamManager.on("usage-delta", (data) => this.emit("usage-delta", data)); } private async ensureSessionsDir(): Promise { diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 3f3f14960..3a10893cc 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -18,6 +18,7 @@ import { log } from "./log"; import type { StreamStartEvent, StreamEndEvent, + UsageDeltaEvent, ErrorEvent, ToolCallEndEvent, CompletedMessagePart, @@ -851,10 +852,30 @@ export class StreamManager extends EventEmitter { case "start": case "start-step": case "text-start": - case "finish": - case "finish-step": // These events can be logged or handled if needed break; + + case "finish-step": { + // Emit usage-delta event with usage from this step + const finishStepPart = part as { + type: "finish-step"; + usage: LanguageModelV2Usage; + }; + + const usageEvent: UsageDeltaEvent = { + type: "usage-delta", + workspaceId: workspaceId as string, + messageId: streamInfo.messageId, + usage: finishStepPart.usage, + }; + this.emit("usage-delta", usageEvent); + break; + } + + case "finish": + // No usage-delta here - totalUsage sums all steps, not current context. + // Last finish-step already has correct context window usage. + break; } } diff --git a/tests/ipcMain/sendMessage.basic.test.ts b/tests/ipcMain/sendMessage.basic.test.ts index 7e73c94ae..5a9fa585f 100644 --- a/tests/ipcMain/sendMessage.basic.test.ts +++ b/tests/ipcMain/sendMessage.basic.test.ts @@ -471,5 +471,53 @@ describeIntegration("IpcMain sendMessage integration tests", () => { ); }); +// Test usage-delta events during multi-step streams +describeIntegration("usage-delta events", () => { + configureTestRetries(3); + + // Only test with Anthropic - more reliable multi-step behavior + test.concurrent( + "should emit usage-delta events during multi-step tool call streams", + async () => { + await withSharedWorkspace("anthropic", async ({ env, workspaceId }) => { + // Ask the model to read a file - guaranteed to trigger tool use + const result = await sendMessageWithModel( + env.mockIpcRenderer, + workspaceId, + "Use the file_read tool to read README.md. Only read the first 5 lines.", + modelString("anthropic", KNOWN_MODELS.SONNET.providerModelId) + ); + + expect(result.success).toBe(true); + + // Collect events and wait for stream completion + const collector = createEventCollector(env.sentEvents, workspaceId); + await collector.waitForEvent("stream-end", 15000); + + // Verify usage-delta events were emitted + const allEvents = collector.getEvents(); + const usageDeltas = allEvents.filter( + (e) => "type" in e && e.type === "usage-delta" + ) as Array<{ type: "usage-delta"; usage: { inputTokens: number; outputTokens: number } }>; + + // Multi-step stream should emit at least one usage-delta (on finish-step) + expect(usageDeltas.length).toBeGreaterThan(0); + + // Each usage-delta should have valid usage data + for (const delta of usageDeltas) { + expect(delta.usage).toBeDefined(); + expect(delta.usage.inputTokens).toBeGreaterThan(0); + // outputTokens may be 0 for some steps, but should be defined + expect(typeof delta.usage.outputTokens).toBe("number"); + } + + // Verify stream completed successfully + assertStreamSuccess(collector); + }); + }, + 30000 + ); +}); + // Test image support across providers describe.each(PROVIDER_CONFIGS)("%s:%s image support", (provider, model) => {});