From 1e305dce567570ac74569000a25e806ebaaf9f56 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 18 Apr 2025 09:10:29 +0100 Subject: [PATCH] fix: prevent unbounded looping while dequeueing by exiting early when no queues have messages to dequeue --- .../run-engine/src/run-queue/index.ts | 19 +++- .../dequeueMessageFromMasterQueue.test.ts | 93 ++++++++++++++++++- 2 files changed, 108 insertions(+), 4 deletions(-) diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 6dc4a3b76e..57ba1b75b7 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -49,7 +49,7 @@ export type RunQueueOptions = { keys: RunQueueKeyProducer; queueSelectionStrategy: RunQueueSelectionStrategy; verbose?: boolean; - logger: Logger; + logger?: Logger; retryOptions?: RetryOptions; }; @@ -88,7 +88,7 @@ export class RunQueue { }); }, }); - this.logger = options.logger; + this.logger = options.logger ?? new Logger("RunQueue", "warn"); this.keys = options.keys; this.queueSelectionStrategy = options.queueSelectionStrategy; @@ -404,11 +404,17 @@ export class RunQueue { tenantQueues[env.envId] = [...env.queues]; // Create a copy of the queues array } + // Track if we successfully dequeued any message in a complete cycle + let successfulDequeueInCycle = false; + // Continue until we've hit max count or all tenants have empty queue lists while ( messages.length < maxCount && Object.values(tenantQueues).some((queues) => queues.length > 0) ) { + // Reset the success flag at the start of each cycle + successfulDequeueInCycle = false; + for (const env of envQueues) { attemptedEnvs++; @@ -428,6 +434,7 @@ export class RunQueue { if (message) { messages.push(message); + successfulDequeueInCycle = true; // Re-add this queue at the end, since it might have more messages tenantQueues[env.envId].push(queue); } @@ -438,6 +445,14 @@ export class RunQueue { break; } } + + // If we completed a full cycle through all tenants with no successful dequeues, + // exit early as we're likely hitting concurrency limits or have no ready messages + if (!successfulDequeueInCycle) { + // IMPORTANT: Keep this log message as it's used in tests + this.logger.log("No successful dequeues in a full cycle, exiting..."); + break; + } } span.setAttributes({ diff --git a/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts b/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts index cb23b0dd39..3d756d7692 100644 --- a/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts @@ -1,6 +1,5 @@ import { redisTest } from "@internal/testcontainers"; import { trace } from "@internal/tracing"; -import { Logger } from "@trigger.dev/core/logger"; import { describe } from "node:test"; import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js"; import { RunQueue } from "../index.js"; @@ -12,7 +11,6 @@ const testOptions = { tracer: trace.getTracer("rq"), workers: 1, defaultEnvConcurrency: 25, - logger: new Logger("RunQueue", "warn"), retryOptions: { maxAttempts: 5, factor: 1.1, @@ -264,4 +262,95 @@ describe("RunQueue.dequeueMessageFromMasterQueue", () => { } } ); + + redisTest( + "should exit early when no messages can be dequeued in a full cycle", + async ({ redisContainer }) => { + const mockLogger = { + log: vi.fn(), + error: vi.fn(), + warn: vi.fn(), + debug: vi.fn(), + name: "test-logger", + level: "debug", + filteredKeys: [], + additionalFields: {}, + setLevel: vi.fn(), + setFilteredKeys: vi.fn(), + setAdditionalFields: vi.fn(), + child: vi.fn(), + }; + + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + // @ts-expect-error + logger: mockLogger, + }); + + try { + const envMasterQueue = `env:${authenticatedEnvDev.id}`; + const queueCount = 10; // Reduced for simplicity + + // First, create all queues and enqueue initial messages + for (let i = 0; i < queueCount; i++) { + const queueName = `${messageDev.queue}_${i}`; + // Set each queue's concurrency limit to 0 (this guarantees dequeue will fail) + await queue.updateQueueConcurrencyLimits(authenticatedEnvDev, queueName, 0); + + // Enqueue a message to each queue + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: { ...messageDev, runId: `r${4321 + i}`, queue: queueName }, + masterQueues: ["main", envMasterQueue], + }); + } + + // Try to dequeue messages - this should exit early due to concurrency limits + const startTime = Date.now(); + const dequeued = await queue.dequeueMessageFromMasterQueue( + "test_12345", + envMasterQueue, + queueCount + ); + const endTime = Date.now(); + + // Verify no messages were dequeued + expect(dequeued.length).toBe(0); + + // Verify the operation completed quickly (under 1000ms) + const duration = endTime - startTime; + expect(duration).toBeLessThan(1000); + + // Verify we only logged one early exit message + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining("No successful dequeues in a full cycle, exiting") + ); + expect(mockLogger.log.mock.calls.length).toBeLessThanOrEqual(2); + + // Verify all messages are still in queues + let totalRemaining = 0; + for (let i = 0; i < queueCount; i++) { + const queueName = `${messageDev.queue}_${i}`; + const length = await queue.lengthOfQueue(authenticatedEnvDev, queueName); + totalRemaining += length; + } + expect(totalRemaining).toBe(queueCount); + } finally { + await queue.quit(); + } + } + ); });