diff --git a/.changeset/orange-pens-smile.md b/.changeset/orange-pens-smile.md new file mode 100644 index 0000000000..9a4948cda5 --- /dev/null +++ b/.changeset/orange-pens-smile.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/redis-worker": patch +--- + +Now each worker gets it's own pLimit concurrency limiter, and we will only ever dequeue items where there is concurrency capacity, preventing incorrectly retried jobs due to visibility timeout expiry diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 543f13ddb8..2b6bad2a61 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -637,7 +637,7 @@ const EnvironmentSchema = z.object({ LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1), LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), - LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), + LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50), LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), LEGACY_RUN_ENGINE_WORKER_LOG_LEVEL: z .enum(["log", "error", "warn", "info", "debug"]) @@ -683,7 +683,7 @@ const EnvironmentSchema = z.object({ COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), - COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), + COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50), COMMON_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), COMMON_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), @@ -722,7 +722,7 @@ const EnvironmentSchema = z.object({ BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), BATCH_TRIGGER_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), BATCH_TRIGGER_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), - BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), + BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(20), BATCH_TRIGGER_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), BATCH_TRIGGER_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), @@ -763,7 +763,7 @@ const EnvironmentSchema = z.object({ ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), ALERTS_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100), - ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), + ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50), ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), ALERTS_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index 96c41ed900..58db6bef56 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -113,7 +113,7 @@ class Worker { private shutdownTimeoutMs: number; // The p-limit limiter to control overall concurrency. - private limiter: ReturnType; + private limiters: Record> = {}; constructor(private options: WorkerOptions) { this.logger = options.logger ?? new Logger("Worker", "debug"); @@ -138,9 +138,6 @@ class Worker { const { workers = 1, tasksPerWorker = 1, limit = 10 } = options.concurrency ?? {}; this.concurrency = { workers, tasksPerWorker, limit }; - // Create a p-limit instance using this limit. - this.limiter = pLimit(this.concurrency.limit); - const masterQueueObservableGauge = this.meter.createObservableGauge("redis_worker.queue.size", { description: "The number of items in the queue", unit: "items", @@ -203,15 +200,21 @@ class Worker { } async #updateConcurrencyLimitActiveMetric(observableResult: ObservableResult) { - observableResult.observe(this.limiter.activeCount, { - worker_name: this.options.name, - }); + for (const [workerId, limiter] of Object.entries(this.limiters)) { + observableResult.observe(limiter.activeCount, { + worker_name: this.options.name, + worker_id: workerId, + }); + } } async #updateConcurrencyLimitPendingMetric(observableResult: ObservableResult) { - observableResult.observe(this.limiter.pendingCount, { - worker_name: this.options.name, - }); + for (const [workerId, limiter] of Object.entries(this.limiters)) { + observableResult.observe(limiter.pendingCount, { + worker_name: this.options.name, + worker_id: workerId, + }); + } } public start() { @@ -417,6 +420,9 @@ class Worker { workerIndex: number, totalWorkers: number ): Promise { + const limiter = pLimit(this.concurrency.limit); + this.limiters[workerId] = limiter; + const pollIntervalMs = this.options.pollIntervalMs ?? 1000; const immediatePollIntervalMs = this.options.immediatePollIntervalMs ?? 100; @@ -438,12 +444,12 @@ class Worker { while (!this.isShuttingDown) { // Check overall load. If at capacity, wait a bit before trying to dequeue more. - if (this.limiter.activeCount + this.limiter.pendingCount >= this.concurrency.limit) { + if (limiter.activeCount + limiter.pendingCount >= this.concurrency.limit) { this.logger.debug("Worker at capacity, waiting", { workerId, concurrencyOptions: this.concurrency, - activeCount: this.limiter.activeCount, - pendingCount: this.limiter.pendingCount, + activeCount: limiter.activeCount, + pendingCount: limiter.pendingCount, }); await Worker.delay(pollIntervalMs); @@ -451,13 +457,20 @@ class Worker { continue; } + // If taskCount is 10, concurrency limit is 100, and there are 98 active workers, we should dequeue 2 items at most. + // If taskCount is 10, concurrency limit is 100, and there are 12 active workers, we should dequeue 10 items at most. + const $taskCount = Math.min( + taskCount, + this.concurrency.limit - limiter.activeCount - limiter.pendingCount + ); + try { const items = await this.withHistogram( this.metrics.dequeueDuration, - this.queue.dequeue(taskCount), + this.queue.dequeue($taskCount), { worker_id: workerId, - task_count: taskCount, + task_count: $taskCount, } ); @@ -465,8 +478,8 @@ class Worker { this.logger.debug("No items to dequeue", { workerId, concurrencyOptions: this.concurrency, - activeCount: this.limiter.activeCount, - pendingCount: this.limiter.pendingCount, + activeCount: limiter.activeCount, + pendingCount: limiter.pendingCount, }); await Worker.delay(pollIntervalMs); @@ -477,17 +490,17 @@ class Worker { workerId, itemCount: items.length, concurrencyOptions: this.concurrency, - activeCount: this.limiter.activeCount, - pendingCount: this.limiter.pendingCount, + activeCount: limiter.activeCount, + pendingCount: limiter.pendingCount, }); // Schedule each item using the limiter. for (const item of items) { - this.limiter(() => this.processItem(item as AnyQueueItem, items.length, workerId)).catch( - (err) => { - this.logger.error("Unhandled error in processItem:", { error: err, workerId, item }); - } - ); + limiter(() => + this.processItem(item as AnyQueueItem, items.length, workerId, limiter) + ).catch((err) => { + this.logger.error("Unhandled error in processItem:", { error: err, workerId, item }); + }); } } catch (error) { this.logger.error("Error dequeuing items:", { name: this.options.name, error }); @@ -508,7 +521,8 @@ class Worker { private async processItem( { id, job, item, visibilityTimeoutMs, attempt, timestamp, deduplicationKey }: AnyQueueItem, batchSize: number, - workerId: string + workerId: string, + limiter: ReturnType ): Promise { const catalogItem = this.options.catalog[job as any]; const handler = this.jobs[job as any]; @@ -553,9 +567,9 @@ class Worker { job_timestamp: timestamp.getTime(), job_age_in_ms: Date.now() - timestamp.getTime(), worker_id: workerId, - worker_limit_concurrency: this.limiter.concurrency, - worker_limit_active: this.limiter.activeCount, - worker_limit_pending: this.limiter.pendingCount, + worker_limit_concurrency: limiter.concurrency, + worker_limit_active: limiter.activeCount, + worker_limit_pending: limiter.pendingCount, worker_name: this.options.name, batch_size: batchSize, },