Skip to content

Commit 2b51466

Browse files
committed
fix: implement persistent multi-model cost tracking (#7755)
- Add CostLedger class with Write-Ahead Logging (WAL) for crash-safe persistence - Integrate cost tracking into Task streaming loop to capture costs after each API call - Preserve cost ledger instance when switching models in ClineProvider - Update UI components to display ledger data instead of recalculated metrics - Add comprehensive unit tests for CostLedger functionality This ensures accurate cumulative cost tracking across model switches, fixing the issue where costs were incorrectly recalculated based on the current model pricing instead of preserving actual historical costs.
1 parent 247da38 commit 2b51466

File tree

6 files changed

+909
-5
lines changed

6 files changed

+909
-5
lines changed

src/core/cost-ledger/CostLedger.ts

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
import * as fs from "fs/promises"
2+
import * as path from "path"
3+
import { v4 as uuidv4 } from "uuid"
4+
import { safeWriteJson } from "../../utils/safeWriteJson"
5+
6+
/**
7+
* Represents a single cost entry in the ledger
8+
*/
9+
export interface CostEntry {
10+
entry_id: string
11+
task_id: string
12+
origin_task_id?: string
13+
root_task_id?: string
14+
provider: string
15+
model_id: string
16+
feature: string
17+
tokens_in: number
18+
tokens_out: number
19+
cache_writes?: number
20+
cache_reads?: number
21+
cost: number
22+
timestamp: string
23+
}
24+
25+
/**
26+
* Model breakdown for cost reporting
27+
*/
28+
export interface ModelCostBreakdown {
29+
provider: string
30+
model_id: string
31+
total_cost: number
32+
total_tokens_in: number
33+
total_tokens_out: number
34+
total_cache_writes: number
35+
total_cache_reads: number
36+
entry_count: number
37+
}
38+
39+
/**
40+
* CostLedger manages persistent cost tracking across model switches
41+
* Uses Write-Ahead Logging (WAL) for crash safety
42+
*/
43+
export class CostLedger {
44+
private entries: CostEntry[] = []
45+
private walPath: string
46+
private snapshotPath: string
47+
private walFileHandle: fs.FileHandle | null = null
48+
private snapshotInterval = 100 // Snapshot every 100 entries
49+
private isInitialized = false
50+
51+
constructor(private storagePath: string) {
52+
this.walPath = path.join(storagePath, "cost-ledger-wal.jsonl")
53+
this.snapshotPath = path.join(storagePath, "cost-ledger.json")
54+
}
55+
56+
/**
57+
* Initialize the ledger by loading existing data
58+
*/
59+
async initialize(): Promise<void> {
60+
if (this.isInitialized) {
61+
return
62+
}
63+
64+
try {
65+
// Ensure storage directory exists
66+
await fs.mkdir(this.storagePath, { recursive: true })
67+
68+
// Load snapshot if exists
69+
await this.loadSnapshot()
70+
71+
// Replay WAL entries after snapshot
72+
await this.replayWAL()
73+
74+
// Open WAL file for appending
75+
try {
76+
this.walFileHandle = await fs.open(this.walPath, "a")
77+
} catch (error: any) {
78+
// If file doesn't exist, create it
79+
if (error.code === "ENOENT") {
80+
await fs.writeFile(this.walPath, "")
81+
this.walFileHandle = await fs.open(this.walPath, "a")
82+
} else {
83+
throw error
84+
}
85+
}
86+
87+
this.isInitialized = true
88+
} catch (error) {
89+
console.error("Failed to initialize CostLedger:", error)
90+
throw error
91+
}
92+
}
93+
94+
/**
95+
* Append a new cost entry to the ledger
96+
*/
97+
async appendEntry(params: {
98+
task_id: string
99+
origin_task_id?: string
100+
root_task_id?: string
101+
provider: string
102+
model_id: string
103+
feature: string
104+
tokens_in: number
105+
tokens_out: number
106+
cache_writes?: number
107+
cache_reads?: number
108+
cost: number
109+
}): Promise<void> {
110+
if (!this.isInitialized) {
111+
await this.initialize()
112+
}
113+
114+
const entry: CostEntry = {
115+
entry_id: uuidv4(),
116+
timestamp: new Date().toISOString(),
117+
...params,
118+
}
119+
120+
// Append to WAL first (for durability)
121+
await this.appendToWAL(entry)
122+
123+
// Add to in-memory entries
124+
this.entries.push(entry)
125+
126+
// Check if we need to create a snapshot
127+
if (this.entries.length % this.snapshotInterval === 0) {
128+
await this.createSnapshot()
129+
}
130+
}
131+
132+
/**
133+
* Get cumulative total cost across all models
134+
*/
135+
getCumulativeTotal(): number {
136+
return this.entries.reduce((total, entry) => total + entry.cost, 0)
137+
}
138+
139+
/**
140+
* Get breakdown of costs by model
141+
*/
142+
getBreakdownByModel(): Record<
143+
string,
144+
{
145+
provider: string
146+
tokens_in: number
147+
tokens_out: number
148+
cache_writes: number
149+
cache_reads: number
150+
cost: number
151+
count: number
152+
}
153+
> {
154+
const breakdown: Record<string, any> = {}
155+
156+
for (const entry of this.entries) {
157+
const key = entry.model_id
158+
if (!breakdown[key]) {
159+
breakdown[key] = {
160+
provider: entry.provider,
161+
tokens_in: 0,
162+
tokens_out: 0,
163+
cache_writes: 0,
164+
cache_reads: 0,
165+
cost: 0,
166+
count: 0,
167+
}
168+
}
169+
170+
breakdown[key].tokens_in += entry.tokens_in
171+
breakdown[key].tokens_out += entry.tokens_out
172+
breakdown[key].cache_writes += entry.cache_writes || 0
173+
breakdown[key].cache_reads += entry.cache_reads || 0
174+
breakdown[key].cost += entry.cost
175+
breakdown[key].count += 1
176+
}
177+
178+
return breakdown
179+
}
180+
181+
/**
182+
* Get all entries for a specific task
183+
*/
184+
getEntriesForTask(taskId: string): CostEntry[] {
185+
return this.entries.filter(
186+
(entry) => entry.task_id === taskId || entry.origin_task_id === taskId || entry.root_task_id === taskId,
187+
)
188+
}
189+
190+
/**
191+
* Get total metrics (for UI display)
192+
*/
193+
getTotalMetrics(): {
194+
totalTokensIn: number
195+
totalTokensOut: number
196+
totalCacheWrites: number
197+
totalCacheReads: number
198+
totalCost: number
199+
} {
200+
const metrics = {
201+
totalTokensIn: 0,
202+
totalTokensOut: 0,
203+
totalCacheWrites: 0,
204+
totalCacheReads: 0,
205+
totalCost: 0,
206+
}
207+
208+
for (const entry of this.entries) {
209+
metrics.totalTokensIn += entry.tokens_in
210+
metrics.totalTokensOut += entry.tokens_out
211+
metrics.totalCacheWrites += entry.cache_writes || 0
212+
metrics.totalCacheReads += entry.cache_reads || 0
213+
metrics.totalCost += entry.cost
214+
}
215+
216+
return metrics
217+
}
218+
219+
/**
220+
* Clear the ledger (for new tasks)
221+
*/
222+
async clear(): Promise<void> {
223+
this.entries = []
224+
225+
// Close and truncate WAL
226+
if (this.walFileHandle) {
227+
await this.walFileHandle.close()
228+
}
229+
await fs.writeFile(this.walPath, "")
230+
this.walFileHandle = await fs.open(this.walPath, "a")
231+
232+
// Remove snapshot
233+
try {
234+
await fs.unlink(this.snapshotPath)
235+
} catch (error) {
236+
// Ignore if file doesn't exist
237+
}
238+
}
239+
240+
/**
241+
* Close the ledger (cleanup)
242+
*/
243+
async close(): Promise<void> {
244+
// Save a final snapshot before closing
245+
if (this.entries.length > 0) {
246+
await this.createSnapshot()
247+
}
248+
249+
if (this.walFileHandle) {
250+
await this.walFileHandle.close()
251+
this.walFileHandle = null
252+
}
253+
this.isInitialized = false
254+
}
255+
256+
/**
257+
* Append entry to WAL file
258+
*/
259+
private async appendToWAL(entry: CostEntry): Promise<void> {
260+
if (!this.walFileHandle) {
261+
throw new Error("WAL file handle not initialized")
262+
}
263+
264+
const line = JSON.stringify(entry) + "\n"
265+
await this.walFileHandle.write(line)
266+
}
267+
268+
/**
269+
* Load snapshot from disk
270+
*/
271+
private async loadSnapshot(): Promise<void> {
272+
try {
273+
const data = await fs.readFile(this.snapshotPath, "utf-8")
274+
const snapshot = JSON.parse(data)
275+
if (Array.isArray(snapshot)) {
276+
this.entries = snapshot
277+
}
278+
} catch (error) {
279+
// Snapshot doesn't exist or is corrupted, start fresh
280+
this.entries = []
281+
}
282+
}
283+
284+
/**
285+
* Replay WAL entries after snapshot
286+
*/
287+
private async replayWAL(): Promise<void> {
288+
try {
289+
const walContent = await fs.readFile(this.walPath, "utf-8")
290+
const lines = walContent.split("\n").filter((line) => line.trim())
291+
292+
// Get the last entry ID from snapshot
293+
const lastSnapshotEntryId = this.entries.length > 0 ? this.entries[this.entries.length - 1].entry_id : null
294+
295+
let foundSnapshot = !lastSnapshotEntryId
296+
for (const line of lines) {
297+
try {
298+
const entry = JSON.parse(line) as CostEntry
299+
300+
// Skip entries until we find the one after snapshot
301+
if (!foundSnapshot) {
302+
if (entry.entry_id === lastSnapshotEntryId) {
303+
foundSnapshot = true
304+
}
305+
continue
306+
}
307+
308+
// Add entries after snapshot
309+
if (!this.entries.find((e) => e.entry_id === entry.entry_id)) {
310+
this.entries.push(entry)
311+
}
312+
} catch (error) {
313+
// Skip malformed lines
314+
console.warn("Skipping malformed WAL entry:", line)
315+
}
316+
}
317+
} catch (error) {
318+
// WAL doesn't exist, that's fine
319+
}
320+
}
321+
322+
/**
323+
* Create a snapshot of current entries
324+
*/
325+
private async createSnapshot(): Promise<void> {
326+
await safeWriteJson(this.snapshotPath, this.entries)
327+
328+
// Truncate WAL after successful snapshot
329+
if (this.walFileHandle) {
330+
await this.walFileHandle.close()
331+
}
332+
await fs.writeFile(this.walPath, "")
333+
this.walFileHandle = await fs.open(this.walPath, "a")
334+
}
335+
}

0 commit comments

Comments
 (0)