Skip to content

Commit bdaa2ed

Browse files
authored
Move batch trigger processing jobs to a dedicated redis worker (#2233)
1 parent afc9a6a commit bdaa2ed

File tree

5 files changed

+139
-4
lines changed

5 files changed

+139
-4
lines changed

apps/webapp/app/env.server.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,47 @@ const EnvironmentSchema = z.object({
717717
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
718718
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
719719

720+
BATCH_TRIGGER_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
721+
BATCH_TRIGGER_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
722+
BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
723+
BATCH_TRIGGER_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
724+
BATCH_TRIGGER_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
725+
BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
726+
BATCH_TRIGGER_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
727+
BATCH_TRIGGER_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
728+
729+
BATCH_TRIGGER_WORKER_REDIS_HOST: z
730+
.string()
731+
.optional()
732+
.transform((v) => v ?? process.env.REDIS_HOST),
733+
BATCH_TRIGGER_WORKER_REDIS_READER_HOST: z
734+
.string()
735+
.optional()
736+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
737+
BATCH_TRIGGER_WORKER_REDIS_READER_PORT: z.coerce
738+
.number()
739+
.optional()
740+
.transform(
741+
(v) =>
742+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
743+
),
744+
BATCH_TRIGGER_WORKER_REDIS_PORT: z.coerce
745+
.number()
746+
.optional()
747+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
748+
BATCH_TRIGGER_WORKER_REDIS_USERNAME: z
749+
.string()
750+
.optional()
751+
.transform((v) => v ?? process.env.REDIS_USERNAME),
752+
BATCH_TRIGGER_WORKER_REDIS_PASSWORD: z
753+
.string()
754+
.optional()
755+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
756+
BATCH_TRIGGER_WORKER_REDIS_TLS_DISABLED: z
757+
.string()
758+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
759+
BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
760+
720761
ALERTS_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
721762
ALERTS_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
722763
ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { env } from "~/env.server";
1414
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1515
import { logger } from "~/services/logger.server";
1616
import { getEntitlement } from "~/services/platform.v3.server";
17-
import { commonWorker } from "~/v3/commonWorker.server";
17+
import { batchTriggerWorker } from "~/v3/batchTriggerWorker.server";
1818
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server";
1919
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
2020
import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server";
@@ -314,7 +314,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
314314
}
315315

316316
async #enqueueBatchTaskRun(options: BatchProcessingOptions) {
317-
await commonWorker.enqueue({
317+
await batchTriggerWorker.enqueue({
318318
id: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`,
319319
job: "runengine.processBatchTaskRun",
320320
payload: options,
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import { Logger } from "@trigger.dev/core/logger";
2+
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
3+
import { z } from "zod";
4+
import { env } from "~/env.server";
5+
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
6+
import { logger } from "~/services/logger.server";
7+
import { singleton } from "~/utils/singleton";
8+
import { BatchTriggerV3Service } from "./services/batchTriggerV3.server";
9+
10+
function initializeWorker() {
11+
const redisOptions = {
12+
keyPrefix: "batch-trigger:worker:",
13+
host: env.BATCH_TRIGGER_WORKER_REDIS_HOST,
14+
port: env.BATCH_TRIGGER_WORKER_REDIS_PORT,
15+
username: env.BATCH_TRIGGER_WORKER_REDIS_USERNAME,
16+
password: env.BATCH_TRIGGER_WORKER_REDIS_PASSWORD,
17+
enableAutoPipelining: true,
18+
...(env.BATCH_TRIGGER_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
19+
};
20+
21+
logger.debug(
22+
`👨‍🏭 Initializing batch trigger worker at host ${env.BATCH_TRIGGER_WORKER_REDIS_HOST}`
23+
);
24+
25+
const worker = new RedisWorker({
26+
name: "batch-trigger-worker",
27+
redisOptions,
28+
catalog: {
29+
"v3.processBatchTaskRun": {
30+
schema: z.object({
31+
batchId: z.string(),
32+
processingId: z.string(),
33+
range: z.object({ start: z.number().int(), count: z.number().int() }),
34+
attemptCount: z.number().int(),
35+
strategy: z.enum(["sequential", "parallel"]),
36+
}),
37+
visibilityTimeoutMs: 60_000,
38+
retry: {
39+
maxAttempts: 5,
40+
},
41+
},
42+
"runengine.processBatchTaskRun": {
43+
schema: z.object({
44+
batchId: z.string(),
45+
processingId: z.string(),
46+
range: z.object({ start: z.number().int(), count: z.number().int() }),
47+
attemptCount: z.number().int(),
48+
strategy: z.enum(["sequential", "parallel"]),
49+
parentRunId: z.string().optional(),
50+
resumeParentOnCompletion: z.boolean().optional(),
51+
}),
52+
visibilityTimeoutMs: 60_000,
53+
retry: {
54+
maxAttempts: 5,
55+
},
56+
},
57+
},
58+
concurrency: {
59+
workers: env.BATCH_TRIGGER_WORKER_CONCURRENCY_WORKERS,
60+
tasksPerWorker: env.BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER,
61+
limit: env.BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT,
62+
},
63+
pollIntervalMs: env.BATCH_TRIGGER_WORKER_POLL_INTERVAL,
64+
immediatePollIntervalMs: env.BATCH_TRIGGER_WORKER_IMMEDIATE_POLL_INTERVAL,
65+
shutdownTimeoutMs: env.BATCH_TRIGGER_WORKER_SHUTDOWN_TIMEOUT_MS,
66+
logger: new Logger("BatchTriggerWorker", env.BATCH_TRIGGER_WORKER_LOG_LEVEL),
67+
jobs: {
68+
"v3.processBatchTaskRun": async ({ payload }) => {
69+
const service = new BatchTriggerV3Service(payload.strategy);
70+
await service.processBatchTaskRun(payload);
71+
},
72+
"runengine.processBatchTaskRun": async ({ payload }) => {
73+
const service = new RunEngineBatchTriggerService(payload.strategy);
74+
await service.processBatchTaskRun(payload);
75+
},
76+
},
77+
});
78+
79+
if (env.BATCH_TRIGGER_WORKER_ENABLED === "true") {
80+
logger.debug(
81+
`👨‍🏭 Starting batch trigger worker at host ${env.BATCH_TRIGGER_WORKER_REDIS_HOST}, pollInterval = ${env.BATCH_TRIGGER_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.BATCH_TRIGGER_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.BATCH_TRIGGER_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT}`
82+
);
83+
84+
worker.start();
85+
}
86+
87+
return worker;
88+
}
89+
90+
export const batchTriggerWorker = singleton("batchTriggerWorker", initializeWorker);

apps/webapp/app/v3/commonWorker.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ function initializeWorker() {
114114
maxAttempts: 5,
115115
},
116116
},
117+
// @deprecated, moved to batchTriggerWorker.server.ts
117118
"v3.processBatchTaskRun": {
118119
schema: z.object({
119120
batchId: z.string(),
@@ -127,6 +128,7 @@ function initializeWorker() {
127128
maxAttempts: 5,
128129
},
129130
},
131+
// @deprecated, moved to batchTriggerWorker.server.ts
130132
"runengine.processBatchTaskRun": {
131133
schema: z.object({
132134
batchId: z.string(),
@@ -229,10 +231,12 @@ function initializeWorker() {
229231
const service = new CancelDevSessionRunsService();
230232
await service.call(payload);
231233
},
234+
// @deprecated, moved to batchTriggerWorker.server.ts
232235
"v3.processBatchTaskRun": async ({ payload }) => {
233236
const service = new BatchTriggerV3Service(payload.strategy);
234237
await service.processBatchTaskRun(payload);
235238
},
239+
// @deprecated, moved to batchTriggerWorker.server.ts
236240
"runengine.processBatchTaskRun": async ({ payload }) => {
237241
const service = new RunEngineBatchTriggerService(payload.strategy);
238242
await service.processBatchTaskRun(payload);

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server";
2020
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2121
import { logger } from "~/services/logger.server";
2222
import { getEntitlement } from "~/services/platform.v3.server";
23-
import { commonWorker } from "../commonWorker.server";
23+
import { batchTriggerWorker } from "../batchTriggerWorker.server";
2424
import { generateFriendlyId } from "../friendlyIdentifiers";
2525
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
2626
import { marqs } from "../marqs/index.server";
@@ -892,7 +892,7 @@ export class BatchTriggerV3Service extends BaseService {
892892
}
893893

894894
async #enqueueBatchTaskRun(options: BatchProcessingOptions) {
895-
await commonWorker.enqueue({
895+
await batchTriggerWorker.enqueue({
896896
id: `BatchTriggerV2Service.process:${options.batchId}:${options.processingId}`,
897897
job: "v3.processBatchTaskRun",
898898
payload: options,

0 commit comments

Comments
 (0)