From 88326b394e1ab1cafc3c03168ac0de44273f5fb2 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 22 Aug 2025 14:11:15 +0100 Subject: [PATCH 1/2] fix(v3): prevent saturated queues from dominating dequeue attempts by adding a cooloff period of successive failed dequeues --- apps/webapp/app/v3/marqs/index.server.ts | 53 +++++++++++++++++++++++- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 5bd4e495e3..b66eca767e 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -43,7 +43,7 @@ import { MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET, MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS, } from "./constants.server"; -import { setInterval } from "node:timers/promises"; +import { setInterval as setIntervalAsync } from "node:timers/promises"; import { tryCatch } from "@trigger.dev/core/utils"; import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker"; import z from "zod"; @@ -78,6 +78,8 @@ export type MarQSOptions = { subscriber?: MessageQueueSubscriber; sharedWorkerQueueConsumerIntervalMs?: number; sharedWorkerQueueMaxMessageCount?: number; + sharedWorkerQueueCooloffPeriodMs?: number; + sharedWorkerQueueCooloffCountThreshold?: number; eagerDequeuingEnabled?: boolean; workerOptions: { pollIntervalMs?: number; @@ -107,6 +109,9 @@ export class MarQS { public keys: MarQSKeyProducer; #rebalanceWorkers: Array = []; private worker: Worker; + private queueDequeueCooloffPeriod: Map = new Map(); + private queueDequeueCooloffCounts: Map = new Map(); + private clearCooloffPeriodInterval: NodeJS.Timeout; constructor(private readonly options: MarQSOptions) { this.redis = options.redis; @@ -116,6 +121,12 @@ export class MarQS { this.#startRebalanceWorkers(); this.#registerCommands(); + // This will prevent these cooloff maps from growing indefinitely + this.clearCooloffPeriodInterval = setInterval(() => { + this.queueDequeueCooloffCounts.clear(); + this.queueDequeueCooloffPeriod.clear(); + }, 60_000 * 10); // 10 minutes + this.worker = new Worker({ name: "marqs-worker", redisOptions: options.workerOptions.redisOptions, @@ -737,7 +748,7 @@ export class MarQS { let processedCount = 0; try { - for await (const _ of setInterval( + for await (const _ of setIntervalAsync( this.options.sharedWorkerQueueConsumerIntervalMs ?? 500, null, { @@ -821,6 +832,7 @@ export class MarQS { let attemptedEnvs = 0; let attemptedQueues = 0; let messageCount = 0; + let coolOffPeriodCount = 0; // Try each queue in order, attempt to dequeue a message from each queue, keep going until we've tried all the queues for (const env of envQueues) { @@ -829,6 +841,20 @@ export class MarQS { for (const messageQueue of env.queues) { attemptedQueues++; + const cooloffPeriod = this.queueDequeueCooloffPeriod.get(messageQueue); + + // If the queue is in a cooloff period, skip attempting to dequeue from it + if (cooloffPeriod) { + // If the cooloff period is still active, skip attempting to dequeue from it + if (cooloffPeriod > Date.now()) { + coolOffPeriodCount++; + continue; + } else { + // If the cooloff period is over, delete the cooloff period and attempt to dequeue from the queue + this.queueDequeueCooloffPeriod.delete(messageQueue); + } + } + await this.#trace( "attemptDequeue", async (attemptDequeueSpan) => { @@ -862,10 +888,32 @@ export class MarQS { ); if (!messages || messages.length === 0) { + const cooloffCount = this.queueDequeueCooloffCounts.get(messageQueue) ?? 0; + + const cooloffCountThreshold = Math.max( + 10, + this.options.sharedWorkerQueueCooloffCountThreshold ?? 10 + ); // minimum of 10 + + if (cooloffCount >= cooloffCountThreshold) { + // If no messages were dequeued, set a cooloff period for the queue + // This is to prevent the queue from being dequeued too frequently + // and to give other queues a chance to dequeue messages more frequently + this.queueDequeueCooloffPeriod.set( + messageQueue, + Date.now() + (this.options.sharedWorkerQueueCooloffPeriodMs ?? 10_000) // defaults to 10 seconds + ); + this.queueDequeueCooloffCounts.delete(messageQueue); + } else { + this.queueDequeueCooloffCounts.set(messageQueue, cooloffCount + 1); + } + attemptDequeueSpan.setAttribute("message_count", 0); return null; // Try next queue if no message was dequeued } + this.queueDequeueCooloffCounts.delete(messageQueue); + messageCount += messages.length; attemptDequeueSpan.setAttribute("message_count", messages.length); @@ -916,6 +964,7 @@ export class MarQS { span.setAttribute("attempted_queues", attemptedQueues); span.setAttribute("attempted_envs", attemptedEnvs); span.setAttribute("message_count", messageCount); + span.setAttribute("cooloff_period_count", coolOffPeriodCount); return; }, From dafbf6756b3eac37a17c79d5b5044e24a4702742 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 22 Aug 2025 14:37:00 +0100 Subject: [PATCH 2/2] Add env vars and add a marqs shutdown on SIGTERM/INT --- apps/webapp/app/env.server.ts | 2 ++ apps/webapp/app/v3/marqs/index.server.ts | 43 ++++++++++++++++-------- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index bdf0f49d39..fdd343c90b 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -421,6 +421,8 @@ const EnvironmentSchema = z.object({ MARQS_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().default(100), MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS: z.coerce.number().int().default(100), MARQS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10), + MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(5_000), PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(), diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index b66eca767e..89dfa1e3ff 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -1,3 +1,4 @@ +import { type RedisOptions } from "@internal/redis"; import { context, propagation, @@ -8,21 +9,32 @@ import { trace, Tracer, } from "@opentelemetry/api"; -import { type RedisOptions } from "@internal/redis"; import { SEMATTRS_MESSAGE_ID, - SEMATTRS_MESSAGING_SYSTEM, SEMATTRS_MESSAGING_OPERATION, + SEMATTRS_MESSAGING_SYSTEM, } from "@opentelemetry/semantic-conventions"; +import { Logger } from "@trigger.dev/core/logger"; +import { tryCatch } from "@trigger.dev/core/utils"; import { flattenAttributes } from "@trigger.dev/core/v3"; +import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker"; import Redis, { type Callback, type Result } from "ioredis"; +import { setInterval as setIntervalAsync } from "node:timers/promises"; +import z from "zod"; import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; +import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server"; import { concurrencyTracker } from "../services/taskRunConcurrencyTracker.server"; import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server"; import { AsyncWorker } from "./asyncWorker.server"; +import { + MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS, + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET, + MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET, + MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS, +} from "./constants.server"; import { FairDequeuingStrategy } from "./fairDequeuingStrategy.server"; import { MarQSShortKeyProducer } from "./marqsKeyProducer"; import { @@ -36,18 +48,6 @@ import { VisibilityTimeoutStrategy, } from "./types"; import { V3LegacyRunEngineWorkerVisibilityTimeout } from "./v3VisibilityTimeout.server"; -import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server"; -import { - MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS, - MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET, - MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET, - MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS, -} from "./constants.server"; -import { setInterval as setIntervalAsync } from "node:timers/promises"; -import { tryCatch } from "@trigger.dev/core/utils"; -import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker"; -import z from "zod"; -import { Logger } from "@trigger.dev/core/logger"; const KEY_PREFIX = "marqs:"; @@ -146,6 +146,19 @@ export class MarQS { if (options.workerOptions?.enabled) { this.worker.start(); } + + this.#setupShutdownHandlers(); + } + + #setupShutdownHandlers() { + process.on("SIGTERM", () => this.shutdown("SIGTERM")); + process.on("SIGINT", () => this.shutdown("SIGINT")); + } + + async shutdown(signal: NodeJS.Signals) { + console.log("👇 Shutting down marqs", this.name, signal); + clearInterval(this.clearCooloffPeriodInterval); + this.#rebalanceWorkers.forEach((worker) => worker.stop()); } get name() { @@ -2663,6 +2676,8 @@ function getMarQSClient() { sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS, sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT, eagerDequeuingEnabled: env.MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED === "1", + sharedWorkerQueueCooloffCountThreshold: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD, + sharedWorkerQueueCooloffPeriodMs: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS, workerOptions: { enabled: env.MARQS_WORKER_ENABLED === "1", pollIntervalMs: env.MARQS_WORKER_POLL_INTERVAL_MS,