diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 10bb697c7f..d22a2ba7f1 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -410,6 +410,8 @@ const EnvironmentSchema = z.object({ MARQS_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25), MARQS_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0), MARQS_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(), + MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(250), + MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT: z.coerce.number().int().default(10), PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(), diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 978991fabc..98ff128509 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -42,6 +42,8 @@ import { MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET, MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS, } from "./constants.server"; +import { setInterval } from "node:timers/promises"; +import { tryCatch } from "@trigger.dev/core/utils"; const KEY_PREFIX = "marqs:"; @@ -70,6 +72,8 @@ export type MarQSOptions = { enableRebalancing?: boolean; verbose?: boolean; subscriber?: MessageQueueSubscriber; + sharedWorkerQueueConsumerIntervalMs?: number; + sharedWorkerQueueMaxMessageCount?: number; }; /** @@ -488,15 +492,18 @@ export class MarQS { span.setAttribute("queue_count", queues.length); for (const messageQueue of queues) { - const messageData = await this.#callDequeueMessage({ + const messages = await this.#callDequeueMessages({ messageQueue, parentQueue, + maxCount: 1, }); - if (!messageData) { + if (!messages || messages.length === 0) { return; } + const messageData = messages[0]; + const message = await this.readMessage(messageData.messageId); if (message) { @@ -554,11 +561,184 @@ export class MarQS { } /** - * Dequeue a message from the shared queue (this should be used in production environments) + * Dequeue a message from the shared worker queue (this should be used in production environments) */ - public async dequeueMessageInSharedQueue(consumerId: string) { + public async dequeueMessageFromSharedWorkerQueue(consumerId: string) { return this.#trace( - "dequeueMessageInSharedQueue", + "dequeueMessageFromSharedWorkerQueue", + async (span) => { + span.setAttribute(SemanticAttributes.CONSUMER_ID, consumerId); + + const workerQueueKey = this.keys.sharedWorkerQueueKey(); + + span.setAttribute(SemanticAttributes.PARENT_QUEUE, workerQueueKey); + + // Try and pop a message from the worker queue (redis list) + const messageId = await this.#trace("popMessageFromWorkerQueue", async (innerSpan) => { + innerSpan.setAttribute(SemanticAttributes.PARENT_QUEUE, workerQueueKey); + innerSpan.setAttribute(SemanticAttributes.CONSUMER_ID, consumerId); + + const results = await this.redis.popMessageFromWorkerQueue(workerQueueKey); + + if (!results) { + return null; + } + + const [messageId, queueLength] = results; + + innerSpan.setAttribute("queue_length", Number(queueLength)); + + return messageId; + }); + + if (!messageId) { + return; + } + + const message = await this.readMessage(messageId); + + if (!message) { + return; + } + + if (this.options.subscriber) { + await this.#trace( + "postMessageDequeued", + async (subscriberSpan) => { + subscriberSpan.setAttributes({ + [SemanticAttributes.MESSAGE_ID]: message.messageId, + [SemanticAttributes.QUEUE]: message.queue, + [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, + }); + + return await this.options.subscriber?.messageDequeued(message); + }, + { + kind: SpanKind.INTERNAL, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); + } + + await this.#trace( + "startHeartbeat", + async (heartbeatSpan) => { + heartbeatSpan.setAttributes({ + [SemanticAttributes.MESSAGE_ID]: message.messageId, + visibility_timeout_ms: this.visibilityTimeoutInMs, + }); + + return await this.options.visibilityTimeoutStrategy.startHeartbeat( + message.messageId, + this.visibilityTimeoutInMs + ); + }, + { + kind: SpanKind.INTERNAL, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); + + return message; + }, + { + kind: SpanKind.CONSUMER, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); + } + + public startSharedWorkerQueueConsumer(consumerId: string) { + const abortController = new AbortController(); + + this.#startSharedWorkerQueueConsumer(consumerId, abortController).catch((error) => { + logger.error("Failed to start shared worker queue consumer", { + error, + service: this.name, + consumerId, + }); + }); + + return () => { + abortController.abort(); + }; + } + + async #startSharedWorkerQueueConsumer(consumerId: string, abortController: AbortController) { + let lastProcessedAt = Date.now(); + let processedCount = 0; + + try { + for await (const _ of setInterval( + this.options.sharedWorkerQueueConsumerIntervalMs ?? 500, + null, + { + signal: abortController.signal, + } + )) { + logger.debug(`Processing shared worker queue`, { + processedCount, + lastProcessedAt, + service: this.name, + consumerId, + }); + + const now = performance.now(); + + const [error, results] = await tryCatch(this.#processSharedWorkerQueue(consumerId)); + + if (error) { + logger.error(`Failed to process shared worker queue`, { + error, + service: this.name, + consumerId, + }); + + continue; + } + + const duration = performance.now() - now; + + logger.debug(`Processed shared worker queue`, { + processedCount, + lastProcessedAt, + service: this.name, + duration, + results, + consumerId, + }); + + processedCount++; + lastProcessedAt = Date.now(); + } + } catch (error) { + if (error instanceof Error && error.name !== "AbortError") { + throw error; + } + + logger.debug(`Shared worker queue consumer stopped`, { + service: this.name, + processedCount, + lastProcessedAt, + }); + } + } + + /** + * Dequeue as many messages as possible from queues into the shared worker queue list + */ + async #processSharedWorkerQueue(consumerId: string) { + return this.#trace( + "processSharedWorkerQueue", async (span) => { span.setAttribute(SemanticAttributes.CONSUMER_ID, consumerId); @@ -581,34 +761,36 @@ export class MarQS { let attemptedEnvs = 0; let attemptedQueues = 0; + let messageCount = 0; - // Try each queue in order until we successfully dequeue a message + // Try each queue in order, attempt to dequeue a message from each queue, keep going until we've tried all the queues for (const env of envQueues) { attemptedEnvs++; for (const messageQueue of env.queues) { attemptedQueues++; - const result = await this.#trace( + await this.#trace( "attemptDequeue", - async (innerSpan) => { + async (attemptDequeueSpan) => { try { - innerSpan.setAttributes({ + attemptDequeueSpan.setAttributes({ [SemanticAttributes.QUEUE]: messageQueue, [SemanticAttributes.PARENT_QUEUE]: parentQueue, }); - const messageData = await this.#trace( - "callDequeueMessage", + const messages = await this.#trace( + "callDequeueMessages", async (dequeueSpan) => { dequeueSpan.setAttributes({ [SemanticAttributes.QUEUE]: messageQueue, [SemanticAttributes.PARENT_QUEUE]: parentQueue, }); - return await this.#callDequeueMessage({ + return await this.#callDequeueMessages({ messageQueue, parentQueue, + maxCount: this.options.sharedWorkerQueueMaxMessageCount ?? 10, }); }, { @@ -620,76 +802,38 @@ export class MarQS { } ); - if (!messageData) { + if (!messages || messages.length === 0) { + attemptDequeueSpan.setAttribute("message_count", 0); return null; // Try next queue if no message was dequeued } - const message = await this.readMessage(messageData.messageId); - - if (message) { - const attributes = { - [SEMATTRS_MESSAGE_ID]: message.messageId, - [SemanticAttributes.QUEUE]: message.queue, - [SemanticAttributes.MESSAGE_ID]: message.messageId, - [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, - [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, - attempted_queues: attemptedQueues, // How many queues we tried before success - attempted_envs: attemptedEnvs, // How many environments we tried before success - message_timestamp: message.timestamp, - message_age: this.#calculateMessageAge(message), - message_priority: message.priority, - message_enqueue_method: message.enqueueMethod, - message_available_at: message.availableAt, - ...flattenAttributes(message.data, "message.data"), - }; - - span.setAttributes(attributes); - innerSpan.setAttributes(attributes); - - await this.#trace( - "messageDequeued", - async (subscriberSpan) => { - subscriberSpan.setAttributes({ - [SemanticAttributes.MESSAGE_ID]: message.messageId, - [SemanticAttributes.QUEUE]: message.queue, - [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, - }); - - return await this.options.subscriber?.messageDequeued(message); - }, - { - kind: SpanKind.INTERNAL, - attributes: { - [SEMATTRS_MESSAGING_OPERATION]: "receive", - [SEMATTRS_MESSAGING_SYSTEM]: "marqs", - }, - } - ); - - await this.#trace( - "startHeartbeat", - async (heartbeatSpan) => { - heartbeatSpan.setAttributes({ - [SemanticAttributes.MESSAGE_ID]: messageData.messageId, - visibility_timeout_ms: this.visibilityTimeoutInMs, - }); - - return await this.options.visibilityTimeoutStrategy.startHeartbeat( - messageData.messageId, - this.visibilityTimeoutInMs - ); + messageCount += messages.length; + + attemptDequeueSpan.setAttribute("message_count", messages.length); + + await this.#trace( + "addToWorkerQueue", + async (addToWorkerQueueSpan) => { + const workerQueueKey = this.keys.sharedWorkerQueueKey(); + + addToWorkerQueueSpan.setAttributes({ + message_count: messages.length, + [SemanticAttributes.PARENT_QUEUE]: workerQueueKey, + }); + + await this.redis.rpush( + workerQueueKey, + ...messages.map((message) => message.messageId) + ); + }, + { + kind: SpanKind.INTERNAL, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", }, - { - kind: SpanKind.INTERNAL, - attributes: { - [SEMATTRS_MESSAGING_OPERATION]: "receive", - [SEMATTRS_MESSAGING_SYSTEM]: "marqs", - }, - } - ); - - return message; - } + } + ); } catch (error) { // Log error but continue trying other queues logger.warn(`[${this.name}] Failed to dequeue from queue ${messageQueue}`, { @@ -706,16 +850,13 @@ export class MarQS { }, } ); - - if (result) { - return result; - } } } // If we get here, we tried all queues but couldn't dequeue a message span.setAttribute("attempted_queues", attemptedQueues); span.setAttribute("attempted_envs", attemptedEnvs); + span.setAttribute("message_count", messageCount); return; }, @@ -1334,12 +1475,14 @@ export class MarQS { } } - async #callDequeueMessage({ + async #callDequeueMessages({ messageQueue, parentQueue, + maxCount, }: { messageQueue: string; parentQueue: string; + maxCount: number; }) { const queueConcurrencyLimitKey = this.keys.queueConcurrencyLimitKeyFromQueue(messageQueue); const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKeyFromQueue(messageQueue); @@ -1349,7 +1492,7 @@ export class MarQS { const queueReserveConcurrencyKey = this.keys.queueReserveConcurrencyKeyFromQueue(messageQueue); const envQueueKey = this.keys.envQueueKeyFromQueue(messageQueue); - logger.debug("Calling dequeueMessage", { + logger.debug("Calling dequeueMessages", { messageQueue, parentQueue, queueConcurrencyLimitKey, @@ -1362,7 +1505,7 @@ export class MarQS { service: this.name, }); - const result = await this.redis.dequeueMessage( + const result = await this.redis.dequeueMessages( messageQueue, parentQueue, queueConcurrencyLimitKey, @@ -1374,7 +1517,8 @@ export class MarQS { envQueueKey, messageQueue, String(Date.now()), - String(this.options.defaultEnvConcurrency) + String(this.options.defaultEnvConcurrency), + String(maxCount) ); if (!result) { @@ -1386,14 +1530,23 @@ export class MarQS { service: this.name, }); - if (result.length !== 2) { - return; + const messages = []; + for (let i = 0; i < result.length; i += 2) { + const messageId = result[i]; + const messageScore = result[i + 1]; + + messages.push({ + messageId, + messageScore, + }); } - return { - messageId: result[0], - messageScore: result[1], - }; + logger.debug("dequeueMessages parsed result", { + messages, + service: this.name, + }); + + return messages.filter(Boolean); } async #callRequeueMessage(message: MessagePayload) { @@ -1824,7 +1977,7 @@ return true `, }); - this.redis.defineCommand("dequeueMessage", { + this.redis.defineCommand("dequeueMessages", { numberOfKeys: 9, lua: ` local queueKey = KEYS[1] @@ -1840,6 +1993,7 @@ local envQueueKey = KEYS[9] local queueName = ARGV[1] local currentTime = tonumber(ARGV[2]) local defaultEnvConcurrencyLimit = ARGV[3] +local maxCount = tonumber(ARGV[4] or '1') -- Check current env concurrency against the limit local envCurrentConcurrency = tonumber(redis.call('SCARD', envCurrentConcurrencyKey) or '0') @@ -1862,27 +2016,38 @@ if queueCurrentConcurrency >= totalQueueConcurrencyLimit then return nil end --- Attempt to dequeue the next message -local messages = redis.call('ZRANGEBYSCORE', queueKey, '-inf', currentTime, 'WITHSCORES', 'LIMIT', 0, 1) +-- Calculate how many messages we can actually dequeue based on concurrency limits +local envAvailableCapacity = totalEnvConcurrencyLimit - envCurrentConcurrency +local queueAvailableCapacity = totalQueueConcurrencyLimit - queueCurrentConcurrency +local actualMaxCount = math.min(maxCount, envAvailableCapacity, queueAvailableCapacity) -if #messages == 0 then +if actualMaxCount <= 0 then return nil end -local messageId = messages[1] -local messageScore = tonumber(messages[2]) +-- Attempt to dequeue messages up to actualMaxCount +local messagesWithScores = redis.call('ZRANGEBYSCORE', queueKey, '-inf', currentTime, 'WITHSCORES', 'LIMIT', 0, actualMaxCount) --- Remove the message from the queue and update concurrency -redis.call('ZREM', queueKey, messageId) -redis.call('ZREM', envQueueKey, messageId) -redis.call('SADD', queueCurrentConcurrencyKey, messageId) -redis.call('SADD', envCurrentConcurrencyKey, messageId) +if #messagesWithScores == 0 then + return nil +end + +local messageIds = {} +for i = 1, #messagesWithScores, 2 do + table.insert(messageIds, messagesWithScores[i]) +end + +-- Remove the messages from the queue and update concurrency +redis.call('ZREM', queueKey, unpack(messageIds)) +redis.call('ZREM', envQueueKey, unpack(messageIds)) +redis.call('SADD', queueCurrentConcurrencyKey, unpack(messageIds)) +redis.call('SADD', envCurrentConcurrencyKey, unpack(messageIds)) -- Remove the message from the reserve concurrency set -redis.call('SREM', envReserveConcurrencyKey, messageId) +redis.call('SREM', envReserveConcurrencyKey, unpack(messageIds)) -- Remove the message from the queue reserve concurrency set -redis.call('SREM', queueReserveConcurrencyKey, messageId) +redis.call('SREM', queueReserveConcurrencyKey, unpack(messageIds)) -- Rebalance the parent queue local earliestMessage = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') @@ -1892,7 +2057,27 @@ else redis.call('ZADD', parentQueueKey, earliestMessage[2], queueName) end -return {messageId, messageScore} -- Return message details +return messagesWithScores + `, + }); + + this.redis.defineCommand("popMessageFromWorkerQueue", { + numberOfKeys: 1, + lua: ` +local workerQueueKey = KEYS[1] + +-- lpop the first message from the worker queue +local messageId = redis.call('LPOP', workerQueueKey) + +-- if there is no messageId, return nil +if not messageId then + return nil +end + +-- get the length of the worker queue +local queueLength = tonumber(redis.call('LLEN', workerQueueKey) or '0') + +return {messageId, queueLength} -- Return message details `, }); @@ -2139,7 +2324,7 @@ declare module "ioredis" { callback?: Callback ): Result; - dequeueMessage( + dequeueMessages( queueKey: string, parentQueueKey: string, queueConcurrencyLimitKey: string, @@ -2152,7 +2337,13 @@ declare module "ioredis" { queueName: string, currentTime: string, defaultEnvConcurrencyLimit: string, - callback?: Callback<[string, string]> + maxCount: string, + callback?: Callback + ): Result; + + popMessageFromWorkerQueue( + workerQueueKey: string, + callback?: Callback<[string, string] | null> ): Result<[string, string] | null, Context>; requeueMessage( @@ -2289,5 +2480,7 @@ function getMarQSClient() { enableRebalancing: !env.MARQS_DISABLE_REBALANCING, maximumNackCount: env.MARQS_MAXIMUM_NACK_COUNT, subscriber: concurrencyTracker, + sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS, + sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT, }); } diff --git a/apps/webapp/app/v3/marqs/marqsKeyProducer.ts b/apps/webapp/app/v3/marqs/marqsKeyProducer.ts index 673d180229..5c9c7238ad 100644 --- a/apps/webapp/app/v3/marqs/marqsKeyProducer.ts +++ b/apps/webapp/app/v3/marqs/marqsKeyProducer.ts @@ -2,6 +2,7 @@ import { MarQSKeyProducer, MarQSKeyProducerEnv, QueueDescriptor } from "./types" const constants = { SHARED_QUEUE: "sharedQueue", + SHARED_WORKER_QUEUE: "sharedWorkerQueue", CURRENT_CONCURRENCY_PART: "currentConcurrency", CONCURRENCY_LIMIT_PART: "concurrency", DISABLED_CONCURRENCY_LIMIT_PART: "disabledConcurrency", @@ -108,6 +109,10 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { return constants.SHARED_QUEUE; } + sharedWorkerQueueKey(): string { + return constants.SHARED_WORKER_QUEUE; + } + queueConcurrencyLimitKeyFromQueue(queue: string) { const descriptor = this.queueDescriptorFromQueue(queue); diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index a075a17847..20abf87b32 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -166,6 +166,7 @@ export class SharedQueueConsumer { private _runningDurationInMs = 0; private _currentMessage: MessagePayload | undefined; private _currentMessageData: SharedQueueMessageBody | undefined; + private _stopWorkerQueueConsumer?: () => void; constructor( private _providerSender: ZodMessageSender, @@ -173,7 +174,7 @@ export class SharedQueueConsumer { ) { this._options = { maximumItemsPerTrace: options.maximumItemsPerTrace ?? 500, - traceTimeoutSeconds: options.traceTimeoutSeconds ?? 10, + traceTimeoutSeconds: options.traceTimeoutSeconds ?? 1, nextTickInterval: options.nextTickInterval ?? 1000, // 1 second interval: options.interval ?? 100, // 100ms }; @@ -233,6 +234,10 @@ export class SharedQueueConsumer { return; } + console.log("❌ Stopping the SharedQueueConsumer"); + + this._stopWorkerQueueConsumer?.(); + logger.debug("Stopping shared queue consumer"); this._enabled = false; @@ -252,6 +257,9 @@ export class SharedQueueConsumer { this._reasonStats = {}; this._actionStats = {}; this._outcomeStats = {}; + this._stopWorkerQueueConsumer = marqs?.startSharedWorkerQueueConsumer(this._id); + + console.log("✅ Started the SharedQueueConsumer"); this.#doWork().finally(() => {}); } @@ -429,7 +437,7 @@ export class SharedQueueConsumer { this._currentMessage = undefined; this._currentMessageData = undefined; - const message = await marqs?.dequeueMessageInSharedQueue(this._id); + const message = await marqs?.dequeueMessageFromSharedWorkerQueue(this._id); if (!message) { return { diff --git a/apps/webapp/app/v3/marqs/types.ts b/apps/webapp/app/v3/marqs/types.ts index 98792a3099..69e75ac44a 100644 --- a/apps/webapp/app/v3/marqs/types.ts +++ b/apps/webapp/app/v3/marqs/types.ts @@ -36,6 +36,7 @@ export interface MarQSKeyProducer { envSharedQueueKey(env: MarQSKeyProducerEnv): string; sharedQueueKey(): string; sharedQueueScanPattern(): string; + sharedWorkerQueueKey(): string; queueCurrentConcurrencyScanPattern(): string; queueConcurrencyLimitKeyFromQueue(queue: string): string; queueCurrentConcurrencyKeyFromQueue(queue: string): string;