diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 2ebec63833..cd9bf5bead 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -6,6 +6,7 @@ import { AdditionalEnvVars, BoolEnv } from "./envUtil.js"; const Env = z.object({ // This will come from `spec.nodeName` in k8s TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), + TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), // Required settings TRIGGER_API_URL: z.string().url(), @@ -31,7 +32,8 @@ const Env = z.object({ // Dequeue settings (provider mode) TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"), - TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000), + TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250), + TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000), TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10), TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(1), diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index a7fc480c7f..b32e4b00ef 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -116,10 +116,12 @@ class ManagedSupervisor { instanceName: env.TRIGGER_WORKER_INSTANCE_NAME, managedWorkerSecret: env.MANAGED_WORKER_SECRET, dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS, + dequeueIdleIntervalMs: env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS, queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED, maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT, maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT, runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, + heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS, preDequeue: async () => { if (this.isKubernetes) { // Not used in k8s for now diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts index ed2dfca78f..273c7bbe0a 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts @@ -4,7 +4,8 @@ import { PreDequeueFn, PreSkipFn } from "./types.js"; type RunQueueConsumerOptions = { client: SupervisorHttpClient; - intervalMs?: number; + intervalMs: number; + idleIntervalMs: number; preDequeue?: PreDequeueFn; preSkip?: PreSkipFn; maxRunCount?: number; @@ -19,11 +20,13 @@ export class RunQueueConsumer { private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; private intervalMs: number; + private idleIntervalMs: number; private isEnabled: boolean; constructor(opts: RunQueueConsumerOptions) { this.isEnabled = false; - this.intervalMs = opts.intervalMs ?? 5_000; + this.intervalMs = opts.intervalMs; + this.idleIntervalMs = opts.idleIntervalMs; this.preDequeue = opts.preDequeue; this.preSkip = opts.preSkip; this.maxRunCount = opts.maxRunCount; @@ -84,9 +87,11 @@ export class RunQueueConsumer { } } - return this.scheduleNextDequeue(); + return this.scheduleNextDequeue(this.idleIntervalMs); } + let nextIntervalMs = this.idleIntervalMs; + try { const response = await this.client.dequeue({ maxResources: preDequeueResult?.maxResources, @@ -98,6 +103,10 @@ export class RunQueueConsumer { } else { try { await this.onDequeue(response.data); + + if (response.data.length > 0) { + nextIntervalMs = this.intervalMs; + } } catch (handlerError) { console.error("[RunQueueConsumer] onDequeue error", { error: handlerError }); } @@ -106,10 +115,10 @@ export class RunQueueConsumer { console.error("[RunQueueConsumer] client.dequeue error", { error: clientError }); } - this.scheduleNextDequeue(); + this.scheduleNextDequeue(nextIntervalMs); } - scheduleNextDequeue(delay: number = this.intervalMs) { - setTimeout(this.dequeue.bind(this), delay); + scheduleNextDequeue(delayMs: number) { + setTimeout(this.dequeue.bind(this), delayMs); } } diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index 6cc2bfc8f1..3f19bb97de 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -13,8 +13,9 @@ import { IntervalService } from "../../utils/interval.js"; type SupervisorSessionOptions = SupervisorClientCommonOptions & { queueConsumerEnabled?: boolean; runNotificationsEnabled?: boolean; - heartbeatIntervalSeconds?: number; - dequeueIntervalMs?: number; + heartbeatIntervalSeconds: number; + dequeueIntervalMs: number; + dequeueIdleIntervalMs: number; preDequeue?: PreDequeueFn; preSkip?: PreSkipFn; maxRunCount?: number; @@ -31,7 +32,6 @@ export class SupervisorSession extends EventEmitter { private readonly queueConsumers: RunQueueConsumer[]; private readonly heartbeat: IntervalService; - private readonly heartbeatIntervalSeconds: number; constructor(private opts: SupervisorSessionOptions) { super(); @@ -47,12 +47,11 @@ export class SupervisorSession extends EventEmitter { preSkip: opts.preSkip, onDequeue: this.onDequeue.bind(this), intervalMs: opts.dequeueIntervalMs, + idleIntervalMs: opts.dequeueIdleIntervalMs, maxRunCount: opts.maxRunCount, }); }); - // TODO: This should be dynamic and set by (or at least overridden by) the platform - this.heartbeatIntervalSeconds = opts.heartbeatIntervalSeconds || 30; this.heartbeat = new IntervalService({ onInterval: async () => { console.debug("[SupervisorSession] Sending heartbeat"); @@ -64,7 +63,7 @@ export class SupervisorSession extends EventEmitter { console.error("[SupervisorSession] Heartbeat failed", { error: response.error }); } }, - intervalMs: this.heartbeatIntervalSeconds * 1000, + intervalMs: opts.heartbeatIntervalSeconds * 1000, leadingEdge: false, onError: async (error) => { console.error("[SupervisorSession] Failed to send heartbeat", { error });