diff --git a/.changeset/spotty-pants-wink.md b/.changeset/spotty-pants-wink.md new file mode 100644 index 0000000000..7021ecc8fa --- /dev/null +++ b/.changeset/spotty-pants-wink.md @@ -0,0 +1,6 @@ +--- +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +Prevent large outputs from overwriting each other diff --git a/packages/cli-v3/src/entryPoints/dev-run-controller.ts b/packages/cli-v3/src/entryPoints/dev-run-controller.ts index f851bc07aa..e9c91012ed 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-controller.ts @@ -623,7 +623,7 @@ export class DevRunController { }).initialize(); logger.debug("executing task run process", { - attemptId: execution.attempt.id, + attemptNumber: execution.attempt.number, runId: execution.run.id, }); diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index d821621ebd..fed66e6dc6 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -7,6 +7,7 @@ import { AnyOnStartHookFunction, AnyOnSuccessHookFunction, apiClientManager, + attemptKey, clock, ExecutorToWorkerMessageCatalog, type HandleErrorFunction, @@ -546,7 +547,7 @@ const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10); for await (const _ of setInterval(heartbeatInterval)) { if (_isRunning && _execution) { try { - await zodIpc.send("TASK_HEARTBEAT", { id: _execution.attempt.id }); + await zodIpc.send("TASK_HEARTBEAT", { id: attemptKey(_execution) }); } catch (err) { logError("Failed to send HEARTBEAT message", err); } diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index 96f68f0f42..cb24dac9f6 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -1,4 +1,5 @@ import { + attemptKey, CompletedWaitpoint, ExecutorToWorkerMessageCatalog, MachinePresetResources, @@ -164,15 +165,17 @@ export class TaskRunProcess { TASK_RUN_COMPLETED: async (message) => { const { result, execution } = message; - const promiseStatus = this._attemptStatuses.get(execution.attempt.id); + const key = attemptKey(execution); + + const promiseStatus = this._attemptStatuses.get(key); if (promiseStatus !== "PENDING") { return; } - this._attemptStatuses.set(execution.attempt.id, "RESOLVED"); + this._attemptStatuses.set(key, "RESOLVED"); - const attemptPromise = this._attemptPromises.get(execution.attempt.id); + const attemptPromise = this._attemptPromises.get(key); if (!attemptPromise) { return; @@ -229,10 +232,12 @@ export class TaskRunProcess { rejecter = reject; }); - this._attemptStatuses.set(params.payload.execution.attempt.id, "PENDING"); + const key = attemptKey(params.payload.execution); + + this._attemptStatuses.set(key, "PENDING"); // @ts-expect-error - We know that the resolver and rejecter are defined - this._attemptPromises.set(params.payload.execution.attempt.id, { resolver, rejecter }); + this._attemptPromises.set(key, { resolver, rejecter }); const { execution, traceContext, metrics } = params.payload; diff --git a/packages/core/src/v3/idempotencyKeys.ts b/packages/core/src/v3/idempotencyKeys.ts index cde6d7ed0b..c3778d42a7 100644 --- a/packages/core/src/v3/idempotencyKeys.ts +++ b/packages/core/src/v3/idempotencyKeys.ts @@ -105,7 +105,7 @@ function injectScope(scope: "run" | "attempt" | "global"): string[] { } case "attempt": { if (taskContext?.ctx) { - return [taskContext.ctx.attempt.id]; + return [taskContext.ctx.run.id, taskContext.ctx.attempt.number.toString()]; } break; } @@ -125,3 +125,17 @@ async function generateIdempotencyKey(keyMaterial: string[]) { .map((byte) => byte.toString(16).padStart(2, "0")) .join(""); } + +type AttemptKeyMaterial = { + run: { + id: string; + }; + attempt: { + number: number; + }; +}; + +/** Creates a unique key for each attempt. */ +export function attemptKey(ctx: AttemptKeyMaterial): string { + return `${ctx.run.id}-${ctx.attempt.number}`; +} diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 1f77fd6d3f..d11827d4ca 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -12,6 +12,7 @@ import { } from "../errors.js"; import { accessoryAttributes, + attemptKey, flattenAttributes, lifecycleHooks, runMetadata, @@ -235,7 +236,7 @@ export class TaskExecutor { const [exportError, finalOutput] = await tryCatch( conditionallyExportPacket( stringifiedOutput, - `${execution.attempt.id}/output`, + `${attemptKey(ctx)}/output`, this._tracer ) );