Skip to content

Commit dafbf67

Browse files
committed
Add env vars and add a marqs shutdown on SIGTERM/INT
1 parent 88326b3 commit dafbf67

File tree

2 files changed

+31
-14
lines changed

2 files changed

+31
-14
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,8 @@ const EnvironmentSchema = z.object({
421421
MARQS_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
422422
MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
423423
MARQS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
424+
MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10),
425+
MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(5_000),
424426

425427
PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
426428

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { type RedisOptions } from "@internal/redis";
12
import {
23
context,
34
propagation,
@@ -8,21 +9,32 @@ import {
89
trace,
910
Tracer,
1011
} from "@opentelemetry/api";
11-
import { type RedisOptions } from "@internal/redis";
1212
import {
1313
SEMATTRS_MESSAGE_ID,
14-
SEMATTRS_MESSAGING_SYSTEM,
1514
SEMATTRS_MESSAGING_OPERATION,
15+
SEMATTRS_MESSAGING_SYSTEM,
1616
} from "@opentelemetry/semantic-conventions";
17+
import { Logger } from "@trigger.dev/core/logger";
18+
import { tryCatch } from "@trigger.dev/core/utils";
1719
import { flattenAttributes } from "@trigger.dev/core/v3";
20+
import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker";
1821
import Redis, { type Callback, type Result } from "ioredis";
22+
import { setInterval as setIntervalAsync } from "node:timers/promises";
23+
import z from "zod";
1924
import { env } from "~/env.server";
2025
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2126
import { logger } from "~/services/logger.server";
2227
import { singleton } from "~/utils/singleton";
28+
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
2329
import { concurrencyTracker } from "../services/taskRunConcurrencyTracker.server";
2430
import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
2531
import { AsyncWorker } from "./asyncWorker.server";
32+
import {
33+
MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS,
34+
MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET,
35+
MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET,
36+
MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS,
37+
} from "./constants.server";
2638
import { FairDequeuingStrategy } from "./fairDequeuingStrategy.server";
2739
import { MarQSShortKeyProducer } from "./marqsKeyProducer";
2840
import {
@@ -36,18 +48,6 @@ import {
3648
VisibilityTimeoutStrategy,
3749
} from "./types";
3850
import { V3LegacyRunEngineWorkerVisibilityTimeout } from "./v3VisibilityTimeout.server";
39-
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
40-
import {
41-
MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS,
42-
MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET,
43-
MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET,
44-
MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS,
45-
} from "./constants.server";
46-
import { setInterval as setIntervalAsync } from "node:timers/promises";
47-
import { tryCatch } from "@trigger.dev/core/utils";
48-
import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker";
49-
import z from "zod";
50-
import { Logger } from "@trigger.dev/core/logger";
5151

5252
const KEY_PREFIX = "marqs:";
5353

@@ -146,6 +146,19 @@ export class MarQS {
146146
if (options.workerOptions?.enabled) {
147147
this.worker.start();
148148
}
149+
150+
this.#setupShutdownHandlers();
151+
}
152+
153+
#setupShutdownHandlers() {
154+
process.on("SIGTERM", () => this.shutdown("SIGTERM"));
155+
process.on("SIGINT", () => this.shutdown("SIGINT"));
156+
}
157+
158+
async shutdown(signal: NodeJS.Signals) {
159+
console.log("👇 Shutting down marqs", this.name, signal);
160+
clearInterval(this.clearCooloffPeriodInterval);
161+
this.#rebalanceWorkers.forEach((worker) => worker.stop());
149162
}
150163

151164
get name() {
@@ -2663,6 +2676,8 @@ function getMarQSClient() {
26632676
sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS,
26642677
sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT,
26652678
eagerDequeuingEnabled: env.MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED === "1",
2679+
sharedWorkerQueueCooloffCountThreshold: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD,
2680+
sharedWorkerQueueCooloffPeriodMs: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS,
26662681
workerOptions: {
26672682
enabled: env.MARQS_WORKER_ENABLED === "1",
26682683
pollIntervalMs: env.MARQS_WORKER_POLL_INTERVAL_MS,

0 commit comments

Comments
 (0)