Skip to content

Commit 799c47a

Browse files
committed
Ensure only a single instance performs concurrency sweeping by using redis-worker cron jobs
1 parent 502918b commit 799c47a

File tree

10 files changed

+508
-390
lines changed

10 files changed

+508
-390
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -428,11 +428,10 @@ const EnvironmentSchema = z.object({
428428
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
429429
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
430430
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),
431-
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_INTERVAL_MS: z.coerce.number().int().default(60_000),
432-
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_INTERVAL_MS: z.coerce.number().int().default(5_000),
433-
RUN_ENGINE_CONCURRENCY_SWEEPER_LOG_LEVEL: z
434-
.enum(["log", "error", "warn", "info", "debug"])
435-
.default("info"),
431+
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE: z.string().optional(),
432+
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE: z.string().optional(),
433+
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER: z.coerce.number().int().optional(),
434+
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER: z.coerce.number().int().optional(),
436435

437436
RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
438437
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,10 @@ function createRunEngine() {
6767
masterQueueConsumersIntervalMs: env.RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS,
6868
masterQueueConsumersDisabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
6969
concurrencySweeper: {
70-
scanIntervalMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_INTERVAL_MS,
71-
processMarkedIntervalMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_INTERVAL_MS,
72-
logLevel: env.RUN_ENGINE_CONCURRENCY_SWEEPER_LOG_LEVEL,
70+
scanSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE,
71+
processMarkedSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE,
72+
scanJitter: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER,
73+
processMarkedJitter: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER,
7374
},
7475
},
7576
runLock: {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,10 @@ export class RunEngine {
138138
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
139139
},
140140
concurrencySweeper: {
141-
enabled: !options.worker.disabled,
142-
scanIntervalMs: options.queue?.concurrencySweeper?.scanIntervalMs ?? 60_000,
143-
processMarkedIntervalMs:
144-
options.queue?.concurrencySweeper?.processMarkedIntervalMs ?? 5_000,
145-
logLevel: options.queue?.concurrencySweeper?.logLevel ?? options.queue?.logLevel,
141+
scanSchedule: options.queue?.concurrencySweeper?.scanSchedule,
142+
processMarkedSchedule: options.queue?.concurrencySweeper?.processMarkedSchedule,
143+
scanJitter: options.queue?.concurrencySweeper?.scanJitter,
144+
processMarkedJitter: options.queue?.concurrencySweeper?.processMarkedJitter,
146145
callback: this.#concurrencySweeperCallback.bind(this),
147146
},
148147
shardCount: options.queue?.shardCount,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ export type RunEngineOptions = {
4646
>;
4747
dequeueBlockingTimeoutSeconds?: number;
4848
concurrencySweeper?: {
49-
scanIntervalMs?: number;
50-
processMarkedIntervalMs?: number;
51-
logLevel?: LogLevel;
49+
scanSchedule?: string;
50+
processMarkedSchedule?: string;
51+
scanJitter?: number;
52+
processMarkedJitter?: number;
5253
};
5354
};
5455
runLock: {

0 commit comments

Comments
 (0)