Skip to content

Commit 21c70ad

Browse files
committed
Serialize metadata to prevent invalid data from breaking run completions
1 parent 0c2af6d commit 21c70ad

File tree

6 files changed

+107
-24
lines changed

6 files changed

+107
-24
lines changed

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { startSpan } from "@internal/tracing";
22
import {
33
CompleteRunAttemptResult,
44
ExecutionResult,
5+
FlushedRunMetadata,
56
GitMeta,
67
StartRunAttemptResult,
78
TaskRunError,
@@ -35,6 +36,7 @@ import {
3536
import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js";
3637
import { SystemResources } from "./systems.js";
3738
import { WaitpointSystem } from "./waitpointSystem.js";
39+
import { tryCatch } from "@trigger.dev/core/utils";
3840

3941
export type RunAttemptSystemOptions = {
4042
resources: SystemResources;
@@ -386,15 +388,7 @@ export class RunAttemptSystem {
386388
workerId?: string;
387389
runnerId?: string;
388390
}): Promise<CompleteRunAttemptResult> {
389-
if (completion.metadata) {
390-
this.$.eventBus.emit("runMetadataUpdated", {
391-
time: new Date(),
392-
run: {
393-
id: runId,
394-
metadata: completion.metadata,
395-
},
396-
});
397-
}
391+
await this.#notifyMetadataUpdated(runId, completion);
398392

399393
switch (completion.ok) {
400394
case true: {
@@ -1314,4 +1308,56 @@ export class RunAttemptSystem {
13141308

13151309
return taskRun?.runtimeEnvironment;
13161310
}
1311+
1312+
async #notifyMetadataUpdated(runId: string, completion: TaskRunExecutionResult) {
1313+
if (completion.metadata) {
1314+
this.$.eventBus.emit("runMetadataUpdated", {
1315+
time: new Date(),
1316+
run: {
1317+
id: runId,
1318+
metadata: completion.metadata,
1319+
},
1320+
});
1321+
1322+
return;
1323+
}
1324+
1325+
if (completion.flushedMetadata) {
1326+
const [packetError, packet] = await tryCatch(parsePacket(completion.flushedMetadata));
1327+
1328+
if (!packet) {
1329+
return;
1330+
}
1331+
1332+
if (packetError) {
1333+
this.$.logger.error("RunEngine.completeRunAttempt(): failed to parse flushed metadata", {
1334+
runId,
1335+
flushedMetadata: completion.flushedMetadata,
1336+
error: packetError,
1337+
});
1338+
1339+
return;
1340+
}
1341+
1342+
const metadata = FlushedRunMetadata.safeParse(packet);
1343+
1344+
if (!metadata.success) {
1345+
this.$.logger.error("RunEngine.completeRunAttempt(): failed to parse flushed metadata", {
1346+
runId,
1347+
flushedMetadata: completion.flushedMetadata,
1348+
error: metadata.error,
1349+
});
1350+
1351+
return;
1352+
}
1353+
1354+
this.$.eventBus.emit("runMetadataUpdated", {
1355+
time: new Date(),
1356+
run: {
1357+
id: runId,
1358+
metadata: metadata.data,
1359+
},
1360+
});
1361+
}
1362+
}
13171363
}

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ const zodIpc = new ZodIpcConnection({
351351
usage: {
352352
durationMs: 0,
353353
},
354-
metadata: runMetadataManager.stopAndReturnLastFlush(),
354+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
355355
},
356356
});
357357

@@ -382,7 +382,7 @@ const zodIpc = new ZodIpcConnection({
382382
usage: {
383383
durationMs: 0,
384384
},
385-
metadata: runMetadataManager.stopAndReturnLastFlush(),
385+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
386386
},
387387
});
388388

@@ -447,7 +447,7 @@ const zodIpc = new ZodIpcConnection({
447447
usage: {
448448
durationMs: 0,
449449
},
450-
metadata: runMetadataManager.stopAndReturnLastFlush(),
450+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
451451
},
452452
});
453453

@@ -473,7 +473,7 @@ const zodIpc = new ZodIpcConnection({
473473
usage: {
474474
durationMs: 0,
475475
},
476-
metadata: runMetadataManager.stopAndReturnLastFlush(),
476+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
477477
},
478478
});
479479

@@ -518,7 +518,7 @@ const zodIpc = new ZodIpcConnection({
518518
usage: {
519519
durationMs: usageSample.cpuTime,
520520
},
521-
metadata: runMetadataManager.stopAndReturnLastFlush(),
521+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
522522
},
523523
});
524524
}
@@ -544,7 +544,7 @@ const zodIpc = new ZodIpcConnection({
544544
usage: {
545545
durationMs: 0,
546546
},
547-
metadata: runMetadataManager.stopAndReturnLastFlush(),
547+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
548548
},
549549
});
550550
}

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ const zodIpc = new ZodIpcConnection({
350350
usage: {
351351
durationMs: 0,
352352
},
353-
metadata: runMetadataManager.stopAndReturnLastFlush(),
353+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
354354
},
355355
});
356356

