Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,9 @@ const EnvironmentSchema = z.object({
LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
LEGACY_RUN_ENGINE_WORKER_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),

LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
Expand Down Expand Up @@ -661,6 +664,7 @@ const EnvironmentSchema = z.object({
COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
COMMON_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
COMMON_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),

COMMON_WORKER_REDIS_HOST: z
.string()
Expand Down Expand Up @@ -699,6 +703,7 @@ const EnvironmentSchema = z.object({
ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100),
ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
ALERTS_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),

ALERTS_WORKER_REDIS_HOST: z
.string()
Expand Down Expand Up @@ -732,8 +737,8 @@ const EnvironmentSchema = z.object({

SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(1),
SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
SCHEDULE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
SCHEDULE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
SCHEDULE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/alertsWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function initializeWorker() {
pollIntervalMs: env.ALERTS_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL,
shutdownTimeoutMs: env.ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS,
logger: new Logger("AlertsWorker", "debug"),
logger: new Logger("AlertsWorker", env.ALERTS_WORKER_LOG_LEVEL),
jobs: {
"v3.deliverAlert": async ({ payload }) => {
const service = new DeliverAlertService();
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/commonWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ function initializeWorker() {
pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL,
shutdownTimeoutMs: env.COMMON_WORKER_SHUTDOWN_TIMEOUT_MS,
logger: new Logger("CommonWorker", "debug"),
logger: new Logger("CommonWorker", env.COMMON_WORKER_LOG_LEVEL),
jobs: {
scheduleEmail: async ({ payload }) => {
await sendEmail(payload);
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/legacyRunEngineWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ function initializeWorker() {
pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL,
shutdownTimeoutMs: env.LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS,
logger: new Logger("LegacyRunEngineWorker", "debug"),
logger: new Logger("LegacyRunEngineWorker", env.LEGACY_RUN_ENGINE_WORKER_LOG_LEVEL),
jobs: {
runHeartbeat: async ({ payload }) => {
const service = new TaskRunHeartbeatFailedService();
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/scheduleEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ function createScheduleEngine() {
},
worker: {
concurrency: env.SCHEDULE_WORKER_CONCURRENCY_LIMIT,
workers: env.SCHEDULE_WORKER_CONCURRENCY_WORKERS,
tasksPerWorker: env.SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER,
pollIntervalMs: env.SCHEDULE_WORKER_POLL_INTERVAL,
shutdownTimeoutMs: env.SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS,
disabled: env.SCHEDULE_WORKER_ENABLED === "0",
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export class RunEngine {
pollIntervalMs: options.worker.pollIntervalMs,
immediatePollIntervalMs: options.worker.immediatePollIntervalMs,
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
logger: new Logger("RunEngineWorker", "debug"),
logger: new Logger("RunEngineWorker", options.logLevel ?? "info"),
jobs: {
finishWaitpoint: async ({ payload }) => {
await this.waitpointSystem.completeWaitpoint({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,27 @@
*/
export function calculateDistributedExecutionTime(
exactScheduleTime: Date,
distributionWindowSeconds: number = 30
distributionWindowSeconds: number = 30,
instanceId?: string
): Date {
// Use the ISO string of the exact schedule time as the seed for consistency
const seed = exactScheduleTime.toISOString();
// Create seed by combining ISO timestamp with optional instanceId
// This ensures different instances get different distributions even with same schedule time
const timeSeed = exactScheduleTime.toISOString();
const seed = instanceId ? `${timeSeed}:${instanceId}` : timeSeed;

// Use a better hash function (FNV-1a variant) for more uniform distribution
let hash = 2166136261; // FNV offset basis (32-bit)

// Create a simple hash from the seed string
let hash = 0;
for (let i = 0; i < seed.length; i++) {
const char = seed.charCodeAt(i);
hash = (hash << 5) - hash + char;
hash = hash & hash; // Convert to 32-bit integer
hash ^= seed.charCodeAt(i);
hash *= 16777619; // FNV prime (32-bit)
// Keep it as 32-bit unsigned integer
hash = hash >>> 0;
}

// Convert hash to a value between 0 and 1
const normalized = Math.abs(hash) / Math.pow(2, 31);
// Convert hash to a value between 0 and 1 using better normalization
// Use the full 32-bit range for better distribution
const normalized = hash / 0xffffffff;

// Calculate offset in milliseconds (0 to distributionWindowSeconds * 1000)
const offsetMs = Math.floor(normalized * distributionWindowSeconds * 1000);
Expand Down
5 changes: 4 additions & 1 deletion internal-packages/schedule-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ export class ScheduleEngine {
catalog: scheduleWorkerCatalog,
concurrency: {
limit: options.worker.concurrency,
workers: options.worker.workers,
tasksPerWorker: options.worker.tasksPerWorker,
},
pollIntervalMs: options.worker.pollIntervalMs,
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
Expand Down Expand Up @@ -590,7 +592,8 @@ export class ScheduleEngine {

const distributedExecutionTime = calculateDistributedExecutionTime(
exactScheduleTime,
this.distributionWindowSeconds
this.distributionWindowSeconds,
instanceId
);

const distributionOffsetMs = exactScheduleTime.getTime() - distributedExecutionTime.getTime();
Expand Down
2 changes: 2 additions & 0 deletions internal-packages/schedule-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ export interface ScheduleEngineOptions {
redis: RedisOptions;
worker: {
concurrency: number;
workers?: number;
tasksPerWorker?: number;
pollIntervalMs?: number;
shutdownTimeoutMs?: number;
disabled?: boolean;
Expand Down
50 changes: 48 additions & 2 deletions packages/redis-worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class Worker<TCatalog extends WorkerCatalog> {

// Launch a number of "worker loops" on the main thread.
for (let i = 0; i < workers; i++) {
this.workerLoops.push(this.runWorkerLoop(`worker-${nanoid(12)}`, tasksPerWorker));
this.workerLoops.push(this.runWorkerLoop(`worker-${nanoid(12)}`, tasksPerWorker, i, workers));
}

this.setupShutdownHandlers();
Expand Down Expand Up @@ -390,14 +390,43 @@ class Worker<TCatalog extends WorkerCatalog> {
* The main loop that each worker runs. It repeatedly polls for items,
* processes them, and then waits before the next iteration.
*/
private async runWorkerLoop(workerId: string, taskCount: number): Promise<void> {
private async runWorkerLoop(
workerId: string,
taskCount: number,
workerIndex: number,
totalWorkers: number
): Promise<void> {
const pollIntervalMs = this.options.pollIntervalMs ?? 1000;
const immediatePollIntervalMs = this.options.immediatePollIntervalMs ?? 100;

// Calculate the delay between starting each worker loop so that they don't all start at the same time.
const delayBetweenWorkers = this.options.pollIntervalMs ?? 1000;
const delay = delayBetweenWorkers * (totalWorkers - workerIndex);
await Worker.delay(delay);

this.logger.info("Starting worker loop", {
workerIndex,
totalWorkers,
delay,
workerId,
taskCount,
pollIntervalMs,
immediatePollIntervalMs,
concurrencyOptions: this.concurrency,
});

while (!this.isShuttingDown) {
// Check overall load. If at capacity, wait a bit before trying to dequeue more.
if (this.limiter.activeCount + this.limiter.pendingCount >= this.concurrency.limit) {
this.logger.debug("Worker at capacity, waiting", {
workerId,
concurrencyOptions: this.concurrency,
activeCount: this.limiter.activeCount,
pendingCount: this.limiter.pendingCount,
});

await Worker.delay(pollIntervalMs);

continue;
}

Expand All @@ -412,10 +441,25 @@ class Worker<TCatalog extends WorkerCatalog> {
);

if (items.length === 0) {
this.logger.debug("No items to dequeue", {
workerId,
concurrencyOptions: this.concurrency,
activeCount: this.limiter.activeCount,
pendingCount: this.limiter.pendingCount,
});

await Worker.delay(pollIntervalMs);
continue;
}

this.logger.info("Dequeued items", {
workerId,
itemCount: items.length,
concurrencyOptions: this.concurrency,
activeCount: this.limiter.activeCount,
pendingCount: this.limiter.pendingCount,
});

// Schedule each item using the limiter.
for (const item of items) {
this.limiter(() => this.processItem(item as AnyQueueItem, items.length, workerId)).catch(
Expand All @@ -433,6 +477,8 @@ class Worker<TCatalog extends WorkerCatalog> {
// Wait briefly before immediately polling again since we processed items
await Worker.delay(immediatePollIntervalMs);
}

this.logger.info("Worker loop finished", { workerId });
}

/**
Expand Down
Loading