diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 0b6d5c1d1d..3fc4d0883c 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -212,7 +212,7 @@ export class RunQueue { scanConcurrencySets: { ...workerCatalog.scanConcurrencySets, cron: options.concurrencySweeper?.scanSchedule ?? workerCatalog.scanConcurrencySets.cron, - jitter: + jitterInMs: options.concurrencySweeper?.scanJitterInMs ?? workerCatalog.scanConcurrencySets.jitterInMs, }, diff --git a/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts b/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts index 8a1399911f..342cba674a 100644 --- a/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts @@ -13,7 +13,7 @@ const testOptions = { tracer: trace.getTracer("rq"), workers: 1, defaultEnvConcurrency: 25, - logger: new Logger("RunQueue", "warn"), + logger: new Logger("RunQueue", "debug"), retryOptions: { maxAttempts: 5, factor: 1.1, @@ -59,6 +59,7 @@ describe("RunQueue Concurrency Sweeper", () => { const queue = new RunQueue({ ...testOptions, + logLevel: "debug", queueSelectionStrategy: new FairQueueSelectionStrategy({ redis: { keyPrefix: "runqueue:test:", @@ -67,6 +68,10 @@ describe("RunQueue Concurrency Sweeper", () => { }, keys: testOptions.keys, }), + workerOptions: { + pollIntervalMs: 100, + immediatePollIntervalMs: 100, + }, redis: { keyPrefix: "runqueue:test:", host: redisContainer.getHost(), diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index 3542f305fe..96c41ed900 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -693,9 +693,8 @@ class Worker { const scheduledAt = this.calculateNextScheduledAt(cron, lastTimestamp); const identifier = [job, this.timestampIdentifier(scheduledAt)].join(":"); // Calculate the availableAt date by calculating a random number between -jitter/2 and jitter/2 and adding it to the scheduledAt - const availableAt = jitter - ? new Date(scheduledAt.getTime() + Math.random() * jitter - jitter / 2) - : scheduledAt; + const appliedJitter = typeof jitter === "number" ? Math.random() * jitter - jitter / 2 : 0; + const availableAt = new Date(scheduledAt.getTime() + appliedJitter); const enqueued = await this.enqueueOnce({ id: identifier, @@ -715,6 +714,8 @@ class Worker { scheduledAt, enqueued, availableAt, + appliedJitter, + jitter, }); return {