From 33d134f72ef8bda152bd2e8e503b29f3936e6713 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 22 Aug 2025 12:55:05 +0100 Subject: [PATCH 1/2] fix(v3): eagerly dequeue messages from a queue when that queue is added to or removed from (v4 backport) --- apps/webapp/app/env.server.ts | 8 ++ apps/webapp/app/v3/marqs/index.server.ts | 144 +++++++++++++++++++++++ 2 files changed, 152 insertions(+) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index d22a2ba7f1..d5151f9ab0 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -413,6 +413,14 @@ const EnvironmentSchema = z.object({ MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(250), MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT: z.coerce.number().int().default(10), + MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED: z.string().default("0"), + MARQS_WORKER_ENABLED: z.string().default("0"), + MARQS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(10), + MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), + MARQS_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().default(100), + MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS: z.coerce.number().int().default(100), + MARQS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(), VERBOSE_GRAPHILE_LOGGING: z.string().default("false"), diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 98ff128509..49752e517e 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -44,6 +44,9 @@ import { } from "./constants.server"; import { setInterval } from "node:timers/promises"; import { tryCatch } from "@trigger.dev/core/utils"; +import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker"; +import z from "zod"; +import { Logger } from "@trigger.dev/core/logger"; const KEY_PREFIX = "marqs:"; @@ -74,6 +77,24 @@ export type MarQSOptions = { subscriber?: MessageQueueSubscriber; sharedWorkerQueueConsumerIntervalMs?: number; sharedWorkerQueueMaxMessageCount?: number; + eagerDequeuingEnabled?: boolean; + workerOptions: { + pollIntervalMs?: number; + immediatePollIntervalMs?: number; + shutdownTimeoutMs?: number; + concurrency?: WorkerConcurrencyOptions; + enabled?: boolean; + }; +}; + +const workerCatalog = { + processQueueForWorkerQueue: { + schema: z.object({ + queueKey: z.string(), + parentQueueKey: z.string(), + }), + visibilityTimeoutMs: 30_000, + }, }; /** @@ -83,6 +104,7 @@ export class MarQS { private redis: Redis; public keys: MarQSKeyProducer; #rebalanceWorkers: Array = []; + private worker: Worker; constructor(private readonly options: MarQSOptions) { this.redis = options.redis; @@ -91,6 +113,29 @@ export class MarQS { this.#startRebalanceWorkers(); this.#registerCommands(); + + this.worker = new Worker({ + name: "marqs-worker", + redisOptions: { + ...options.redis.options, + keyPrefix: `${options.redis.options.keyPrefix}:worker`, + }, + catalog: workerCatalog, + concurrency: options.workerOptions?.concurrency, + pollIntervalMs: options.workerOptions?.pollIntervalMs ?? 1000, + immediatePollIntervalMs: options.workerOptions?.immediatePollIntervalMs ?? 100, + shutdownTimeoutMs: options.workerOptions?.shutdownTimeoutMs ?? 10_000, + logger: new Logger("MarQSWorker", "info"), + jobs: { + processQueueForWorkerQueue: async (job) => { + await this.#processQueueForWorkerQueue(job.payload.queueKey, job.payload.parentQueueKey); + }, + }, + }); + + if (options.workerOptions?.enabled) { + this.worker.start(); + } } get name() { @@ -280,6 +325,21 @@ export class MarQS { span.setAttribute("reserve_recursive_queue", reserve.recursiveQueue); } + if (env.type !== "DEVELOPMENT" && this.options.eagerDequeuingEnabled) { + // This will move the message to the worker queue so it can be dequeued + await this.worker.enqueueOnce({ + id: messageQueue, // dedupe by environment, queue, and concurrency key + job: "processQueueForWorkerQueue", + payload: { + queueKey: messageQueue, + parentQueueKey: parentQueue, + }, + // Add a small delay to dedupe messages so at most one of these will processed, + // every 500ms per queue, concurrency key, and environment + availableAt: new Date(Date.now() + 500), // 500ms from now + }); + } + const result = await this.#callEnqueueMessage(messagePayload, reserve); if (result) { @@ -870,6 +930,64 @@ export class MarQS { ); } + async #processQueueForWorkerQueue(queueKey: string, parentQueueKey: string) { + return this.#trace("processQueueForWorkerQueue", async (span) => { + span.setAttributes({ + [SemanticAttributes.QUEUE]: queueKey, + [SemanticAttributes.PARENT_QUEUE]: parentQueueKey, + }); + + const maxCount = this.options.sharedWorkerQueueMaxMessageCount ?? 10; + + const dequeuedMessages = await this.#callDequeueMessages({ + messageQueue: queueKey, + parentQueue: parentQueueKey, + maxCount, + }); + + if (!dequeuedMessages || dequeuedMessages.length === 0) { + return; + } + + await this.#trace( + "addToWorkerQueue", + async (addToWorkerQueueSpan) => { + const workerQueueKey = this.keys.sharedWorkerQueueKey(); + + addToWorkerQueueSpan.setAttributes({ + message_count: dequeuedMessages.length, + [SemanticAttributes.PARENT_QUEUE]: workerQueueKey, + }); + + await this.redis.rpush( + workerQueueKey, + ...dequeuedMessages.map((message) => message.messageId) + ); + }, + { + kind: SpanKind.INTERNAL, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); + + // If we dequeued the max count, we need to enqueue another job to dequeue the next batch + if (dequeuedMessages.length === maxCount) { + await this.worker.enqueueOnce({ + id: queueKey, + job: "processQueueForWorkerQueue", + payload: { + queueKey, + parentQueueKey, + }, + availableAt: new Date(Date.now() + 500), // 500ms from now + }); + } + }); + } + public async acknowledgeMessage(messageId: string, reason: string = "unknown") { return this.#trace( "acknowledgeMessage", @@ -901,6 +1019,20 @@ export class MarQS { messageId, }); + const sharedQueueKey = this.keys.sharedQueueKey(); + + if (this.options.eagerDequeuingEnabled && message.parentQueue === sharedQueueKey) { + await this.worker.enqueueOnce({ + id: message.queue, + job: "processQueueForWorkerQueue", + payload: { + queueKey: message.queue, + parentQueueKey: message.parentQueue, + }, + availableAt: new Date(Date.now() + 500), // 500ms from now + }); + } + await this.options.subscriber?.messageAcked(message); }, { @@ -2482,5 +2614,17 @@ function getMarQSClient() { subscriber: concurrencyTracker, sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS, sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT, + eagerDequeuingEnabled: env.MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED === "1", + workerOptions: { + enabled: env.MARQS_WORKER_ENABLED === "1", + pollIntervalMs: env.MARQS_WORKER_POLL_INTERVAL_MS, + immediatePollIntervalMs: env.MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS, + shutdownTimeoutMs: env.MARQS_WORKER_SHUTDOWN_TIMEOUT_MS, + concurrency: { + workers: env.MARQS_WORKER_CONCURRENCY_LIMIT, + tasksPerWorker: env.MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER, + limit: env.MARQS_WORKER_CONCURRENCY_LIMIT, + }, + }, }); } From d44a525a8512ddec6d409090c658e9aee56e054a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 22 Aug 2025 13:20:51 +0100 Subject: [PATCH 2/2] fixed configuration --- apps/webapp/app/env.server.ts | 5 +++-- apps/webapp/app/v3/marqs/index.server.ts | 18 +++++++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index d5151f9ab0..bdf0f49d39 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -415,8 +415,9 @@ const EnvironmentSchema = z.object({ MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED: z.string().default("0"), MARQS_WORKER_ENABLED: z.string().default("0"), - MARQS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(10), - MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), + MARQS_WORKER_COUNT: z.coerce.number().int().default(2), + MARQS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50), + MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(5), MARQS_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().default(100), MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS: z.coerce.number().int().default(100), MARQS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 49752e517e..5bd4e495e3 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -8,6 +8,7 @@ import { trace, Tracer, } from "@opentelemetry/api"; +import { type RedisOptions } from "@internal/redis"; import { SEMATTRS_MESSAGE_ID, SEMATTRS_MESSAGING_SYSTEM, @@ -84,6 +85,7 @@ export type MarQSOptions = { shutdownTimeoutMs?: number; concurrency?: WorkerConcurrencyOptions; enabled?: boolean; + redisOptions: RedisOptions; }; }; @@ -116,10 +118,7 @@ export class MarQS { this.worker = new Worker({ name: "marqs-worker", - redisOptions: { - ...options.redis.options, - keyPrefix: `${options.redis.options.keyPrefix}:worker`, - }, + redisOptions: options.workerOptions.redisOptions, catalog: workerCatalog, concurrency: options.workerOptions?.concurrency, pollIntervalMs: options.workerOptions?.pollIntervalMs ?? 1000, @@ -2621,10 +2620,19 @@ function getMarQSClient() { immediatePollIntervalMs: env.MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS, shutdownTimeoutMs: env.MARQS_WORKER_SHUTDOWN_TIMEOUT_MS, concurrency: { - workers: env.MARQS_WORKER_CONCURRENCY_LIMIT, + workers: env.MARQS_WORKER_COUNT, tasksPerWorker: env.MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER, limit: env.MARQS_WORKER_CONCURRENCY_LIMIT, }, + redisOptions: { + keyPrefix: KEY_PREFIX, + port: env.REDIS_PORT ?? undefined, + host: env.REDIS_HOST ?? undefined, + username: env.REDIS_USERNAME ?? undefined, + password: env.REDIS_PASSWORD ?? undefined, + enableAutoPipelining: true, + ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }, }, }); }