Skip to content

Commit ec5793f

Browse files
fix(billing): drop transaction wrapper in recordUsage to relieve pool contention (#4494)
* fix(billing): drop transaction wrapper in recordUsage to relieve pool contention * fix(billing): always warn on userStats drift in recordUsage
1 parent 49713f8 commit ec5793f

1 file changed

Lines changed: 47 additions & 36 deletions

File tree

apps/sim/lib/billing/core/usage-log.ts

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,17 @@ export interface RecordUsageParams {
7171
}
7272

7373
/**
74-
* Records usage in a single atomic transaction.
74+
* Records usage by inserting into usage_log and incrementing userStats counters.
7575
*
76-
* Inserts all entries into usage_log and updates userStats counters
77-
* (totalCost, currentPeriodCost, lastActive) within one Postgres transaction.
78-
* The total cost added to userStats is derived from summing entry costs,
79-
* ensuring usage_log and currentPeriodCost can never drift apart.
76+
* The two writes are intentionally not wrapped in a transaction: under high
77+
* concurrency for the same userId, holding BEGIN/COMMIT across the user_stats
78+
* row-lock wait pins pgbouncer connections and exhausts the pool.
8079
*
81-
* If billing is disabled, total cost is zero, or no entries have positive cost,
82-
* this function returns early without writing anything.
80+
* usage_log is the source of truth and the INSERT propagates errors to the
81+
* caller. The userStats UPDATE is best-effort: failures (and missing-row
82+
* cases) are logged as warnings and swallowed. Counter drift is acceptable
83+
* here — the long-term plan is to derive counters from usage_log directly.
84+
* Any drift warning in logs is a signal that needs investigation.
8385
*/
8486
export async function recordUsage(params: RecordUsageParams): Promise<void> {
8587
if (!isBillingEnabled) {
@@ -103,47 +105,56 @@ export async function recordUsage(params: RecordUsageParams): Promise<void> {
103105
? Object.fromEntries(Object.entries(additionalStats).filter(([k]) => !RESERVED_KEYS.has(k)))
104106
: undefined
105107

106-
await db.transaction(async (tx) => {
107-
if (validEntries.length > 0) {
108-
await tx.insert(usageLog).values(
109-
validEntries.map((entry) => ({
110-
id: generateId(),
111-
userId,
112-
category: entry.category,
113-
source: entry.source,
114-
description: entry.description,
115-
metadata: entry.metadata ?? null,
116-
cost: entry.cost.toString(),
117-
workspaceId: workspaceId ?? null,
118-
workflowId: workflowId ?? null,
119-
executionId: executionId ?? null,
120-
}))
121-
)
122-
}
108+
if (validEntries.length > 0) {
109+
await db.insert(usageLog).values(
110+
validEntries.map((entry) => ({
111+
id: generateId(),
112+
userId,
113+
category: entry.category,
114+
source: entry.source,
115+
description: entry.description,
116+
metadata: entry.metadata ?? null,
117+
cost: entry.cost.toString(),
118+
workspaceId: workspaceId ?? null,
119+
workflowId: workflowId ?? null,
120+
executionId: executionId ?? null,
121+
}))
122+
)
123+
}
123124

124-
const updateFields: Record<string, SQL | Date> = {
125-
lastActive: new Date(),
126-
...(totalCost > 0 && {
127-
totalCost: sql`total_cost + ${totalCost}`,
128-
currentPeriodCost: sql`current_period_cost + ${totalCost}`,
129-
}),
130-
...safeStats,
131-
}
125+
const updateFields: Record<string, SQL | Date> = {
126+
lastActive: new Date(),
127+
...(totalCost > 0 && {
128+
totalCost: sql`total_cost + ${totalCost}`,
129+
currentPeriodCost: sql`current_period_cost + ${totalCost}`,
130+
}),
131+
...safeStats,
132+
}
132133

133-
const result = await tx
134+
try {
135+
const result = await db
134136
.update(userStats)
135137
.set(updateFields)
136138
.where(eq(userStats.userId, userId))
137139
.returning({ userId: userStats.userId })
138140

139141
if (result.length === 0) {
140-
logger.warn('recordUsage: userStats row not found, transaction will roll back', {
142+
logger.warn('recordUsage: userStats row not found; counter increment dropped', {
141143
userId,
142144
totalCost,
145+
hadEntries: validEntries.length > 0,
146+
additionalStatsKeys: safeStats ? Object.keys(safeStats) : [],
143147
})
144-
throw new Error(`userStats row not found for userId: ${userId}`)
145148
}
146-
})
149+
} catch (error) {
150+
logger.warn('recordUsage: userStats update failed; counter increment dropped', {
151+
error: toError(error).message,
152+
userId,
153+
totalCost,
154+
hadEntries: validEntries.length > 0,
155+
additionalStatsKeys: safeStats ? Object.keys(safeStats) : [],
156+
})
157+
}
147158

148159
logger.debug('Recorded usage', {
149160
userId,

0 commit comments

Comments
 (0)