From ea56e0ab7697bb84a8c605912e0cd7df74632588 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 19 Jun 2025 11:14:34 +0100 Subject: [PATCH 1/2] Add a new alerts worker that just handle alerts --- apps/webapp/app/env.server.ts | 38 ++++++++ apps/webapp/app/services/worker.server.ts | 6 +- apps/webapp/app/v3/alertsWorker.server.ts | 94 +++++++++++++++++++ apps/webapp/app/v3/commonWorker.server.ts | 3 + .../v3/services/alerts/deliverAlert.server.ts | 23 +++-- .../alerts/performDeploymentAlerts.server.ts | 7 +- .../alerts/performTaskRunAlerts.server.ts | 4 +- 7 files changed, 153 insertions(+), 22 deletions(-) create mode 100644 apps/webapp/app/v3/alertsWorker.server.ts diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 9e2713cd90..ffdfacf164 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -692,6 +692,44 @@ const EnvironmentSchema = z.object({ COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + ALERTS_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), + ALERTS_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), + ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), + ALERTS_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), + ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100), + ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), + ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + + ALERTS_WORKER_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + ALERTS_WORKER_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + ALERTS_WORKER_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + ALERTS_WORKER_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + ALERTS_WORKER_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + ALERTS_WORKER_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + ALERTS_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + ALERTS_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(1), diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index 15de002d28..dade550cd9 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -215,7 +215,7 @@ function getWorkerQueue() { }); }, }, - // @deprecated, moved to commonWorker.server.ts + // @deprecated, moved to alertsWorker.server.ts "v3.performTaskRunAlerts": { priority: 0, maxAttempts: 3, @@ -224,7 +224,7 @@ function getWorkerQueue() { return await service.call(payload.runId); }, }, - // @deprecated, moved to commonWorker.server.ts + // @deprecated, moved to alertsWorker.server.ts "v3.deliverAlert": { priority: 0, maxAttempts: 8, @@ -234,7 +234,7 @@ function getWorkerQueue() { return await service.call(payload.alertId); }, }, - // @deprecated, moved to commonWorker.server.ts + // @deprecated, moved to alertsWorker.server.ts "v3.performDeploymentAlerts": { priority: 0, maxAttempts: 3, diff --git a/apps/webapp/app/v3/alertsWorker.server.ts b/apps/webapp/app/v3/alertsWorker.server.ts new file mode 100644 index 0000000000..24ebc2ce65 --- /dev/null +++ b/apps/webapp/app/v3/alertsWorker.server.ts @@ -0,0 +1,94 @@ +import { Logger } from "@trigger.dev/core/logger"; +import { Worker as RedisWorker } from "@trigger.dev/redis-worker"; +import { z } from "zod"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { singleton } from "~/utils/singleton"; +import { DeliverAlertService } from "./services/alerts/deliverAlert.server"; +import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server"; +import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server"; + +function initializeWorker() { + const redisOptions = { + keyPrefix: "alerts:worker:", + host: env.ALERTS_WORKER_REDIS_HOST, + port: env.ALERTS_WORKER_REDIS_PORT, + username: env.ALERTS_WORKER_REDIS_USERNAME, + password: env.ALERTS_WORKER_REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.ALERTS_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }; + + logger.debug(`👨‍🏭 Initializing alerts worker at host ${env.ALERTS_WORKER_REDIS_HOST}`); + + const worker = new RedisWorker({ + name: "alerts-worker", + redisOptions, + catalog: { + "v3.performTaskRunAlerts": { + schema: z.object({ + runId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + "v3.performDeploymentAlerts": { + schema: z.object({ + deploymentId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + "v3.deliverAlert": { + schema: z.object({ + alertId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + }, + concurrency: { + workers: env.ALERTS_WORKER_CONCURRENCY_WORKERS, + tasksPerWorker: env.ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER, + limit: env.ALERTS_WORKER_CONCURRENCY_LIMIT, + }, + pollIntervalMs: env.ALERTS_WORKER_POLL_INTERVAL, + immediatePollIntervalMs: env.ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL, + shutdownTimeoutMs: env.ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS, + logger: new Logger("AlertsWorker", "debug"), + jobs: { + "v3.deliverAlert": async ({ payload }) => { + const service = new DeliverAlertService(); + + await service.call(payload.alertId); + }, + "v3.performDeploymentAlerts": async ({ payload }) => { + const service = new PerformDeploymentAlertsService(); + + await service.call(payload.deploymentId); + }, + "v3.performTaskRunAlerts": async ({ payload }) => { + const service = new PerformTaskRunAlertsService(); + await service.call(payload.runId); + }, + }, + }); + + if (env.ALERTS_WORKER_ENABLED === "true") { + logger.debug( + `👨‍🏭 Starting alerts worker at host ${env.ALERTS_WORKER_REDIS_HOST}, pollInterval = ${env.ALERTS_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.ALERTS_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.ALERTS_WORKER_CONCURRENCY_LIMIT}` + ); + + worker.start(); + } + + return worker; +} + +export const alertsWorker = singleton("alertsWorker", initializeWorker); diff --git a/apps/webapp/app/v3/commonWorker.server.ts b/apps/webapp/app/v3/commonWorker.server.ts index f735747ba6..2ba633e6d6 100644 --- a/apps/webapp/app/v3/commonWorker.server.ts +++ b/apps/webapp/app/v3/commonWorker.server.ts @@ -237,16 +237,19 @@ function initializeWorker() { const service = new RunEngineBatchTriggerService(payload.strategy); await service.processBatchTaskRun(payload); }, + // @deprecated, moved to alertsWorker.server.ts "v3.deliverAlert": async ({ payload }) => { const service = new DeliverAlertService(); await service.call(payload.alertId); }, + // @deprecated, moved to alertsWorker.server.ts "v3.performDeploymentAlerts": async ({ payload }) => { const service = new PerformDeploymentAlertsService(); await service.call(payload.deploymentId); }, + // @deprecated, moved to alertsWorker.server.ts "v3.performTaskRunAlerts": async ({ payload }) => { const service = new PerformTaskRunAlertsService(); await service.call(payload.runId); diff --git a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts index d9c6236d3f..2d1abe8910 100644 --- a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts +++ b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts @@ -7,21 +7,22 @@ import { type WebAPIRequestError, } from "@slack/web-api"; import { - Webhook, - TaskRunError, createJsonErrorObject, - type RunFailedWebhook, type DeploymentFailedWebhook, type DeploymentSuccessWebhook, isOOMRunError, + type RunFailedWebhook, + TaskRunError, } from "@trigger.dev/core/v3"; +import { type ProjectAlertChannelType, type ProjectAlertType } from "@trigger.dev/database"; import assertNever from "assert-never"; import { subtle } from "crypto"; +import { environmentTitle } from "~/components/environments/EnvironmentLabel"; import { type Prisma, type prisma, type PrismaClientOrTransaction } from "~/db.server"; import { env } from "~/env.server"; import { - OrgIntegrationRepository, type OrganizationIntegrationForService, + OrgIntegrationRepository, } from "~/models/orgIntegration.server"; import { ProjectAlertEmailProperties, @@ -29,18 +30,16 @@ import { ProjectAlertSlackStorage, ProjectAlertWebhookProperties, } from "~/models/projectAlert.server"; +import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server"; import { DeploymentPresenter } from "~/presenters/v3/DeploymentPresenter.server"; import { sendAlertEmail } from "~/services/email.server"; import { logger } from "~/services/logger.server"; import { decryptSecret } from "~/services/secrets/secretStore.server"; -import { commonWorker } from "~/v3/commonWorker.server"; -import { BaseService } from "../baseService.server"; -import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; -import { type ProjectAlertChannelType, type ProjectAlertType } from "@trigger.dev/database"; -import { alertsRateLimiter } from "~/v3/alertsRateLimiter.server"; import { v3RunPath } from "~/utils/pathBuilder"; -import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server"; -import { environmentTitle } from "~/components/environments/EnvironmentLabel"; +import { alertsRateLimiter } from "~/v3/alertsRateLimiter.server"; +import { alertsWorker } from "~/v3/alertsWorker.server"; +import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; +import { BaseService } from "../baseService.server"; type FoundAlert = Prisma.Result< typeof prisma.projectAlert, @@ -1044,7 +1043,7 @@ export class DeliverAlertService extends BaseService { } static async enqueue(alertId: string, runAt?: Date) { - return await commonWorker.enqueue({ + return await alertsWorker.enqueue({ id: `alert:${alertId}`, job: "v3.deliverAlert", payload: { alertId }, diff --git a/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts b/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts index fd390477b6..537871c920 100644 --- a/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts +++ b/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts @@ -1,10 +1,7 @@ import { ProjectAlertChannel, ProjectAlertType, WorkerDeployment } from "@trigger.dev/database"; -import { $transaction, PrismaClientOrTransaction } from "~/db.server"; -import { workerQueue } from "~/services/worker.server"; -import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; +import { alertsWorker } from "~/v3/alertsWorker.server"; import { BaseService } from "../baseService.server"; import { DeliverAlertService } from "./deliverAlert.server"; -import { commonWorker } from "~/v3/commonWorker.server"; export class PerformDeploymentAlertsService extends BaseService { public async call(deploymentId: string) { @@ -60,7 +57,7 @@ export class PerformDeploymentAlertsService extends BaseService { } static async enqueue(deploymentId: string, runAt?: Date) { - return await commonWorker.enqueue({ + return await alertsWorker.enqueue({ id: `performDeploymentAlerts:${deploymentId}`, job: "v3.performDeploymentAlerts", payload: { deploymentId }, diff --git a/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts b/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts index 0f8f90d89d..9c05534623 100644 --- a/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts +++ b/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts @@ -1,6 +1,6 @@ import { type Prisma, type ProjectAlertChannel } from "@trigger.dev/database"; import { type prisma } from "~/db.server"; -import { commonWorker } from "~/v3/commonWorker.server"; +import { alertsWorker } from "~/v3/alertsWorker.server"; import { BaseService } from "../baseService.server"; import { DeliverAlertService } from "./deliverAlert.server"; @@ -62,7 +62,7 @@ export class PerformTaskRunAlertsService extends BaseService { } static async enqueue(runId: string, runAt?: Date) { - return await commonWorker.enqueue({ + return await alertsWorker.enqueue({ id: `performTaskRunAlerts:${runId}`, job: "v3.performTaskRunAlerts", payload: { runId }, From 9b9e73635039250fd062301d2a17932ce736fe26 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 19 Jun 2025 11:29:01 +0100 Subject: [PATCH 2/2] fallback to graphile worker when a tx is required --- apps/webapp/app/services/worker.server.ts | 4 +- .../app/v3/services/batchTriggerV3.server.ts | 9 ++++ .../v3/services/createCheckpoint.server.ts | 6 +-- .../app/v3/services/finalizeTaskRun.server.ts | 2 +- .../app/v3/services/resumeBatchRun.server.ts | 43 +++++++++++++++---- 5 files changed, 48 insertions(+), 16 deletions(-) diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index dade550cd9..83c9eab023 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -33,7 +33,7 @@ import { logger } from "./logger.server"; const workerCatalog = { // @deprecated, moved to commonWorker.server.ts scheduleEmail: DeliverEmailSchema, - // @deprecated, moved to commonWorker.server.ts + // @deprecated, but still used when resuming batch runs in a transaction "v3.resumeBatchRun": z.object({ batchRunId: z.string(), }), @@ -164,7 +164,7 @@ function getWorkerQueue() { await sendEmail(payload); }, }, - // @deprecated, moved to commonWorker.server.ts + // @deprecated, moved to commonWorker.server.ts but still used when resuming batch runs in a transaction "v3.resumeBatchRun": { priority: 0, maxAttempts: 5, diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 110ae492a1..29271f9cdf 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -942,6 +942,15 @@ export async function completeBatchTaskRunItemV3( ) { const isRetry = retryAttempt !== undefined; + logger.debug("completeBatchTaskRunItemV3", { + itemId, + batchTaskRunId, + scheduleResumeOnComplete, + taskRunAttemptId, + retryAttempt, + isRetry, + }); + if (isRetry) { logger.debug("completeBatchTaskRunItemV3 retrying", { itemId, diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index 54419aac54..ec87d5bacd 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -392,11 +392,7 @@ export class CreateCheckpointService extends BaseService { checkpointEventId: checkpointEvent.id, }); - await ResumeBatchRunService.enqueue( - batchRun.id, - batchRun.batchVersion === "v3", - this._prisma - ); + await ResumeBatchRunService.enqueue(batchRun.id, batchRun.batchVersion === "v3"); return { success: true, diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index 1fe21eed47..2316fd25f4 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -244,7 +244,7 @@ export class FinalizeTaskRunService extends BaseService { // This won't resume because this batch does not have a dependent task attempt ID // or is in development, but this service will mark the batch as completed - await ResumeBatchRunService.enqueue(item.batchTaskRunId, false, this._prisma); + await ResumeBatchRunService.enqueue(item.batchTaskRunId, false); } } } diff --git a/apps/webapp/app/v3/services/resumeBatchRun.server.ts b/apps/webapp/app/v3/services/resumeBatchRun.server.ts index d07e38eda8..d9f5170fc2 100644 --- a/apps/webapp/app/v3/services/resumeBatchRun.server.ts +++ b/apps/webapp/app/v3/services/resumeBatchRun.server.ts @@ -4,6 +4,7 @@ import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; import { BatchTaskRun } from "@trigger.dev/database"; +import { workerQueue } from "~/services/worker.server"; const finishedBatchRunStatuses = ["COMPLETED", "FAILED", "CANCELED"]; @@ -331,17 +332,43 @@ export class ResumeBatchRunService extends BaseService { static async enqueue( batchRunId: string, skipJobKey: boolean, - tx: PrismaClientOrTransaction, + tx?: PrismaClientOrTransaction, runAt?: Date ) { - return await commonWorker.enqueue({ - id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`, - job: "v3.resumeBatchRun", - payload: { + if (tx) { + logger.debug("ResumeBatchRunService: Enqueuing resume batch run using workerQueue", { batchRunId, - }, - availableAt: runAt, - }); + skipJobKey, + runAt, + }); + + return await workerQueue.enqueue( + "v3.resumeBatchRun", + { + batchRunId, + }, + { + jobKey: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`, + runAt, + tx, + } + ); + } else { + logger.debug("ResumeBatchRunService: Enqueuing resume batch run using commonWorker", { + batchRunId, + skipJobKey, + runAt, + }); + + return await commonWorker.enqueue({ + id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`, + job: "v3.resumeBatchRun", + payload: { + batchRunId, + }, + availableAt: runAt, + }); + } } }