Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 56 additions & 34 deletions apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
import { logger } from "~/services/logger.server";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import type { RunEngine } from "~/v3/runEngine.server";
import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus";
import type { TraceEventConcern, TriggerTaskRequest } from "../types";

export type IdempotencyKeyConcernResult =
Expand Down Expand Up @@ -41,6 +42,7 @@ export class IdempotencyKeyConcern {
: undefined;

if (existingRun) {
// The idempotency key has expired
if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) {
logger.debug("[TriggerTaskService][call] Idempotency key has expired", {
idempotencyKey: request.options?.idempotencyKey,
Expand All @@ -52,42 +54,62 @@ export class IdempotencyKeyConcern {
where: { id: existingRun.id, idempotencyKey },
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
});
} else {
const associatedWaitpoint = existingRun.associatedWaitpoint;
const parentRunId = request.body.options?.parentRunId;
const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion;
//We're using `andWait` so we need to block the parent run with a waitpoint
if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) {
await this.traceEventConcern.traceIdempotentRun(
request,
{
existingRun,
idempotencyKey,
incomplete: associatedWaitpoint.status === "PENDING",
isError: associatedWaitpoint.outputIsError,
},
async (event) => {
//block run with waitpoint
await this.engine.blockRunWithWaitpoint({
runId: RunId.fromFriendlyId(parentRunId),
waitpoints: associatedWaitpoint.id,
spanIdToComplete: event.spanId,
batch: request.options?.batchId
? {
id: request.options.batchId,
index: request.options.batchIndex ?? 0,
}
: undefined,
projectId: request.environment.projectId,
organizationId: request.environment.organizationId,
tx: this.prisma,
});
}
);
}

return { isCached: true, run: existingRun };
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}

// If the existing run failed or was expired, we clear the key and do a new run
if (shouldIdempotencyKeyBeCleared(existingRun.status)) {
logger.debug("[TriggerTaskService][call] Idempotency key should be cleared", {
idempotencyKey: request.options?.idempotencyKey,
runStatus: existingRun.status,
runId: existingRun.id,
});

// Update the existing run to remove the idempotency key
await this.prisma.taskRun.updateMany({
where: { id: existingRun.id, idempotencyKey },
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
});

return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}

// We have an idempotent run, so we return it
const associatedWaitpoint = existingRun.associatedWaitpoint;
const parentRunId = request.body.options?.parentRunId;
const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion;
//We're using `andWait` so we need to block the parent run with a waitpoint
if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) {
await this.traceEventConcern.traceIdempotentRun(
request,
{
existingRun,
idempotencyKey,
incomplete: associatedWaitpoint.status === "PENDING",
isError: associatedWaitpoint.outputIsError,
},
async (event) => {
//block run with waitpoint
await this.engine.blockRunWithWaitpoint({
runId: RunId.fromFriendlyId(parentRunId),
waitpoints: associatedWaitpoint.id,
spanIdToComplete: event.spanId,
batch: request.options?.batchId
? {
id: request.options.batchId,
index: request.options.batchIndex ?? 0,
}
: undefined,
projectId: request.environment.projectId,
organizationId: request.environment.organizationId,
tx: this.prisma,
});
}
);
}

return { isCached: true, run: existingRun };
}

return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,7 @@ export class EventRepository {
tracestate: typeof tracestate === "string" ? tracestate : undefined,
duration: options.incomplete ? 0 : duration,
isPartial: failedWithError ? false : options.incomplete,
isError: !!failedWithError,
isError: options.isError === true || !!failedWithError,
message: message,
serviceName: "api server",
serviceNamespace: "trigger.dev",
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/taskStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,7 @@ export function isRestorableRunStatus(status: TaskRunStatus): boolean {
export function isRestorableAttemptStatus(status: TaskRunAttemptStatus): boolean {
return RESTORABLE_ATTEMPT_STATUSES.includes(status);
}

export function shouldIdempotencyKeyBeCleared(status: TaskRunStatus): boolean {
return isFailedRunStatus(status) || status === "EXPIRED";
}
Loading