Skip to content

Commit f2db1b8

Browse files
authored
Create dedicated alerts worker (#2184)
* Add a new alerts worker that just handle alerts * fallback to graphile worker when a tx is required
1 parent 3c93783 commit f2db1b8

11 files changed

+201
-38
lines changed

apps/webapp/app/env.server.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,44 @@ const EnvironmentSchema = z.object({
692692
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
693693
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
694694

695+
ALERTS_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
696+
ALERTS_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
697+
ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
698+
ALERTS_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
699+
ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100),
700+
ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
701+
ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
702+
703+
ALERTS_WORKER_REDIS_HOST: z
704+
.string()
705+
.optional()
706+
.transform((v) => v ?? process.env.REDIS_HOST),
707+
ALERTS_WORKER_REDIS_READER_HOST: z
708+
.string()
709+
.optional()
710+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
711+
ALERTS_WORKER_REDIS_READER_PORT: z.coerce
712+
.number()
713+
.optional()
714+
.transform(
715+
(v) =>
716+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
717+
),
718+
ALERTS_WORKER_REDIS_PORT: z.coerce
719+
.number()
720+
.optional()
721+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
722+
ALERTS_WORKER_REDIS_USERNAME: z
723+
.string()
724+
.optional()
725+
.transform((v) => v ?? process.env.REDIS_USERNAME),
726+
ALERTS_WORKER_REDIS_PASSWORD: z
727+
.string()
728+
.optional()
729+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
730+
ALERTS_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
731+
ALERTS_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
732+
695733
SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
696734
SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
697735
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(1),

apps/webapp/app/services/worker.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { logger } from "./logger.server";
3333
const workerCatalog = {
3434
// @deprecated, moved to commonWorker.server.ts
3535
scheduleEmail: DeliverEmailSchema,
36-
// @deprecated, moved to commonWorker.server.ts
36+
// @deprecated, but still used when resuming batch runs in a transaction
3737
"v3.resumeBatchRun": z.object({
3838
batchRunId: z.string(),
3939
}),
@@ -164,7 +164,7 @@ function getWorkerQueue() {
164164
await sendEmail(payload);
165165
},
166166
},
167-
// @deprecated, moved to commonWorker.server.ts
167+
// @deprecated, moved to commonWorker.server.ts but still used when resuming batch runs in a transaction
168168
"v3.resumeBatchRun": {
169169
priority: 0,
170170
maxAttempts: 5,
@@ -215,7 +215,7 @@ function getWorkerQueue() {
215215
});
216216
},
217217
},
218-
// @deprecated, moved to commonWorker.server.ts
218+
// @deprecated, moved to alertsWorker.server.ts
219219
"v3.performTaskRunAlerts": {
220220
priority: 0,
221221
maxAttempts: 3,
@@ -224,7 +224,7 @@ function getWorkerQueue() {
224224
return await service.call(payload.runId);
225225
},
226226
},
227-
// @deprecated, moved to commonWorker.server.ts
227+
// @deprecated, moved to alertsWorker.server.ts
228228
"v3.deliverAlert": {
229229
priority: 0,
230230
maxAttempts: 8,
@@ -234,7 +234,7 @@ function getWorkerQueue() {
234234
return await service.call(payload.alertId);
235235
},
236236
},
237-
// @deprecated, moved to commonWorker.server.ts
237+
// @deprecated, moved to alertsWorker.server.ts
238238
"v3.performDeploymentAlerts": {
239239
priority: 0,
240240
maxAttempts: 3,
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import { Logger } from "@trigger.dev/core/logger";
2+
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
3+
import { z } from "zod";
4+
import { env } from "~/env.server";
5+
import { logger } from "~/services/logger.server";
6+
import { singleton } from "~/utils/singleton";
7+
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
8+
import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server";
9+
import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server";
10+
11+
function initializeWorker() {
12+
const redisOptions = {
13+
keyPrefix: "alerts:worker:",
14+
host: env.ALERTS_WORKER_REDIS_HOST,
15+
port: env.ALERTS_WORKER_REDIS_PORT,
16+
username: env.ALERTS_WORKER_REDIS_USERNAME,
17+
password: env.ALERTS_WORKER_REDIS_PASSWORD,
18+
enableAutoPipelining: true,
19+
...(env.ALERTS_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
20+
};
21+
22+
logger.debug(`👨‍🏭 Initializing alerts worker at host ${env.ALERTS_WORKER_REDIS_HOST}`);
23+
24+
const worker = new RedisWorker({
25+
name: "alerts-worker",
26+
redisOptions,
27+
catalog: {
28+
"v3.performTaskRunAlerts": {
29+
schema: z.object({
30+
runId: z.string(),
31+
}),
32+
visibilityTimeoutMs: 60_000,
33+
retry: {
34+
maxAttempts: 3,
35+
},
36+
},
37+
"v3.performDeploymentAlerts": {
38+
schema: z.object({
39+
deploymentId: z.string(),
40+
}),
41+
visibilityTimeoutMs: 60_000,
42+
retry: {
43+
maxAttempts: 3,
44+
},
45+
},
46+
"v3.deliverAlert": {
47+
schema: z.object({
48+
alertId: z.string(),
49+
}),
50+
visibilityTimeoutMs: 60_000,
51+
retry: {
52+
maxAttempts: 3,
53+
},
54+
},
55+
},
56+
concurrency: {
57+
workers: env.ALERTS_WORKER_CONCURRENCY_WORKERS,
58+
tasksPerWorker: env.ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER,
59+
limit: env.ALERTS_WORKER_CONCURRENCY_LIMIT,
60+
},
61+
pollIntervalMs: env.ALERTS_WORKER_POLL_INTERVAL,
62+
immediatePollIntervalMs: env.ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL,
63+
shutdownTimeoutMs: env.ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS,
64+
logger: new Logger("AlertsWorker", "debug"),
65+
jobs: {
66+
"v3.deliverAlert": async ({ payload }) => {
67+
const service = new DeliverAlertService();
68+
69+
await service.call(payload.alertId);
70+
},
71+
"v3.performDeploymentAlerts": async ({ payload }) => {
72+
const service = new PerformDeploymentAlertsService();
73+
74+
await service.call(payload.deploymentId);
75+
},
76+
"v3.performTaskRunAlerts": async ({ payload }) => {
77+
const service = new PerformTaskRunAlertsService();
78+
await service.call(payload.runId);
79+
},
80+
},
81+
});
82+
83+
if (env.ALERTS_WORKER_ENABLED === "true") {
84+
logger.debug(
85+
`👨‍🏭 Starting alerts worker at host ${env.ALERTS_WORKER_REDIS_HOST}, pollInterval = ${env.ALERTS_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.ALERTS_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.ALERTS_WORKER_CONCURRENCY_LIMIT}`
86+
);
87+
88+
worker.start();
89+
}
90+
91+
return worker;
92+
}
93+
94+
export const alertsWorker = singleton("alertsWorker", initializeWorker);

apps/webapp/app/v3/commonWorker.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,16 +237,19 @@ function initializeWorker() {
237237
const service = new RunEngineBatchTriggerService(payload.strategy);
238238
await service.processBatchTaskRun(payload);
239239
},
240+
// @deprecated, moved to alertsWorker.server.ts
240241
"v3.deliverAlert": async ({ payload }) => {
241242
const service = new DeliverAlertService();
242243

243244
await service.call(payload.alertId);
244245
},
246+
// @deprecated, moved to alertsWorker.server.ts
245247
"v3.performDeploymentAlerts": async ({ payload }) => {
246248
const service = new PerformDeploymentAlertsService();
247249

248250
await service.call(payload.deploymentId);
249251
},
252+
// @deprecated, moved to alertsWorker.server.ts
250253
"v3.performTaskRunAlerts": async ({ payload }) => {
251254
const service = new PerformTaskRunAlertsService();
252255
await service.call(payload.runId);

apps/webapp/app/v3/services/alerts/deliverAlert.server.ts

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,40 +7,39 @@ import {
77
type WebAPIRequestError,
88
} from "@slack/web-api";
99
import {
10-
Webhook,
11-
TaskRunError,
1210
createJsonErrorObject,
13-
type RunFailedWebhook,
1411
type DeploymentFailedWebhook,
1512
type DeploymentSuccessWebhook,
1613
isOOMRunError,
14+
type RunFailedWebhook,
15+
TaskRunError,
1716
} from "@trigger.dev/core/v3";
17+
import { type ProjectAlertChannelType, type ProjectAlertType } from "@trigger.dev/database";
1818
import assertNever from "assert-never";
1919
import { subtle } from "crypto";
20+
import { environmentTitle } from "~/components/environments/EnvironmentLabel";
2021
import { type Prisma, type prisma, type PrismaClientOrTransaction } from "~/db.server";
2122
import { env } from "~/env.server";
2223
import {
23-
OrgIntegrationRepository,
2424
type OrganizationIntegrationForService,
25+
OrgIntegrationRepository,
2526
} from "~/models/orgIntegration.server";
2627
import {
2728
ProjectAlertEmailProperties,
2829
ProjectAlertSlackProperties,
2930
ProjectAlertSlackStorage,
3031
ProjectAlertWebhookProperties,
3132
} from "~/models/projectAlert.server";
33+
import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server";
3234
import { DeploymentPresenter } from "~/presenters/v3/DeploymentPresenter.server";
3335
import { sendAlertEmail } from "~/services/email.server";
3436
import { logger } from "~/services/logger.server";
3537
import { decryptSecret } from "~/services/secrets/secretStore.server";
36-
import { commonWorker } from "~/v3/commonWorker.server";
37-
import { BaseService } from "../baseService.server";
38-
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
39-
import { type ProjectAlertChannelType, type ProjectAlertType } from "@trigger.dev/database";
40-
import { alertsRateLimiter } from "~/v3/alertsRateLimiter.server";
4138
import { v3RunPath } from "~/utils/pathBuilder";
42-
import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server";
43-
import { environmentTitle } from "~/components/environments/EnvironmentLabel";
39+
import { alertsRateLimiter } from "~/v3/alertsRateLimiter.server";
40+
import { alertsWorker } from "~/v3/alertsWorker.server";
41+
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
42+
import { BaseService } from "../baseService.server";
4443

4544
type FoundAlert = Prisma.Result<
4645
typeof prisma.projectAlert,
@@ -1044,7 +1043,7 @@ export class DeliverAlertService extends BaseService {
10441043
}
10451044

10461045
static async enqueue(alertId: string, runAt?: Date) {
1047-
return await commonWorker.enqueue({
1046+
return await alertsWorker.enqueue({
10481047
id: `alert:${alertId}`,
10491048
job: "v3.deliverAlert",
10501049
payload: { alertId },

apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
import { ProjectAlertChannel, ProjectAlertType, WorkerDeployment } from "@trigger.dev/database";
2-
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
3-
import { workerQueue } from "~/services/worker.server";
4-
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
2+
import { alertsWorker } from "~/v3/alertsWorker.server";
53
import { BaseService } from "../baseService.server";
64
import { DeliverAlertService } from "./deliverAlert.server";
7-
import { commonWorker } from "~/v3/commonWorker.server";
85

96
export class PerformDeploymentAlertsService extends BaseService {
107
public async call(deploymentId: string) {
@@ -60,7 +57,7 @@ export class PerformDeploymentAlertsService extends BaseService {
6057
}
6158

6259
static async enqueue(deploymentId: string, runAt?: Date) {
63-
return await commonWorker.enqueue({
60+
return await alertsWorker.enqueue({
6461
id: `performDeploymentAlerts:${deploymentId}`,
6562
job: "v3.performDeploymentAlerts",
6663
payload: { deploymentId },

apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { type Prisma, type ProjectAlertChannel } from "@trigger.dev/database";
22
import { type prisma } from "~/db.server";
3-
import { commonWorker } from "~/v3/commonWorker.server";
3+
import { alertsWorker } from "~/v3/alertsWorker.server";
44
import { BaseService } from "../baseService.server";
55
import { DeliverAlertService } from "./deliverAlert.server";
66

@@ -62,7 +62,7 @@ export class PerformTaskRunAlertsService extends BaseService {
6262
}
6363

6464
static async enqueue(runId: string, runAt?: Date) {
65-
return await commonWorker.enqueue({
65+
return await alertsWorker.enqueue({
6666
id: `performTaskRunAlerts:${runId}`,
6767
job: "v3.performTaskRunAlerts",
6868
payload: { runId },

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,15 @@ export async function completeBatchTaskRunItemV3(
942942
) {
943943
const isRetry = retryAttempt !== undefined;
944944

945+
logger.debug("completeBatchTaskRunItemV3", {
946+
itemId,
947+
batchTaskRunId,
948+
scheduleResumeOnComplete,
949+
taskRunAttemptId,
950+
retryAttempt,
951+
isRetry,
952+
});
953+
945954
if (isRetry) {
946955
logger.debug("completeBatchTaskRunItemV3 retrying", {
947956
itemId,

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -392,11 +392,7 @@ export class CreateCheckpointService extends BaseService {
392392
checkpointEventId: checkpointEvent.id,
393393
});
394394

395-
await ResumeBatchRunService.enqueue(
396-
batchRun.id,
397-
batchRun.batchVersion === "v3",
398-
this._prisma
399-
);
395+
await ResumeBatchRunService.enqueue(batchRun.id, batchRun.batchVersion === "v3");
400396

401397
return {
402398
success: true,

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ export class FinalizeTaskRunService extends BaseService {
244244

245245
// This won't resume because this batch does not have a dependent task attempt ID
246246
// or is in development, but this service will mark the batch as completed
247-
await ResumeBatchRunService.enqueue(item.batchTaskRunId, false, this._prisma);
247+
await ResumeBatchRunService.enqueue(item.batchTaskRunId, false);
248248
}
249249
}
250250
}

0 commit comments

Comments
 (0)