From 21c70ad67333cdab8cfa079b73facc60041957e5 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 2 Jul 2025 10:05:13 +0100 Subject: [PATCH 1/2] Serialize metadata to prevent invalid data from breaking run completions --- .../src/engine/systems/runAttemptSystem.ts | 64 ++++++++++++++++--- .../cli-v3/src/entryPoints/dev-run-worker.ts | 12 ++-- .../src/entryPoints/managed-run-worker.ts | 12 ++-- packages/core/src/v3/runMetadata/manager.ts | 11 +++- packages/core/src/v3/schemas/common.ts | 16 +++++ .../hello-world/src/trigger/metadata.ts | 16 +++++ 6 files changed, 107 insertions(+), 24 deletions(-) create mode 100644 references/hello-world/src/trigger/metadata.ts diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 1a45c4108f..11a42121e1 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -2,6 +2,7 @@ import { startSpan } from "@internal/tracing"; import { CompleteRunAttemptResult, ExecutionResult, + FlushedRunMetadata, GitMeta, StartRunAttemptResult, TaskRunError, @@ -35,6 +36,7 @@ import { import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js"; import { SystemResources } from "./systems.js"; import { WaitpointSystem } from "./waitpointSystem.js"; +import { tryCatch } from "@trigger.dev/core/utils"; export type RunAttemptSystemOptions = { resources: SystemResources; @@ -386,15 +388,7 @@ export class RunAttemptSystem { workerId?: string; runnerId?: string; }): Promise { - if (completion.metadata) { - this.$.eventBus.emit("runMetadataUpdated", { - time: new Date(), - run: { - id: runId, - metadata: completion.metadata, - }, - }); - } + await this.#notifyMetadataUpdated(runId, completion); switch (completion.ok) { case true: { @@ -1314,4 +1308,56 @@ export class RunAttemptSystem { return taskRun?.runtimeEnvironment; } + + async #notifyMetadataUpdated(runId: string, completion: TaskRunExecutionResult) { + if (completion.metadata) { + this.$.eventBus.emit("runMetadataUpdated", { + time: new Date(), + run: { + id: runId, + metadata: completion.metadata, + }, + }); + + return; + } + + if (completion.flushedMetadata) { + const [packetError, packet] = await tryCatch(parsePacket(completion.flushedMetadata)); + + if (!packet) { + return; + } + + if (packetError) { + this.$.logger.error("RunEngine.completeRunAttempt(): failed to parse flushed metadata", { + runId, + flushedMetadata: completion.flushedMetadata, + error: packetError, + }); + + return; + } + + const metadata = FlushedRunMetadata.safeParse(packet); + + if (!metadata.success) { + this.$.logger.error("RunEngine.completeRunAttempt(): failed to parse flushed metadata", { + runId, + flushedMetadata: completion.flushedMetadata, + error: metadata.error, + }); + + return; + } + + this.$.eventBus.emit("runMetadataUpdated", { + time: new Date(), + run: { + id: runId, + metadata: metadata.data, + }, + }); + } + } } diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 224a782221..edc1e0ed26 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -351,7 +351,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: 0, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); @@ -382,7 +382,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: 0, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); @@ -447,7 +447,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: 0, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); @@ -473,7 +473,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: 0, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); @@ -518,7 +518,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: usageSample.cpuTime, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); } @@ -544,7 +544,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: 0, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); } diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index 20ea1582fd..820754c05b 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -350,7 +350,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: 0, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); @@ -381,7 +381,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: 0, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); @@ -444,7 +444,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: 0, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); @@ -472,7 +472,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: 0, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); @@ -517,7 +517,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: usageSample.cpuTime, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); } @@ -544,7 +544,7 @@ const zodIpc = new ZodIpcConnection({ usage: { durationMs: 0, }, - metadata: runMetadataManager.stopAndReturnLastFlush(), + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, }); } diff --git a/packages/core/src/v3/runMetadata/manager.ts b/packages/core/src/v3/runMetadata/manager.ts index 7916492cd3..75cdd23500 100644 --- a/packages/core/src/v3/runMetadata/manager.ts +++ b/packages/core/src/v3/runMetadata/manager.ts @@ -7,6 +7,7 @@ import { MetadataStream } from "./metadataStream.js"; import { applyMetadataOperations, collapseOperations } from "./operations.js"; import { RunMetadataManager, RunMetadataUpdater } from "./types.js"; import { AsyncIterableStream } from "../streams/asyncIterableStream.js"; +import { IOPacket, stringifyIO } from "../utils/ioSerialization.js"; const MAXIMUM_ACTIVE_STREAMS = 5; const MAXIMUM_TOTAL_STREAMS = 10; @@ -422,23 +423,27 @@ export class StandardMetadataManager implements RunMetadataManager { } } - stopAndReturnLastFlush(): FlushedRunMetadata | undefined { + async stopAndReturnLastFlush(): Promise { this.stopPeriodicFlush(); this.isFlushing = true; if (!this.#needsFlush()) { - return; + return { dataType: "application/json" }; } const operations = Array.from(this.queuedOperations); const parentOperations = Array.from(this.queuedParentOperations); const rootOperations = Array.from(this.queuedRootOperations); - return { + const data = { operations: collapseOperations(operations), parentOperations: collapseOperations(parentOperations), rootOperations: collapseOperations(rootOperations), }; + + const packet = await stringifyIO(data); + + return packet; } #needsFlush(): boolean { diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index 99e5bc5f0c..e56ec8da6d 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -376,7 +376,15 @@ export const TaskRunFailedExecutionResult = z.object({ usage: TaskRunExecutionUsage.optional(), // Optional for now for backwards compatibility taskIdentifier: z.string().optional(), + // This is deprecated, use flushedMetadata instead metadata: FlushedRunMetadata.optional(), + // This is the new way to flush metadata + flushedMetadata: z + .object({ + data: z.string().optional(), + dataType: z.string(), + }) + .optional(), }); export type TaskRunFailedExecutionResult = z.infer; @@ -389,7 +397,15 @@ export const TaskRunSuccessfulExecutionResult = z.object({ usage: TaskRunExecutionUsage.optional(), // Optional for now for backwards compatibility taskIdentifier: z.string().optional(), + // This is deprecated, use flushedMetadata instead metadata: FlushedRunMetadata.optional(), + // This is the new way to flush metadata + flushedMetadata: z + .object({ + data: z.string().optional(), + dataType: z.string(), + }) + .optional(), }); export type TaskRunSuccessfulExecutionResult = z.infer; diff --git a/references/hello-world/src/trigger/metadata.ts b/references/hello-world/src/trigger/metadata.ts new file mode 100644 index 0000000000..af024c9f46 --- /dev/null +++ b/references/hello-world/src/trigger/metadata.ts @@ -0,0 +1,16 @@ +import { metadata, task } from "@trigger.dev/sdk"; + +export const metadataTestTask = task({ + id: "metadata-tester", + retry: { + maxAttempts: 3, + minTimeoutInMs: 500, + maxTimeoutInMs: 1000, + factor: 1.5, + }, + run: async (payload: any, { ctx }) => { + metadata.set("test-key", "test-value"); + metadata.append("test-keys", "test-value"); + metadata.increment("test-counter", 1); + }, +}); From 38a94b3b70354664661ad56f55027626094ebecf Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 2 Jul 2025 10:16:55 +0100 Subject: [PATCH 2/2] Add changeset --- .changeset/wet-dragons-boil.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/wet-dragons-boil.md diff --git a/.changeset/wet-dragons-boil.md b/.changeset/wet-dragons-boil.md new file mode 100644 index 0000000000..becd48dd8c --- /dev/null +++ b/.changeset/wet-dragons-boil.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Serialize metadata to prevent invalid metadata from breaking run completions