diff --git a/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts b/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts index 689e7a5736..ac7099357d 100644 --- a/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts +++ b/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts @@ -3,6 +3,7 @@ import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database"; import { logger } from "~/services/logger.server"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; import type { RunEngine } from "~/v3/runEngine.server"; +import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus"; import type { TraceEventConcern, TriggerTaskRequest } from "../types"; export type IdempotencyKeyConcernResult = @@ -41,6 +42,7 @@ export class IdempotencyKeyConcern { : undefined; if (existingRun) { + // The idempotency key has expired if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) { logger.debug("[TriggerTaskService][call] Idempotency key has expired", { idempotencyKey: request.options?.idempotencyKey, @@ -52,42 +54,62 @@ export class IdempotencyKeyConcern { where: { id: existingRun.id, idempotencyKey }, data: { idempotencyKey: null, idempotencyKeyExpiresAt: null }, }); - } else { - const associatedWaitpoint = existingRun.associatedWaitpoint; - const parentRunId = request.body.options?.parentRunId; - const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion; - //We're using `andWait` so we need to block the parent run with a waitpoint - if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) { - await this.traceEventConcern.traceIdempotentRun( - request, - { - existingRun, - idempotencyKey, - incomplete: associatedWaitpoint.status === "PENDING", - isError: associatedWaitpoint.outputIsError, - }, - async (event) => { - //block run with waitpoint - await this.engine.blockRunWithWaitpoint({ - runId: RunId.fromFriendlyId(parentRunId), - waitpoints: associatedWaitpoint.id, - spanIdToComplete: event.spanId, - batch: request.options?.batchId - ? { - id: request.options.batchId, - index: request.options.batchIndex ?? 0, - } - : undefined, - projectId: request.environment.projectId, - organizationId: request.environment.organizationId, - tx: this.prisma, - }); - } - ); - } - return { isCached: true, run: existingRun }; + return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt }; } + + // If the existing run failed or was expired, we clear the key and do a new run + if (shouldIdempotencyKeyBeCleared(existingRun.status)) { + logger.debug("[TriggerTaskService][call] Idempotency key should be cleared", { + idempotencyKey: request.options?.idempotencyKey, + runStatus: existingRun.status, + runId: existingRun.id, + }); + + // Update the existing run to remove the idempotency key + await this.prisma.taskRun.updateMany({ + where: { id: existingRun.id, idempotencyKey }, + data: { idempotencyKey: null, idempotencyKeyExpiresAt: null }, + }); + + return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt }; + } + + // We have an idempotent run, so we return it + const associatedWaitpoint = existingRun.associatedWaitpoint; + const parentRunId = request.body.options?.parentRunId; + const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion; + //We're using `andWait` so we need to block the parent run with a waitpoint + if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) { + await this.traceEventConcern.traceIdempotentRun( + request, + { + existingRun, + idempotencyKey, + incomplete: associatedWaitpoint.status === "PENDING", + isError: associatedWaitpoint.outputIsError, + }, + async (event) => { + //block run with waitpoint + await this.engine.blockRunWithWaitpoint({ + runId: RunId.fromFriendlyId(parentRunId), + waitpoints: associatedWaitpoint.id, + spanIdToComplete: event.spanId, + batch: request.options?.batchId + ? { + id: request.options.batchId, + index: request.options.batchIndex ?? 0, + } + : undefined, + projectId: request.environment.projectId, + organizationId: request.environment.organizationId, + tx: this.prisma, + }); + } + ); + } + + return { isCached: true, run: existingRun }; } return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt }; diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index d70deb3ef4..687fbe9e76 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -1097,7 +1097,7 @@ export class EventRepository { tracestate: typeof tracestate === "string" ? tracestate : undefined, duration: options.incomplete ? 0 : duration, isPartial: failedWithError ? false : options.incomplete, - isError: !!failedWithError, + isError: options.isError === true || !!failedWithError, message: message, serviceName: "api server", serviceNamespace: "trigger.dev", diff --git a/apps/webapp/app/v3/taskStatus.ts b/apps/webapp/app/v3/taskStatus.ts index 6b276bfc00..b5e1d915cf 100644 --- a/apps/webapp/app/v3/taskStatus.ts +++ b/apps/webapp/app/v3/taskStatus.ts @@ -128,3 +128,7 @@ export function isRestorableRunStatus(status: TaskRunStatus): boolean { export function isRestorableAttemptStatus(status: TaskRunAttemptStatus): boolean { return RESTORABLE_ATTEMPT_STATUSES.includes(status); } + +export function shouldIdempotencyKeyBeCleared(status: TaskRunStatus): boolean { + return isFailedRunStatus(status) || status === "EXPIRED"; +}