diff --git a/.github/workflows/pr_checks.yml b/.github/workflows/pr_checks.yml index c8eee52a34..b00475ebfa 100644 --- a/.github/workflows/pr_checks.yml +++ b/.github/workflows/pr_checks.yml @@ -28,34 +28,3 @@ jobs: with: package: cli-v3 secrets: inherit - - preview-release: - name: Preview Release - needs: [typecheck, units, e2e] - if: github.repository == 'triggerdotdev/trigger.dev' - runs-on: ubuntu-latest - steps: - - name: ⬇️ Checkout repo - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - - name: ⎔ Setup pnpm - uses: pnpm/action-setup@v4 - with: - version: 8.15.5 - - - name: ⎔ Setup node - uses: buildjet/setup-node@v4 - with: - node-version: 20.11.1 - cache: "pnpm" - - - name: 📥 Download deps - run: pnpm install --frozen-lockfile - - - name: 🏗️ Build - run: pnpm run build --filter "@trigger.dev/*" --filter "trigger.dev" - - - name: ⚡ Publish preview release - run: npx pkg-pr-new publish --no-template $(ls -d ./packages/*) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 3105c36d04..c41b4ea19f 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -368,6 +368,82 @@ const EnvironmentSchema = z.object({ BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000), BATCH_METADATA_OPERATIONS_FLUSH_ENABLED: z.string().default("1"), BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED: z.string().default("1"), + + LEGACY_RUN_ENGINE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), + LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), + LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1), + LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), + LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), + LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), + + LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + LEGACY_RUN_ENGINE_WORKER_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + LEGACY_RUN_ENGINE_WORKER_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + LEGACY_RUN_ENGINE_WORKER_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + LEGACY_RUN_ENGINE_WORKER_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + LEGACY_RUN_ENGINE_WORKER_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + LEGACY_RUN_ENGINE_WORKER_REDIS_TLS_DISABLED: z + .string() + .default(process.env.REDIS_TLS_DISABLED ?? "false"), + LEGACY_RUN_ENGINE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + + COMMON_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), + COMMON_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), + COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), + COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), + COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), + COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), + + COMMON_WORKER_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + COMMON_WORKER_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + COMMON_WORKER_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + COMMON_WORKER_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + COMMON_WORKER_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + COMMON_WORKER_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), }); export type Environment = z.infer; diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index 6c86b37f47..3c76907aa7 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -6,10 +6,8 @@ import { $replica, prisma } from "~/db.server"; import { env } from "~/env.server"; import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server"; import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server"; -import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server"; import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server"; import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server"; -import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server"; import { PerformBulkActionService } from "~/v3/services/bulk/performBulkAction.server"; import { CancelTaskAttemptDependenciesService } from "~/v3/services/cancelTaskAttemptDependencies.server"; import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server"; @@ -157,9 +155,6 @@ const workerCatalog = { "v3.performTaskRunAlerts": z.object({ runId: z.string(), }), - "v3.performTaskAttemptAlerts": z.object({ - attemptId: z.string(), - }), "v3.deliverAlert": z.object({ alertId: z.string(), }), @@ -610,15 +605,6 @@ function getWorkerQueue() { return await service.call(payload.runId); }, }, - "v3.performTaskAttemptAlerts": { - priority: 0, - maxAttempts: 3, - handler: async (payload, job) => { - const service = new PerformTaskAttemptAlertsService(); - - return await service.call(payload.attemptId); - }, - }, "v3.deliverAlert": { priority: 0, maxAttempts: 8, @@ -658,11 +644,7 @@ function getWorkerQueue() { "v3.requeueTaskRun": { priority: 0, maxAttempts: 3, - handler: async (payload, job) => { - const service = new RequeueTaskRunService(); - - await service.call(payload.runId); - }, + handler: async (payload, job) => {}, // This is now handled by redisWorker }, "v3.retryAttempt": { priority: 0, diff --git a/apps/webapp/app/v3/commonWorker.server.ts b/apps/webapp/app/v3/commonWorker.server.ts new file mode 100644 index 0000000000..5e43d3c90a --- /dev/null +++ b/apps/webapp/app/v3/commonWorker.server.ts @@ -0,0 +1,93 @@ +import { Worker as RedisWorker } from "@internal/redis-worker"; +import { Logger } from "@trigger.dev/core/logger"; +import { z } from "zod"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { singleton } from "~/utils/singleton"; +import { DeliverAlertService } from "./services/alerts/deliverAlert.server"; +import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server"; +import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server"; + +function initializeWorker() { + const redisOptions = { + keyPrefix: "common:worker:", + host: env.COMMON_WORKER_REDIS_HOST, + port: env.COMMON_WORKER_REDIS_PORT, + username: env.COMMON_WORKER_REDIS_USERNAME, + password: env.COMMON_WORKER_REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.COMMON_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }; + + logger.debug(`👨‍🏭 Initializing common worker at host ${env.COMMON_WORKER_REDIS_HOST}`); + + const worker = new RedisWorker({ + name: "common-worker", + redisOptions, + catalog: { + "v3.performTaskRunAlerts": { + schema: z.object({ + runId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + "v3.performDeploymentAlerts": { + schema: z.object({ + deploymentId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + "v3.deliverAlert": { + schema: z.object({ + alertId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + }, + concurrency: { + workers: env.COMMON_WORKER_CONCURRENCY_WORKERS, + tasksPerWorker: env.COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER, + limit: env.COMMON_WORKER_CONCURRENCY_LIMIT, + }, + pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL, + immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL, + logger: new Logger("CommonWorker", "debug"), + jobs: { + "v3.deliverAlert": async ({ payload }) => { + const service = new DeliverAlertService(); + + return await service.call(payload.alertId); + }, + "v3.performDeploymentAlerts": async ({ payload }) => { + const service = new PerformDeploymentAlertsService(); + + return await service.call(payload.deploymentId); + }, + "v3.performTaskRunAlerts": async ({ payload }) => { + const service = new PerformTaskRunAlertsService(); + return await service.call(payload.runId); + }, + }, + }); + + if (env.COMMON_WORKER_ENABLED === "true") { + logger.debug( + `👨‍🏭 Starting common worker at host ${env.COMMON_WORKER_REDIS_HOST}, pollInterval = ${env.COMMON_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.COMMON_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.COMMON_WORKER_CONCURRENCY_LIMIT}` + ); + + worker.start(); + } + + return worker; +} + +export const commonWorker = singleton("commonWorker", initializeWorker); diff --git a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts new file mode 100644 index 0000000000..52a978fffc --- /dev/null +++ b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts @@ -0,0 +1,66 @@ +import { Worker as RedisWorker } from "@internal/redis-worker"; +import { Logger } from "@trigger.dev/core/logger"; +import { z } from "zod"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { singleton } from "~/utils/singleton"; +import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server"; + +function initializeWorker() { + const redisOptions = { + keyPrefix: "legacy-run-engine:worker:", + host: env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST, + port: env.LEGACY_RUN_ENGINE_WORKER_REDIS_PORT, + username: env.LEGACY_RUN_ENGINE_WORKER_REDIS_USERNAME, + password: env.LEGACY_RUN_ENGINE_WORKER_REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.LEGACY_RUN_ENGINE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }; + + logger.debug( + `👨‍🏭 Initializing legacy run engine worker at host ${env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST}` + ); + + const worker = new RedisWorker({ + name: "legacy-run-engine-worker", + redisOptions, + catalog: { + runHeartbeat: { + schema: z.object({ + runId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + }, + concurrency: { + workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS, + tasksPerWorker: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER, + limit: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT, + }, + pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL, + immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL, + logger: new Logger("LegacyRunEngineWorker", "debug"), + jobs: { + runHeartbeat: async ({ payload }) => { + const service = new TaskRunHeartbeatFailedService(); + + await service.call(payload.runId); + }, + }, + }); + + if (env.LEGACY_RUN_ENGINE_WORKER_ENABLED === "true") { + logger.debug( + `👨‍🏭 Starting legacy run engine worker at host ${env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST}, pollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT}` + ); + + worker.start(); + } + + return worker; +} + +export const legacyRunEngineWorker = singleton("legacyRunEngineWorker", initializeWorker); diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index a6d60bab8e..9ec891b7e3 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -30,7 +30,7 @@ import { MessageQueueSubscriber, VisibilityTimeoutStrategy, } from "./types"; -import { V3VisibilityTimeout } from "./v3VisibilityTimeout.server"; +import { V3LegacyRunEngineWorkerVisibilityTimeout } from "./v3VisibilityTimeout.server"; const KEY_PREFIX = "marqs:"; @@ -1611,7 +1611,7 @@ function getMarQSClient() { name: "marqs", tracer: trace.getTracer("marqs"), keysProducer, - visibilityTimeoutStrategy: new V3VisibilityTimeout(), + visibilityTimeoutStrategy: new V3LegacyRunEngineWorkerVisibilityTimeout(), queuePriorityStrategy: new FairDequeuingStrategy({ tracer: tracer, redis, diff --git a/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts b/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts index 88494d0159..7611ee2bce 100644 --- a/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts +++ b/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts @@ -1,12 +1,28 @@ -import { RequeueTaskRunService } from "../requeueTaskRun.server"; +import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server"; +import { TaskRunHeartbeatFailedService } from "../taskRunHeartbeatFailed.server"; import { VisibilityTimeoutStrategy } from "./types"; -export class V3VisibilityTimeout implements VisibilityTimeoutStrategy { +export class V3GraphileVisibilityTimeout implements VisibilityTimeoutStrategy { async heartbeat(messageId: string, timeoutInMs: number): Promise { - await RequeueTaskRunService.enqueue(messageId, new Date(Date.now() + timeoutInMs)); + await TaskRunHeartbeatFailedService.enqueue(messageId, new Date(Date.now() + timeoutInMs)); } async cancelHeartbeat(messageId: string): Promise { - await RequeueTaskRunService.dequeue(messageId); + await TaskRunHeartbeatFailedService.dequeue(messageId); + } +} + +export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeoutStrategy { + async heartbeat(messageId: string, timeoutInMs: number): Promise { + await legacyRunEngineWorker.enqueue({ + id: `heartbeat:${messageId}`, + job: "runHeartbeat", + payload: { runId: messageId }, + availableAt: new Date(Date.now() + timeoutInMs), + }); + } + + async cancelHeartbeat(messageId: string): Promise { + await legacyRunEngineWorker.ack(`heartbeat:${messageId}`); } } diff --git a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts index a84388c583..282a839bff 100644 --- a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts +++ b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts @@ -28,6 +28,7 @@ import { decryptSecret } from "~/services/secrets/secretStore.server"; import { workerQueue } from "~/services/worker.server"; import { BaseService } from "../baseService.server"; import { FINAL_ATTEMPT_STATUSES } from "~/v3/taskStatus"; +import { commonWorker } from "~/v3/commonWorker.server"; type FoundAlert = Prisma.Result< typeof prisma.projectAlert, @@ -1092,22 +1093,13 @@ export class DeliverAlertService extends BaseService { return text; } - static async enqueue( - alertId: string, - tx: PrismaClientOrTransaction, - options?: { runAt?: Date; queueName?: string } - ) { - return await workerQueue.enqueue( - "v3.deliverAlert", - { - alertId, - }, - { - tx, - runAt: options?.runAt, - jobKey: `deliverAlert:${alertId}`, - } - ); + static async enqueue(alertId: string, runAt?: Date) { + return await commonWorker.enqueue({ + id: `alert:${alertId}`, + job: "v3.deliverAlert", + payload: { alertId }, + availableAt: runAt, + }); } } diff --git a/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts b/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts index cd4d20052d..4bbe7b50cf 100644 --- a/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts +++ b/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts @@ -4,6 +4,7 @@ import { workerQueue } from "~/services/worker.server"; import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; import { BaseService } from "../baseService.server"; import { DeliverAlertService } from "./deliverAlert.server"; +import { commonWorker } from "~/v3/commonWorker.server"; export class PerformDeploymentAlertsService extends BaseService { public async call(deploymentId: string) { @@ -45,34 +46,27 @@ export class PerformDeploymentAlertsService extends BaseService { deployment: WorkerDeployment, alertType: ProjectAlertType ) { - await $transaction(this._prisma, "create and send deploy alert", async (tx) => { - const alert = await this._prisma.projectAlert.create({ - data: { - friendlyId: generateFriendlyId("alert"), - channelId: alertChannel.id, - projectId: deployment.projectId, - environmentId: deployment.environmentId, - status: "PENDING", - type: alertType, - workerDeploymentId: deployment.id, - }, - }); - - await DeliverAlertService.enqueue(alert.id, tx); + const alert = await this._prisma.projectAlert.create({ + data: { + friendlyId: generateFriendlyId("alert"), + channelId: alertChannel.id, + projectId: deployment.projectId, + environmentId: deployment.environmentId, + status: "PENDING", + type: alertType, + workerDeploymentId: deployment.id, + }, }); + + await DeliverAlertService.enqueue(alert.id); } - static async enqueue(deploymentId: string, tx: PrismaClientOrTransaction, runAt?: Date) { - return await workerQueue.enqueue( - "v3.performDeploymentAlerts", - { - deploymentId, - }, - { - tx, - runAt, - jobKey: `performDeploymentAlerts:${deploymentId}`, - } - ); + static async enqueue(deploymentId: string, runAt?: Date) { + return await commonWorker.enqueue({ + id: `performDeploymentAlerts:${deploymentId}`, + job: "v3.performDeploymentAlerts", + payload: { deploymentId }, + availableAt: runAt, + }); } } diff --git a/apps/webapp/app/v3/services/alerts/performTaskAttemptAlerts.server.ts b/apps/webapp/app/v3/services/alerts/performTaskAttemptAlerts.server.ts deleted file mode 100644 index dab6539aeb..0000000000 --- a/apps/webapp/app/v3/services/alerts/performTaskAttemptAlerts.server.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { Prisma, ProjectAlertChannel } from "@trigger.dev/database"; -import { $transaction, PrismaClientOrTransaction, prisma } from "~/db.server"; -import { workerQueue } from "~/services/worker.server"; -import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; -import { BaseService } from "../baseService.server"; -import { DeliverAlertService } from "./deliverAlert.server"; - -type FoundTaskAttempt = Prisma.Result< - typeof prisma.taskRunAttempt, - { include: { taskRun: true; backgroundWorkerTask: true; runtimeEnvironment: true } }, - "findUniqueOrThrow" ->; - -export class PerformTaskAttemptAlertsService extends BaseService { - public async call(attemptId: string) { - const taskAttempt = await this._prisma.taskRunAttempt.findFirst({ - where: { id: attemptId }, - include: { - taskRun: true, - backgroundWorkerTask: true, - runtimeEnvironment: true, - }, - }); - - if (!taskAttempt) { - return; - } - - // Find all the alert channels - const alertChannels = await this._prisma.projectAlertChannel.findMany({ - where: { - projectId: taskAttempt.taskRun.projectId, - alertTypes: { - has: "TASK_RUN_ATTEMPT", - }, - environmentTypes: { - has: taskAttempt.runtimeEnvironment.type, - }, - enabled: true, - }, - }); - - for (const alertChannel of alertChannels) { - await this.#createAndSendAlert(alertChannel, taskAttempt); - } - } - - async #createAndSendAlert(alertChannel: ProjectAlertChannel, taskAttempt: FoundTaskAttempt) { - await $transaction(this._prisma, "create and send attempt alert", async (tx) => { - const alert = await this._prisma.projectAlert.create({ - data: { - friendlyId: generateFriendlyId("alert"), - channelId: alertChannel.id, - projectId: taskAttempt.taskRun.projectId, - environmentId: taskAttempt.runtimeEnvironmentId, - status: "PENDING", - type: "TASK_RUN_ATTEMPT", - taskRunAttemptId: taskAttempt.id, - }, - }); - - await DeliverAlertService.enqueue(alert.id, tx); - }); - } - - static async enqueue(attemptId: string, tx: PrismaClientOrTransaction, runAt?: Date) { - return await workerQueue.enqueue( - "v3.performTaskAttemptAlerts", - { - attemptId, - }, - { - tx, - runAt, - jobKey: `performTaskAttemptAlerts:${attemptId}`, - } - ); - } -} diff --git a/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts b/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts index 1d3b6d62f9..0706bd0192 100644 --- a/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts +++ b/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts @@ -4,6 +4,7 @@ import { workerQueue } from "~/services/worker.server"; import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; import { BaseService } from "../baseService.server"; import { DeliverAlertService } from "./deliverAlert.server"; +import { commonWorker } from "~/v3/commonWorker.server"; type FoundRun = Prisma.Result< typeof prisma.taskRun, @@ -45,34 +46,27 @@ export class PerformTaskRunAlertsService extends BaseService { } async #createAndSendAlert(alertChannel: ProjectAlertChannel, run: FoundRun) { - await $transaction(this._prisma, "create and send run alert", async (tx) => { - const alert = await this._prisma.projectAlert.create({ - data: { - friendlyId: generateFriendlyId("alert"), - channelId: alertChannel.id, - projectId: run.projectId, - environmentId: run.runtimeEnvironmentId, - status: "PENDING", - type: "TASK_RUN", - taskRunId: run.id, - }, - }); - - await DeliverAlertService.enqueue(alert.id, tx); + const alert = await this._prisma.projectAlert.create({ + data: { + friendlyId: generateFriendlyId("alert"), + channelId: alertChannel.id, + projectId: run.projectId, + environmentId: run.runtimeEnvironmentId, + status: "PENDING", + type: "TASK_RUN", + taskRunId: run.id, + }, }); + + await DeliverAlertService.enqueue(alert.id); } - static async enqueue(runId: string, tx: PrismaClientOrTransaction, runAt?: Date) { - return await workerQueue.enqueue( - "v3.performTaskRunAlerts", - { - runId, - }, - { - tx, - runAt, - jobKey: `performTaskRunAlerts:${runId}`, - } - ); + static async enqueue(runId: string, runAt?: Date) { + return await commonWorker.enqueue({ + id: `performTaskRunAlerts:${runId}`, + job: "v3.performTaskRunAlerts", + payload: { runId }, + availableAt: runAt, + }); } } diff --git a/apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts index 11396acaa5..c46f5b0266 100644 --- a/apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts @@ -148,7 +148,7 @@ export class CreateDeployedBackgroundWorkerService extends BaseService { } await ExecuteTasksWaitingForDeployService.enqueue(backgroundWorker.id, this._prisma); - await PerformDeploymentAlertsService.enqueue(deployment.id, this._prisma); + await PerformDeploymentAlertsService.enqueue(deployment.id); await TimeoutDeploymentService.dequeue(deployment.id, this._prisma); return backgroundWorker; diff --git a/apps/webapp/app/v3/services/deploymentIndexFailed.server.ts b/apps/webapp/app/v3/services/deploymentIndexFailed.server.ts index 44b5f55eeb..fffab0af03 100644 --- a/apps/webapp/app/v3/services/deploymentIndexFailed.server.ts +++ b/apps/webapp/app/v3/services/deploymentIndexFailed.server.ts @@ -66,7 +66,7 @@ export class DeploymentIndexFailed extends BaseService { }, }); - await PerformDeploymentAlertsService.enqueue(failedDeployment.id, this._prisma); + await PerformDeploymentAlertsService.enqueue(failedDeployment.id); return failedDeployment; } diff --git a/apps/webapp/app/v3/services/failDeployment.server.ts b/apps/webapp/app/v3/services/failDeployment.server.ts index a5d7fc9e86..dbb9194050 100644 --- a/apps/webapp/app/v3/services/failDeployment.server.ts +++ b/apps/webapp/app/v3/services/failDeployment.server.ts @@ -49,7 +49,7 @@ export class FailDeploymentService extends BaseService { }, }); - await PerformDeploymentAlertsService.enqueue(failedDeployment.id, this._prisma); + await PerformDeploymentAlertsService.enqueue(failedDeployment.id); return failedDeployment; } diff --git a/apps/webapp/app/v3/services/finalizeDeployment.server.ts b/apps/webapp/app/v3/services/finalizeDeployment.server.ts index 58ad8b03dd..042afa4a66 100644 --- a/apps/webapp/app/v3/services/finalizeDeployment.server.ts +++ b/apps/webapp/app/v3/services/finalizeDeployment.server.ts @@ -124,7 +124,7 @@ export class FinalizeDeploymentService extends BaseService { } await ExecuteTasksWaitingForDeployService.enqueue(deployment.worker.id, this._prisma); - await PerformDeploymentAlertsService.enqueue(deployment.id, this._prisma); + await PerformDeploymentAlertsService.enqueue(deployment.id); return finalizedDeployment; } diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index fd34cec64a..81df8461ec 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -129,7 +129,7 @@ export class FinalizeTaskRunService extends BaseService { //enqueue alert if (isFailedRunStatus(run.status)) { - await PerformTaskRunAlertsService.enqueue(run.id, this._prisma); + await PerformTaskRunAlertsService.enqueue(run.id); } if (isFatalRunStatus(run.status)) { diff --git a/apps/webapp/app/v3/services/timeoutDeployment.server.ts b/apps/webapp/app/v3/services/timeoutDeployment.server.ts index f86f513b18..1bb9493a89 100644 --- a/apps/webapp/app/v3/services/timeoutDeployment.server.ts +++ b/apps/webapp/app/v3/services/timeoutDeployment.server.ts @@ -39,7 +39,7 @@ export class TimeoutDeploymentService extends BaseService { }, }); - await PerformDeploymentAlertsService.enqueue(deployment.id, this._prisma); + await PerformDeploymentAlertsService.enqueue(deployment.id); } static async enqueue( diff --git a/apps/webapp/app/v3/requeueTaskRun.server.ts b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts similarity index 98% rename from apps/webapp/app/v3/requeueTaskRun.server.ts rename to apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts index 5e7732ed35..d722a6f80b 100644 --- a/apps/webapp/app/v3/requeueTaskRun.server.ts +++ b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts @@ -9,7 +9,7 @@ import { workerQueue } from "~/services/worker.server"; import { socketIo } from "./handleSocketIo.server"; import { TaskRunErrorCodes } from "@trigger.dev/core/v3"; -export class RequeueTaskRunService extends BaseService { +export class TaskRunHeartbeatFailedService extends BaseService { public async call(runId: string) { const taskRun = await this._prisma.taskRun.findFirst({ where: { diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 96970164c1..e6f11259f9 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -51,6 +51,7 @@ "@headlessui/react": "^1.7.8", "@heroicons/react": "^2.0.12", "@internal/zod-worker": "workspace:*", + "@internal/redis-worker": "workspace:*", "@internationalized/date": "^3.5.1", "@lezer/highlight": "^1.1.6", "@opentelemetry/api": "1.9.0", diff --git a/apps/webapp/remix.config.js b/apps/webapp/remix.config.js index d2417a3eb5..ffa62d14af 100644 --- a/apps/webapp/remix.config.js +++ b/apps/webapp/remix.config.js @@ -11,6 +11,8 @@ module.exports = { /^remix-utils.*/, "marked", "axios", + "p-limit", + "yocto-queue", "@trigger.dev/core", "@trigger.dev/sdk", "@trigger.dev/platform", diff --git a/apps/webapp/tsconfig.json b/apps/webapp/tsconfig.json index af3d25eb48..af02ef016b 100644 --- a/apps/webapp/tsconfig.json +++ b/apps/webapp/tsconfig.json @@ -34,7 +34,9 @@ "emails": ["../../internal-packages/emails/src/index"], "emails/*": ["../../internal-packages/emails/src/*"], "@internal/zod-worker": ["../../internal-packages/zod-worker/src/index"], - "@internal/zod-worker/*": ["../../internal-packages/zod-worker/src/*"] + "@internal/zod-worker/*": ["../../internal-packages/zod-worker/src/*"], + "@internal/redis-worker": ["../../internal-packages/redis-worker/src/index"], + "@internal/redis-worker/*": ["../../internal-packages/redis-worker/src/*"] }, "noEmit": true } diff --git a/internal-packages/redis-worker/package.json b/internal-packages/redis-worker/package.json index 33e7bbea42..95922af921 100644 --- a/internal-packages/redis-worker/package.json +++ b/internal-packages/redis-worker/package.json @@ -11,7 +11,7 @@ "ioredis": "^5.3.2", "lodash.omit": "^4.5.0", "nanoid": "^5.0.7", - "typescript": "^5.5.4", + "p-limit": "^6.2.0", "zod": "3.23.8" }, "devDependencies": { @@ -21,6 +21,6 @@ }, "scripts": { "typecheck": "tsc --noEmit", - "test": "vitest" + "test": "vitest --no-file-parallelism" } } \ No newline at end of file diff --git a/internal-packages/redis-worker/src/index.ts b/internal-packages/redis-worker/src/index.ts new file mode 100644 index 0000000000..a5893efc83 --- /dev/null +++ b/internal-packages/redis-worker/src/index.ts @@ -0,0 +1,2 @@ +export * from "./queue"; +export * from "./worker"; diff --git a/internal-packages/redis-worker/src/queue.test.ts b/internal-packages/redis-worker/src/queue.test.ts index 075961eba5..023a9564a8 100644 --- a/internal-packages/redis-worker/src/queue.test.ts +++ b/internal-packages/redis-worker/src/queue.test.ts @@ -30,13 +30,16 @@ describe("SimpleQueue", () => { expect(await queue.size()).toBe(2); const [first] = await queue.dequeue(1); - expect(first).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(first).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); expect(await queue.size()).toBe(1); expect(await queue.size({ includeFuture: true })).toBe(2); @@ -44,13 +47,16 @@ describe("SimpleQueue", () => { expect(await queue.size({ includeFuture: true })).toBe(1); const [second] = await queue.dequeue(1); - expect(second).toEqual({ - id: "2", - job: "test", - item: { value: 2 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(second).toEqual( + expect.objectContaining({ + id: "2", + job: "test", + item: { value: 2 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); await queue.ack(second.id); expect(await queue.size({ includeFuture: true })).toBe(0); @@ -81,13 +87,16 @@ describe("SimpleQueue", () => { await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 }); const [hitOne] = await queue.dequeue(1); - expect(hitOne).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(hitOne).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); const missTwo = await queue.dequeue(1); expect(missTwo).toEqual([]); @@ -128,13 +137,16 @@ describe("SimpleQueue", () => { await new Promise((resolve) => setTimeout(resolve, 50)); const [first] = await queue.dequeue(); - expect(first).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(first).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); } finally { await queue.close(); } @@ -160,13 +172,16 @@ describe("SimpleQueue", () => { await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 1_000 }); const [first] = await queue.dequeue(); - expect(first).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 1_000, - attempt: 0, - }); + expect(first).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 1_000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); const missImmediate = await queue.dequeue(1); expect(missImmediate).toEqual([]); @@ -174,13 +189,16 @@ describe("SimpleQueue", () => { await new Promise((resolve) => setTimeout(resolve, 1_000)); const [second] = await queue.dequeue(); - expect(second).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 1_000, - attempt: 0, - }); + expect(second).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 1_000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); } finally { await queue.close(); } @@ -211,20 +229,26 @@ describe("SimpleQueue", () => { const dequeued = await queue.dequeue(2); expect(dequeued).toHaveLength(2); - expect(dequeued[0]).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); - expect(dequeued[1]).toEqual({ - id: "2", - job: "test", - item: { value: 2 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(dequeued[0]).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); + expect(dequeued[1]).toEqual( + expect.objectContaining({ + id: "2", + job: "test", + item: { value: 2 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); expect(await queue.size()).toBe(1); expect(await queue.size({ includeFuture: true })).toBe(3); @@ -235,13 +259,16 @@ describe("SimpleQueue", () => { expect(await queue.size({ includeFuture: true })).toBe(1); const [last] = await queue.dequeue(1); - expect(last).toEqual({ - id: "3", - job: "test", - item: { value: 3 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(last).toEqual( + expect.objectContaining({ + id: "3", + job: "test", + item: { value: 3 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); await queue.ack(last.id); expect(await queue.size({ includeFuture: true })).toBe(0); @@ -288,13 +315,16 @@ describe("SimpleQueue", () => { // Dequeue the redriven item const [redrivenItem] = await queue.dequeue(1); - expect(redrivenItem).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(redrivenItem).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); // Acknowledge the item await queue.ack(redrivenItem.id); diff --git a/internal-packages/redis-worker/src/queue.ts b/internal-packages/redis-worker/src/queue.ts index 04c08a30d2..94309aaaa7 100644 --- a/internal-packages/redis-worker/src/queue.ts +++ b/internal-packages/redis-worker/src/queue.ts @@ -13,6 +13,25 @@ export type MessageCatalogValue< TKey extends MessageCatalogKey, > = z.infer; +export type AnyMessageCatalog = MessageCatalogSchema; +export type QueueItem = { + id: string; + job: MessageCatalogKey; + item: MessageCatalogValue>; + visibilityTimeoutMs: number; + attempt: number; + timestamp: Date; +}; + +export type AnyQueueItem = { + id: string; + job: string; + item: any; + visibilityTimeoutMs: number; + attempt: number; + timestamp: Date; +}; + export class SimpleQueue { name: string; private redis: Redis; @@ -33,7 +52,7 @@ export class SimpleQueue { this.name = name; this.redis = new Redis({ ...redisOptions, - keyPrefix: `{queue:${name}:}`, + keyPrefix: `${redisOptions.keyPrefix ?? ""}{queue:${name}:}`, retryStrategy(times) { const delay = Math.min(times * 50, 1000); return delay; @@ -107,15 +126,7 @@ export class SimpleQueue { throw e; } } - async dequeue(count: number = 1): Promise< - Array<{ - id: string; - job: MessageCatalogKey; - item: MessageCatalogValue>; - visibilityTimeoutMs: number; - attempt: number; - }> - > { + async dequeue(count: number = 1): Promise>> { const now = Date.now(); try { @@ -127,13 +138,15 @@ export class SimpleQueue { const dequeuedItems = []; - for (const [id, serializedItem] of results) { - const parsedItem = JSON.parse(serializedItem); + for (const [id, serializedItem, score] of results) { + const parsedItem = JSON.parse(serializedItem) as any; if (typeof parsedItem.job !== "string") { this.logger.error(`Invalid item in queue`, { queue: this.name, id, item: parsedItem }); continue; } + const timestamp = new Date(Number(score)); + const schema = this.schema[parsedItem.job]; if (!schema) { @@ -142,6 +155,7 @@ export class SimpleQueue { id, item: parsedItem, job: parsedItem.job, + timestamp, }); continue; } @@ -155,6 +169,7 @@ export class SimpleQueue { item: parsedItem, errors: validatedItem.error, attempt: parsedItem.attempt, + timestamp, }); continue; } @@ -170,6 +185,7 @@ export class SimpleQueue { item: validatedItem.data, visibilityTimeoutMs, attempt: parsedItem.attempt ?? 0, + timestamp, }); } @@ -336,7 +352,7 @@ export class SimpleQueue { local invisibleUntil = now + visibilityTimeoutMs redis.call('ZADD', queue, invisibleUntil, id) - table.insert(dequeued, {id, serializedItem}) + table.insert(dequeued, {id, serializedItem, score}) end end @@ -376,10 +392,13 @@ export class SimpleQueue { local parsedItem = cjson.decode(item) parsedItem.errorMessage = errorMessage + local time = redis.call('TIME') + local now = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2]) / 1000) + redis.call('ZREM', queue, id) redis.call('HDEL', items, id) - redis.call('ZADD', dlq, redis.call('TIME')[1], id) + redis.call('ZADD', dlq, now, id) redis.call('HSET', dlqItems, id, cjson.encode(parsedItem)) return 1 @@ -403,10 +422,13 @@ export class SimpleQueue { local parsedItem = cjson.decode(item) parsedItem.errorMessage = nil + local time = redis.call('TIME') + local now = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2]) / 1000) + redis.call('ZREM', dlq, id) redis.call('HDEL', dlqItems, id) - redis.call('ZADD', queue, redis.call('TIME')[1], id) + redis.call('ZADD', queue, now, id) redis.call('HSET', items, id, cjson.encode(parsedItem)) return 1 @@ -435,8 +457,8 @@ declare module "ioredis" { //args now: number, count: number, - callback?: Callback> - ): Result, Context>; + callback?: Callback> + ): Result, Context>; ackItem( queue: string, diff --git a/internal-packages/redis-worker/src/telemetry.ts b/internal-packages/redis-worker/src/telemetry.ts new file mode 100644 index 0000000000..d52c437204 --- /dev/null +++ b/internal-packages/redis-worker/src/telemetry.ts @@ -0,0 +1,31 @@ +import { SpanOptions, SpanStatusCode, Span, Tracer } from "@opentelemetry/api"; + +export async function startSpan( + tracer: Tracer, + name: string, + fn: (span: Span) => Promise, + options?: SpanOptions +): Promise { + return tracer.startActiveSpan(name, options ?? {}, async (span) => { + try { + return await fn(span); + } catch (error) { + if (error instanceof Error) { + span.recordException(error); + } else if (typeof error === "string") { + span.recordException(new Error(error)); + } else { + span.recordException(new Error(String(error))); + } + + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + + throw error; + } finally { + span.end(); + } + }); +} diff --git a/internal-packages/redis-worker/src/worker.test.ts b/internal-packages/redis-worker/src/worker.test.ts index a55a653887..3eb760cf9c 100644 --- a/internal-packages/redis-worker/src/worker.test.ts +++ b/internal-packages/redis-worker/src/worker.test.ts @@ -34,7 +34,7 @@ describe("Worker", () => { tasksPerWorker: 3, }, logger: new Logger("test", "log"), - }); + }).start(); try { // Enqueue 10 items @@ -47,10 +47,8 @@ describe("Worker", () => { }); } - worker.start(); - // Wait for items to be processed - await new Promise((resolve) => setTimeout(resolve, 600)); + await new Promise((resolve) => setTimeout(resolve, 2000)); expect(processedItems.length).toBe(10); expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely @@ -97,7 +95,7 @@ describe("Worker", () => { }, pollIntervalMs: 50, logger: new Logger("test", "error"), - }); + }).start(); try { // Enqueue 10 items @@ -110,8 +108,6 @@ describe("Worker", () => { }); } - worker.start(); - // Wait for items to be processed await new Promise((resolve) => setTimeout(resolve, 500)); @@ -158,7 +154,7 @@ describe("Worker", () => { }, pollIntervalMs: 50, logger: new Logger("test", "error"), - }); + }).start(); try { // Enqueue the item that will permanently fail @@ -175,8 +171,6 @@ describe("Worker", () => { payload: { value: 1 }, }); - worker.start(); - // Wait for items to be processed and retried await new Promise((resolve) => setTimeout(resolve, 1000)); @@ -229,7 +223,7 @@ describe("Worker", () => { }, pollIntervalMs: 50, logger: new Logger("test", "error"), - }); + }).start(); try { // Enqueue the item that will fail 3 times @@ -239,8 +233,6 @@ describe("Worker", () => { payload: { value: 999 }, }); - worker.start(); - // Wait for the item to be processed and moved to DLQ await new Promise((resolve) => setTimeout(resolve, 1000)); @@ -274,11 +266,4 @@ describe("Worker", () => { } } ); - - //todo test that throwing an error doesn't screw up the other items - //todo process more items when finished - - //todo add a Dead Letter Queue when items are failed, with the error - //todo add a function on the worker to redrive them - //todo add an API endpoint to redrive with an ID }); diff --git a/internal-packages/redis-worker/src/worker.ts b/internal-packages/redis-worker/src/worker.ts index 601f5e1708..e80b63bc25 100644 --- a/internal-packages/redis-worker/src/worker.ts +++ b/internal-packages/redis-worker/src/worker.ts @@ -1,19 +1,20 @@ +import { SpanKind, trace, Tracer } from "@opentelemetry/api"; import { Logger } from "@trigger.dev/core/logger"; -import { type RetryOptions } from "@trigger.dev/core/v3/schemas"; import { calculateNextRetryDelay } from "@trigger.dev/core/v3"; +import { type RetryOptions } from "@trigger.dev/core/v3/schemas"; import { type RedisOptions } from "ioredis"; -import os from "os"; -import { Worker as NodeWorker } from "worker_threads"; import { z } from "zod"; -import { SimpleQueue } from "./queue.js"; - +import { AnyQueueItem, SimpleQueue } from "./queue.js"; import Redis from "ioredis"; +import { nanoid } from "nanoid"; +import { startSpan } from "./telemetry.js"; +import pLimit from "p-limit"; -type WorkerCatalog = { +export type WorkerCatalog = { [key: string]: { schema: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion; visibilityTimeoutMs: number; - retry: RetryOptions; + retry?: RetryOptions; }; }; @@ -28,6 +29,12 @@ type JobHandler = (param attempt: number; }) => Promise; +export type WorkerConcurrencyOptions = { + workers?: number; + tasksPerWorker?: number; + limit?: number; +}; + type WorkerOptions = { name: string; redisOptions: RedisOptions; @@ -35,31 +42,46 @@ type WorkerOptions = { jobs: { [K in keyof TCatalog]: JobHandler; }; - concurrency?: { - workers?: number; - tasksPerWorker?: number; - }; + concurrency?: WorkerConcurrencyOptions; pollIntervalMs?: number; + immediatePollIntervalMs?: number; logger?: Logger; + tracer?: Tracer; +}; + +// This results in attempt 12 being a delay of 1 hour +const defaultRetrySettings = { + maxAttempts: 12, + factor: 2, + //one second + minTimeoutInMs: 1_000, + //one hour + maxTimeoutInMs: 3_600_000, + randomize: true, }; class Worker { - private subscriber: Redis; + private subscriber: Redis | undefined; + private tracer: Tracer; queue: SimpleQueue>; private jobs: WorkerOptions["jobs"]; private logger: Logger; - private workers: NodeWorker[] = []; + private workerLoops: Promise[] = []; private isShuttingDown = false; private concurrency: Required["concurrency"]>>; + // The p-limit limiter to control overall concurrency. + private limiter: ReturnType; + constructor(private options: WorkerOptions) { this.logger = options.logger ?? new Logger("Worker", "debug"); + this.tracer = options.tracer ?? trace.getTracer(options.name); const schema: QueueCatalogFromWorkerCatalog = Object.fromEntries( Object.entries(this.options.catalog).map(([key, value]) => [key, value.schema]) ) as QueueCatalogFromWorkerCatalog; - // + this.queue = new SimpleQueue({ name: options.name, redisOptions: options.redisOptions, @@ -69,187 +91,251 @@ class Worker { this.jobs = options.jobs; - const { workers = os.cpus().length, tasksPerWorker = 1 } = options.concurrency ?? {}; - this.concurrency = { workers, tasksPerWorker }; + const { workers = 1, tasksPerWorker = 1, limit = 10 } = options.concurrency ?? {}; + this.concurrency = { workers, tasksPerWorker, limit }; + + // Create a p-limit instance using this limit. + this.limiter = pLimit(this.concurrency.limit); + } + + public start() { + const { workers, tasksPerWorker } = this.concurrency; - // Initialize worker threads + // Launch a number of "worker loops" on the main thread. for (let i = 0; i < workers; i++) { - this.createWorker(tasksPerWorker); + this.workerLoops.push(this.runWorkerLoop(`worker-${nanoid(12)}`, tasksPerWorker)); } this.setupShutdownHandlers(); - this.subscriber = new Redis(options.redisOptions); + this.subscriber = new Redis(this.options.redisOptions); this.setupSubscriber(); + + return this; } + /** + * Enqueues a job for processing. + * @param options - The enqueue options. + * @param options.id - Optional unique identifier for the job. If not provided, one will be generated. It prevents duplication. + * @param options.job - The job type from the worker catalog. + * @param options.payload - The job payload that matches the schema defined in the catalog. + * @param options.visibilityTimeoutMs - Optional visibility timeout in milliseconds. Defaults to value from catalog. + * @param options.availableAt - Optional date when the job should become available for processing. Defaults to now. + * @returns A promise that resolves when the job is enqueued. + */ enqueue({ id, job, payload, visibilityTimeoutMs, + availableAt, }: { id?: string; job: K; payload: z.infer; visibilityTimeoutMs?: number; + availableAt?: Date; }) { - const timeout = visibilityTimeoutMs ?? this.options.catalog[job].visibilityTimeoutMs; - return this.queue.enqueue({ - id, - job, - item: payload, - visibilityTimeoutMs: timeout, - }); + return startSpan( + this.tracer, + "enqueue", + async (span) => { + const timeout = visibilityTimeoutMs ?? this.options.catalog[job].visibilityTimeoutMs; + + span.setAttribute("job_visibility_timeout_ms", timeout); + + return this.queue.enqueue({ + id, + job, + item: payload, + visibilityTimeoutMs: timeout, + availableAt, + }); + }, + { + kind: SpanKind.PRODUCER, + attributes: { + job_type: job as string, + job_id: id, + }, + } + ); } - private createWorker(tasksPerWorker: number) { - const worker = new NodeWorker( - ` - const { parentPort } = require('worker_threads'); - - parentPort.on('message', async (message) => { - if (message.type === 'process') { - // Process items here - parentPort.postMessage({ type: 'done' }); - } - }); - `, - { eval: true } + ack(id: string) { + return startSpan( + this.tracer, + "ack", + () => { + return this.queue.ack(id); + }, + { + attributes: { + job_id: id, + }, + } ); + } + + /** + * The main loop that each worker runs. It repeatedly polls for items, + * processes them, and then waits before the next iteration. + */ + private async runWorkerLoop(workerId: string, taskCount: number): Promise { + const pollIntervalMs = this.options.pollIntervalMs ?? 1000; + const immediatePollIntervalMs = this.options.immediatePollIntervalMs ?? 100; - worker.on("message", (message) => { - if (message.type === "done") { - this.processItems(worker, tasksPerWorker); + while (!this.isShuttingDown) { + // Check overall load. If at capacity, wait a bit before trying to dequeue more. + if (this.limiter.activeCount + this.limiter.pendingCount >= this.concurrency.limit) { + await Worker.delay(pollIntervalMs); + continue; } - }); - worker.on("error", (error) => { - this.logger.error("Worker error:", { error }); - }); + try { + const items = await this.queue.dequeue(taskCount); - worker.on("exit", (code) => { - if (code !== 0) { - this.logger.warn(`Worker stopped with exit code ${code}`); - } - if (!this.isShuttingDown) { - this.createWorker(tasksPerWorker); + if (items.length === 0) { + await Worker.delay(pollIntervalMs); + continue; + } + + // Schedule each item using the limiter. + for (const item of items) { + this.limiter(() => this.processItem(item as AnyQueueItem, items.length, workerId)).catch( + (err) => { + this.logger.error("Unhandled error in processItem:", { error: err, workerId, item }); + } + ); + } + } catch (error) { + this.logger.error("Error dequeuing items:", { name: this.options.name, error }); + await Worker.delay(pollIntervalMs); + continue; } - }); - this.workers.push(worker); - this.processItems(worker, tasksPerWorker); + // Wait briefly before immediately polling again since we processed items + await Worker.delay(immediatePollIntervalMs); + } } - private async processItems(worker: NodeWorker, count: number) { - if (this.isShuttingDown) return; - - const pollIntervalMs = this.options.pollIntervalMs ?? 1000; + /** + * Processes a single item. + */ + private async processItem( + { id, job, item, visibilityTimeoutMs, attempt, timestamp }: AnyQueueItem, + batchSize: number, + workerId: string + ): Promise { + const catalogItem = this.options.catalog[job as any]; + const handler = this.jobs[job as any]; + if (!handler) { + this.logger.error(`No handler found for job type: ${job}`); + return; + } - try { - const items = await this.queue.dequeue(count); - if (items.length === 0) { - setTimeout(() => this.processItems(worker, count), pollIntervalMs); - return; + await startSpan( + this.tracer, + "processItem", + async () => { + await handler({ id, payload: item, visibilityTimeoutMs, attempt }); + // On success, acknowledge the item. + await this.queue.ack(id); + }, + { + kind: SpanKind.CONSUMER, + attributes: { + job_id: id, + job_type: job, + attempt, + job_timestamp: timestamp.getTime(), + job_age_in_ms: Date.now() - timestamp.getTime(), + worker_id: workerId, + worker_limit_concurrency: this.limiter.concurrency, + worker_limit_active: this.limiter.activeCount, + worker_limit_pending: this.limiter.pendingCount, + worker_name: this.options.name, + batch_size: batchSize, + }, } + ).catch(async (error) => { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`Error processing item:`, { + name: this.options.name, + id, + job, + item, + visibilityTimeoutMs, + error, + errorMessage, + }); + // Attempt requeue logic. + try { + const newAttempt = attempt + 1; + const retrySettings = { + ...defaultRetrySettings, + ...catalogItem.retry, + }; + const retryDelay = calculateNextRetryDelay(retrySettings, newAttempt); + + if (!retryDelay) { + this.logger.error(`Item ${id} reached max attempts. Moving to DLQ.`, { + name: this.options.name, + id, + job, + item, + visibilityTimeoutMs, + attempt: newAttempt, + errorMessage, + }); + await this.queue.moveToDeadLetterQueue(id, errorMessage); + return; + } - await Promise.all( - items.map(async ({ id, job, item, visibilityTimeoutMs, attempt }) => { - const catalogItem = this.options.catalog[job as any]; - const handler = this.jobs[job as any]; - if (!handler) { - this.logger.error(`No handler found for job type: ${job as string}`); - return; - } - - try { - await handler({ id, payload: item, visibilityTimeoutMs, attempt }); - - //succeeded, acking the item - await this.queue.ack(id); - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - this.logger.error(`Error processing item, it threw an error:`, { - name: this.options.name, - id, - job, - item, - visibilityTimeoutMs, - error, - errorMessage, - }); - // Requeue the failed item with a delay - try { - attempt = attempt + 1; - - const retryDelay = calculateNextRetryDelay(catalogItem.retry, attempt); - - if (!retryDelay) { - this.logger.error( - `Failed item ${id} has reached max attempts, moving to the DLQ.`, - { - name: this.options.name, - id, - job, - item, - visibilityTimeoutMs, - attempt, - errorMessage, - } - ); - - await this.queue.moveToDeadLetterQueue(id, errorMessage); - return; - } - - const retryDate = new Date(Date.now() + retryDelay); - this.logger.info(`Requeued failed item ${id} with delay`, { - name: this.options.name, - id, - job, - item, - retryDate, - retryDelay, - visibilityTimeoutMs, - attempt, - }); - await this.queue.enqueue({ - id, - job, - item, - availableAt: retryDate, - attempt, - visibilityTimeoutMs, - }); - } catch (requeueError) { - this.logger.error( - `Failed to requeue item, threw error. Will automatically get rescheduled after the visilibity timeout.`, - { - name: this.options.name, - id, - job, - item, - visibilityTimeoutMs, - error: requeueError, - } - ); - } + const retryDate = new Date(Date.now() + retryDelay); + this.logger.info(`Requeuing failed item ${id} with delay`, { + name: this.options.name, + id, + job, + item, + retryDate, + retryDelay, + visibilityTimeoutMs, + attempt: newAttempt, + }); + await this.queue.enqueue({ + id, + job, + item, + availableAt: retryDate, + attempt: newAttempt, + visibilityTimeoutMs, + }); + } catch (requeueError) { + this.logger.error( + `Failed to requeue item ${id}. It will be retried after the visibility timeout.`, + { + name: this.options.name, + id, + job, + item, + visibilityTimeoutMs, + error: requeueError, } - }) - ); - } catch (error) { - this.logger.error("Error dequeuing items:", { name: this.options.name, error }); - setTimeout(() => this.processItems(worker, count), pollIntervalMs); - return; - } + ); + } + }); + } - // Immediately process next batch because there were items in the queue - this.processItems(worker, count); + // A simple helper to delay for a given number of milliseconds. + private static delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); } private setupSubscriber() { const channel = `${this.options.name}:redrive`; - this.subscriber.subscribe(channel, (err) => { + this.subscriber?.subscribe(channel, (err) => { if (err) { this.logger.error(`Failed to subscribe to ${channel}`, { error: err }); } else { @@ -257,12 +343,12 @@ class Worker { } }); - this.subscriber.on("message", this.handleRedriveMessage.bind(this)); + this.subscriber?.on("message", this.handleRedriveMessage.bind(this)); } private async handleRedriveMessage(channel: string, message: string) { try { - const { id } = JSON.parse(message); + const { id } = JSON.parse(message) as any; if (typeof id !== "string") { throw new Error("Invalid message format: id must be a string"); } @@ -281,28 +367,19 @@ class Worker { private async shutdown() { if (this.isShuttingDown) return; this.isShuttingDown = true; - this.logger.log("Shutting down workers..."); + this.logger.log("Shutting down worker loops..."); - for (const worker of this.workers) { - worker.terminate(); - } + // Wait for all worker loops to finish. + await Promise.all(this.workerLoops); - await this.subscriber.unsubscribe(); - await this.subscriber.quit(); + await this.subscriber?.unsubscribe(); + await this.subscriber?.quit(); await this.queue.close(); this.logger.log("All workers and subscribers shut down."); } - public start() { - this.logger.log("Starting workers..."); - this.isShuttingDown = false; - for (const worker of this.workers) { - this.processItems(worker, this.concurrency.tasksPerWorker); - } - } - - public stop() { - this.shutdown(); + public async stop() { + await this.shutdown(); } } diff --git a/internal-packages/redis-worker/tsconfig.json b/internal-packages/redis-worker/tsconfig.json index 766df37eae..ff096d3e9f 100644 --- a/internal-packages/redis-worker/tsconfig.json +++ b/internal-packages/redis-worker/tsconfig.json @@ -1,7 +1,7 @@ { "compilerOptions": { "target": "ES2019", - "lib": ["ES2019", "DOM", "DOM.Iterable"], + "lib": ["ES2019", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], "module": "CommonJS", "moduleResolution": "Node", "moduleDetection": "force", diff --git a/internal-packages/redis-worker/vitest.config.ts b/internal-packages/redis-worker/vitest.config.ts index 4afd926425..dfe0df2746 100644 --- a/internal-packages/redis-worker/vitest.config.ts +++ b/internal-packages/redis-worker/vitest.config.ts @@ -4,5 +4,11 @@ export default defineConfig({ test: { include: ["**/*.test.ts"], globals: true, + fileParallelism: false, + poolOptions: { + threads: { + singleThread: true, + }, + }, }, }); diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index 15de844759..bd73937442 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -509,7 +509,7 @@ const isSafari = () => { */ if (isSafari()) { - // @ts-expect-error + // @ts-ignore-error ReadableStream.prototype.values ??= function ({ preventCancel = false } = {}) { const reader = this.getReader(); return { @@ -541,6 +541,6 @@ if (isSafari()) { }; }; - // @ts-expect-error + // @ts-ignore-error ReadableStream.prototype[Symbol.asyncIterator] ??= ReadableStream.prototype.values; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 322482b8a7..604337f9da 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -243,6 +243,9 @@ importers: '@heroicons/react': specifier: ^2.0.12 version: 2.0.13(react@18.2.0) + '@internal/redis-worker': + specifier: workspace:* + version: link:../../internal-packages/redis-worker '@internal/zod-worker': specifier: workspace:* version: link:../../internal-packages/zod-worker @@ -948,9 +951,9 @@ importers: nanoid: specifier: ^5.0.7 version: 5.0.7 - typescript: - specifier: ^5.5.4 - version: 5.5.4 + p-limit: + specifier: ^6.2.0 + version: 6.2.0 zod: specifier: 3.23.8 version: 3.23.8 @@ -26272,6 +26275,13 @@ packages: yocto-queue: 1.0.0 dev: true + /p-limit@6.2.0: + resolution: {integrity: sha512-kuUqqHNUqoIWp/c467RI4X6mmyuojY5jGutNU0wVTmEOOfcuwLqyMVoAi9MKi2Ak+5i9+nhmrK4ufZE8069kHA==} + engines: {node: '>=18'} + dependencies: + yocto-queue: 1.1.1 + dev: false + /p-locate@4.1.0: resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==} engines: {node: '>=8'} @@ -32852,6 +32862,11 @@ packages: engines: {node: '>=12.20'} dev: true + /yocto-queue@1.1.1: + resolution: {integrity: sha512-b4JR1PFR10y1mKjhHY9LaGo6tmrgjit7hxVIeAmyMw3jegXR4dhYqLaQF5zMXZxY7tLpMyJeLjr1C4rLmkVe8g==} + engines: {node: '>=12.20'} + dev: false + /youch@3.3.3: resolution: {integrity: sha512-qSFXUk3UZBLfggAW3dJKg0BMblG5biqSF8M34E06o5CSsZtH92u9Hqmj2RzGiHDi64fhe83+4tENFP2DB6t6ZA==} dependencies: diff --git a/references/v3-catalog/src/trigger/simple.ts b/references/v3-catalog/src/trigger/simple.ts index 2152622b2f..8dcf96f46e 100644 --- a/references/v3-catalog/src/trigger/simple.ts +++ b/references/v3-catalog/src/trigger/simple.ts @@ -106,15 +106,33 @@ export const immediateReturn = task({ console.info("some"); console.warn("random"); console.error("logs"); + + await new Promise((resolve) => setTimeout(resolve, 20000)); + }, +}); + +export const simulateErrorTester = task({ + id: "simulateErrorTester", + run: async (payload: { message: string }) => { + await simulateError.batchTrigger([ + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + ]); }, }); export const simulateError = task({ id: "simulateError", + retry: { + maxAttempts: 1, + }, run: async (payload: { message: string }) => { - // Sleep for 1 second - await new Promise((resolve) => setTimeout(resolve, 1000)); - thisFunctionWillThrow(); }, });