Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/orange-pens-smile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/redis-worker": patch
---

Now each worker gets it's own pLimit concurrency limiter, and we will only ever dequeue items where there is concurrency capacity, preventing incorrectly retried jobs due to visibility timeout expiry
8 changes: 4 additions & 4 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ const EnvironmentSchema = z.object({
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
LEGACY_RUN_ENGINE_WORKER_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
Expand Down Expand Up @@ -683,7 +683,7 @@ const EnvironmentSchema = z.object({
COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
COMMON_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
COMMON_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),

Expand Down Expand Up @@ -722,7 +722,7 @@ const EnvironmentSchema = z.object({
BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
BATCH_TRIGGER_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
BATCH_TRIGGER_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(20),
BATCH_TRIGGER_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
BATCH_TRIGGER_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),

Expand Down Expand Up @@ -763,7 +763,7 @@ const EnvironmentSchema = z.object({
ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
ALERTS_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100),
ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
ALERTS_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),

Expand Down
70 changes: 42 additions & 28 deletions packages/redis-worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class Worker<TCatalog extends WorkerCatalog> {
private shutdownTimeoutMs: number;

// The p-limit limiter to control overall concurrency.
private limiter: ReturnType<typeof pLimit>;
private limiters: Record<string, ReturnType<typeof pLimit>> = {};

constructor(private options: WorkerOptions<TCatalog>) {
this.logger = options.logger ?? new Logger("Worker", "debug");
Expand All @@ -138,9 +138,6 @@ class Worker<TCatalog extends WorkerCatalog> {
const { workers = 1, tasksPerWorker = 1, limit = 10 } = options.concurrency ?? {};
this.concurrency = { workers, tasksPerWorker, limit };

// Create a p-limit instance using this limit.
this.limiter = pLimit(this.concurrency.limit);

const masterQueueObservableGauge = this.meter.createObservableGauge("redis_worker.queue.size", {
description: "The number of items in the queue",
unit: "items",
Expand Down Expand Up @@ -203,15 +200,21 @@ class Worker<TCatalog extends WorkerCatalog> {
}

async #updateConcurrencyLimitActiveMetric(observableResult: ObservableResult<Attributes>) {
observableResult.observe(this.limiter.activeCount, {
worker_name: this.options.name,
});
for (const [workerId, limiter] of Object.entries(this.limiters)) {
observableResult.observe(limiter.activeCount, {
worker_name: this.options.name,
worker_id: workerId,
});
}
}

async #updateConcurrencyLimitPendingMetric(observableResult: ObservableResult<Attributes>) {
observableResult.observe(this.limiter.pendingCount, {
worker_name: this.options.name,
});
for (const [workerId, limiter] of Object.entries(this.limiters)) {
observableResult.observe(limiter.pendingCount, {
worker_name: this.options.name,
worker_id: workerId,
});
}
}

public start() {
Expand Down Expand Up @@ -417,6 +420,9 @@ class Worker<TCatalog extends WorkerCatalog> {
workerIndex: number,
totalWorkers: number
): Promise<void> {
const limiter = pLimit(this.concurrency.limit);
this.limiters[workerId] = limiter;

const pollIntervalMs = this.options.pollIntervalMs ?? 1000;
const immediatePollIntervalMs = this.options.immediatePollIntervalMs ?? 100;

Expand All @@ -438,35 +444,42 @@ class Worker<TCatalog extends WorkerCatalog> {

while (!this.isShuttingDown) {
// Check overall load. If at capacity, wait a bit before trying to dequeue more.
if (this.limiter.activeCount + this.limiter.pendingCount >= this.concurrency.limit) {
if (limiter.activeCount + limiter.pendingCount >= this.concurrency.limit) {
this.logger.debug("Worker at capacity, waiting", {
workerId,
concurrencyOptions: this.concurrency,
activeCount: this.limiter.activeCount,
pendingCount: this.limiter.pendingCount,
activeCount: limiter.activeCount,
pendingCount: limiter.pendingCount,
});

await Worker.delay(pollIntervalMs);

continue;
}

// If taskCount is 10, concurrency limit is 100, and there are 98 active workers, we should dequeue 2 items at most.
// If taskCount is 10, concurrency limit is 100, and there are 12 active workers, we should dequeue 10 items at most.
const $taskCount = Math.min(
taskCount,
this.concurrency.limit - limiter.activeCount - limiter.pendingCount
);

try {
const items = await this.withHistogram(
this.metrics.dequeueDuration,
this.queue.dequeue(taskCount),
this.queue.dequeue($taskCount),
{
worker_id: workerId,
task_count: taskCount,
task_count: $taskCount,
}
);

if (items.length === 0) {
this.logger.debug("No items to dequeue", {
workerId,
concurrencyOptions: this.concurrency,
activeCount: this.limiter.activeCount,
pendingCount: this.limiter.pendingCount,
activeCount: limiter.activeCount,
pendingCount: limiter.pendingCount,
});

await Worker.delay(pollIntervalMs);
Expand All @@ -477,17 +490,17 @@ class Worker<TCatalog extends WorkerCatalog> {
workerId,
itemCount: items.length,
concurrencyOptions: this.concurrency,
activeCount: this.limiter.activeCount,
pendingCount: this.limiter.pendingCount,
activeCount: limiter.activeCount,
pendingCount: limiter.pendingCount,
});

// Schedule each item using the limiter.
for (const item of items) {
this.limiter(() => this.processItem(item as AnyQueueItem, items.length, workerId)).catch(
(err) => {
this.logger.error("Unhandled error in processItem:", { error: err, workerId, item });
}
);
limiter(() =>
this.processItem(item as AnyQueueItem, items.length, workerId, limiter)
).catch((err) => {
this.logger.error("Unhandled error in processItem:", { error: err, workerId, item });
});
}
} catch (error) {
this.logger.error("Error dequeuing items:", { name: this.options.name, error });
Expand All @@ -508,7 +521,8 @@ class Worker<TCatalog extends WorkerCatalog> {
private async processItem(
{ id, job, item, visibilityTimeoutMs, attempt, timestamp, deduplicationKey }: AnyQueueItem,
batchSize: number,
workerId: string
workerId: string,
limiter: ReturnType<typeof pLimit>
): Promise<void> {
const catalogItem = this.options.catalog[job as any];
const handler = this.jobs[job as any];
Expand Down Expand Up @@ -553,9 +567,9 @@ class Worker<TCatalog extends WorkerCatalog> {
job_timestamp: timestamp.getTime(),
job_age_in_ms: Date.now() - timestamp.getTime(),
worker_id: workerId,
worker_limit_concurrency: this.limiter.concurrency,
worker_limit_active: this.limiter.activeCount,
worker_limit_pending: this.limiter.pendingCount,
worker_limit_concurrency: limiter.concurrency,
worker_limit_active: limiter.activeCount,
worker_limit_pending: limiter.pendingCount,
worker_name: this.options.name,
batch_size: batchSize,
},
Expand Down