diff --git a/internal-packages/run-engine/src/run-queue/index.test.ts b/internal-packages/run-engine/src/run-queue/index.test.ts index a587a0a06d..5480b8d2ed 100644 --- a/internal-packages/run-engine/src/run-queue/index.test.ts +++ b/internal-packages/run-engine/src/run-queue/index.test.ts @@ -578,7 +578,7 @@ describe("RunQueue", () => { } }); - redisTest("Acking", { timeout: 5_000 }, async ({ redisContainer, redisOptions }) => { + redisTest("Acking", async ({ redisContainer, redisOptions }) => { const queue = new RunQueue({ ...testOptions, queueSelectionStrategy: new FairQueueSelectionStrategy({ @@ -659,7 +659,7 @@ describe("RunQueue", () => { } }); - redisTest("Ack (before dequeue)", { timeout: 5_000 }, async ({ redisContainer }) => { + redisTest("Ack (before dequeue)", async ({ redisContainer }) => { const queue = new RunQueue({ ...testOptions, queueSelectionStrategy: new FairQueueSelectionStrategy({ @@ -718,7 +718,6 @@ describe("RunQueue", () => { redisTest( "Ack after moving to workerQueue with removeFromWorkerQueue = undefined", - { timeout: 5_000 }, async ({ redisContainer }) => { const queue = new RunQueue({ ...testOptions, @@ -763,7 +762,6 @@ describe("RunQueue", () => { redisTest( "Ack after moving to workerQueue with removeFromWorkerQueue = true", - { timeout: 5_000 }, async ({ redisContainer }) => { const queue = new RunQueue({ ...testOptions, @@ -808,7 +806,7 @@ describe("RunQueue", () => { } ); - redisTest("Nacking", { timeout: 15_000 }, async ({ redisContainer, redisOptions }) => { + redisTest("Nacking", async ({ redisContainer, redisOptions }) => { const queue = new RunQueue({ ...testOptions, queueSelectionStrategy: new FairQueueSelectionStrategy({ diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index fbcfe291bd..f5772a9d6c 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -438,6 +438,39 @@ export class RunQueue { ); } + public async readMessageFromKey(messageKey: string) { + return this.#trace( + "readMessageFromKey", + async (span) => { + const rawMessage = await this.redis.get(messageKey); + + if (!rawMessage) { + return; + } + + const message = OutputPayload.safeParse(JSON.parse(rawMessage)); + + if (!message.success) { + this.logger.error(`[${this.name}] Failed to parse message`, { + messageKey, + error: message.error, + service: this.name, + }); + + return; + } + + return message.data; + }, + { + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); + } + public async enqueueMessage({ env, message, @@ -1409,17 +1442,18 @@ export class RunQueue { this.abortController.signal.addEventListener("abort", cleanup); - const result = await blockingClient.dequeueMessageFromWorkerQueue( - //keys + const result = await blockingClient.blpop( workerQueueKey, - //args - this.options.redis.keyPrefix ?? "", - String(this.options.dequeueBlockingTimeoutSeconds ?? 10) + this.options.dequeueBlockingTimeoutSeconds ?? 10 ); this.abortController.signal.removeEventListener("abort", cleanup); - await cleanup(); + cleanup().then(() => { + this.logger.debug("dequeueMessageFromWorkerQueue cleanup", { + service: this.name, + }); + }); if (!result) { return; @@ -1447,24 +1481,16 @@ export class RunQueue { return; } - const [messageId, rawMessage] = result; + const [, messageKey] = result; - //read message - const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage)); - if (!parsedMessage.success) { - this.logger.error(`[${this.name}] Failed to parse message`, { - messageId, - error: parsedMessage.error, - service: this.name, - }); + const message = await this.readMessageFromKey(messageKey); + if (!message) { return; } - const message = parsedMessage.data; - return { - messageId, + messageId: message.runId, messageScore: String(message.timestamp), message, }; @@ -1622,45 +1648,6 @@ export class RunQueue { #createBlockingDequeueClient() { const blockingClient = this.redis.duplicate(); - blockingClient.defineCommand("dequeueMessageFromWorkerQueue", { - numberOfKeys: 1, - lua: ` -local workerQueueKey = KEYS[1] - -local keyPrefix = ARGV[1] -local timeoutInSeconds = tonumber(ARGV[2]) - --- Attempt to dequeue using BLPOP --- result is either nil or [queueName, messageId] -local result = redis.call('BLPOP', workerQueueKey, timeoutInSeconds) - -if not result or type(result) ~= "table" then - return nil -end - -local messageKeyValue = result[2] - --- Get the message payload -local messageKey = keyPrefix .. messageKeyValue - -local messagePayload = redis.call('GET', messageKey) - --- if the messagePayload is nil, then the message is not in the queue -if not messagePayload then - return nil -end - --- messageKeyValue is {org:}:message: and we want to extract the messageId -local messageId = messageKeyValue:match("([^:]+)$") - -if not messageId then - return nil -end - -return {messageId, messagePayload} -- Return message details - `, - }); - return blockingClient; }