Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,44 @@ const EnvironmentSchema = z.object({
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

ALERTS_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
ALERTS_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
ALERTS_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100),
ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),

ALERTS_WORKER_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
ALERTS_WORKER_REDIS_READER_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_READER_HOST),
ALERTS_WORKER_REDIS_READER_PORT: z.coerce
.number()
.optional()
.transform(
(v) =>
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
),
ALERTS_WORKER_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
ALERTS_WORKER_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
ALERTS_WORKER_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
ALERTS_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
ALERTS_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(1),
Expand Down
10 changes: 5 additions & 5 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { logger } from "./logger.server";
const workerCatalog = {
// @deprecated, moved to commonWorker.server.ts
scheduleEmail: DeliverEmailSchema,
// @deprecated, moved to commonWorker.server.ts
// @deprecated, but still used when resuming batch runs in a transaction
"v3.resumeBatchRun": z.object({
batchRunId: z.string(),
}),
Expand Down Expand Up @@ -164,7 +164,7 @@ function getWorkerQueue() {
await sendEmail(payload);
},
},
// @deprecated, moved to commonWorker.server.ts
// @deprecated, moved to commonWorker.server.ts but still used when resuming batch runs in a transaction
"v3.resumeBatchRun": {
priority: 0,
maxAttempts: 5,
Expand Down Expand Up @@ -215,7 +215,7 @@ function getWorkerQueue() {
});
},
},
// @deprecated, moved to commonWorker.server.ts
// @deprecated, moved to alertsWorker.server.ts
"v3.performTaskRunAlerts": {
priority: 0,
maxAttempts: 3,
Expand All @@ -224,7 +224,7 @@ function getWorkerQueue() {
return await service.call(payload.runId);
},
},
// @deprecated, moved to commonWorker.server.ts
// @deprecated, moved to alertsWorker.server.ts
"v3.deliverAlert": {
priority: 0,
maxAttempts: 8,
Expand All @@ -234,7 +234,7 @@ function getWorkerQueue() {
return await service.call(payload.alertId);
},
},
// @deprecated, moved to commonWorker.server.ts
// @deprecated, moved to alertsWorker.server.ts
"v3.performDeploymentAlerts": {
priority: 0,
maxAttempts: 3,
Expand Down
94 changes: 94 additions & 0 deletions apps/webapp/app/v3/alertsWorker.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { Logger } from "@trigger.dev/core/logger";
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
import { z } from "zod";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server";
import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server";

function initializeWorker() {
const redisOptions = {
keyPrefix: "alerts:worker:",
host: env.ALERTS_WORKER_REDIS_HOST,
port: env.ALERTS_WORKER_REDIS_PORT,
username: env.ALERTS_WORKER_REDIS_USERNAME,
password: env.ALERTS_WORKER_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.ALERTS_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
};

logger.debug(`👨‍🏭 Initializing alerts worker at host ${env.ALERTS_WORKER_REDIS_HOST}`);

const worker = new RedisWorker({
name: "alerts-worker",
redisOptions,
catalog: {
"v3.performTaskRunAlerts": {
schema: z.object({
runId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 3,
},
},
"v3.performDeploymentAlerts": {
schema: z.object({
deploymentId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 3,
},
},
"v3.deliverAlert": {
schema: z.object({
alertId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 3,
},
},
},
concurrency: {
workers: env.ALERTS_WORKER_CONCURRENCY_WORKERS,
tasksPerWorker: env.ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER,
limit: env.ALERTS_WORKER_CONCURRENCY_LIMIT,
},
pollIntervalMs: env.ALERTS_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL,
shutdownTimeoutMs: env.ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS,
logger: new Logger("AlertsWorker", "debug"),
jobs: {
"v3.deliverAlert": async ({ payload }) => {
const service = new DeliverAlertService();

await service.call(payload.alertId);
},
"v3.performDeploymentAlerts": async ({ payload }) => {
const service = new PerformDeploymentAlertsService();

await service.call(payload.deploymentId);
},
"v3.performTaskRunAlerts": async ({ payload }) => {
const service = new PerformTaskRunAlertsService();
await service.call(payload.runId);
},
},
});

if (env.ALERTS_WORKER_ENABLED === "true") {
logger.debug(
`👨‍🏭 Starting alerts worker at host ${env.ALERTS_WORKER_REDIS_HOST}, pollInterval = ${env.ALERTS_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.ALERTS_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.ALERTS_WORKER_CONCURRENCY_LIMIT}`
);

worker.start();
}

return worker;
}

export const alertsWorker = singleton("alertsWorker", initializeWorker);
3 changes: 3 additions & 0 deletions apps/webapp/app/v3/commonWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,19 @@ function initializeWorker() {
const service = new RunEngineBatchTriggerService(payload.strategy);
await service.processBatchTaskRun(payload);
},
// @deprecated, moved to alertsWorker.server.ts
"v3.deliverAlert": async ({ payload }) => {
const service = new DeliverAlertService();

await service.call(payload.alertId);
},
// @deprecated, moved to alertsWorker.server.ts
"v3.performDeploymentAlerts": async ({ payload }) => {
const service = new PerformDeploymentAlertsService();

await service.call(payload.deploymentId);
},
// @deprecated, moved to alertsWorker.server.ts
"v3.performTaskRunAlerts": async ({ payload }) => {
const service = new PerformTaskRunAlertsService();
await service.call(payload.runId);
Expand Down
23 changes: 11 additions & 12 deletions apps/webapp/app/v3/services/alerts/deliverAlert.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,39 @@ import {
type WebAPIRequestError,
} from "@slack/web-api";
import {
Webhook,
TaskRunError,
createJsonErrorObject,
type RunFailedWebhook,
type DeploymentFailedWebhook,
type DeploymentSuccessWebhook,
isOOMRunError,
type RunFailedWebhook,
TaskRunError,
} from "@trigger.dev/core/v3";
import { type ProjectAlertChannelType, type ProjectAlertType } from "@trigger.dev/database";
import assertNever from "assert-never";
import { subtle } from "crypto";
import { environmentTitle } from "~/components/environments/EnvironmentLabel";
import { type Prisma, type prisma, type PrismaClientOrTransaction } from "~/db.server";
import { env } from "~/env.server";
import {
OrgIntegrationRepository,
type OrganizationIntegrationForService,
OrgIntegrationRepository,
} from "~/models/orgIntegration.server";
import {
ProjectAlertEmailProperties,
ProjectAlertSlackProperties,
ProjectAlertSlackStorage,
ProjectAlertWebhookProperties,
} from "~/models/projectAlert.server";
import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server";
import { DeploymentPresenter } from "~/presenters/v3/DeploymentPresenter.server";
import { sendAlertEmail } from "~/services/email.server";
import { logger } from "~/services/logger.server";
import { decryptSecret } from "~/services/secrets/secretStore.server";
import { commonWorker } from "~/v3/commonWorker.server";
import { BaseService } from "../baseService.server";
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
import { type ProjectAlertChannelType, type ProjectAlertType } from "@trigger.dev/database";
import { alertsRateLimiter } from "~/v3/alertsRateLimiter.server";
import { v3RunPath } from "~/utils/pathBuilder";
import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server";
import { environmentTitle } from "~/components/environments/EnvironmentLabel";
import { alertsRateLimiter } from "~/v3/alertsRateLimiter.server";
import { alertsWorker } from "~/v3/alertsWorker.server";
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
import { BaseService } from "../baseService.server";

type FoundAlert = Prisma.Result<
typeof prisma.projectAlert,
Expand Down Expand Up @@ -1044,7 +1043,7 @@ export class DeliverAlertService extends BaseService {
}

static async enqueue(alertId: string, runAt?: Date) {
return await commonWorker.enqueue({
return await alertsWorker.enqueue({
id: `alert:${alertId}`,
job: "v3.deliverAlert",
payload: { alertId },
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { ProjectAlertChannel, ProjectAlertType, WorkerDeployment } from "@trigger.dev/database";
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
import { workerQueue } from "~/services/worker.server";
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
import { alertsWorker } from "~/v3/alertsWorker.server";
import { BaseService } from "../baseService.server";
import { DeliverAlertService } from "./deliverAlert.server";
import { commonWorker } from "~/v3/commonWorker.server";

export class PerformDeploymentAlertsService extends BaseService {
public async call(deploymentId: string) {
Expand Down Expand Up @@ -60,7 +57,7 @@ export class PerformDeploymentAlertsService extends BaseService {
}

static async enqueue(deploymentId: string, runAt?: Date) {
return await commonWorker.enqueue({
return await alertsWorker.enqueue({
id: `performDeploymentAlerts:${deploymentId}`,
job: "v3.performDeploymentAlerts",
payload: { deploymentId },
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type Prisma, type ProjectAlertChannel } from "@trigger.dev/database";
import { type prisma } from "~/db.server";
import { commonWorker } from "~/v3/commonWorker.server";
import { alertsWorker } from "~/v3/alertsWorker.server";
import { BaseService } from "../baseService.server";
import { DeliverAlertService } from "./deliverAlert.server";

Expand Down Expand Up @@ -62,7 +62,7 @@ export class PerformTaskRunAlertsService extends BaseService {
}

static async enqueue(runId: string, runAt?: Date) {
return await commonWorker.enqueue({
return await alertsWorker.enqueue({
id: `performTaskRunAlerts:${runId}`,
job: "v3.performTaskRunAlerts",
payload: { runId },
Expand Down
9 changes: 9 additions & 0 deletions apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,15 @@ export async function completeBatchTaskRunItemV3(
) {
const isRetry = retryAttempt !== undefined;

logger.debug("completeBatchTaskRunItemV3", {
itemId,
batchTaskRunId,
scheduleResumeOnComplete,
taskRunAttemptId,
retryAttempt,
isRetry,
});

if (isRetry) {
logger.debug("completeBatchTaskRunItemV3 retrying", {
itemId,
Expand Down
6 changes: 1 addition & 5 deletions apps/webapp/app/v3/services/createCheckpoint.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,7 @@ export class CreateCheckpointService extends BaseService {
checkpointEventId: checkpointEvent.id,
});

await ResumeBatchRunService.enqueue(
batchRun.id,
batchRun.batchVersion === "v3",
this._prisma
);
await ResumeBatchRunService.enqueue(batchRun.id, batchRun.batchVersion === "v3");

return {
success: true,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/finalizeTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ export class FinalizeTaskRunService extends BaseService {

// This won't resume because this batch does not have a dependent task attempt ID
// or is in development, but this service will mark the batch as completed
await ResumeBatchRunService.enqueue(item.batchTaskRunId, false, this._prisma);
await ResumeBatchRunService.enqueue(item.batchTaskRunId, false);
}
}
}
Expand Down
Loading