Skip to content

Commit 1d94d73

Browse files
2 parents ae46570 + 152351f commit 1d94d73

File tree

3 files changed

+91
-39
lines changed

3 files changed

+91
-39
lines changed

src/actions/credits.action.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import { requireVerifiedEmail } from "@/utils/auth";
44
import {
55
getCreditTransactions,
6-
updateUserCredits,
6+
addUserCredits,
77
logTransaction,
88
getCreditPackage,
99
} from "@/utils/credits";
@@ -12,6 +12,7 @@ import { getStripe } from "@/lib/stripe";
1212
import { MAX_TRANSACTIONS_PER_PAGE, CREDITS_EXPIRATION_YEARS } from "@/constants";
1313
import ms from "ms";
1414
import { withRateLimit, RATE_LIMITS } from "@/utils/with-rate-limit";
15+
import { updateAllSessionsOfUser } from "@/utils/kv-session";
1516

1617
// Action types
1718
type GetTransactionsInput = {
@@ -130,7 +131,7 @@ export async function confirmPayment({ packageId, paymentIntentId }: PurchaseCre
130131
}
131132

132133
// Add credits and log transaction
133-
await updateUserCredits(session.user.id, creditPackage.credits);
134+
await addUserCredits(session.user.id, creditPackage.credits);
134135
await logTransaction({
135136
userId: session.user.id,
136137
amount: creditPackage.credits,
@@ -140,6 +141,9 @@ export async function confirmPayment({ packageId, paymentIntentId }: PurchaseCre
140141
paymentIntentId: paymentIntent?.id
141142
});
142143

144+
// Update all KV sessions to reflect the new credit balance
145+
await updateAllSessionsOfUser(session.user.id);
146+
143147
return { success: true };
144148
} catch (error) {
145149
console.error("Purchase error:", error);

src/utils/credits.ts

Lines changed: 84 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,25 @@ async function processExpiredCredits(userId: string, currentTime: Date) {
4747
// Process each expired transaction
4848
for (const transaction of expiredTransactions) {
4949
try {
50-
// First, mark the transaction as processed to prevent double processing
51-
await db
50+
// Atomically mark the transaction as processed ONLY if it hasn't been processed yet
51+
// This prevents race conditions where multiple requests try to process the same transaction
52+
const updateResult = await db
5253
.update(creditTransactionTable)
5354
.set({
5455
expirationDateProcessedAt: currentTime,
5556
remainingAmount: 0, // All remaining credits are expired
5657
})
57-
.where(eq(creditTransactionTable.id, transaction.id));
58+
.where(and(
59+
eq(creditTransactionTable.id, transaction.id),
60+
isNull(creditTransactionTable.expirationDateProcessedAt),
61+
eq(creditTransactionTable.remainingAmount, transaction.remainingAmount)
62+
))
63+
.returning({ id: creditTransactionTable.id });
64+
65+
// If no rows were updated, another request already processed this transaction
66+
if (!updateResult || updateResult.length === 0) {
67+
continue;
68+
}
5869

5970
// Then deduct the expired credits from user's balance
6071
await db
@@ -70,27 +81,14 @@ async function processExpiredCredits(userId: string, currentTime: Date) {
7081
}
7182
}
7283

73-
export async function updateUserCredits(userId: string, creditsToAdd: number) {
84+
export async function addUserCredits(userId: string, creditsToAdd: number) {
7485
const db = getDB();
7586
await db
7687
.update(userTable)
7788
.set({
7889
currentCredits: sql`${userTable.currentCredits} + ${creditsToAdd}`,
7990
})
8091
.where(eq(userTable.id, userId));
81-
82-
// Update all KV sessions to reflect the new credit balance
83-
await updateAllSessionsOfUser(userId);
84-
}
85-
86-
async function updateLastRefreshDate(userId: string, date: Date) {
87-
const db = getDB();
88-
await db
89-
.update(userTable)
90-
.set({
91-
lastCreditRefreshAt: date,
92-
})
93-
.where(eq(userTable.id, userId));
9492
}
9593

9694
export async function logTransaction({
@@ -140,14 +138,45 @@ export async function addFreeMonthlyCreditsIfNeeded(session: KVSession): Promise
140138
return user?.currentCredits ?? 0;
141139
}
142140

141+
// Calculate one month ago from current time (using calendar month logic)
142+
const oneMonthAgo = new Date(currentTime);
143+
oneMonthAgo.setMonth(oneMonthAgo.getMonth() - 1);
144+
145+
// Update last refresh date FIRST to act as a distributed lock
146+
// This prevents race conditions where multiple requests try to add credits simultaneously
147+
const updateResult = await db
148+
.update(userTable)
149+
.set({
150+
lastCreditRefreshAt: currentTime,
151+
})
152+
.where(and(
153+
eq(userTable.id, session.userId),
154+
or(
155+
isNull(userTable.lastCreditRefreshAt),
156+
lt(userTable.lastCreditRefreshAt, oneMonthAgo) // More than 1 calendar month ago
157+
)
158+
))
159+
.returning({ lastCreditRefreshAt: userTable.lastCreditRefreshAt });
160+
161+
// If no rows were updated, another request already processed the refresh
162+
if (!updateResult || updateResult.length === 0) {
163+
const currentUser = await db.query.userTable.findFirst({
164+
where: eq(userTable.id, session.userId),
165+
columns: {
166+
currentCredits: true,
167+
},
168+
});
169+
return currentUser?.currentCredits ?? 0;
170+
}
171+
143172
// Process any expired credits first
144173
await processExpiredCredits(session.userId, currentTime);
145174

146175
// Add free monthly credits with 1 month expiration
147176
const expirationDate = new Date(currentTime);
148177
expirationDate.setMonth(expirationDate.getMonth() + 1);
149178

150-
await updateUserCredits(session.userId, FREE_MONTHLY_CREDITS);
179+
await addUserCredits(session.userId, FREE_MONTHLY_CREDITS);
151180
await logTransaction({
152181
userId: session.userId,
153182
amount: FREE_MONTHLY_CREDITS,
@@ -156,8 +185,8 @@ export async function addFreeMonthlyCreditsIfNeeded(session: KVSession): Promise
156185
expirationDate
157186
});
158187

159-
// Update last refresh date
160-
await updateLastRefreshDate(session.userId, currentTime);
188+
// Update all KV sessions to reflect the new credit balance and lastCreditRefreshAt
189+
await updateAllSessionsOfUser(session.userId);
161190

162191
// Get the updated credit balance from the database
163192
const updatedUser = await db.query.userTable.findFirst({
@@ -215,30 +244,57 @@ export async function consumeCredits({ userId, amount, description }: { userId:
215244
});
216245

217246
let remainingToDeduct = amount;
247+
let actuallyDeducted = 0;
218248

219249
// Deduct from each transaction until we've deducted the full amount
220250
for (const transaction of activeTransactionsWithBalance) {
221251
if (remainingToDeduct <= 0) break;
222252

223253
const deductFromThis = Math.min(transaction.remainingAmount, remainingToDeduct);
254+
const newRemainingAmount = transaction.remainingAmount - deductFromThis;
224255

225-
await db
256+
// Atomically update ONLY if the remainingAmount hasn't changed
257+
// This prevents race conditions where multiple requests try to deduct from the same transaction
258+
const updateResult = await db
226259
.update(creditTransactionTable)
227260
.set({
228-
remainingAmount: transaction.remainingAmount - deductFromThis,
261+
remainingAmount: newRemainingAmount,
229262
})
230-
.where(eq(creditTransactionTable.id, transaction.id));
263+
.where(and(
264+
eq(creditTransactionTable.id, transaction.id),
265+
eq(creditTransactionTable.remainingAmount, transaction.remainingAmount)
266+
))
267+
.returning({ remainingAmount: creditTransactionTable.remainingAmount });
268+
269+
// If the update succeeded, count the deduction
270+
if (updateResult && updateResult.length > 0) {
271+
actuallyDeducted += deductFromThis;
272+
remainingToDeduct -= deductFromThis;
273+
}
274+
// If update failed, another request modified this transaction, re-fetch and continue
275+
}
231276

232-
remainingToDeduct -= deductFromThis;
277+
// Verify we were able to deduct the full amount
278+
if (actuallyDeducted < amount) {
279+
throw new Error("Insufficient credits - concurrent modification detected");
233280
}
234281

235-
// Update total credits
236-
await db
282+
// Update total credits using SQL to ensure atomicity and prevent negative balance
283+
const userUpdateResult = await db
237284
.update(userTable)
238285
.set({
239286
currentCredits: sql`${userTable.currentCredits} - ${amount}`,
240287
})
241-
.where(eq(userTable.id, userId));
288+
.where(and(
289+
eq(userTable.id, userId),
290+
sql`${userTable.currentCredits} >= ${amount}` // Ensure we don't go negative
291+
))
292+
.returning({ currentCredits: userTable.currentCredits });
293+
294+
// If no rows were updated, we don't have enough credits (race condition)
295+
if (!userUpdateResult || userUpdateResult.length === 0) {
296+
throw new Error("Insufficient credits");
297+
}
242298

243299
// Log the usage transaction
244300
await db.insert(creditTransactionTable).values({
@@ -251,18 +307,10 @@ export async function consumeCredits({ userId, amount, description }: { userId:
251307
updatedAt: new Date(),
252308
});
253309

254-
// Get updated credit balance
255-
const updatedUser = await db.query.userTable.findFirst({
256-
where: eq(userTable.id, userId),
257-
columns: {
258-
currentCredits: true,
259-
},
260-
});
261-
262310
// Update all KV sessions to reflect the new credit balance
263311
await updateAllSessionsOfUser(userId);
264312

265-
return updatedUser?.currentCredits ?? 0;
313+
return userUpdateResult[0].currentCredits;
266314
}
267315

268316
export async function getCreditTransactions({

src/utils/kv-session.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export interface KVSession {
6363
* IF YOU MAKE ANY CHANGES TO THE KVSESSION TYPE ABOVE, YOU NEED TO INCREMENT THIS VERSION.
6464
* THIS IS HOW WE TRACK WHEN WE NEED TO UPDATE THE SESSIONS IN THE KV STORE.
6565
*/
66-
export const CURRENT_SESSION_VERSION = 2;
66+
export const CURRENT_SESSION_VERSION = 3;
6767

6868
export async function getKV() {
6969
const { env } = getCloudflareContext();

0 commit comments

Comments
 (0)