From de891d69e31f6cfd6a7d54d4d331d549a44f8695 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 19 Feb 2025 14:55:11 +0000 Subject: [PATCH 1/2] New MarQS method requeueMessage, an atomic version of replace message --- apps/webapp/app/v3/marqs/index.server.ts | 220 ++++++++++++++++-- apps/webapp/app/v3/marqs/types.ts | 1 + .../app/v3/services/completeAttempt.server.ts | 4 +- .../v3/services/createCheckpoint.server.ts | 14 +- .../app/v3/services/resumeBatchRun.server.ts | 2 +- .../services/resumeTaskDependency.server.ts | 2 +- .../taskRunConcurrencyTracker.server.ts | 26 +++ 7 files changed, 237 insertions(+), 32 deletions(-) diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index cef0e95024..5f87a76ab5 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -260,9 +260,7 @@ export class MarQS { public async replaceMessage( messageId: string, messageData: Record, - timestamp?: number, - priority?: number, - inplace?: boolean + timestamp?: number ) { return this.#trace( "replaceMessage", @@ -273,6 +271,57 @@ export class MarQS { return; } + span.setAttributes({ + [SemanticAttributes.QUEUE]: oldMessage.queue, + [SemanticAttributes.MESSAGE_ID]: oldMessage.messageId, + [SemanticAttributes.CONCURRENCY_KEY]: oldMessage.concurrencyKey, + [SemanticAttributes.PARENT_QUEUE]: oldMessage.parentQueue, + }); + + const traceContext = { + traceparent: oldMessage.data.traceparent, + tracestate: oldMessage.data.tracestate, + }; + + const newMessage: MessagePayload = { + version: "1", + // preserve original trace context + data: { ...oldMessage.data, ...messageData, ...traceContext, queue: oldMessage.queue }, + queue: oldMessage.queue, + concurrencyKey: oldMessage.concurrencyKey, + timestamp: timestamp ?? Date.now(), + messageId, + parentQueue: oldMessage.parentQueue, + }; + + await this.#callReplaceMessage(newMessage); + }, + { + kind: SpanKind.CONSUMER, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "replace", + [SEMATTRS_MESSAGE_ID]: messageId, + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); + } + + public async requeueMessage( + messageId: string, + messageData: Record, + timestamp?: number, + priority?: number + ) { + return this.#trace( + "requeueMessage", + async (span) => { + const oldMessage = await this.readMessage(messageId); + + if (!oldMessage) { + return; + } + const queue = this.keys.queueKeyFromQueue(oldMessage.queue, priority); span.setAttributes({ @@ -298,27 +347,16 @@ export class MarQS { parentQueue: oldMessage.parentQueue, }; - if (inplace) { - await this.#callReplaceMessage(newMessage); - return; - } - await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId); - await this.#callAcknowledgeMessage({ - parentQueue: oldMessage.parentQueue, - messageQueue: oldMessage.queue, - messageId, - }); + await this.#callRequeueMessage(oldMessage.queue, newMessage); - await this.#callEnqueueMessage(newMessage); - - await this.options.subscriber?.messageReplaced(newMessage); + await this.options.subscriber?.messageRequeued(oldMessage.queue, newMessage); }, { kind: SpanKind.CONSUMER, attributes: { - [SEMATTRS_MESSAGING_OPERATION]: "replace", + [SEMATTRS_MESSAGING_OPERATION]: "requeue", [SEMATTRS_MESSAGE_ID]: messageId, [SEMATTRS_MESSAGING_SYSTEM]: "marqs", }, @@ -602,7 +640,7 @@ export class MarQS { }); if (updates) { - await this.replaceMessage(messageId, updates, retryAt, undefined, true); + await this.replaceMessage(messageId, updates, retryAt); } await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId); @@ -1163,6 +1201,78 @@ export class MarQS { ); } + async #callRequeueMessage(oldQueue: string, message: MessagePayload) { + const queueKey = message.queue; + const oldQueueKey = oldQueue; + const parentQueueKey = message.parentQueue; + const messageKey = this.keys.messageKey(message.messageId); + const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKeyFromQueue(message.queue); + const queueReserveConcurrencyKey = this.keys.queueReserveConcurrencyKeyFromQueue(message.queue); + const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue); + const envReserveConcurrencyKey = this.keys.envReserveConcurrencyKeyFromQueue(message.queue); + const envQueueKey = this.keys.envQueueKeyFromQueue(message.queue); + + const queueName = message.queue; + const oldQueueName = oldQueue; + const messageId = message.messageId; + const messageData = JSON.stringify(message); + const messageScore = String(message.timestamp); + + logger.debug("Calling requeueMessage", { + service: this.name, + queueKey, + oldQueueKey, + parentQueueKey, + messageKey, + queueCurrentConcurrencyKey, + queueReserveConcurrencyKey, + envCurrentConcurrencyKey, + envReserveConcurrencyKey, + envQueueKey, + queueName, + oldQueueName, + messageId, + messageData, + messageScore, + }); + + const result = await this.redis.requeueMessage( + queueKey, + oldQueueKey, + parentQueueKey, + messageKey, + queueCurrentConcurrencyKey, + queueReserveConcurrencyKey, + envCurrentConcurrencyKey, + envReserveConcurrencyKey, + envQueueKey, + queueName, + oldQueueName, + messageId, + messageData, + messageScore + ); + + logger.debug("requeueMessage result", { + service: this.name, + queueKey, + parentQueueKey, + messageKey, + queueCurrentConcurrencyKey, + queueReserveConcurrencyKey, + envCurrentConcurrencyKey, + envReserveConcurrencyKey, + envQueueKey, + queueName, + messageId, + messageData, + messageScore, + result, + }); + + return true; + } + async #callAcknowledgeMessage({ parentQueue, messageQueue, @@ -1587,6 +1697,62 @@ redis.call('DEL', messageKey) `, }); + this.redis.defineCommand("requeueMessage", { + numberOfKeys: 9, + lua: ` +local queueKey = KEYS[1] +local oldQueueKey = KEYS[2] +local parentQueueKey = KEYS[3] +local messageKey = KEYS[4] +local queueCurrentConcurrencyKey = KEYS[5] +local queueReserveConcurrencyKey = KEYS[6] +local envCurrentConcurrencyKey = KEYS[7] +local envReserveConcurrencyKey = KEYS[8] +local envQueueKey = KEYS[9] + +local queueName = ARGV[1] +local oldQueueName = ARGV[2] +local messageId = ARGV[3] +local messageData = ARGV[4] +local messageScore = ARGV[5] + +-- First remove the old message from the sorted sets +redis.call('ZREM', oldQueueKey, messageId) +redis.call('ZREM', envQueueKey, messageId) + +-- Write the new message data +redis.call('SET', messageKey, messageData) + +-- Add the message to the queues with new score +redis.call('ZADD', queueKey, messageScore, messageId) +redis.call('ZADD', envQueueKey, messageScore, messageId) + +-- Rebalance the parent queue (for the new queue) +local earliestMessage = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') +if #earliestMessage == 0 then + redis.call('ZREM', parentQueueKey, queueName) +else + redis.call('ZADD', parentQueueKey, earliestMessage[2], queueName) +end + +-- Rebalance the parent queue (for the old queue) +local earliestMessage = redis.call('ZRANGE', oldQueueKey, 0, 0, 'WITHSCORES') +if #earliestMessage == 0 then + redis.call('ZREM', parentQueueKey, oldQueueName) +else + redis.call('ZADD', parentQueueKey, earliestMessage[2], oldQueueName) +end + +-- Clear all concurrency sets (combined from both scripts) +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', queueReserveConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', envReserveConcurrencyKey, messageId) + +return true +`, + }); + this.redis.defineCommand("nackMessage", { numberOfKeys: 7, lua: ` @@ -1749,6 +1915,24 @@ declare module "ioredis" { callback?: Callback ): Result; + requeueMessage( + queueKey: string, + oldQueueKey: string, + parentQueueKey: string, + messageKey: string, + queueCurrentConcurrencyKey: string, + queueReserveConcurrencyKey: string, + envCurrentConcurrencyKey: string, + envReserveConcurrencyKey: string, + envQueueKey: string, + queueName: string, + oldQueueName: string, + messageId: string, + messageData: string, + messageScore: string, + callback?: Callback + ): Result; + acknowledgeMessage( parentQueue: string, messageKey: string, diff --git a/apps/webapp/app/v3/marqs/types.ts b/apps/webapp/app/v3/marqs/types.ts index 339f1bded9..284453c77f 100644 --- a/apps/webapp/app/v3/marqs/types.ts +++ b/apps/webapp/app/v3/marqs/types.ts @@ -100,6 +100,7 @@ export interface MessageQueueSubscriber { messageAcked(message: MessagePayload): Promise; messageNacked(message: MessagePayload): Promise; messageReplaced(message: MessagePayload): Promise; + messageRequeued(oldQueue: string, message: MessagePayload): Promise; } export interface VisibilityTimeoutStrategy { diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index ae5beb5bbc..635bb2b5e8 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -468,7 +468,7 @@ export class CompleteAttemptService extends BaseService { logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId: run.id }); // We have to replace a potential RESUME with EXECUTE to correctly retry the attempt - return marqs?.replaceMessage( + return marqs?.requeueMessage( run.id, { type: "EXECUTE", @@ -615,7 +615,7 @@ export class CompleteAttemptService extends BaseService { }); if (environment.type === "DEVELOPMENT") { - marqs.replaceMessage( + marqs.requeueMessage( taskRunAttempt.taskRunId, {}, executionRetry.timestamp, diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index 15cdf697d9..84d695e80f 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -167,7 +167,7 @@ export class CreateCheckpointService extends BaseService { }); if (checkpointEvent) { - await marqs?.replaceMessage( + await marqs.requeueMessage( attempt.taskRunId, { type: "RESUME_AFTER_DURATION", @@ -297,15 +297,9 @@ export class CreateCheckpointService extends BaseService { } //if there's a message in the queue, we make sure the checkpoint event is on it - await marqs?.replaceMessage( - attempt.taskRun.id, - { - checkpointEventId: checkpointEvent.id, - }, - undefined, - undefined, - true - ); + await marqs.replaceMessage(attempt.taskRun.id, { + checkpointEventId: checkpointEvent.id, + }); await ResumeBatchRunService.enqueue( batchRun.id, diff --git a/apps/webapp/app/v3/services/resumeBatchRun.server.ts b/apps/webapp/app/v3/services/resumeBatchRun.server.ts index 68f5a59cf7..f9c03681ea 100644 --- a/apps/webapp/app/v3/services/resumeBatchRun.server.ts +++ b/apps/webapp/app/v3/services/resumeBatchRun.server.ts @@ -252,7 +252,7 @@ export class ResumeBatchRunService extends BaseService { hasCheckpointEvent: !!batchRun.checkpointEventId, }); - await marqs?.replaceMessage( + await marqs?.requeueMessage( dependentRun.id, { type: "RESUME", diff --git a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts index 3151de18a0..c941cd693c 100644 --- a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts +++ b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts @@ -89,7 +89,7 @@ export class ResumeTaskDependencyService extends BaseService { return; } - await marqs?.replaceMessage( + await marqs?.requeueMessage( dependentRun.id, { type: "RESUME", diff --git a/apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts b/apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts index 451a5e20ba..4f3b0400ad 100644 --- a/apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts +++ b/apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts @@ -121,6 +121,32 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber { }); } + async messageRequeued(oldQueue: string, message: MessagePayload): Promise { + logger.debug("TaskRunConcurrencyTracker.messageRequeued()", { + data: message.data, + messageId: message.messageId, + oldQueue, + }); + + const data = this.getMessageData(message); + + if (!data) { + logger.info( + `TaskRunConcurrencyTracker.messageReplaced(): could not parse message data`, + message + ); + return; + } + + await this.executionFinished({ + projectId: data.projectId, + taskId: data.taskIdentifier, + runId: message.messageId, + environmentId: data.environmentId, + deployed: data.environmentType !== "DEVELOPMENT", + }); + } + private getMessageData(message: MessagePayload) { const result = ConcurrentMessageData.safeParse(message.data); if (result.success) { From 76d14d6e4afad6f8a9095a32e76364cf6f4e43ae Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 19 Feb 2025 15:05:50 +0000 Subject: [PATCH 2/2] Remove redundant call to remove the message from the env queue in requeueMessage --- apps/webapp/app/v3/marqs/index.server.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 5f87a76ab5..f8c36248d0 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -1716,14 +1716,13 @@ local messageId = ARGV[3] local messageData = ARGV[4] local messageScore = ARGV[5] --- First remove the old message from the sorted sets +-- First remove the message from the old queue redis.call('ZREM', oldQueueKey, messageId) -redis.call('ZREM', envQueueKey, messageId) -- Write the new message data redis.call('SET', messageKey, messageData) --- Add the message to the queues with new score +-- Add the message to the new queue with a new score redis.call('ZADD', queueKey, messageScore, messageId) redis.call('ZADD', envQueueKey, messageScore, messageId)