diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index d22a2ba7f1..bdf0f49d39 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -413,6 +413,15 @@ const EnvironmentSchema = z.object({ MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(250), MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT: z.coerce.number().int().default(10), + MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED: z.string().default("0"), + MARQS_WORKER_ENABLED: z.string().default("0"), + MARQS_WORKER_COUNT: z.coerce.number().int().default(2), + MARQS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50), + MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(5), + MARQS_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().default(100), + MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS: z.coerce.number().int().default(100), + MARQS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(), VERBOSE_GRAPHILE_LOGGING: z.string().default("false"), diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 98ff128509..5bd4e495e3 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -8,6 +8,7 @@ import { trace, Tracer, } from "@opentelemetry/api"; +import { type RedisOptions } from "@internal/redis"; import { SEMATTRS_MESSAGE_ID, SEMATTRS_MESSAGING_SYSTEM, @@ -44,6 +45,9 @@ import { } from "./constants.server"; import { setInterval } from "node:timers/promises"; import { tryCatch } from "@trigger.dev/core/utils"; +import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker"; +import z from "zod"; +import { Logger } from "@trigger.dev/core/logger"; const KEY_PREFIX = "marqs:"; @@ -74,6 +78,25 @@ export type MarQSOptions = { subscriber?: MessageQueueSubscriber; sharedWorkerQueueConsumerIntervalMs?: number; sharedWorkerQueueMaxMessageCount?: number; + eagerDequeuingEnabled?: boolean; + workerOptions: { + pollIntervalMs?: number; + immediatePollIntervalMs?: number; + shutdownTimeoutMs?: number; + concurrency?: WorkerConcurrencyOptions; + enabled?: boolean; + redisOptions: RedisOptions; + }; +}; + +const workerCatalog = { + processQueueForWorkerQueue: { + schema: z.object({ + queueKey: z.string(), + parentQueueKey: z.string(), + }), + visibilityTimeoutMs: 30_000, + }, }; /** @@ -83,6 +106,7 @@ export class MarQS { private redis: Redis; public keys: MarQSKeyProducer; #rebalanceWorkers: Array = []; + private worker: Worker; constructor(private readonly options: MarQSOptions) { this.redis = options.redis; @@ -91,6 +115,26 @@ export class MarQS { this.#startRebalanceWorkers(); this.#registerCommands(); + + this.worker = new Worker({ + name: "marqs-worker", + redisOptions: options.workerOptions.redisOptions, + catalog: workerCatalog, + concurrency: options.workerOptions?.concurrency, + pollIntervalMs: options.workerOptions?.pollIntervalMs ?? 1000, + immediatePollIntervalMs: options.workerOptions?.immediatePollIntervalMs ?? 100, + shutdownTimeoutMs: options.workerOptions?.shutdownTimeoutMs ?? 10_000, + logger: new Logger("MarQSWorker", "info"), + jobs: { + processQueueForWorkerQueue: async (job) => { + await this.#processQueueForWorkerQueue(job.payload.queueKey, job.payload.parentQueueKey); + }, + }, + }); + + if (options.workerOptions?.enabled) { + this.worker.start(); + } } get name() { @@ -280,6 +324,21 @@ export class MarQS { span.setAttribute("reserve_recursive_queue", reserve.recursiveQueue); } + if (env.type !== "DEVELOPMENT" && this.options.eagerDequeuingEnabled) { + // This will move the message to the worker queue so it can be dequeued + await this.worker.enqueueOnce({ + id: messageQueue, // dedupe by environment, queue, and concurrency key + job: "processQueueForWorkerQueue", + payload: { + queueKey: messageQueue, + parentQueueKey: parentQueue, + }, + // Add a small delay to dedupe messages so at most one of these will processed, + // every 500ms per queue, concurrency key, and environment + availableAt: new Date(Date.now() + 500), // 500ms from now + }); + } + const result = await this.#callEnqueueMessage(messagePayload, reserve); if (result) { @@ -870,6 +929,64 @@ export class MarQS { ); } + async #processQueueForWorkerQueue(queueKey: string, parentQueueKey: string) { + return this.#trace("processQueueForWorkerQueue", async (span) => { + span.setAttributes({ + [SemanticAttributes.QUEUE]: queueKey, + [SemanticAttributes.PARENT_QUEUE]: parentQueueKey, + }); + + const maxCount = this.options.sharedWorkerQueueMaxMessageCount ?? 10; + + const dequeuedMessages = await this.#callDequeueMessages({ + messageQueue: queueKey, + parentQueue: parentQueueKey, + maxCount, + }); + + if (!dequeuedMessages || dequeuedMessages.length === 0) { + return; + } + + await this.#trace( + "addToWorkerQueue", + async (addToWorkerQueueSpan) => { + const workerQueueKey = this.keys.sharedWorkerQueueKey(); + + addToWorkerQueueSpan.setAttributes({ + message_count: dequeuedMessages.length, + [SemanticAttributes.PARENT_QUEUE]: workerQueueKey, + }); + + await this.redis.rpush( + workerQueueKey, + ...dequeuedMessages.map((message) => message.messageId) + ); + }, + { + kind: SpanKind.INTERNAL, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "receive", + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); + + // If we dequeued the max count, we need to enqueue another job to dequeue the next batch + if (dequeuedMessages.length === maxCount) { + await this.worker.enqueueOnce({ + id: queueKey, + job: "processQueueForWorkerQueue", + payload: { + queueKey, + parentQueueKey, + }, + availableAt: new Date(Date.now() + 500), // 500ms from now + }); + } + }); + } + public async acknowledgeMessage(messageId: string, reason: string = "unknown") { return this.#trace( "acknowledgeMessage", @@ -901,6 +1018,20 @@ export class MarQS { messageId, }); + const sharedQueueKey = this.keys.sharedQueueKey(); + + if (this.options.eagerDequeuingEnabled && message.parentQueue === sharedQueueKey) { + await this.worker.enqueueOnce({ + id: message.queue, + job: "processQueueForWorkerQueue", + payload: { + queueKey: message.queue, + parentQueueKey: message.parentQueue, + }, + availableAt: new Date(Date.now() + 500), // 500ms from now + }); + } + await this.options.subscriber?.messageAcked(message); }, { @@ -2482,5 +2613,26 @@ function getMarQSClient() { subscriber: concurrencyTracker, sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS, sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT, + eagerDequeuingEnabled: env.MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED === "1", + workerOptions: { + enabled: env.MARQS_WORKER_ENABLED === "1", + pollIntervalMs: env.MARQS_WORKER_POLL_INTERVAL_MS, + immediatePollIntervalMs: env.MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS, + shutdownTimeoutMs: env.MARQS_WORKER_SHUTDOWN_TIMEOUT_MS, + concurrency: { + workers: env.MARQS_WORKER_COUNT, + tasksPerWorker: env.MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER, + limit: env.MARQS_WORKER_CONCURRENCY_LIMIT, + }, + redisOptions: { + keyPrefix: KEY_PREFIX, + port: env.REDIS_PORT ?? undefined, + host: env.REDIS_HOST ?? undefined, + username: env.REDIS_USERNAME ?? undefined, + password: env.REDIS_PASSWORD ?? undefined, + enableAutoPipelining: true, + ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }, + }, }); }