Skip to content

Commit 5946a08

Browse files
committed
fix(convex): reduce write conflicts across hot paths
- downloads:increment: remove unnecessary db.get that added skill doc to read set, causing conflicts with the stat processing cron - users:ensure: only patch when there are real field changes, skip unconditional updatedAt bump that forced a write on every call - comments: route stats through event sourcing (insertStatEvent) instead of synchronous read-modify-write on the skill doc - rateLimits: split into query-first check + conditional mutation so denied requests are conflict-free reads - skillStatEvents: reduce MAX_SKILLS_PER_RUN from 500 to 50 to shrink the write set and lower conflict probability with concurrent mutations Co-Authored-By: theonejvo <theonejvo@users.noreply.github.com>
1 parent b997f5e commit 5946a08

File tree

8 files changed

+100
-27
lines changed

8 files changed

+100
-27
lines changed

convex/comments.ts

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type { Doc } from './_generated/dataModel'
33
import { mutation, query } from './_generated/server'
44
import { assertModerator, requireUser } from './lib/access'
55
import { type PublicUser, toPublicUser } from './lib/public'
6+
import { insertStatEvent } from './skillStatEvents'
67

78
export const listBySkill = query({
89
args: { skillId: v.id('skills'), limit: v.optional(v.number()) },
@@ -43,10 +44,7 @@ export const add = mutation({
4344
deletedBy: undefined,
4445
})
4546

46-
await ctx.db.patch(skill._id, {
47-
stats: { ...skill.stats, comments: skill.stats.comments + 1 },
48-
updatedAt: Date.now(),
49-
})
47+
await insertStatEvent(ctx, { skillId: skill._id, kind: 'comment' })
5048
},
5149
})
5250

@@ -68,13 +66,7 @@ export const remove = mutation({
6866
deletedBy: user._id,
6967
})
7068

71-
const skill = await ctx.db.get(comment.skillId)
72-
if (skill) {
73-
await ctx.db.patch(skill._id, {
74-
stats: { ...skill.stats, comments: Math.max(0, skill.stats.comments - 1) },
75-
updatedAt: Date.now(),
76-
})
77-
}
69+
await insertStatEvent(ctx, { skillId: comment.skillId, kind: 'uncomment' })
7870