@@ -381,7 +381,7 @@ const zodIpc = new ZodIpcConnection({
381381
usage: {
382382
durationMs: 0,
383383
},
384-
metadata: runMetadataManager.stopAndReturnLastFlush(),
384+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
385385
},
386386
});
387387

@@ -444,7 +444,7 @@ const zodIpc = new ZodIpcConnection({
444444
usage: {
445445
durationMs: 0,
446446
},
447-
metadata: runMetadataManager.stopAndReturnLastFlush(),
447+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
448448
},
449449
});
450450

@@ -472,7 +472,7 @@ const zodIpc = new ZodIpcConnection({
472472
usage: {
473473
durationMs: 0,
474474
},
475-
metadata: runMetadataManager.stopAndReturnLastFlush(),
475+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
476476
},
477477
});
478478

@@ -517,7 +517,7 @@ const zodIpc = new ZodIpcConnection({
517517
usage: {
518518
durationMs: usageSample.cpuTime,
519519
},
520-
metadata: runMetadataManager.stopAndReturnLastFlush(),
520+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
521521
},
522522
});
523523
}
@@ -544,7 +544,7 @@ const zodIpc = new ZodIpcConnection({
544544
usage: {
545545
durationMs: 0,
546546
},
547-
metadata: runMetadataManager.stopAndReturnLastFlush(),
547+
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
548548
},
549549
});
550550
}

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { MetadataStream } from "./metadataStream.js";
77
import { applyMetadataOperations, collapseOperations } from "./operations.js";
88
import { RunMetadataManager, RunMetadataUpdater } from "./types.js";
99
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
10+
import { IOPacket, stringifyIO } from "../utils/ioSerialization.js";
1011

1112
const MAXIMUM_ACTIVE_STREAMS = 5;
1213
const MAXIMUM_TOTAL_STREAMS = 10;
@@ -422,23 +423,27 @@ export class StandardMetadataManager implements RunMetadataManager {
422423
}
423424
}
424425

425-
stopAndReturnLastFlush(): FlushedRunMetadata | undefined {
426+
async stopAndReturnLastFlush(): Promise<IOPacket> {
426427
this.stopPeriodicFlush();
427428
this.isFlushing = true;
428429

429430
if (!this.#needsFlush()) {
430-
return;
431+
return { dataType: "application/json" };
431432
}
432433

433434
const operations = Array.from(this.queuedOperations);
434435
const parentOperations = Array.from(this.queuedParentOperations);
435436
const rootOperations = Array.from(this.queuedRootOperations);
436437

437-
return {
438+
const data = {
438439
operations: collapseOperations(operations),
439440
parentOperations: collapseOperations(parentOperations),
440441
rootOperations: collapseOperations(rootOperations),
441442
};
443+
444+
const packet = await stringifyIO(data);
445+
446+
return packet;
442447
}
443448

444449
#needsFlush(): boolean {

packages/core/src/v3/schemas/common.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,15 @@ export const TaskRunFailedExecutionResult = z.object({
376376
usage: TaskRunExecutionUsage.optional(),
377377
// Optional for now for backwards compatibility
378378
taskIdentifier: z.string().optional(),
379+
// This is deprecated, use flushedMetadata instead
379380
metadata: FlushedRunMetadata.optional(),
381+
// This is the new way to flush metadata
382+
flushedMetadata: z
383+
.object({
384+
data: z.string().optional(),
385+
dataType: z.string(),
386+
})
387+
.optional(),
380388
});
381389

382390
export type TaskRunFailedExecutionResult = z.infer<typeof TaskRunFailedExecutionResult>;
@@ -389,7 +397,15 @@ export const TaskRunSuccessfulExecutionResult = z.object({
389397
usage: TaskRunExecutionUsage.optional(),
390398
// Optional for now for backwards compatibility
391399
taskIdentifier: z.string().optional(),
400+
// This is deprecated, use flushedMetadata instead
392401
metadata: FlushedRunMetadata.optional(),
402+
// This is the new way to flush metadata
403+
flushedMetadata: z
404+
.object({
405+
data: z.string().optional(),
406+
dataType: z.string(),
407+
})
408+
.optional(),
393409
});
394410

395411
export type TaskRunSuccessfulExecutionResult = z.infer<typeof TaskRunSuccessfulExecutionResult>;
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { metadata, task } from "@trigger.dev/sdk";
2+
3+
export const metadataTestTask = task({
4+
id: "metadata-tester",
5+
retry: {
6+
maxAttempts: 3,
7+
minTimeoutInMs: 500,
8+
maxTimeoutInMs: 1000,
9+
factor: 1.5,
10+
},
11+
run: async (payload: any, { ctx }) => {
12+
metadata.set("test-key", "test-value");
13+
metadata.append("test-keys", "test-value");
14+
metadata.increment("test-counter", 1);
15+
},
16+
});

0 commit comments

Comments
 (0)