Skip to content

Commit d1a2723

Browse files
committed
Acking should also cause the master queue to be processed
1 parent c969676 commit d1a2723

File tree

3 files changed

+138
-25
lines changed

3 files changed

+138
-25
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ export class RunEngine {
112112
logger: new Logger("RunQueue", "debug"),
113113
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
114114
retryOptions: options.queue?.retryOptions,
115+
workerOptions: {
116+
disabled: options.worker.disabled,
117+
concurrency: options.worker,
118+
pollIntervalMs: options.worker.pollIntervalMs,
119+
immediatePollIntervalMs: options.worker.immediatePollIntervalMs,
120+
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
121+
},
115122
});
116123

117124
this.worker = new Worker({

internal-packages/run-engine/src/run-queue/index.test.ts

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,113 @@ describe("RunQueue", () => {
375375
}
376376
);
377377

378+
redisTest(
379+
"Dequeue a message when another message on the same queue is acked",
380+
async ({ redisContainer }) => {
381+
const queue = new RunQueue({
382+
...testOptions,
383+
masterQueueConsumersDisabled: true,
384+
processWorkerQueueDebounceMs: 50,
385+
queueSelectionStrategy: new FairQueueSelectionStrategy({
386+
redis: {
387+
keyPrefix: "runqueue:test:",
388+
host: redisContainer.getHost(),
389+
port: redisContainer.getPort(),
390+
},
391+
keys: testOptions.keys,
392+
}),
393+
redis: {
394+
keyPrefix: "runqueue:test:",
395+
host: redisContainer.getHost(),
396+
port: redisContainer.getPort(),
397+
},
398+
});
399+
400+
try {
401+
// Set queue concurrency limit to 1
402+
await queue.updateQueueConcurrencyLimits(authenticatedEnvProd, messageProd.queue, 1);
403+
404+
//initial queue length
405+
const result = await queue.lengthOfQueue(authenticatedEnvProd, messageProd.queue);
406+
expect(result).toBe(0);
407+
const envQueueLength = await queue.lengthOfEnvQueue(authenticatedEnvProd);
408+
expect(envQueueLength).toBe(0);
409+
410+
//initial oldest message
411+
const oldestScore = await queue.oldestMessageInQueue(
412+
authenticatedEnvProd,
413+
messageProd.queue
414+
);
415+
expect(oldestScore).toBe(undefined);
416+
417+
//enqueue message
418+
await queue.enqueueMessage({
419+
env: authenticatedEnvProd,
420+
message: messageProd,
421+
workerQueue: "main",
422+
skipDequeueProcessing: true,
423+
});
424+
425+
// Enqueue another message
426+
await queue.enqueueMessage({
427+
env: authenticatedEnvProd,
428+
message: { ...messageProd, runId: "r4322" },
429+
workerQueue: "main",
430+
skipDequeueProcessing: true,
431+
});
432+
433+
//queue length
434+
const queueLength = await queue.lengthOfQueue(authenticatedEnvProd, messageProd.queue);
435+
expect(queueLength).toBe(2);
436+
const envLength = await queue.lengthOfEnvQueue(authenticatedEnvProd);
437+
expect(envLength).toBe(2);
438+
439+
//oldest message
440+
const oldestScore2 = await queue.oldestMessageInQueue(
441+
authenticatedEnvProd,
442+
messageProd.queue
443+
);
444+
expect(oldestScore2).toBe(messageProd.timestamp);
445+
446+
//concurrencies
447+
const queueConcurrency = await queue.currentConcurrencyOfQueue(
448+
authenticatedEnvProd,
449+
messageProd.queue
450+
);
451+
expect(queueConcurrency).toBe(0);
452+
const envConcurrency = await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd);
453+
expect(envConcurrency).toBe(0);
454+
455+
// Process the message so it can be dequeued
456+
await queue.processMasterQueueForEnvironment(authenticatedEnvProd.id, 1);
457+
458+
//dequeue
459+
const dequeued = await queue.dequeueMessageFromWorkerQueue("test_12345", "main");
460+
461+
assertNonNullable(dequeued);
462+
expect(dequeued).toBeDefined();
463+
expect(dequeued!.messageId).toEqual(messageProd.runId);
464+
expect(dequeued!.message.orgId).toEqual(messageProd.orgId);
465+
expect(dequeued!.message.version).toEqual("2");
466+
467+
// Now lets ack the message
468+
await queue.acknowledgeMessage(messageProd.orgId, messageProd.runId);
469+
470+
await setTimeout(1000);
471+
472+
// Now we can dequeue the other message
473+
const dequeued2 = await queue.dequeueMessageFromWorkerQueue("test_12345", "main");
474+
assertNonNullable(dequeued2);
475+
expect(dequeued2).toBeDefined();
476+
expect(dequeued2!.messageId).toEqual("r4322");
477+
expect(dequeued2!.message.orgId).toEqual(messageProd.orgId);
478+
expect(dequeued2!.message.version).toEqual("2");
479+
} finally {
480+
await queue.quit();
481+
}
482+
}
483+
);
484+
378485
redisTest("Enqueue/Dequeue a 8 shards", async ({ redisContainer }) => {
379486
const queue = new RunQueue({
380487
...testOptions,

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,8 @@ const defaultRetrySettings = {
8888
const workerCatalog = {
8989
processQueueForWorkerQueue: {
9090
schema: z.object({
91-
queue: z.string(),
92-
concurrencyKey: z.string().optional(),
91+
queueKey: z.string(),
9392
environmentId: z.string(),
94-
projectId: z.string(),
95-
orgId: z.string(),
9693
}),
9794
visibilityTimeoutMs: 30_000,
9895
},
@@ -148,13 +145,7 @@ export class RunQueue {
148145
logger: new Logger("RunQueueWorker", options.logLevel ?? "log"),
149146
jobs: {
150147
processQueueForWorkerQueue: async (job) => {
151-
await this.#processQueueForWorkerQueue(
152-
job.payload.queue,
153-
job.payload.environmentId,
154-
job.payload.projectId,
155-
job.payload.orgId,
156-
job.payload.concurrencyKey
157-
);
148+
await this.#processQueueForWorkerQueue(job.payload.queueKey, job.payload.environmentId);
158149
},
159150
},
160151
});
@@ -426,11 +417,8 @@ export class RunQueue {
426417
id: queueKey, // dedupe by environment, queue, and concurrency key
427418
job: "processQueueForWorkerQueue",
428419
payload: {
429-
queue: message.queue,
430-
concurrencyKey,
420+
queueKey,
431421
environmentId: env.id,
432-
orgId: env.organization.id,
433-
projectId: env.project.id,
434422
},
435423
// Add a small delay to dedupe messages so at most one of these will processed,
436424
// every 500ms per queue, concurrency key, and environment
@@ -501,7 +489,11 @@ export class RunQueue {
501489
* This is done when the run is in a final state.
502490
* @param messageId
503491
*/
504-
public async acknowledgeMessage(orgId: string, messageId: string) {
492+
public async acknowledgeMessage(
493+
orgId: string,
494+
messageId: string,
495+
options?: { skipDequeueProcessing?: boolean }
496+
) {
505497
return this.#trace(
506498
"acknowledgeMessage",
507499
async (span) => {
@@ -519,6 +511,21 @@ export class RunQueue {
519511
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
520512
});
521513

514+
if (!options?.skipDequeueProcessing) {
515+
// This will move the message to the worker queue so it can be dequeued
516+
await this.worker.enqueueOnce({
517+
id: message.queue, // dedupe by environment, queue, and concurrency key
518+
job: "processQueueForWorkerQueue",
519+
payload: {
520+
queueKey: message.queue,
521+
environmentId: message.environmentId,
522+
},
523+
// Add a small delay to dedupe messages so at most one of these will processed,
524+
// every 500ms per queue, concurrency key, and environment
525+
availableAt: new Date(Date.now() + (this.options.processWorkerQueueDebounceMs ?? 500)), // 500ms from now
526+
});
527+
}
528+
522529
await this.#callAcknowledgeMessage({
523530
message,
524531
});
@@ -1035,15 +1042,7 @@ export class RunQueue {
10351042
);
10361043
}
10371044

1038-
async #processQueueForWorkerQueue(
1039-
queue: string,
1040-
environmentId: string,
1041-
projectId: string,
1042-
orgId: string,
1043-
concurrencyKey?: string
1044-
) {
1045-
const queueKey = this.keys.queueKey(orgId, projectId, environmentId, queue, concurrencyKey);
1046-
1045+
async #processQueueForWorkerQueue(queueKey: string, environmentId: string) {
10471046
const shard = this.keys.masterQueueShardForEnvironment(environmentId, this.shardCount);
10481047

10491048
this.logger.debug("processQueueForWorkerQueue", {

0 commit comments

Comments
 (0)