Skip to content

Commit 797ed36

Browse files
committed
address claude code PR feedback
1 parent a4c6e83 commit 797ed36

File tree

9 files changed

+464
-16
lines changed

9 files changed

+464
-16
lines changed

packages/redis-worker/src/fair-queue/concurrency.ts

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,34 @@ export class ConcurrencyManager {
119119
* Returns the minimum available capacity across all groups.
120120
*/
121121
async getAvailableCapacity(queue: QueueDescriptor): Promise<number> {
122-
let minCapacity = Infinity;
122+
if (this.groups.length === 0) {
123+
return 0;
124+
}
123125

124-
for (const group of this.groups) {
125-
const groupId = group.extractGroupId(queue);
126-
const key = this.keys.concurrencyKey(group.name, groupId);
127-
const current = await this.redis.scard(key);
128-
const limit = (await group.getLimit(groupId)) || group.defaultLimit;
129-
const available = Math.max(0, limit - current);
126+
// Build group data for parallel fetching
127+
const groupData = this.groups.map((group) => ({
128+
group,
129+
groupId: group.extractGroupId(queue),
130+
}));
131+
132+
// Fetch all current counts and limits in parallel
133+
const [currents, limits] = await Promise.all([
134+
Promise.all(
135+
groupData.map(({ group, groupId }) =>
136+
this.redis.scard(this.keys.concurrencyKey(group.name, groupId))
137+
)
138+
),
139+
Promise.all(
140+
groupData.map(({ group, groupId }) =>
141+
group.getLimit(groupId).then((limit) => limit || group.defaultLimit)
142+
)
143+
),
144+
]);
130145

146+
// Calculate minimum available capacity across all groups
147+
let minCapacity = Infinity;
148+
for (let i = 0; i < groupData.length; i++) {
149+
const available = Math.max(0, limits[i]! - currents[i]!);
131150
minCapacity = Math.min(minCapacity, available);
132151
}
133152

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
8484
private workerQueueEnabled: boolean;
8585
private workerQueueBlockingTimeoutSeconds: number;
8686
private workerQueueResolver?: (message: StoredMessage<z.infer<TPayloadSchema>>) => string;
87+
private batchClaimSize: number;
8788

8889
// Cooloff state
8990
private cooloffEnabled: boolean;
@@ -139,6 +140,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
139140
this.workerQueueBlockingTimeoutSeconds = options.workerQueue?.blockingTimeoutSeconds ?? 10;
140141
this.workerQueueResolver = options.workerQueue?.resolveWorkerQueue;
141142

143+
// Batch claiming
144+
this.batchClaimSize = options.batchClaimSize ?? 10;
145+
142146
// Cooloff
143147
this.cooloffEnabled = options.cooloff?.enabled ?? true;
144148
this.cooloffThreshold = options.cooloff?.threshold ?? 10;
@@ -892,8 +896,20 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
892896
messagesProcessed += processedFromQueue;
893897
this.batchedSpanManager.incrementStat(loopId, "messages_claimed", processedFromQueue);
894898

895-
if (this.scheduler.recordProcessed) {
896-
// Record each processed message for DRR deficit tracking
899+
// Record processed messages for DRR deficit tracking
900+
// Use batch variant if available for efficiency, otherwise fall back to single calls
901+
if (this.scheduler.recordProcessedBatch) {
902+
await this.telemetry.trace(
903+
"recordProcessedBatch",
904+
async (span) => {
905+
span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId);
906+
span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId);
907+
span.setAttribute("count", processedFromQueue);
908+
await this.scheduler.recordProcessedBatch!(tenantId, queueId, processedFromQueue);
909+
},
910+
{ kind: SpanKind.INTERNAL }
911+
);
912+
} else if (this.scheduler.recordProcessed) {
897913
for (let i = 0; i < processedFromQueue; i++) {
898914
await this.telemetry.trace(
899915
"recordProcessed",
@@ -936,7 +952,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
936952
};
937953

938954
// Determine how many messages we can claim based on concurrency
939-
let maxClaimCount = 10; // Default batch size cap
955+
let maxClaimCount = this.batchClaimSize;
940956
if (this.concurrencyManager) {
941957
const availableCapacity = await this.concurrencyManager.getAvailableCapacity(descriptor);
942958
if (availableCapacity === 0) {
@@ -974,7 +990,12 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
974990
return 0;
975991
}
976992

977-
// Use single shared worker queue - all messages go to one queue, all consumers pop atomically
993+
// Single shared worker queue pattern:
994+
// All consumers pop from one queue ("worker-queue") for atomic distribution.
995+
// Trade-off: Simpler code and fair distribution vs. potential contention
996+
// under very high load (>10k messages/sec). For most workloads, Redis
997+
// can handle 100k+ ops/sec on a single key, so this is rarely a bottleneck.
998+
// Future: Consider adding optional worker queue sharding if needed.
978999
const workerQueueId = "worker-queue";
9791000
let processedCount = 0;
9801001

packages/redis-worker/src/fair-queue/scheduler.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ export abstract class BaseScheduler implements FairScheduler {
2828
// Default: no state tracking
2929
}
3030

31+
/**
32+
* Called after processing multiple messages to update scheduler state.
33+
* Batch variant for efficiency - reduces Redis calls when processing multiple messages.
34+
* Default implementation does nothing.
35+
*/
36+
async recordProcessedBatch(_tenantId: string, _queueId: string, _count: number): Promise<void> {
37+
// Default: no state tracking
38+
}
39+
3140
/**
3241
* Initialize the scheduler.
3342
* Default implementation does nothing.

packages/redis-worker/src/fair-queue/schedulers/drr.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,18 @@ export class DRRScheduler extends BaseScheduler {
140140
await this.#decrementDeficit(tenantId);
141141
}
142142

143+
/**
144+
* Record that multiple messages were processed from a tenant.
145+
* Decrements the tenant's deficit by count atomically.
146+
*/
147+
override async recordProcessedBatch(
148+
tenantId: string,
149+
_queueId: string,
150+
count: number
151+
): Promise<void> {
152+
await this.#decrementDeficitBatch(tenantId, count);
153+
}
154+
143155
override async close(): Promise<void> {
144156
await this.redis.quit();
145157
}
@@ -249,6 +261,17 @@ export class DRRScheduler extends BaseScheduler {
249261
return parseFloat(result);
250262
}
251263

264+
/**
265+
* Decrement deficit for a tenant by a count atomically.
266+
*/
267+
async #decrementDeficitBatch(tenantId: string, count: number): Promise<number> {
268+
const key = this.#deficitKey();
269+
270+
// Use Lua script to decrement by count and ensure non-negative
271+
const result = await this.redis.drrDecrementDeficitBatch(key, tenantId, count.toString());
272+
return parseFloat(result);
273+
}
274+
252275
#registerCommands(): void {
253276
// Atomic quantum addition with capping for multiple tenants
254277
this.redis.defineCommand("drrAddQuantum", {
@@ -295,6 +318,27 @@ if newDeficit < 0 then
295318
newDeficit = 0
296319
end
297320
321+
return tostring(newDeficit)
322+
`,
323+
});
324+
325+
// Atomic deficit decrement by count with floor at 0
326+
this.redis.defineCommand("drrDecrementDeficitBatch", {
327+
numberOfKeys: 1,
328+
lua: `
329+
local deficitKey = KEYS[1]
330+
local tenantId = ARGV[1]
331+
local count = tonumber(ARGV[2])
332+
333+
local newDeficit = redis.call('HINCRBYFLOAT', deficitKey, tenantId, -count)
334+
newDeficit = tonumber(newDeficit)
335+
336+
-- Floor at 0
337+
if newDeficit < 0 then
338+
redis.call('HSET', deficitKey, tenantId, 0)
339+
newDeficit = 0
340+
end
341+
298342
return tostring(newDeficit)
299343
`,
300344
});
@@ -312,6 +356,12 @@ declare module "@internal/redis" {
312356
): Promise<string[]>;
313357

314358
drrDecrementDeficit(deficitKey: string, tenantId: string): Promise<string>;
359+
360+
drrDecrementDeficitBatch(
361+
deficitKey: string,
362+
tenantId: string,
363+
count: string
364+
): Promise<string>;
315365
}
316366
}
317367

packages/redis-worker/src/fair-queue/tests/concurrency.test.ts

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,167 @@ describe("ConcurrencyManager", () => {
222222
);
223223
});
224224

225+
describe("getAvailableCapacity", () => {
226+
redisTest(
227+
"should return available capacity for single group",
228+
{ timeout: 10000 },
229+
async ({ redisOptions }) => {
230+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
231+
232+
const manager = new ConcurrencyManager({
233+
redis: redisOptions,
234+
keys,
235+
groups: [
236+
{
237+
name: "tenant",
238+
extractGroupId: (q) => q.tenantId,
239+
getLimit: async () => 10,
240+
defaultLimit: 10,
241+
},
242+
],
243+
});
244+
245+
const queue: QueueDescriptor = {
246+
id: "queue-1",
247+
tenantId: "t1",
248+
metadata: {},
249+
};
250+
251+
// Initial capacity should be full
252+
let capacity = await manager.getAvailableCapacity(queue);
253+
expect(capacity).toBe(10);
254+
255+
// Reserve 3 slots
256+
await manager.reserve(queue, "msg-1");
257+
await manager.reserve(queue, "msg-2");
258+
await manager.reserve(queue, "msg-3");
259+
260+
// Capacity should be reduced
261+
capacity = await manager.getAvailableCapacity(queue);
262+
expect(capacity).toBe(7);
263+
264+
await manager.close();
265+
}
266+
);
267+
268+
redisTest(
269+
"should return minimum capacity across multiple groups",
270+
{ timeout: 10000 },
271+
async ({ redisOptions }) => {
272+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
273+
274+
const manager = new ConcurrencyManager({
275+
redis: redisOptions,
276+
keys,
277+
groups: [
278+
{
279+
name: "tenant",
280+
extractGroupId: (q) => q.tenantId,
281+
getLimit: async () => 5,
282+
defaultLimit: 5,
283+
},
284+
{
285+
name: "organization",
286+
extractGroupId: (q) => (q.metadata.orgId as string) ?? "default",
287+
getLimit: async () => 20,
288+
defaultLimit: 20,
289+
},
290+
],
291+
});
292+
293+
const queue: QueueDescriptor = {
294+
id: "queue-1",
295+
tenantId: "t1",
296+
metadata: { orgId: "org1" },
297+
};
298+
299+
// Initial capacity should be minimum (5 for tenant, 20 for org)
300+
let capacity = await manager.getAvailableCapacity(queue);
301+
expect(capacity).toBe(5);
302+
303+
// Reserve 3 slots
304+
await manager.reserve(queue, "msg-1");
305+
await manager.reserve(queue, "msg-2");
306+
await manager.reserve(queue, "msg-3");
307+
308+
// Now tenant has 2 left, org has 17 left - minimum is 2
309+
capacity = await manager.getAvailableCapacity(queue);
310+
expect(capacity).toBe(2);
311+
312+
await manager.close();
313+
}
314+
);
315+
316+
redisTest(
317+
"should return 0 when any group is at capacity",
318+
{ timeout: 10000 },
319+
async ({ redisOptions }) => {
320+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
321+
322+
const manager = new ConcurrencyManager({
323+
redis: redisOptions,
324+
keys,
325+
groups: [
326+
{
327+
name: "tenant",
328+
extractGroupId: (q) => q.tenantId,
329+
getLimit: async () => 3,
330+
defaultLimit: 3,
331+
},
332+
{
333+
name: "organization",
334+
extractGroupId: (q) => (q.metadata.orgId as string) ?? "default",
335+
getLimit: async () => 10,
336+
defaultLimit: 10,
337+
},
338+
],
339+
});
340+
341+
const queue: QueueDescriptor = {
342+
id: "queue-1",
343+
tenantId: "t1",
344+
metadata: { orgId: "org1" },
345+
};
346+
347+
// Fill up tenant capacity
348+
await manager.reserve(queue, "msg-1");
349+
await manager.reserve(queue, "msg-2");
350+
await manager.reserve(queue, "msg-3");
351+
352+
// Tenant is at 3/3, org is at 3/10
353+
const capacity = await manager.getAvailableCapacity(queue);
354+
expect(capacity).toBe(0);
355+
356+
await manager.close();
357+
}
358+
);
359+
360+
redisTest(
361+
"should return 0 when no groups are configured",
362+
{ timeout: 10000 },
363+
async ({ redisOptions }) => {
364+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
365+
366+
const manager = new ConcurrencyManager({
367+
redis: redisOptions,
368+
keys,
369+
groups: [],
370+
});
371+
372+
const queue: QueueDescriptor = {
373+
id: "queue-1",
374+
tenantId: "t1",
375+
metadata: {},
376+
};
377+
378+
const capacity = await manager.getAvailableCapacity(queue);
379+
expect(capacity).toBe(0);
380+
381+
await manager.close();
382+
}
383+
);
384+
});
385+
225386
describe("atomic reservation", () => {
226387
redisTest(
227388
"should atomically reserve across groups",

0 commit comments

Comments
 (0)