diff --git a/.changeset/eight-turtles-itch.md b/.changeset/eight-turtles-itch.md new file mode 100644 index 0000000000..967232a474 --- /dev/null +++ b/.changeset/eight-turtles-itch.md @@ -0,0 +1,9 @@ +--- +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +- Include retries.default in task retry config when indexing +- New helpers for internal error retry mechanics +- Detection for segfaults and ffmpeg OOM errors +- Retries for packet import and export \ No newline at end of file diff --git a/apps/webapp/app/components/runs/v3/RunInspector.tsx b/apps/webapp/app/components/runs/v3/RunInspector.tsx index 1f3f6f8fa0..4ff84dfdb6 100644 --- a/apps/webapp/app/components/runs/v3/RunInspector.tsx +++ b/apps/webapp/app/components/runs/v3/RunInspector.tsx @@ -40,7 +40,7 @@ import { } from "~/utils/pathBuilder"; import { TraceSpan } from "~/utils/taskEvent"; import { SpanLink } from "~/v3/eventRepository.server"; -import { isFinalRunStatus } from "~/v3/taskStatus"; +import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus"; import { RunTimelineEvent, RunTimelineLine } from "./InspectorTimeline"; import { RunTag } from "./RunTag"; import { TaskRunStatusCombo } from "./TaskRunStatus"; @@ -479,6 +479,7 @@ function RunTimeline({ run }: { run: RawRun }) { const updatedAt = new Date(run.updatedAt); const isFinished = isFinalRunStatus(run.status); + const isError = isFailedRunStatus(run.status); return (
@@ -535,7 +536,7 @@ function RunTimeline({ run }: { run: RawRun }) { } - state="complete" + state={isError ? "error" : "complete"} /> ) : ( diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index 6af0f09de1..0e11e15d96 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -7,7 +7,7 @@ import { import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus"; import { eventRepository } from "~/v3/eventRepository.server"; import { machinePresetFromName } from "~/v3/machinePresets.server"; -import { FINAL_ATTEMPT_STATUSES, isFinalRunStatus } from "~/v3/taskStatus"; +import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus"; import { BasePresenter } from "./basePresenter.server"; import { getMaxDuration } from "~/v3/utils/maxDuration"; @@ -294,6 +294,7 @@ export class SpanPresenter extends BasePresenter { usageDurationMs: run.usageDurationMs, isFinished, isRunning: RUNNING_STATUSES.includes(run.status), + isError: isFailedRunStatus(run.status), payload, payloadType: run.payloadType, output, diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts index 9c2845f6a5..33894f8493 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts @@ -29,7 +29,10 @@ export async function action({ request, params }: ActionFunctionArgs) { const service = new CreateTaskRunAttemptService(); try { - const { execution } = await service.call(runParam, authenticationResult.environment); + const { execution } = await service.call({ + runId: runParam, + authenticatedEnv: authenticationResult.environment, + }); return json(execution, { status: 200 }); } catch (error) { diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx index b250a88017..a871672684 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx @@ -857,7 +857,7 @@ function RunTimeline({ run }: { run: SpanRun }) { } - state="complete" + state={run.isError ? "error" : "complete"} /> ) : ( diff --git a/apps/webapp/app/v3/failedTaskRun.server.ts b/apps/webapp/app/v3/failedTaskRun.server.ts index c322241df7..9ebe9233c9 100644 --- a/apps/webapp/app/v3/failedTaskRun.server.ts +++ b/apps/webapp/app/v3/failedTaskRun.server.ts @@ -1,15 +1,41 @@ -import { sanitizeError, TaskRunFailedExecutionResult } from "@trigger.dev/core/v3"; +import { + calculateNextRetryDelay, + RetryOptions, + TaskRunExecution, + TaskRunExecutionRetry, + TaskRunFailedExecutionResult, +} from "@trigger.dev/core/v3"; import { logger } from "~/services/logger.server"; -import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server"; import { BaseService } from "./services/baseService.server"; -import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server"; -import { FAILABLE_RUN_STATUSES } from "./taskStatus"; +import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus"; +import type { Prisma, TaskRun } from "@trigger.dev/database"; +import { CompleteAttemptService } from "./services/completeAttempt.server"; +import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server"; +import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server"; +import * as semver from "semver"; + +const includeAttempts = { + attempts: { + orderBy: { + createdAt: "desc", + }, + take: 1, + }, + lockedBy: true, // task + lockedToVersion: true, // worker +} satisfies Prisma.TaskRunInclude; + +type TaskRunWithAttempts = Prisma.TaskRunGetPayload<{ + include: typeof includeAttempts; +}>; export class FailedTaskRunService extends BaseService { public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) { + logger.debug("[FailedTaskRunService] Handling failed task run", { anyRunId, completion }); + const isFriendlyId = anyRunId.startsWith("run_"); - const taskRun = await this._prisma.taskRun.findUnique({ + const taskRun = await this._prisma.taskRun.findFirst({ where: { friendlyId: isFriendlyId ? anyRunId : undefined, id: !isFriendlyId ? anyRunId : undefined, @@ -25,7 +51,7 @@ export class FailedTaskRunService extends BaseService { return; } - if (!FAILABLE_RUN_STATUSES.includes(taskRun.status)) { + if (!isFailableRunStatus(taskRun.status)) { logger.error("[FailedTaskRunService] Task run is not in a failable state", { taskRun, completion, @@ -34,33 +60,226 @@ export class FailedTaskRunService extends BaseService { return; } - // No more retries, we need to fail the task run - logger.debug("[FailedTaskRunService] Failing task run", { taskRun, completion }); + const retryHelper = new FailedTaskRunRetryHelper(this._prisma); + const retryResult = await retryHelper.call({ + runId: taskRun.id, + completion, + }); + + logger.debug("[FailedTaskRunService] Completion result", { + runId: taskRun.id, + result: retryResult, + }); + } +} + +interface TaskRunWithWorker extends TaskRun { + lockedBy: { retryConfig: Prisma.JsonValue } | null; + lockedToVersion: { sdkVersion: string } | null; +} - const finalizeService = new FinalizeTaskRunService(); - await finalizeService.call({ - id: taskRun.id, - status: "SYSTEM_FAILURE", - completedAt: new Date(), - attemptStatus: "FAILED", - error: sanitizeError(completion.error), +export class FailedTaskRunRetryHelper extends BaseService { + async call({ + runId, + completion, + isCrash, + }: { + runId: string; + completion: TaskRunFailedExecutionResult; + isCrash?: boolean; + }) { + const taskRun = await this._prisma.taskRun.findFirst({ + where: { + id: runId, + }, + include: includeAttempts, }); - // Now we need to "complete" the task run event/span - await eventRepository.completeEvent(taskRun.spanId, { - endTime: new Date(), - attributes: { - isError: true, + if (!taskRun) { + logger.error("[FailedTaskRunRetryHelper] Task run not found", { + runId, + completion, + }); + + return "NO_TASK_RUN"; + } + + const retriableExecution = await this.#getRetriableAttemptExecution(taskRun, completion); + + if (!retriableExecution) { + return "NO_EXECUTION"; + } + + logger.debug("[FailedTaskRunRetryHelper] Completing attempt", { taskRun, completion }); + + const executionRetry = + completion.retry ?? + (await FailedTaskRunRetryHelper.getExecutionRetry({ + run: taskRun, + execution: retriableExecution, + })); + + const completeAttempt = new CompleteAttemptService(this._prisma); + const completeResult = await completeAttempt.call({ + completion: { + ...completion, + retry: executionRetry, }, - events: [ - { - name: "exception", - time: new Date(), - properties: { - exception: createExceptionPropertiesFromError(completion.error), - }, - }, - ], + execution: retriableExecution, + isSystemFailure: !isCrash, + isCrash, }); + + return completeResult; + } + + async #getRetriableAttemptExecution( + run: TaskRunWithAttempts, + completion: TaskRunFailedExecutionResult + ): Promise { + let attempt = run.attempts[0]; + + // We need to create an attempt if: + // - None exists yet + // - The last attempt has a final status, e.g. we failed between attempts + if (!attempt || isFinalAttemptStatus(attempt.status)) { + logger.debug("[FailedTaskRunRetryHelper] No attempts found", { + run, + completion, + }); + + const createAttempt = new CreateTaskRunAttemptService(this._prisma); + + try { + const { execution } = await createAttempt.call({ + runId: run.id, + // This ensures we correctly respect `maxAttempts = 1` when failing before the first attempt was created + startAtZero: true, + }); + return execution; + } catch (error) { + logger.error("[FailedTaskRunRetryHelper] Failed to create attempt", { + run, + completion, + error, + }); + + return; + } + } + + // We already have an attempt with non-final status, let's use it + try { + const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt({ + id: attempt.id, + skipStatusChecks: true, + }); + + return executionPayload?.execution; + } catch (error) { + logger.error("[FailedTaskRunRetryHelper] Failed to get execution payload", { + run, + completion, + error, + }); + + return; + } + } + + static async getExecutionRetry({ + run, + execution, + }: { + run: TaskRunWithWorker; + execution: TaskRunExecution; + }): Promise { + try { + const retryConfig = run.lockedBy?.retryConfig; + + if (!retryConfig) { + if (!run.lockedToVersion) { + logger.error("[FailedTaskRunRetryHelper] Run not locked to version", { + run, + execution, + }); + + return; + } + + const sdkVersion = run.lockedToVersion.sdkVersion ?? "0.0.0"; + const isValid = semver.valid(sdkVersion); + + if (!isValid) { + logger.error("[FailedTaskRunRetryHelper] Invalid SDK version", { + run, + execution, + }); + + return; + } + + // With older SDK versions, tasks only have a retry config stored in the DB if it's explicitly defined on the task itself + // It won't get populated with retry.default in trigger.config.ts + if (semver.lt(sdkVersion, FailedTaskRunRetryHelper.DEFAULT_RETRY_CONFIG_SINCE_VERSION)) { + logger.warn( + "[FailedTaskRunRetryHelper] SDK version not recent enough to determine retry config", + { + run, + execution, + } + ); + + return; + } + } + + const parsedRetryConfig = RetryOptions.nullable().safeParse(retryConfig); + + if (!parsedRetryConfig.success) { + logger.error("[FailedTaskRunRetryHelper] Invalid retry config", { + run, + execution, + }); + + return; + } + + if (!parsedRetryConfig.data) { + logger.debug("[FailedTaskRunRetryHelper] No retry config", { + run, + execution, + }); + + return; + } + + const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number); + + if (!delay) { + logger.debug("[FailedTaskRunRetryHelper] No more retries", { + run, + execution, + }); + + return; + } + + return { + timestamp: Date.now() + delay, + delay, + }; + } catch (error) { + logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", { + run, + execution, + error, + }); + + return; + } } + + // TODO: update this to the correct version + static DEFAULT_RETRY_CONFIG_SINCE_VERSION = "3.0.14"; } diff --git a/apps/webapp/app/v3/handleSocketIo.server.ts b/apps/webapp/app/v3/handleSocketIo.server.ts index dd12d69bb4..b40b5b5673 100644 --- a/apps/webapp/app/v3/handleSocketIo.server.ts +++ b/apps/webapp/app/v3/handleSocketIo.server.ts @@ -193,7 +193,11 @@ function createCoordinatorNamespace(io: Server) { } const service = new CreateTaskRunAttemptService(); - const { attempt } = await service.call(message.runId, environment, false); + const { attempt } = await service.call({ + runId: message.runId, + authenticatedEnv: environment, + setToExecuting: false, + }); const payload = await sharedQueueTasks.getExecutionPayloadFromAttempt({ id: attempt.id, diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 19628db616..eed8d7f5ae 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -509,7 +509,10 @@ export class SharedQueueConsumer { if (!deployment.worker.supportsLazyAttempts) { try { const service = new CreateTaskRunAttemptService(); - await service.call(lockedTaskRun.friendlyId, undefined, false); + await service.call({ + runId: lockedTaskRun.id, + setToExecuting: false, + }); } catch (error) { logger.error("Failed to create task run attempt for outdate worker", { error, diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 3df51c5cf9..9695ee9c7f 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -8,6 +8,7 @@ import { TaskRunSuccessfulExecutionResult, flattenAttributes, sanitizeError, + shouldRetryError, } from "@trigger.dev/core/v3"; import { $transaction, PrismaClientOrTransaction } from "~/db.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; @@ -21,9 +22,10 @@ import { MAX_TASK_RUN_ATTEMPTS } from "~/consts"; import { CreateCheckpointService } from "./createCheckpoint.server"; import { TaskRun } from "@trigger.dev/database"; import { RetryAttemptService } from "./retryAttempt.server"; -import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; +import { FAILED_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { env } from "~/env.server"; +import { FailedTaskRunRetryHelper } from "../failedTaskRun.server"; type FoundAttempt = Awaited>; @@ -39,12 +41,16 @@ export class CompleteAttemptService extends BaseService { env, checkpoint, supportsRetryCheckpoints, + isSystemFailure, + isCrash, }: { completion: TaskRunExecutionResult; execution: TaskRunExecution; env?: AuthenticatedEnvironment; checkpoint?: CheckpointData; supportsRetryCheckpoints?: boolean; + isSystemFailure?: boolean; + isCrash?: boolean; }): Promise<"COMPLETED" | "RETRIED"> { const taskRunAttempt = await findAttempt(this._prisma, execution.attempt.id); @@ -110,6 +116,8 @@ export class CompleteAttemptService extends BaseService { env, checkpoint, supportsRetryCheckpoints, + isSystemFailure, + isCrash, }); } } @@ -170,6 +178,8 @@ export class CompleteAttemptService extends BaseService { env, checkpoint, supportsRetryCheckpoints, + isSystemFailure, + isCrash, }: { completion: TaskRunFailedExecutionResult; execution: TaskRunExecution; @@ -177,6 +187,8 @@ export class CompleteAttemptService extends BaseService { env?: AuthenticatedEnvironment; checkpoint?: CheckpointData; supportsRetryCheckpoints?: boolean; + isSystemFailure?: boolean; + isCrash?: boolean; }): Promise<"COMPLETED" | "RETRIED"> { if ( completion.error.type === "INTERNAL_ERROR" && @@ -194,18 +206,17 @@ export class CompleteAttemptService extends BaseService { env ); - // The cancel service handles ACK - return "COMPLETED"; } + const failedAt = new Date(); const sanitizedError = sanitizeError(completion.error); await this._prisma.taskRunAttempt.update({ where: { id: taskRunAttempt.id }, data: { status: "FAILED", - completedAt: new Date(), + completedAt: failedAt, error: sanitizedError, usageDurationMs: completion.usage?.durationMs, }, @@ -213,183 +224,152 @@ export class CompleteAttemptService extends BaseService { const environment = env ?? (await this.#getEnvironment(execution.environment.id)); - if (completion.retry !== undefined && taskRunAttempt.number < MAX_TASK_RUN_ATTEMPTS) { - const retryAt = new Date(completion.retry.timestamp); - - // Retry the task run - await eventRepository.recordEvent(`Retry #${execution.attempt.number} delay`, { - taskSlug: taskRunAttempt.taskRun.taskIdentifier, - environment, - attributes: { - metadata: this.#generateMetadataAttributesForNextAttempt(execution), - properties: { - retryAt: retryAt.toISOString(), - }, - runId: taskRunAttempt.taskRun.friendlyId, - style: { - icon: "schedule-attempt", - }, - queueId: taskRunAttempt.queueId, - queueName: taskRunAttempt.taskRun.queue, + const executionRetry = + completion.retry ?? + (await FailedTaskRunRetryHelper.getExecutionRetry({ + run: { + ...taskRunAttempt.taskRun, + lockedBy: taskRunAttempt.backgroundWorkerTask, + lockedToVersion: taskRunAttempt.backgroundWorker, }, - context: taskRunAttempt.taskRun.traceContext as Record, - spanIdSeed: `retry-${taskRunAttempt.number + 1}`, - endTime: retryAt, - }); - - logger.debug("Retrying", { - taskRun: taskRunAttempt.taskRun.friendlyId, - retry: completion.retry, - }); + execution, + })); - await this._prisma.taskRun.update({ - where: { - id: taskRunAttempt.taskRunId, - }, - data: { - status: "RETRYING_AFTER_FAILURE", - }, + if ( + shouldRetryError(completion.error) && + executionRetry !== undefined && + taskRunAttempt.number < MAX_TASK_RUN_ATTEMPTS + ) { + return await this.#retryAttempt({ + execution, + executionRetry, + taskRunAttempt, + environment, + checkpoint, + supportsRetryCheckpoints, }); + } - if (environment.type === "DEVELOPMENT") { - // This is already an EXECUTE message so we can just NACK - await marqs?.nackMessage(taskRunAttempt.taskRunId, completion.retry.timestamp); - return "RETRIED"; - } - - if (!checkpoint) { - await this.#retryAttempt({ - run: taskRunAttempt.taskRun, - retry: completion.retry, - supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts, - supportsRetryCheckpoints, - }); - - return "RETRIED"; - } + // The attempt has failed and we won't retry - const createCheckpoint = new CreateCheckpointService(this._prisma); - const checkpointCreateResult = await createCheckpoint.call({ - attemptFriendlyId: execution.attempt.id, - docker: checkpoint.docker, - location: checkpoint.location, - reason: { - type: "RETRYING_AFTER_FAILURE", - attemptNumber: execution.attempt.number, + // Now we need to "complete" the task run event/span + await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, { + endTime: failedAt, + attributes: { + isError: true, + }, + events: [ + { + name: "exception", + time: failedAt, + properties: { + exception: createExceptionPropertiesFromError(sanitizedError), + }, }, - }); - - if (!checkpointCreateResult.success) { - logger.error("Failed to create checkpoint", { checkpoint, execution: execution.run.id }); - - const finalizeService = new FinalizeTaskRunService(); - await finalizeService.call({ - id: taskRunAttempt.taskRunId, - status: "SYSTEM_FAILURE", - completedAt: new Date(), - }); + ], + }); - return "COMPLETED"; - } + await this._prisma.taskRun.update({ + where: { + id: taskRunAttempt.taskRunId, + }, + data: { + error: sanitizedError, + }, + }); - await this.#retryAttempt({ - run: taskRunAttempt.taskRun, - retry: completion.retry, - checkpointEventId: checkpointCreateResult.event.id, - supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts, - supportsRetryCheckpoints, - }); + let status: FAILED_RUN_STATUSES; - return "RETRIED"; + // Set the correct task run status + if (isSystemFailure) { + status = "SYSTEM_FAILURE"; + } else if (isCrash) { + status = "CRASHED"; + } else if ( + sanitizedError.type === "INTERNAL_ERROR" && + sanitizedError.code === "MAX_DURATION_EXCEEDED" + ) { + status = "TIMED_OUT"; + // TODO: check we want these all to be crashes by default + } else if (sanitizedError.type === "INTERNAL_ERROR") { + status = "CRASHED"; } else { - // Now we need to "complete" the task run event/span - await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, { - endTime: new Date(), - attributes: { - isError: true, - }, - events: [ - { - name: "exception", - time: new Date(), - properties: { - exception: createExceptionPropertiesFromError(sanitizedError), - }, - }, - ], - }); + status = "COMPLETED_WITH_ERRORS"; + } - if ( - sanitizedError.type === "INTERNAL_ERROR" && - sanitizedError.code === "GRACEFUL_EXIT_TIMEOUT" - ) { - const finalizeService = new FinalizeTaskRunService(); - await finalizeService.call({ - id: taskRunAttempt.taskRunId, - status: "SYSTEM_FAILURE", - completedAt: new Date(), - }); + const finalizeService = new FinalizeTaskRunService(); + await finalizeService.call({ + id: taskRunAttempt.taskRunId, + status, + completedAt: failedAt, + }); - // We need to fail all incomplete spans - const inProgressEvents = await eventRepository.queryIncompleteEvents({ - attemptId: execution.attempt.id, - }); + if (status !== "CRASHED" && status !== "SYSTEM_FAILURE") { + return "COMPLETED"; + } + + const inProgressEvents = await eventRepository.queryIncompleteEvents({ + runId: taskRunAttempt.taskRun.friendlyId, + }); - logger.debug("Failing in-progress events", { + // Handle in-progress events + switch (status) { + case "CRASHED": { + logger.debug("[CompleteAttemptService] Crashing in-progress events", { inProgressEvents: inProgressEvents.map((event) => event.id), }); - const exception = { - type: "Graceful exit timeout", - message: sanitizedError.message, - }; - await Promise.all( inProgressEvents.map((event) => { return eventRepository.crashEvent({ - event: event, - crashedAt: new Date(), - exception, + event, + crashedAt: failedAt, + exception: createExceptionPropertiesFromError(sanitizedError), }); }) ); - } else { - await this._prisma.taskRun.update({ - where: { - id: taskRunAttempt.taskRunId, - }, - data: { - error: sanitizedError, - }, - }); - const status = - sanitizedError.type === "INTERNAL_ERROR" && - sanitizedError.code === "MAX_DURATION_EXCEEDED" - ? "TIMED_OUT" - : "COMPLETED_WITH_ERRORS"; - - const finalizeService = new FinalizeTaskRunService(); - await finalizeService.call({ - id: taskRunAttempt.taskRunId, - status, - completedAt: new Date(), - }); + break; } + case "SYSTEM_FAILURE": { + logger.debug("[CompleteAttemptService] Failing in-progress events", { + inProgressEvents: inProgressEvents.map((event) => event.id), + }); - return "COMPLETED"; + await Promise.all( + inProgressEvents.map((event) => { + return eventRepository.completeEvent(event.spanId, { + endTime: failedAt, + attributes: { + isError: true, + }, + events: [ + { + name: "exception", + time: failedAt, + properties: { + exception: createExceptionPropertiesFromError(sanitizedError), + }, + }, + ], + }); + }) + ); + } } + + return "COMPLETED"; } - async #retryAttempt({ + async #enqueueReattempt({ run, - retry, + executionRetry, checkpointEventId, supportsLazyAttempts, supportsRetryCheckpoints, }: { run: TaskRun; - retry: TaskRunExecutionRetry; + executionRetry: TaskRunExecutionRetry; checkpointEventId?: string; supportsLazyAttempts: boolean; supportsRetryCheckpoints?: boolean; @@ -404,12 +384,12 @@ export class CompleteAttemptService extends BaseService { checkpointEventId: supportsRetryCheckpoints ? checkpointEventId : undefined, retryCheckpointsDisabled: !supportsRetryCheckpoints, }, - retry.timestamp + executionRetry.timestamp ); }; const retryDirectly = () => { - return RetryAttemptService.enqueue(run.id, this._prisma, new Date(retry.timestamp)); + return RetryAttemptService.enqueue(run.id, this._prisma, new Date(executionRetry.timestamp)); }; // There's a checkpoint, so we need to go through the queue @@ -432,7 +412,7 @@ export class CompleteAttemptService extends BaseService { } // Workers that never checkpoint between attempts will exit after completing their current attempt if the retry delay exceeds the threshold - if (!supportsRetryCheckpoints && retry.delay >= env.CHECKPOINT_THRESHOLD_IN_MS) { + if (!supportsRetryCheckpoints && executionRetry.delay >= env.CHECKPOINT_THRESHOLD_IN_MS) { await retryViaQueue(); return; } @@ -441,6 +421,141 @@ export class CompleteAttemptService extends BaseService { await retryDirectly(); } + async #retryAttempt({ + execution, + executionRetry, + taskRunAttempt, + environment, + checkpoint, + supportsRetryCheckpoints, + }: { + execution: TaskRunExecution; + executionRetry: TaskRunExecutionRetry; + taskRunAttempt: NonNullable; + environment: AuthenticatedEnvironment; + checkpoint?: CheckpointData; + supportsRetryCheckpoints?: boolean; + }) { + const retryAt = new Date(executionRetry.timestamp); + + // Retry the task run + await eventRepository.recordEvent(`Retry #${execution.attempt.number} delay`, { + taskSlug: taskRunAttempt.taskRun.taskIdentifier, + environment, + attributes: { + metadata: this.#generateMetadataAttributesForNextAttempt(execution), + properties: { + retryAt: retryAt.toISOString(), + }, + runId: taskRunAttempt.taskRun.friendlyId, + style: { + icon: "schedule-attempt", + }, + queueId: taskRunAttempt.queueId, + queueName: taskRunAttempt.taskRun.queue, + }, + context: taskRunAttempt.taskRun.traceContext as Record, + spanIdSeed: `retry-${taskRunAttempt.number + 1}`, + endTime: retryAt, + }); + + logger.debug("Retrying", { + taskRun: taskRunAttempt.taskRun.friendlyId, + retry: executionRetry, + }); + + await this._prisma.taskRun.update({ + where: { + id: taskRunAttempt.taskRunId, + }, + data: { + status: "RETRYING_AFTER_FAILURE", + }, + }); + + if (environment.type === "DEVELOPMENT") { + // This is already an EXECUTE message so we can just NACK + await marqs?.nackMessage(taskRunAttempt.taskRunId, executionRetry.timestamp); + return "RETRIED"; + } + + if (checkpoint) { + // This is only here for backwards compat - we don't checkpoint between attempts anymore + return await this.#retryAttemptWithCheckpoint({ + execution, + taskRunAttempt, + executionRetry, + checkpoint, + supportsRetryCheckpoints, + }); + } + + await this.#enqueueReattempt({ + run: taskRunAttempt.taskRun, + executionRetry, + supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts, + supportsRetryCheckpoints, + }); + + return "RETRIED"; + } + + async #retryAttemptWithCheckpoint({ + execution, + taskRunAttempt, + executionRetry, + checkpoint, + supportsRetryCheckpoints, + }: { + execution: TaskRunExecution; + taskRunAttempt: NonNullable; + executionRetry: TaskRunExecutionRetry; + checkpoint: CheckpointData; + supportsRetryCheckpoints?: boolean; + }) { + const createCheckpoint = new CreateCheckpointService(this._prisma); + const checkpointCreateResult = await createCheckpoint.call({ + attemptFriendlyId: execution.attempt.id, + docker: checkpoint.docker, + location: checkpoint.location, + reason: { + type: "RETRYING_AFTER_FAILURE", + attemptNumber: execution.attempt.number, + }, + }); + + if (!checkpointCreateResult.success) { + logger.error("Failed to create reattempt checkpoint", { + checkpoint, + runId: execution.run.id, + attemptId: execution.attempt.id, + }); + + const finalizeService = new FinalizeTaskRunService(); + await finalizeService.call({ + id: taskRunAttempt.taskRunId, + status: "SYSTEM_FAILURE", + completedAt: new Date(), + error: { + type: "STRING_ERROR", + raw: "Failed to create reattempt checkpoint", + }, + }); + + return "COMPLETED" as const; + } + + await this.#enqueueReattempt({ + run: taskRunAttempt.taskRun, + executionRetry, + checkpointEventId: checkpointCreateResult.event.id, + supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts, + supportsRetryCheckpoints, + }); + + return "RETRIED" as const; + } + #generateMetadataAttributesForNextAttempt(execution: TaskRunExecution) { const context = TaskRunContext.parse(execution); @@ -475,6 +590,7 @@ async function findAttempt(prismaClient: PrismaClientOrTransaction, friendlyId: select: { id: true, supportsLazyAttempts: true, + sdkVersion: true, }, }, }, diff --git a/apps/webapp/app/v3/services/crashTaskRun.server.ts b/apps/webapp/app/v3/services/crashTaskRun.server.ts index 6a337c000e..cac467c097 100644 --- a/apps/webapp/app/v3/services/crashTaskRun.server.ts +++ b/apps/webapp/app/v3/services/crashTaskRun.server.ts @@ -1,12 +1,12 @@ import { TaskRun, TaskRunAttempt } from "@trigger.dev/database"; import { eventRepository } from "../eventRepository.server"; -import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus"; import { sanitizeError, TaskRunInternalError } from "@trigger.dev/core/v3"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; +import { FailedTaskRunRetryHelper } from "../failedTaskRun.server"; export type CrashTaskRunServiceOptions = { reason?: string; @@ -36,16 +36,50 @@ export class CrashTaskRunService extends BaseService { }); if (!taskRun) { - logger.error("Task run not found", { runId }); + logger.error("[CrashTaskRunService] Task run not found", { runId }); return; } // Make sure the task run is in a crashable state if (!opts.overrideCompletion && !isCrashableRunStatus(taskRun.status)) { - logger.error("Task run is not in a crashable state", { runId, status: taskRun.status }); + logger.error("[CrashTaskRunService] Task run is not in a crashable state", { + runId, + status: taskRun.status, + }); return; } + logger.debug("[CrashTaskRunService] Completing attempt", { runId, options }); + + const retryHelper = new FailedTaskRunRetryHelper(this._prisma); + const retryResult = await retryHelper.call({ + runId, + completion: { + ok: false, + id: runId, + error: { + type: "INTERNAL_ERROR", + code: opts.errorCode ?? "TASK_RUN_CRASHED", + message: opts.reason, + stackTrace: opts.logs, + }, + }, + isCrash: true, + }); + + logger.debug("[CrashTaskRunService] Completion result", { runId, retryResult }); + + if (retryResult === "RETRIED") { + logger.debug("[CrashTaskRunService] Retried task run", { runId }); + return; + } + + if (!opts.overrideCompletion) { + return; + } + + logger.debug("[CrashTaskRunService] Overriding completion", { runId, options }); + const finalizeService = new FinalizeTaskRunService(); const crashedTaskRun = await finalizeService.call({ id: taskRun.id, @@ -87,7 +121,7 @@ export class CrashTaskRunService extends BaseService { options?.overrideCompletion ); - logger.debug("Crashing in-progress events", { + logger.debug("[CrashTaskRunService] Crashing in-progress events", { inProgressEvents: inProgressEvents.map((event) => event.id), }); @@ -136,27 +170,29 @@ export class CrashTaskRunService extends BaseService { code?: TaskRunInternalError["code"]; } ) { - return await this.traceWithEnv("failAttempt()", environment, async (span) => { - span.setAttribute("taskRunId", run.id); - span.setAttribute("attemptId", attempt.id); - - await marqs?.acknowledgeMessage(run.id); + return await this.traceWithEnv( + "[CrashTaskRunService] failAttempt()", + environment, + async (span) => { + span.setAttribute("taskRunId", run.id); + span.setAttribute("attemptId", attempt.id); - await this._prisma.taskRunAttempt.update({ - where: { - id: attempt.id, - }, - data: { - status: "FAILED", - completedAt: failedAt, - error: sanitizeError({ - type: "INTERNAL_ERROR", - code: error.code ?? "TASK_RUN_CRASHED", - message: error.reason, - stackTrace: error.logs, - }), - }, - }); - }); + await this._prisma.taskRunAttempt.update({ + where: { + id: attempt.id, + }, + data: { + status: "FAILED", + completedAt: failedAt, + error: sanitizeError({ + type: "INTERNAL_ERROR", + code: error.code ?? "TASK_RUN_CRASHED", + message: error.reason, + stackTrace: error.logs, + }), + }, + }); + } + ); } } diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index 32f400d85a..0e9cc29336 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -12,11 +12,17 @@ import { CrashTaskRunService } from "./crashTaskRun.server"; import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; export class CreateTaskRunAttemptService extends BaseService { - public async call( - runId: string, - authenticatedEnv?: AuthenticatedEnvironment, - setToExecuting = true - ): Promise<{ + public async call({ + runId, + authenticatedEnv, + setToExecuting = true, + startAtZero = false, + }: { + runId: string; + authenticatedEnv?: AuthenticatedEnvironment; + setToExecuting?: boolean; + startAtZero?: boolean; + }): Promise<{ execution: TaskRunExecution; run: TaskRun; attempt: TaskRunAttempt; @@ -102,7 +108,11 @@ export class CreateTaskRunAttemptService extends BaseService { throw new ServiceValidationError("Queue not found", 404); } - const nextAttemptNumber = taskRun.attempts[0] ? taskRun.attempts[0].number + 1 : 1; + const nextAttemptNumber = taskRun.attempts[0] + ? taskRun.attempts[0].number + 1 + : startAtZero + ? 0 + : 1; if (nextAttemptNumber > MAX_TASK_RUN_ATTEMPTS) { const service = new CrashTaskRunService(this._prisma); diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index 797e802382..e92beaaa50 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -3,11 +3,17 @@ import { type Prisma, type TaskRun } from "@trigger.dev/database"; import { logger } from "~/services/logger.server"; import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; -import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, type FINAL_RUN_STATUSES } from "../taskStatus"; +import { + FINAL_ATTEMPT_STATUSES, + isFailedRunStatus, + isFatalRunStatus, + type FINAL_RUN_STATUSES, +} from "../taskStatus"; import { PerformTaskRunAlertsService } from "./alerts/performTaskRunAlerts.server"; import { BaseService } from "./baseService.server"; import { ResumeDependentParentsService } from "./resumeDependentParents.server"; import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; +import { socketIo } from "../handleSocketIo.server"; type BaseInput = { id: string; @@ -90,6 +96,42 @@ export class FinalizeTaskRunService extends BaseService { await PerformTaskRunAlertsService.enqueue(run.id, this._prisma); } + if (isFatalRunStatus(run.status)) { + logger.error("FinalizeTaskRunService: Fatal status", { runId: run.id, status: run.status }); + + const extendedRun = await this._prisma.taskRun.findFirst({ + where: { id: run.id }, + select: { + id: true, + lockedToVersion: { + select: { + supportsLazyAttempts: true, + }, + }, + runtimeEnvironment: { + select: { + type: true, + }, + }, + }, + }); + + if (extendedRun && extendedRun.runtimeEnvironment.type !== "DEVELOPMENT") { + logger.error("FinalizeTaskRunService: Fatal status, requesting worker exit", { + runId: run.id, + status: run.status, + }); + + // Signal to exit any leftover containers + socketIo.coordinatorNamespace.emit("REQUEST_RUN_CANCELLATION", { + version: "v1", + runId: run.id, + // Give the run a few seconds to exit to complete any flushing etc + delayInMs: extendedRun.lockedToVersion?.supportsLazyAttempts ? 5_000 : undefined, + }); + } + } + return run as Output; } @@ -111,83 +153,90 @@ export class FinalizeTaskRunService extends BaseService { error?: TaskRunError; run: TaskRun; }) { - if (attemptStatus || error) { - const latestAttempt = await this._prisma.taskRunAttempt.findFirst({ - where: { taskRunId: run.id }, - orderBy: { id: "desc" }, - take: 1, + if (!attemptStatus && !error) { + logger.error("FinalizeTaskRunService: No attemptStatus or error provided", { runId: run.id }); + return; + } + + const latestAttempt = await this._prisma.taskRunAttempt.findFirst({ + where: { taskRunId: run.id }, + orderBy: { id: "desc" }, + take: 1, + }); + + if (latestAttempt) { + logger.debug("Finalizing run attempt", { + id: latestAttempt.id, + status: attemptStatus, + error, }); - if (latestAttempt) { - logger.debug("Finalizing run attempt", { - id: latestAttempt.id, - status: attemptStatus, - error, - }); + await this._prisma.taskRunAttempt.update({ + where: { id: latestAttempt.id }, + data: { status: attemptStatus, error: error ? sanitizeError(error) : undefined }, + }); - await this._prisma.taskRunAttempt.update({ - where: { id: latestAttempt.id }, - data: { status: attemptStatus, error: error ? sanitizeError(error) : undefined }, - }); - } else { - logger.debug("Finalizing run no attempt found", { - runId: run.id, - attemptStatus, - error, - }); + return; + } - if (!run.lockedById) { - logger.error( - "FinalizeTaskRunService: No lockedById, so can't get the BackgroundWorkerTask. Not creating an attempt.", - { runId: run.id } - ); - return; - } - - const workerTask = await this._prisma.backgroundWorkerTask.findFirst({ - select: { - id: true, - workerId: true, - runtimeEnvironmentId: true, - }, - where: { - id: run.lockedById, - }, - }); + // There's no attempt, so create one - if (!workerTask) { - logger.error("FinalizeTaskRunService: No worker task found", { runId: run.id }); - return; - } + logger.debug("Finalizing run no attempt found", { + runId: run.id, + attemptStatus, + error, + }); - const queue = await this._prisma.taskQueue.findUnique({ - where: { - runtimeEnvironmentId_name: { - runtimeEnvironmentId: workerTask.runtimeEnvironmentId, - name: sanitizeQueueName(run.queue), - }, - }, - }); + if (!run.lockedById) { + logger.error( + "FinalizeTaskRunService: No lockedById, so can't get the BackgroundWorkerTask. Not creating an attempt.", + { runId: run.id } + ); + return; + } - if (!queue) { - logger.error("FinalizeTaskRunService: No queue found", { runId: run.id }); - return; - } - - await this._prisma.taskRunAttempt.create({ - data: { - number: 1, - friendlyId: generateFriendlyId("attempt"), - taskRunId: run.id, - backgroundWorkerId: workerTask?.workerId, - backgroundWorkerTaskId: workerTask?.id, - queueId: queue.id, - runtimeEnvironmentId: workerTask.runtimeEnvironmentId, - status: attemptStatus, - error: error ? sanitizeError(error) : undefined, - }, - }); - } + const workerTask = await this._prisma.backgroundWorkerTask.findFirst({ + select: { + id: true, + workerId: true, + runtimeEnvironmentId: true, + }, + where: { + id: run.lockedById, + }, + }); + + if (!workerTask) { + logger.error("FinalizeTaskRunService: No worker task found", { runId: run.id }); + return; + } + + const queue = await this._prisma.taskQueue.findUnique({ + where: { + runtimeEnvironmentId_name: { + runtimeEnvironmentId: workerTask.runtimeEnvironmentId, + name: sanitizeQueueName(run.queue), + }, + }, + }); + + if (!queue) { + logger.error("FinalizeTaskRunService: No queue found", { runId: run.id }); + return; } + + await this._prisma.taskRunAttempt.create({ + data: { + number: 1, + friendlyId: generateFriendlyId("attempt"), + taskRunId: run.id, + backgroundWorkerId: workerTask?.workerId, + backgroundWorkerTaskId: workerTask?.id, + queueId: queue.id, + runtimeEnvironmentId: workerTask.runtimeEnvironmentId, + status: attemptStatus, + error: error ? sanitizeError(error) : undefined, + }, + }); } } diff --git a/apps/webapp/app/v3/taskStatus.ts b/apps/webapp/app/v3/taskStatus.ts index 498df8a267..a360bde091 100644 --- a/apps/webapp/app/v3/taskStatus.ts +++ b/apps/webapp/app/v3/taskStatus.ts @@ -1,72 +1,73 @@ import type { TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database"; -export const CANCELLABLE_RUN_STATUSES: TaskRunStatus[] = [ +export const FINAL_RUN_STATUSES = [ + "CANCELED", + "INTERRUPTED", + "COMPLETED_SUCCESSFULLY", + "COMPLETED_WITH_ERRORS", + "SYSTEM_FAILURE", + "CRASHED", + "EXPIRED", + "TIMED_OUT", +] satisfies TaskRunStatus[]; + +export type FINAL_RUN_STATUSES = (typeof FINAL_RUN_STATUSES)[number]; + +export const NON_FINAL_RUN_STATUSES = [ "DELAYED", "PENDING", "WAITING_FOR_DEPLOY", "EXECUTING", - "PAUSED", "WAITING_TO_RESUME", - "PAUSED", "RETRYING_AFTER_FAILURE", -]; -export const CANCELLABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = [ - "EXECUTING", "PAUSED", - "PENDING", -]; +] satisfies TaskRunStatus[]; -export function isCancellableRunStatus(status: TaskRunStatus): boolean { - return CANCELLABLE_RUN_STATUSES.includes(status); -} -export function isCancellableAttemptStatus(status: TaskRunAttemptStatus): boolean { - return CANCELLABLE_ATTEMPT_STATUSES.includes(status); -} +export type NON_FINAL_RUN_STATUSES = (typeof NON_FINAL_RUN_STATUSES)[number]; -export const CRASHABLE_RUN_STATUSES: TaskRunStatus[] = CANCELLABLE_RUN_STATUSES; -export const CRASHABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = CANCELLABLE_ATTEMPT_STATUSES; +export const FINAL_ATTEMPT_STATUSES = [ + "FAILED", + "CANCELED", + "COMPLETED", +] satisfies TaskRunAttemptStatus[]; -export function isCrashableRunStatus(status: TaskRunStatus): boolean { - return CRASHABLE_RUN_STATUSES.includes(status); -} -export function isCrashableAttemptStatus(status: TaskRunAttemptStatus): boolean { - return CRASHABLE_ATTEMPT_STATUSES.includes(status); -} +export type FINAL_ATTEMPT_STATUSES = (typeof FINAL_ATTEMPT_STATUSES)[number]; -export const FINAL_RUN_STATUSES = [ - "CANCELED", - "COMPLETED_SUCCESSFULLY", - "COMPLETED_WITH_ERRORS", +export const NON_FINAL_ATTEMPT_STATUSES = [ + "PENDING", + "EXECUTING", + "PAUSED", +] satisfies TaskRunAttemptStatus[]; + +export type NON_FINAL_ATTEMPT_STATUSES = (typeof NON_FINAL_ATTEMPT_STATUSES)[number]; + +export const FAILED_RUN_STATUSES = [ "INTERRUPTED", + "COMPLETED_WITH_ERRORS", "SYSTEM_FAILURE", - "EXPIRED", "CRASHED", "TIMED_OUT", ] satisfies TaskRunStatus[]; -export type FINAL_RUN_STATUSES = (typeof FINAL_RUN_STATUSES)[number]; +export type FAILED_RUN_STATUSES = (typeof FAILED_RUN_STATUSES)[number]; -export const FINAL_ATTEMPT_STATUSES = [ - "CANCELED", - "COMPLETED", - "FAILED", -] satisfies TaskRunAttemptStatus[]; +export const FATAL_RUN_STATUSES = ["SYSTEM_FAILURE", "CRASHED"] satisfies TaskRunStatus[]; -export type FINAL_ATTEMPT_STATUSES = (typeof FINAL_ATTEMPT_STATUSES)[number]; +export type FATAL_RUN_STATUSES = (typeof FAILED_RUN_STATUSES)[number]; + +export const CANCELLABLE_RUN_STATUSES = NON_FINAL_RUN_STATUSES; +export const CANCELLABLE_ATTEMPT_STATUSES = NON_FINAL_ATTEMPT_STATUSES; -export const FAILED_ATTEMPT_STATUSES = ["FAILED", "CANCELED"] satisfies TaskRunAttemptStatus[]; +export const CRASHABLE_RUN_STATUSES = NON_FINAL_RUN_STATUSES; +export const CRASHABLE_ATTEMPT_STATUSES = NON_FINAL_ATTEMPT_STATUSES; -export type FAILED_ATTEMPT_STATUSES = (typeof FAILED_ATTEMPT_STATUSES)[number]; +export const FAILABLE_RUN_STATUSES = NON_FINAL_RUN_STATUSES; export const FREEZABLE_RUN_STATUSES: TaskRunStatus[] = ["EXECUTING", "RETRYING_AFTER_FAILURE"]; export const FREEZABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["EXECUTING", "FAILED"]; -export function isFreezableRunStatus(status: TaskRunStatus): boolean { - return FREEZABLE_RUN_STATUSES.includes(status); -} -export function isFreezableAttemptStatus(status: TaskRunAttemptStatus): boolean { - return FREEZABLE_ATTEMPT_STATUSES.includes(status); -} +export const RESTORABLE_RUN_STATUSES: TaskRunStatus[] = ["WAITING_TO_RESUME"]; +export const RESTORABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["PAUSED"]; export function isFinalRunStatus(status: TaskRunStatus): boolean { return FINAL_RUN_STATUSES.includes(status); @@ -75,31 +76,42 @@ export function isFinalAttemptStatus(status: TaskRunAttemptStatus): boolean { return FINAL_ATTEMPT_STATUSES.includes(status); } -export const RESTORABLE_RUN_STATUSES: TaskRunStatus[] = ["WAITING_TO_RESUME"]; -export const RESTORABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["PAUSED"]; +export function isFailedRunStatus(status: TaskRunStatus): boolean { + return FAILED_RUN_STATUSES.includes(status); +} -export function isRestorableRunStatus(status: TaskRunStatus): boolean { - return RESTORABLE_RUN_STATUSES.includes(status); +export function isFatalRunStatus(status: TaskRunStatus): boolean { + return FATAL_RUN_STATUSES.includes(status); } -export function isRestorableAttemptStatus(status: TaskRunAttemptStatus): boolean { - return RESTORABLE_ATTEMPT_STATUSES.includes(status); + +export function isCancellableRunStatus(status: TaskRunStatus): boolean { + return CANCELLABLE_RUN_STATUSES.includes(status); +} +export function isCancellableAttemptStatus(status: TaskRunAttemptStatus): boolean { + return CANCELLABLE_ATTEMPT_STATUSES.includes(status); } -export const FAILABLE_RUN_STATUSES = [ - "EXECUTING", - "PENDING", - "WAITING_FOR_DEPLOY", - "RETRYING_AFTER_FAILURE", -] satisfies TaskRunStatus[]; +export function isCrashableRunStatus(status: TaskRunStatus): boolean { + return CRASHABLE_RUN_STATUSES.includes(status); +} +export function isCrashableAttemptStatus(status: TaskRunAttemptStatus): boolean { + return CRASHABLE_ATTEMPT_STATUSES.includes(status); +} -export const FAILED_RUN_STATUSES = [ - "INTERRUPTED", - "COMPLETED_WITH_ERRORS", - "SYSTEM_FAILURE", - "CRASHED", - "TIMED_OUT", -] satisfies TaskRunStatus[]; +export function isFailableRunStatus(status: TaskRunStatus): boolean { + return FAILABLE_RUN_STATUSES.includes(status); +} -export function isFailedRunStatus(status: TaskRunStatus): boolean { - return FAILED_RUN_STATUSES.includes(status); +export function isFreezableRunStatus(status: TaskRunStatus): boolean { + return FREEZABLE_RUN_STATUSES.includes(status); +} +export function isFreezableAttemptStatus(status: TaskRunAttemptStatus): boolean { + return FREEZABLE_ATTEMPT_STATUSES.includes(status); +} + +export function isRestorableRunStatus(status: TaskRunStatus): boolean { + return RESTORABLE_RUN_STATUSES.includes(status); +} +export function isRestorableAttemptStatus(status: TaskRunAttemptStatus): boolean { + return RESTORABLE_ATTEMPT_STATUSES.includes(status); } diff --git a/apps/webapp/package.json b/apps/webapp/package.json index b58071ed71..9ebe739ca1 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -164,6 +164,7 @@ "remix-typedjson": "0.3.1", "remix-utils": "^7.1.0", "seedrandom": "^3.0.5", + "semver": "^7.5.0", "simple-oauth2": "^5.0.0", "simplur": "^3.0.1", "slug": "^6.0.0", @@ -213,6 +214,7 @@ "@types/react-dom": "18.2.7", "@types/regression": "^2.0.6", "@types/seedrandom": "^3.0.8", + "@types/semver": "^7.5.0", "@types/simple-oauth2": "^5.0.4", "@types/slug": "^5.0.3", "@types/supertest": "^6.0.2", diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 92f255ebc5..8e040191db 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1802,6 +1802,10 @@ model TaskRun { } enum TaskRunStatus { + /// + /// NON-FINAL STATUSES + /// + /// Task has been scheduled to run in the future DELAYED /// Task is waiting to be executed by a worker @@ -1822,6 +1826,10 @@ enum TaskRunStatus { /// Task has been paused by the user, and can be resumed by the user PAUSED + /// + /// FINAL STATUSES + /// + /// Task has been canceled by the user CANCELED @@ -1956,9 +1964,11 @@ model TaskRunAttempt { } enum TaskRunAttemptStatus { + /// NON-FINAL PENDING EXECUTING PAUSED + /// FINAL FAILED CANCELED COMPLETED diff --git a/packages/cli-v3/src/entryPoints/deploy-index-worker.ts b/packages/cli-v3/src/entryPoints/deploy-index-worker.ts index dfaacb5d73..73de86535d 100644 --- a/packages/cli-v3/src/entryPoints/deploy-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/deploy-index-worker.ts @@ -3,6 +3,7 @@ import { type HandleErrorFunction, indexerToWorkerMessages, taskCatalog, + type TaskManifest, TriggerConfig, } from "@trigger.dev/core/v3"; import { @@ -99,6 +100,20 @@ const { buildManifest, importErrors, config } = await bootstrap(); let tasks = taskCatalog.listTaskManifests(); +// If the config has retry defaults, we need to apply them to all tasks that don't have any retry settings +if (config.retries?.default) { + tasks = tasks.map((task) => { + if (!task.retry) { + return { + ...task, + retry: config.retries?.default, + } satisfies TaskManifest; + } + + return task; + }); +} + // If the config has a machine preset, we need to apply it to all tasks that don't have a machine preset if (typeof config.machine === "string") { tasks = tasks.map((task) => { @@ -108,7 +123,7 @@ if (typeof config.machine === "string") { machine: { preset: config.machine, }, - }; + } satisfies TaskManifest; } return task; @@ -122,7 +137,7 @@ if (typeof config.maxDuration === "number") { return { ...task, maxDuration: config.maxDuration, - }; + } satisfies TaskManifest; } return task; diff --git a/packages/cli-v3/src/entryPoints/dev-index-worker.ts b/packages/cli-v3/src/entryPoints/dev-index-worker.ts index 0420665907..9e6e8e05e9 100644 --- a/packages/cli-v3/src/entryPoints/dev-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-index-worker.ts @@ -3,6 +3,7 @@ import { type HandleErrorFunction, indexerToWorkerMessages, taskCatalog, + type TaskManifest, TriggerConfig, } from "@trigger.dev/core/v3"; import { @@ -99,6 +100,20 @@ const { buildManifest, importErrors, config } = await bootstrap(); let tasks = taskCatalog.listTaskManifests(); +// If the config has retry defaults, we need to apply them to all tasks that don't have any retry settings +if (config.retries?.default) { + tasks = tasks.map((task) => { + if (!task.retry) { + return { + ...task, + retry: config.retries?.default, + } satisfies TaskManifest; + } + + return task; + }); +} + // If the config has a maxDuration, we need to apply it to all tasks that don't have a maxDuration if (typeof config.maxDuration === "number") { tasks = tasks.map((task) => { @@ -106,7 +121,7 @@ if (typeof config.maxDuration === "number") { return { ...task, maxDuration: config.maxDuration, - }; + } satisfies TaskManifest; } return task; diff --git a/packages/core/src/v3/apiClient/core.ts b/packages/core/src/v3/apiClient/core.ts index c607f3ec4e..28ce105e3d 100644 --- a/packages/core/src/v3/apiClient/core.ts +++ b/packages/core/src/v3/apiClient/core.ts @@ -229,7 +229,7 @@ async function _doZodFetchWithRetries( } } - const jsonBody = await response.json(); + const jsonBody = await safeJsonFromResponse(response); const parsedResult = schema.safeParse(jsonBody); if (parsedResult.success) { @@ -269,6 +269,14 @@ async function _doZodFetchWithRetries( } } +async function safeJsonFromResponse(response: Response): Promise { + try { + return await response.clone().json(); + } catch (error) { + return; + } +} + function castToError(err: any): Error { if (err instanceof Error) return err; return new Error(err); diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 039942f3b2..e0b8e22ac7 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -10,6 +10,7 @@ import { import { TaskMetadataFailedToParseData } from "./schemas/messages.js"; import { links } from "./links.js"; import { ExceptionEventProperties } from "./schemas/openTelemetry.js"; +import { assertExhaustive } from "../utils.js"; export class AbortTaskRunError extends Error { constructor(message: string) { @@ -149,6 +150,57 @@ export function sanitizeError(error: TaskRunError): TaskRunError { } } +export function shouldRetryError(error: TaskRunError): boolean { + switch (error.type) { + case "INTERNAL_ERROR": { + switch (error.code) { + case "COULD_NOT_FIND_EXECUTOR": + case "COULD_NOT_FIND_TASK": + case "COULD_NOT_IMPORT_TASK": + case "CONFIGURED_INCORRECTLY": + case "TASK_ALREADY_RUNNING": + case "TASK_PROCESS_SIGKILL_TIMEOUT": + case "TASK_PROCESS_SIGSEGV": + case "TASK_PROCESS_SIGTERM": + case "TASK_PROCESS_OOM_KILLED": + case "TASK_PROCESS_MAYBE_OOM_KILLED": + case "TASK_RUN_CANCELLED": + case "MAX_DURATION_EXCEEDED": + case "DISK_SPACE_EXCEEDED": + return false; + + case "GRACEFUL_EXIT_TIMEOUT": + case "HANDLE_ERROR_ERROR": + case "TASK_INPUT_ERROR": + case "TASK_OUTPUT_ERROR": + case "POD_EVICTED": + case "POD_UNKNOWN_ERROR": + case "TASK_EXECUTION_ABORTED": + case "TASK_EXECUTION_FAILED": + case "TASK_RUN_CRASHED": + case "TASK_RUN_HEARTBEAT_TIMEOUT": + case "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE": + return true; + + default: + assertExhaustive(error.code); + } + } + case "STRING_ERROR": { + return true; + } + case "BUILT_IN_ERROR": { + return true; + } + case "CUSTOM_ERROR": { + return true; + } + default: { + assertExhaustive(error); + } + } +} + export function correctErrorStackTrace( stackTrace: string, projectDir?: string, @@ -358,6 +410,15 @@ const prettyInternalErrors: Partial< href: links.docs.machines.home, }, }, + TASK_PROCESS_SIGSEGV: { + message: + "Your task crashed with a segmentation fault (SIGSEGV). Most likely there's a bug in a package or binary you're using. If this keeps happening and you're unsure why, please get in touch.", + link: { + name: "Contact us", + href: links.site.contact, + magic: "CONTACT_FORM", + }, + }, TASK_PROCESS_SIGTERM: { message: "Your task exited after receiving SIGTERM but we don't know why. If this keeps happening, please get in touch so we can investigate.", @@ -369,23 +430,73 @@ const prettyInternalErrors: Partial< }, }; +const getPrettyTaskRunError = (code: TaskRunInternalError["code"]): TaskRunInternalError => { + return { + type: "INTERNAL_ERROR" as const, + code, + ...prettyInternalErrors[code], + }; +}; + +const getPrettyExceptionEvent = (code: TaskRunInternalError["code"]): ExceptionEventProperties => { + return { + type: code, + ...prettyInternalErrors[code], + }; +}; + +const findSignalInMessage = (message?: string, truncateLength = 100) => { + if (!message) { + return; + } + + const trunc = truncateLength ? message.slice(0, truncateLength) : message; + + if (trunc.includes("SIGTERM")) { + return "SIGTERM"; + } else if (trunc.includes("SIGSEGV")) { + return "SIGSEGV"; + } else if (trunc.includes("SIGKILL")) { + return "SIGKILL"; + } else { + return; + } +}; + export function taskRunErrorEnhancer(error: TaskRunError): EnhanceError { switch (error.type) { case "BUILT_IN_ERROR": { if (error.name === "UnexpectedExitError") { if (error.message.startsWith("Unexpected exit with code -1")) { - if (error.message.includes("SIGTERM")) { - return { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.TASK_PROCESS_SIGTERM, - ...prettyInternalErrors.TASK_PROCESS_SIGTERM, - }; + const signal = findSignalInMessage(error.stackTrace); + + switch (signal) { + case "SIGTERM": + return { + ...getPrettyTaskRunError("TASK_PROCESS_SIGTERM"), + }; + case "SIGSEGV": + return { + ...getPrettyTaskRunError("TASK_PROCESS_SIGSEGV"), + }; + case "SIGKILL": + return { + ...getPrettyTaskRunError("TASK_PROCESS_MAYBE_OOM_KILLED"), + }; + default: + return { + ...getPrettyTaskRunError("TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE"), + message: error.message, + stackTrace: error.stackTrace, + }; } + } + } + if (error.name === "Error") { + if (error.message === "ffmpeg was killed with signal SIGKILL") { return { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.TASK_PROCESS_MAYBE_OOM_KILLED, - ...prettyInternalErrors.TASK_PROCESS_MAYBE_OOM_KILLED, + ...getPrettyTaskRunError("TASK_PROCESS_OOM_KILLED"), }; } } @@ -399,31 +510,35 @@ export function taskRunErrorEnhancer(error: TaskRunError): EnhanceError { // Offload the output const filename = `${pathPrefix}.${getPacketExtension(packet.dataType)}`; const presignedResponse = await apiClientManager.client!.createUploadPayloadUrl(filename); - const uploadResponse = await fetch(presignedResponse.presignedUrl, { - method: "PUT", - headers: { - "Content-Type": packet.dataType, + const uploadResponse = await zodfetch( + z.any(), + presignedResponse.presignedUrl, + { + method: "PUT", + headers: { + "Content-Type": packet.dataType, + }, + body: packet.data, }, - body: packet.data, - }); + { + retry: ioRetryOptions, + } + ).asResponse(); if (!uploadResponse.ok) { throw new Error( @@ -202,7 +220,9 @@ async function importPacket(packet: IOPacket, span?: Span): Promise { const presignedResponse = await apiClientManager.client.getPayloadUrl(packet.data); - const response = await fetch(presignedResponse.presignedUrl); + const response = await zodfetch(z.any(), presignedResponse.presignedUrl, undefined, { + retry: ioRetryOptions, + }).asResponse(); if (!response.ok) { throw new Error( diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 017c15c395..98c6861c03 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -92,9 +92,28 @@ export class TaskExecutor { try { const payloadPacket = await conditionallyImportPacket(originalPacket, this._tracer); - parsedPayload = await parsePacket(payloadPacket); + } catch (inputError) { + recordSpanException(span, inputError); + + return { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.TASK_INPUT_ERROR, + message: + inputError instanceof Error + ? `${inputError.name}: ${inputError.message}` + : typeof inputError === "string" + ? inputError + : undefined, + stackTrace: inputError instanceof Error ? inputError.stack : undefined, + }, + } satisfies TaskRunExecutionResult; + } + try { parsedPayload = await this.#parsePayload(parsedPayload); if (execution.attempt.number === 1) { @@ -132,8 +151,8 @@ export class TaskExecutor { output: finalOutput.data, outputType: finalOutput.dataType, } satisfies TaskRunExecutionResult; - } catch (stringifyError) { - recordSpanException(span, stringifyError); + } catch (outputError) { + recordSpanException(span, outputError); return { ok: false, @@ -142,10 +161,10 @@ export class TaskExecutor { type: "INTERNAL_ERROR", code: TaskRunErrorCodes.TASK_OUTPUT_ERROR, message: - stringifyError instanceof Error - ? stringifyError.message - : typeof stringifyError === "string" - ? stringifyError + outputError instanceof Error + ? outputError.message + : typeof outputError === "string" + ? outputError : undefined, }, } satisfies TaskRunExecutionResult; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ac3cf1d433..4133fd69a6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -585,6 +585,9 @@ importers: seedrandom: specifier: ^3.0.5 version: 3.0.5 + semver: + specifier: ^7.5.0 + version: 7.6.3 simple-oauth2: specifier: ^5.0.0 version: 5.0.0 @@ -727,6 +730,9 @@ importers: '@types/seedrandom': specifier: ^3.0.8 version: 3.0.8 + '@types/semver': + specifier: ^7.5.0 + version: 7.5.1 '@types/simple-oauth2': specifier: ^5.0.4 version: 5.0.4