7971
await ctx.db.insert('auditLogs', {
8072
actorUserId: user._id,

convex/downloads.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,11 @@ export const downloadZip = httpAction(async (ctx, request) => {
9292
export const increment = mutation({
9393
args: { skillId: v.id('skills') },
9494
handler: async (ctx, args) => {
95-
const skill = await ctx.db.get(args.skillId)
96-
if (!skill) return
95+
// Skip db.get to avoid adding the skill doc to the read set.
96+
// The calling HTTP action already validated the skill exists,
97+
// and the stat processor handles deleted skills gracefully.
9798
await insertStatEvent(ctx, {
98-
skillId: skill._id,
99+
skillId: args.skillId,
99100
kind: 'download',
100101
})
101102
},

convex/httpApiV1.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -827,11 +827,30 @@ async function checkRateLimit(
827827
key: string,
828828
limit: number,
829829
): Promise<RateLimitResult> {
830-
return (await ctx.runMutation(internal.rateLimits.checkRateLimitInternal, {
830+
// Step 1: Read-only check — no write conflicts for denied requests
831+
const status = (await ctx.runQuery(internal.rateLimits.getRateLimitStatusInternal, {
831832
key,
832833
limit,
833834
windowMs: RATE_LIMIT_WINDOW_MS,
834835
})) as RateLimitResult
836+
837+
if (!status.allowed) {
838+
return status
839+
}
840+
841+
// Step 2: Consume a token (only when allowed, with double-check for races)
842+
const result = (await ctx.runMutation(internal.rateLimits.consumeRateLimitInternal, {
843+
key,
844+
limit,
845+
windowMs: RATE_LIMIT_WINDOW_MS,
846+
})) as { allowed: boolean; remaining: number }
847+
848+
return {
849+
allowed: result.allowed,
850+
remaining: result.remaining,
851+
limit: status.limit,
852+
resetAt: status.resetAt,
853+
}
835854
}
836855

837856
function pickMostRestrictive(primary: RateLimitResult, secondary: RateLimitResult | null) {

convex/lib/skillStats.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { toDayKey } from './leaderboards'
55
type SkillStatDeltas = {
66
downloads?: number
77
stars?: number
8+
comments?: number
89
installsCurrent?: number
910
installsAllTime?: number
1011
}
@@ -22,8 +23,10 @@ export function applySkillStatDeltas(skill: Doc<'skills'>, deltas: SkillStatDelt
2223
? skill.statsInstallsAllTime
2324
: (skill.stats.installsAllTime ?? 0)
2425

26+
const currentComments = skill.stats.comments
2527
const nextDownloads = Math.max(0, currentDownloads + (deltas.downloads ?? 0))
2628
const nextStars = Math.max(0, currentStars + (deltas.stars ?? 0))
29+
const nextComments = Math.max(0, currentComments + (deltas.comments ?? 0))
2730
const nextInstallsCurrent = Math.max(0, currentInstallsCurrent + (deltas.installsCurrent ?? 0))
2831
const nextInstallsAllTime = Math.max(0, currentInstallsAllTime + (deltas.installsAllTime ?? 0))
2932

@@ -36,6 +39,7 @@ export function applySkillStatDeltas(skill: Doc<'skills'>, deltas: SkillStatDelt
3639
...skill.stats,
3740
downloads: nextDownloads,
3841
stars: nextStars,
42+
comments: nextComments,
3943
installsCurrent: nextInstallsCurrent,
4044
installsAllTime: nextInstallsAllTime,
4145
},

convex/rateLimits.ts

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { v } from 'convex/values'
2-
import { internalMutation } from './_generated/server'
2+
import { internalMutation, internalQuery } from './_generated/server'
33

4-
export const checkRateLimitInternal = internalMutation({
4+
/**
5+
* Read-only rate limit check. Returns current status without writing anything.
6+
* This eliminates write conflicts for denied requests entirely.
7+
*/
8+
export const getRateLimitStatusInternal = internalQuery({
59
args: {
610
key: v.string(),
711
limit: v.number(),
@@ -20,6 +24,43 @@ export const checkRateLimitInternal = internalMutation({
2024
.withIndex('by_key_window', (q) => q.eq('key', args.key).eq('windowStart', windowStart))
2125
.unique()
2226

27+
const count = existing?.count ?? 0
28+
const allowed = count < args.limit
29+
return {
30+
allowed,
31+
remaining: Math.max(0, args.limit - count),
32+
limit: args.limit,
33+
resetAt,
34+
}
35+
},
36+
})
37+
38+
/**
39+
* Consume one rate limit token. Only call this after getRateLimitStatusInternal
40+
* returns allowed=true. Includes a double-check to handle races between the
41+
* query and this mutation.
42+
*/
43+
export const consumeRateLimitInternal = internalMutation({
44+
args: {
45+
key: v.string(),
46+
limit: v.number(),
47+
windowMs: v.number(),
48+
},
49+
handler: async (ctx, args) => {
50+
const now = Date.now()
51+
const windowStart = Math.floor(now / args.windowMs) * args.windowMs
52+
53+
const existing = await ctx.db
54+
.query('rateLimits')
55+
.withIndex('by_key_window', (q) => q.eq('key', args.key).eq('windowStart', windowStart))
56+
.unique()
57+
58+
// Double-check: another request may have consumed the last token
59+
// between our query and this mutation
60+
if (existing && existing.count >= args.limit) {
61+
return { allowed: false, remaining: 0 }
62+
}
63+
2364
if (!existing) {
2465
await ctx.db.insert('rateLimits', {
2566
key: args.key,
@@ -28,11 +69,7 @@ export const checkRateLimitInternal = internalMutation({
2869
limit: args.limit,
2970
updatedAt: now,
3071
})
31-
return { allowed: true, remaining: Math.max(0, args.limit - 1), limit: args.limit, resetAt }
32-
}
33-
34-
if (existing.count >= args.limit) {
35-
return { allowed: false, remaining: 0, limit: args.limit, resetAt }
72+
return { allowed: true, remaining: Math.max(0, args.limit - 1) }
3673
}
3774

3875
await ctx.db.patch(existing._id, {
@@ -43,8 +80,6 @@ export const checkRateLimitInternal = internalMutation({
4380
return {
4481
allowed: true,
4582
remaining: Math.max(0, args.limit - existing.count - 1),
46-
limit: args.limit,
47-
resetAt,
4883
}
4984
},
5085
})

convex/schema.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,8 @@ const skillStatEvents = defineTable({
294294
v.literal('download'),
295295
v.literal('star'),
296296
v.literal('unstar'),
297+
v.literal('comment'),
298+
v.literal('uncomment'),
297299
v.literal('install_new'),
298300
v.literal('install_reactivate'),
299301
v.literal('install_deactivate'),

convex/skillStatEvents.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ export type StatEventKind =
3535
| 'download'
3636
| 'star'
3737
| 'unstar'
38+
| 'comment'
39+
| 'uncomment'
3840
| 'install_new'
3941
| 'install_reactivate'
4042
| 'install_deactivate'
@@ -86,6 +88,7 @@ export async function insertStatEvent(
8688
type AggregatedDeltas = {
8789
downloads: number
8890
stars: number
91+
comments: number
8992
installsAllTime: number
9093
installsCurrent: number
9194
/** Original timestamps for each download event (for daily stats bucketing) */
@@ -117,6 +120,7 @@ function aggregateEvents(events: Doc<'skillStatEvents'>[]): AggregatedDeltas {
117120
const result: AggregatedDeltas = {
118121
downloads: 0,
119122
stars: 0,
123+
comments: 0,
120124
installsAllTime: 0,
121125
installsCurrent: 0,
122126
downloadEvents: [],
@@ -135,6 +139,12 @@ function aggregateEvents(events: Doc<'skillStatEvents'>[]): AggregatedDeltas {
135139
case 'unstar':
136140
result.stars -= 1
137141
break
142+
case 'comment':
143+
result.comments += 1
144+
break
145+
case 'uncomment':
146+
result.comments -= 1
147+
break
138148
case 'install_new':
139149
// New user installing for the first time: count toward both lifetime and current
140150
result.installsAllTime += 1
@@ -231,12 +241,14 @@ export const processSkillStatEventsInternal = internalMutation({
231241
if (
232242
deltas.downloads !== 0 ||
233243
deltas.stars !== 0 ||
244+
deltas.comments !== 0 ||
234245
deltas.installsAllTime !== 0 ||
235246
deltas.installsCurrent !== 0
236247
) {
237248
const patch = applySkillStatDeltas(skill, {
238249
downloads: deltas.downloads,
239250
stars: deltas.stars,
251+
comments: deltas.comments,
240252
installsAllTime: deltas.installsAllTime,
241253
installsCurrent: deltas.installsCurrent,
242254
})
@@ -285,7 +297,7 @@ export const processSkillStatEventsInternal = internalMutation({
285297

286298
const CURSOR_KEY = 'skill_stat_events'
287299
const EVENT_BATCH_SIZE = 500
288-
const MAX_SKILLS_PER_RUN = 500
300+
const MAX_SKILLS_PER_RUN = 50
289301

290302
/**
291303
* Fetch a batch of events after the given cursor (by _creationTime).
@@ -332,6 +344,7 @@ const skillDeltaValidator = v.object({
332344
skillId: v.id('skills'),
333345
downloads: v.number(),
334346
stars: v.number(),
347+
comments: v.number(),
335348
installsAllTime: v.number(),
336349
installsCurrent: v.number(),
337350
downloadEvents: v.array(v.number()),
@@ -438,6 +451,7 @@ export const processSkillStatEventsAction = internalAction({
438451
{
439452
downloads: number
440453
stars: number
454+
comments: number
441455
installsAllTime: number
442456
installsCurrent: number
443457
downloadEvents: number[]
@@ -471,6 +485,7 @@ export const processSkillStatEventsAction = internalAction({
471485
skillDelta = {
472486
downloads: 0,
473487
stars: 0,
488+
comments: 0,
474489
installsAllTime: 0,
475490
installsCurrent: 0,
476491
downloadEvents: [],
@@ -491,6 +506,12 @@ export const processSkillStatEventsAction = internalAction({
491506
case 'unstar':
492507
skillDelta.stars -= 1
493508
break
509+
case 'comment':
510+
skillDelta.comments += 1
511+
break
512+
case 'uncomment':
513+
skillDelta.comments -= 1
514+
break
494515
case 'install_new':
495516
skillDelta.installsAllTime += 1
496517
skillDelta.installsCurrent += 1

convex/users.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ export const ensure = mutation({
7676
args: {},
7777
handler: async (ctx) => {
7878
const { userId, user } = await requireUser(ctx)
79-
const now = Date.now()
8079
const updates: Record<string, unknown> = {}
8180

8281
const handle = user.handle || user.name || user.email?.split('@')[0]
@@ -86,9 +85,9 @@ export const ensure = mutation({
8685
updates.role = handle === ADMIN_HANDLE ? 'admin' : DEFAULT_ROLE
8786
}
8887
if (!user.createdAt) updates.createdAt = user._creationTime
89-
updates.updatedAt = now
9088

9189
if (Object.keys(updates).length > 0) {
90+
updates.updatedAt = Date.now()
9291
await ctx.db.patch(userId, updates)
9392
}
9493

0 commit comments

Comments
 (0)