Skip to content

Commit e7a5010

Browse files
committed
fix two-stage processing with concurrency limit releasing in batches
1 parent f55f487 commit e7a5010

File tree

4 files changed

+455
-4
lines changed

4 files changed

+455
-4
lines changed

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -977,14 +977,17 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
977977
let processedCount = 0;
978978

979979
// Reserve concurrency and push each message to worker queue
980-
for (const message of claimedMessages) {
980+
for (let i = 0; i < claimedMessages.length; i++) {
981+
const message = claimedMessages[i]!;
982+
981983
// Reserve concurrency slot
982984
if (this.concurrencyManager) {
983985
const reserved = await this.concurrencyManager.reserve(descriptor, message.messageId);
984986
if (!reserved) {
985-
// Release message back to queue (and ensure it's in master queue)
986-
await this.visibilityManager.release(
987-
message.messageId,
987+
// Release ALL remaining messages (from index i onward) back to queue
988+
// This prevents messages from being stranded in the in-flight set
989+
await this.visibilityManager.releaseBatch(
990+
claimedMessages.slice(i),
988991
queueId,
989992
queueKey,
990993
queueItemsKey,

packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -876,4 +876,106 @@ describe("FairQueue", () => {
876876
await queue.close();
877877
});
878878
});
879+
880+
describe("two-stage processing with concurrency limits", () => {
881+
redisTest(
882+
"should release remaining claimed messages when concurrency reservation fails",
883+
{ timeout: 30000 },
884+
async ({ redisOptions }) => {
885+
const processed: string[] = [];
886+
const processingMessages = new Set<string>();
887+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
888+
889+
const scheduler = new DRRScheduler({
890+
redis: redisOptions,
891+
keys,
892+
quantum: 10,
893+
maxDeficit: 100,
894+
});
895+
896+
// Create queue with:
897+
// - Worker queue enabled (two-stage processing)
898+
// - Concurrency limit of 2 per tenant
899+
const queue = new FairQueue({
900+
redis: redisOptions,
901+
keys,
902+
scheduler,
903+
payloadSchema: TestPayloadSchema,
904+
shardCount: 1,
905+
consumerCount: 1,
906+
consumerIntervalMs: 50,
907+
visibilityTimeoutMs: 10000,
908+
workerQueue: {
909+
enabled: true,
910+
blockingTimeoutSeconds: 1,
911+
},
912+
concurrency: {
913+
groups: [
914+
{
915+
name: "tenant",
916+
extractGroupId: (q) => q.tenantId,
917+
getLimit: async () => 2, // Limit to 2 concurrent per tenant
918+
defaultLimit: 2,
919+
},
920+
],
921+
},
922+
startConsumers: false,
923+
});
924+
925+
// Message handler that tracks what's being processed
926+
queue.onMessage(async (ctx) => {
927+
const value = ctx.message.payload.value;
928+
processingMessages.add(value);
929+
930+
// Simulate some work
931+
await new Promise((resolve) => setTimeout(resolve, 100));
932+
933+
processed.push(value);
934+
processingMessages.delete(value);
935+
await ctx.complete();
936+
});
937+
938+
// Enqueue 5 messages to the same tenant queue
939+
await queue.enqueueBatch({
940+
queueId: "tenant:t1:queue:q1",
941+
tenantId: "t1",
942+
messages: [
943+
{ payload: { value: "msg-1" } },
944+
{ payload: { value: "msg-2" } },
945+
{ payload: { value: "msg-3" } },
946+
{ payload: { value: "msg-4" } },
947+
{ payload: { value: "msg-5" } },
948+
],
949+
});
950+
951+
// Start processing
952+
queue.start();
953+
954+
// Wait for all messages to be processed
955+
// With concurrency limit of 2, it should process in batches
956+
await vi.waitFor(
957+
() => {
958+
expect(processed.length).toBe(5);
959+
},
960+
{ timeout: 20000 }
961+
);
962+
963+
// All messages should have been processed
964+
expect(processed).toContain("msg-1");
965+
expect(processed).toContain("msg-2");
966+
expect(processed).toContain("msg-3");
967+
expect(processed).toContain("msg-4");
968+
expect(processed).toContain("msg-5");
969+
970+
// Wait a bit for any cleanup to complete
971+
await new Promise((resolve) => setTimeout(resolve, 100));
972+
973+
// No messages should be stuck in-flight
974+
const inflightCount = await queue.getTotalInflightCount();
975+
expect(inflightCount).toBe(0);
976+
977+
await queue.close();
978+
}
979+
);
980+
});
879981
});

packages/redis-worker/src/fair-queue/tests/visibility.test.ts

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,5 +244,236 @@ describe("VisibilityManager", () => {
244244
}
245245
);
246246
});
247+
248+
describe("claimBatch", () => {
249+
redisTest(
250+
"should claim multiple messages atomically",
251+
{ timeout: 10000 },
252+
async ({ redisOptions }) => {
253+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
254+
255+
const manager = new VisibilityManager({
256+
redis: redisOptions,
257+
keys,
258+
shardCount: 1,
259+
defaultTimeoutMs: 5000,
260+
});
261+
262+
const redis = createRedisClient(redisOptions);
263+
const queueId = "tenant:t1:queue:claim-batch";
264+
const queueKey = keys.queueKey(queueId);
265+
const queueItemsKey = keys.queueItemsKey(queueId);
266+
267+
// Add multiple messages to the queue
268+
for (let i = 1; i <= 5; i++) {
269+
const messageId = `msg-${i}`;
270+
const storedMessage = {
271+
id: messageId,
272+
queueId,
273+
tenantId: "t1",
274+
payload: { value: `test-${i}` },
275+
timestamp: Date.now() - (6 - i) * 1000,
276+
attempt: 1,
277+
};
278+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
279+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
280+
}
281+
282+
// Claim batch of 3 messages
283+
const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 3);
284+
285+
expect(claimed).toHaveLength(3);
286+
expect(claimed[0]!.messageId).toBe("msg-1");
287+
expect(claimed[1]!.messageId).toBe("msg-2");
288+
expect(claimed[2]!.messageId).toBe("msg-3");
289+
290+
// Verify messages are in in-flight set
291+
const inflightCount = await manager.getTotalInflightCount();
292+
expect(inflightCount).toBe(3);
293+
294+
// Verify messages are removed from queue
295+
const remainingCount = await redis.zcard(queueKey);
296+
expect(remainingCount).toBe(2);
297+
298+
await manager.close();
299+
await redis.quit();
300+
}
301+
);
302+
303+
redisTest(
304+
"should return empty array when queue is empty",
305+
{ timeout: 10000 },
306+
async ({ redisOptions }) => {
307+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
308+
309+
const manager = new VisibilityManager({
310+
redis: redisOptions,
311+
keys,
312+
shardCount: 1,
313+
defaultTimeoutMs: 5000,
314+
});
315+
316+
const queueId = "tenant:t1:queue:empty";
317+
const queueKey = keys.queueKey(queueId);
318+
const queueItemsKey = keys.queueItemsKey(queueId);
319+
320+
const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 5);
321+
expect(claimed).toHaveLength(0);
322+
323+
await manager.close();
324+
}
325+
);
326+
});
327+
328+
describe("releaseBatch", () => {
329+
redisTest(
330+
"should release multiple messages back to queue atomically",
331+
{ timeout: 10000 },
332+
async ({ redisOptions }) => {
333+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
334+
335+
const manager = new VisibilityManager({
336+
redis: redisOptions,
337+
keys,
338+
shardCount: 1,
339+
defaultTimeoutMs: 5000,
340+
});
341+
342+
const redis = createRedisClient(redisOptions);
343+
const queueId = "tenant:t1:queue:release-batch";
344+
const queueKey = keys.queueKey(queueId);
345+
const queueItemsKey = keys.queueItemsKey(queueId);
346+
const masterQueueKey = keys.masterQueueKey(0);
347+
348+
// Add messages to queue and claim them
349+
for (let i = 1; i <= 5; i++) {
350+
const messageId = `msg-${i}`;
351+
const storedMessage = {
352+
id: messageId,
353+
queueId,
354+
tenantId: "t1",
355+
payload: { value: `test-${i}` },
356+
timestamp: Date.now() - (6 - i) * 1000,
357+
attempt: 1,
358+
};
359+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
360+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
361+
}
362+
363+
// Claim all 5 messages
364+
const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 5);
365+
expect(claimed).toHaveLength(5);
366+
367+
// Verify all messages are in-flight
368+
let inflightCount = await manager.getTotalInflightCount();
369+
expect(inflightCount).toBe(5);
370+
371+
// Queue should be empty
372+
let queueCount = await redis.zcard(queueKey);
373+
expect(queueCount).toBe(0);
374+
375+
// Release messages 3, 4, 5 back to queue (batch release)
376+
const messagesToRelease = claimed.slice(2);
377+
await manager.releaseBatch(
378+
messagesToRelease,
379+
queueId,
380+
queueKey,
381+
queueItemsKey,
382+
masterQueueKey
383+
);
384+
385+
// Verify 2 messages still in-flight
386+
inflightCount = await manager.getTotalInflightCount();
387+
expect(inflightCount).toBe(2);
388+
389+
// Verify 3 messages back in queue
390+
queueCount = await redis.zcard(queueKey);
391+
expect(queueCount).toBe(3);
392+
393+
// Verify the correct messages are back in queue
394+
const queueMembers = await redis.zrange(queueKey, 0, -1);
395+
expect(queueMembers).toContain("msg-3");
396+
expect(queueMembers).toContain("msg-4");
397+
expect(queueMembers).toContain("msg-5");
398+
399+
await manager.close();
400+
await redis.quit();
401+
}
402+
);
403+
404+
redisTest(
405+
"should handle empty messages array",
406+
{ timeout: 10000 },
407+
async ({ redisOptions }) => {
408+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
409+
410+
const manager = new VisibilityManager({
411+
redis: redisOptions,
412+
keys,
413+
shardCount: 1,
414+
defaultTimeoutMs: 5000,
415+
});
416+
417+
const queueId = "tenant:t1:queue:empty-release";
418+
const queueKey = keys.queueKey(queueId);
419+
const queueItemsKey = keys.queueItemsKey(queueId);
420+
const masterQueueKey = keys.masterQueueKey(0);
421+
422+
// Should not throw when releasing empty array
423+
await manager.releaseBatch([], queueId, queueKey, queueItemsKey, masterQueueKey);
424+
425+
await manager.close();
426+
}
427+
);
428+
429+
redisTest(
430+
"should update master queue with oldest message timestamp",
431+
{ timeout: 10000 },
432+
async ({ redisOptions }) => {
433+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
434+
435+
const manager = new VisibilityManager({
436+
redis: redisOptions,
437+
keys,
438+
shardCount: 1,
439+
defaultTimeoutMs: 5000,
440+
});
441+
442+
const redis = createRedisClient(redisOptions);
443+
const queueId = "tenant:t1:queue:master-update";
444+
const queueKey = keys.queueKey(queueId);
445+
const queueItemsKey = keys.queueItemsKey(queueId);
446+
const masterQueueKey = keys.masterQueueKey(0);
447+
448+
// Add and claim messages
449+
const baseTime = Date.now();
450+
for (let i = 1; i <= 3; i++) {
451+
const messageId = `msg-${i}`;
452+
const storedMessage = {
453+
id: messageId,
454+
queueId,
455+
tenantId: "t1",
456+
payload: { value: `test-${i}` },
457+
timestamp: baseTime + i * 1000, // Different timestamps
458+
attempt: 1,
459+
};
460+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
461+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
462+
}
463+
464+
const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 3);
465+
466+
// Release all messages back
467+
await manager.releaseBatch(claimed, queueId, queueKey, queueItemsKey, masterQueueKey);
468+
469+
// Master queue should have been updated
470+
const masterScore = await redis.zscore(masterQueueKey, queueId);
471+
expect(masterScore).not.toBeNull();
472+
473+
await manager.close();
474+
await redis.quit();
475+
}
476+
);
477+
});
247478
});
248479

0 commit comments

Comments
 (0)