Skip to content

Commit 469a8df

Browse files
authored
Prevent redis worker from enqueuing pending jobs (#2235)
1 parent 922c7cb commit 469a8df

File tree

3 files changed

+51
-32
lines changed

3 files changed

+51
-32
lines changed

.changeset/orange-pens-smile.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Now each worker gets it's own pLimit concurrency limiter, and we will only ever dequeue items where there is concurrency capacity, preventing incorrectly retried jobs due to visibility timeout expiry

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ const EnvironmentSchema = z.object({
637637
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
638638
LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
639639
LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
640-
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
640+
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
641641
LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
642642
LEGACY_RUN_ENGINE_WORKER_LOG_LEVEL: z
643643
.enum(["log", "error", "warn", "info", "debug"])
@@ -683,7 +683,7 @@ const EnvironmentSchema = z.object({
683683
COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
684684
COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
685685
COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
686-
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
686+
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
687687
COMMON_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
688688
COMMON_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
689689

@@ -727,7 +727,7 @@ const EnvironmentSchema = z.object({
727727
BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
728728
BATCH_TRIGGER_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
729729
BATCH_TRIGGER_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
730-
BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
730+
BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(20),
731731
BATCH_TRIGGER_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
732732
BATCH_TRIGGER_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
733733

@@ -768,7 +768,7 @@ const EnvironmentSchema = z.object({
768768
ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
769769
ALERTS_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
770770
ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100),
771-
ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
771+
ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
772772
ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
773773
ALERTS_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
774774

packages/redis-worker/src/worker.ts

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class Worker<TCatalog extends WorkerCatalog> {
113113
private shutdownTimeoutMs: number;
114114

115115
// The p-limit limiter to control overall concurrency.
116-
private limiter: ReturnType<typeof pLimit>;
116+
private limiters: Record<string, ReturnType<typeof pLimit>> = {};
117117

118118
constructor(private options: WorkerOptions<TCatalog>) {
119119
this.logger = options.logger ?? new Logger("Worker", "debug");
@@ -138,9 +138,6 @@ class Worker<TCatalog extends WorkerCatalog> {
138138
const { workers = 1, tasksPerWorker = 1, limit = 10 } = options.concurrency ?? {};
139139
this.concurrency = { workers, tasksPerWorker, limit };
140140

141-
// Create a p-limit instance using this limit.
142-
this.limiter = pLimit(this.concurrency.limit);
143-
144141
const masterQueueObservableGauge = this.meter.createObservableGauge("redis_worker.queue.size", {
145142
description: "The number of items in the queue",
146143
unit: "items",
@@ -203,15 +200,21 @@ class Worker<TCatalog extends WorkerCatalog> {
203200
}
204201

205202
async #updateConcurrencyLimitActiveMetric(observableResult: ObservableResult<Attributes>) {
206-
observableResult.observe(this.limiter.activeCount, {
207-
worker_name: this.options.name,
208-
});
203+
for (const [workerId, limiter] of Object.entries(this.limiters)) {
204+
observableResult.observe(limiter.activeCount, {
205+
worker_name: this.options.name,
206+
worker_id: workerId,
207+
});
208+
}
209209
}
210210

211211
async #updateConcurrencyLimitPendingMetric(observableResult: ObservableResult<Attributes>) {
212-
observableResult.observe(this.limiter.pendingCount, {
213-
worker_name: this.options.name,
214-
});
212+
for (const [workerId, limiter] of Object.entries(this.limiters)) {
213+
observableResult.observe(limiter.pendingCount, {
214+
worker_name: this.options.name,
215+
worker_id: workerId,
216+
});
217+
}
215218
}
216219

217220
public start() {
@@ -417,6 +420,9 @@ class Worker<TCatalog extends WorkerCatalog> {
417420
workerIndex: number,
418421
totalWorkers: number
419422
): Promise<void> {
423+
const limiter = pLimit(this.concurrency.limit);
424+
this.limiters[workerId] = limiter;
425+
420426
const pollIntervalMs = this.options.pollIntervalMs ?? 1000;
421427
const immediatePollIntervalMs = this.options.immediatePollIntervalMs ?? 100;
422428

@@ -438,35 +444,42 @@ class Worker<TCatalog extends WorkerCatalog> {
438444

439445
while (!this.isShuttingDown) {
440446
// Check overall load. If at capacity, wait a bit before trying to dequeue more.
441-
if (this.limiter.activeCount + this.limiter.pendingCount >= this.concurrency.limit) {
447+
if (limiter.activeCount + limiter.pendingCount >= this.concurrency.limit) {
442448
this.logger.debug("Worker at capacity, waiting", {
443449
workerId,
444450
concurrencyOptions: this.concurrency,
445-
activeCount: this.limiter.activeCount,
446-
pendingCount: this.limiter.pendingCount,
451+
activeCount: limiter.activeCount,
452+
pendingCount: limiter.pendingCount,
447453
});
448454

449455
await Worker.delay(pollIntervalMs);
450456

451457
continue;
452458
}
453459

460+
// If taskCount is 10, concurrency limit is 100, and there are 98 active workers, we should dequeue 2 items at most.
461+
// If taskCount is 10, concurrency limit is 100, and there are 12 active workers, we should dequeue 10 items at most.
462+
const $taskCount = Math.min(
463+
taskCount,
464+
this.concurrency.limit - limiter.activeCount - limiter.pendingCount
465+
);
466+
454467
try {
455468
const items = await this.withHistogram(
456469
this.metrics.dequeueDuration,
457-
this.queue.dequeue(taskCount),
470+
this.queue.dequeue($taskCount),
458471
{
459472
worker_id: workerId,
460-
task_count: taskCount,
473+
task_count: $taskCount,
461474
}
462475
);
463476

464477
if (items.length === 0) {
465478
this.logger.debug("No items to dequeue", {
466479
workerId,
467480
concurrencyOptions: this.concurrency,
468-
activeCount: this.limiter.activeCount,
469-
pendingCount: this.limiter.pendingCount,
481+
activeCount: limiter.activeCount,
482+
pendingCount: limiter.pendingCount,
470483
});
471484

472485
await Worker.delay(pollIntervalMs);
@@ -477,17 +490,17 @@ class Worker<TCatalog extends WorkerCatalog> {
477490
workerId,
478491
itemCount: items.length,
479492
concurrencyOptions: this.concurrency,
480-
activeCount: this.limiter.activeCount,
481-
pendingCount: this.limiter.pendingCount,
493+
activeCount: limiter.activeCount,
494+
pendingCount: limiter.pendingCount,
482495
});
483496

484497
// Schedule each item using the limiter.
485498
for (const item of items) {
486-
this.limiter(() => this.processItem(item as AnyQueueItem, items.length, workerId)).catch(
487-
(err) => {
488-
this.logger.error("Unhandled error in processItem:", { error: err, workerId, item });
489-
}
490-
);
499+
limiter(() =>
500+
this.processItem(item as AnyQueueItem, items.length, workerId, limiter)
501+
).catch((err) => {
502+
this.logger.error("Unhandled error in processItem:", { error: err, workerId, item });
503+
});
491504
}
492505
} catch (error) {
493506
this.logger.error("Error dequeuing items:", { name: this.options.name, error });
@@ -508,7 +521,8 @@ class Worker<TCatalog extends WorkerCatalog> {
508521
private async processItem(
509522
{ id, job, item, visibilityTimeoutMs, attempt, timestamp, deduplicationKey }: AnyQueueItem,
510523
batchSize: number,
511-
workerId: string
524+
workerId: string,
525+
limiter: ReturnType<typeof pLimit>
512526
): Promise<void> {
513527
const catalogItem = this.options.catalog[job as any];
514528
const handler = this.jobs[job as any];
@@ -553,9 +567,9 @@ class Worker<TCatalog extends WorkerCatalog> {
553567
job_timestamp: timestamp.getTime(),
554568
job_age_in_ms: Date.now() - timestamp.getTime(),
555569
worker_id: workerId,
556-
worker_limit_concurrency: this.limiter.concurrency,
557-
worker_limit_active: this.limiter.activeCount,
558-
worker_limit_pending: this.limiter.pendingCount,
570+
worker_limit_concurrency: limiter.concurrency,
571+
worker_limit_active: limiter.activeCount,
572+
worker_limit_pending: limiter.pendingCount,
559573
worker_name: this.options.name,
560574
batch_size: batchSize,
561575
},

0 commit comments

Comments
 (0)