Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apps/webapp/app/v3/replayTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ export const ReplayRunData = z
.optional()
.transform((val, ctx) => {
if (!val) {
return {};
return "{}";
}

try {
return JSON.parse(val);
JSON.parse(val);
return val;
} catch {
ctx.addIssue({
code: z.ZodIssueCode.custom,
Expand Down
40 changes: 29 additions & 11 deletions apps/webapp/app/v3/services/replayTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ import {
type MachinePresetName,
conditionallyImportPacket,
parsePacket,
stringifyIO,
} from "@trigger.dev/core/v3";
import { type TaskRun } from "@trigger.dev/database";
import { findEnvironmentById } from "~/models/runtimeEnvironment.server";
import { getTagsForRunId } from "~/models/taskRunTag.server";
import { logger } from "~/services/logger.server";
import { BaseService } from "./baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server";
import { type RunOptionsData } from "../testTask";
import { replaceSuperJsonPayload } from "@trigger.dev/core/v3/utils/ioSerialization";

type OverrideOptions = {
environmentId?: string;
payload?: unknown;
payload?: string;
metadata?: unknown;
bulkActionId?: string;
} & RunOptionsData;
Expand All @@ -36,7 +37,15 @@ export class ReplayTaskRunService extends BaseService {
taskRunFriendlyId: existingTaskRun.friendlyId,
});

const payload = overrideOptions.payload ?? (await this.getExistingPayload(existingTaskRun));
const payloadPacket = await this.overrideExistingPayloadPacket(
existingTaskRun,
overrideOptions.payload
);
const parsedPayload =
payloadPacket.dataType === "application/json"
? await parsePacket(payloadPacket)
: payloadPacket.data;
const payloadType = payloadPacket.dataType;
const metadata = overrideOptions.metadata ?? (await this.getExistingMetadata(existingTaskRun));
const tags = overrideOptions.tags ?? existingTaskRun.runTags;

Expand All @@ -53,8 +62,9 @@ export class ReplayTaskRunService extends BaseService {
existingTaskRun.taskIdentifier,
authenticatedEnvironment,
{
payload,
payload: parsedPayload,
options: {
payloadType,
queue: taskQueue
? {
name: taskQueue.name,
Expand Down Expand Up @@ -108,15 +118,23 @@ export class ReplayTaskRunService extends BaseService {
}
}

private async getExistingPayload(existingTaskRun: TaskRun) {
const existingPayloadPacket = await conditionallyImportPacket({
data: existingTaskRun.payload,
private async overrideExistingPayloadPacket(
existingTaskRun: TaskRun,
stringifiedPayloadOverride: string | undefined
) {
if (stringifiedPayloadOverride && existingTaskRun.payloadType === "application/super+json") {
const newPayload = await replaceSuperJsonPayload(
existingTaskRun.payload,
stringifiedPayloadOverride
);

return stringifyIO(newPayload);
}

return conditionallyImportPacket({
data: stringifiedPayloadOverride ?? existingTaskRun.payload,
dataType: existingTaskRun.payloadType,
});

return existingPayloadPacket.dataType === "application/json"
? await parsePacket(existingPayloadPacket)
: existingPayloadPacket.data;
}

private async getExistingMetadata(existingTaskRun: TaskRun) {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@
"execa": "^8.0.1",
"humanize-duration": "^3.27.3",
"jose": "^5.4.0",
"lodash.get": "^4.4.2",
"nanoid": "3.3.8",
"prom-client": "^15.1.0",
"socket.io": "4.7.4",
Expand All @@ -206,6 +207,7 @@
"@epic-web/test-server": "^0.1.0",
"@trigger.dev/database": "workspace:*",
"@types/humanize-duration": "^3.27.1",
"@types/lodash.get": "^4.4.9",
"@types/readable-stream": "^4.0.14",
"ai": "^3.4.33",
"defu": "^6.1.4",
Expand Down
36 changes: 35 additions & 1 deletion packages/core/src/v3/utils/ioSerialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { SemanticInternalAttributes } from "../semanticInternalAttributes.js";
import { TriggerTracer } from "../tracer.js";
import { zodfetch } from "../zodfetch.js";
import { flattenAttributes } from "./flattenAttributes.js";
import get from "lodash.get";

export type IOPacket = {
data?: string | undefined;
Expand Down Expand Up @@ -505,13 +506,46 @@ function safeJsonParse(value: string): any {
}
}

/**
* Replaces the data in a SuperJSON-serialized string with new payload data while preserving
* the original type metadata (Dates, BigInts, Sets, Maps, etc.).
*
* It is primarily useful for our run replay functionality where we want to preserve the original
* type metadata for the new payload.
*
* Note that `undefined` type metadata is ignored when the corresponding field is overriden in the
* new payload, i.e., fields which were previously undefined in the original payload are restored into
* the primitive type they have in the new payload, instead of `undefined`.
* This is a workaround for https://github.com/triggerdotdev/trigger.dev/issues/1968.
*
* @param original - A SuperJSON-serialized string containing the original data with type metadata
* @param newPayload - A JSON string containing the new data to replace the original payload
* @returns The deserialized object with new data but original type metadata preserved
*
* @throws {Error} If the newPayload is not valid JSON
*/
export async function replaceSuperJsonPayload(original: string, newPayload: string) {
const superjson = await loadSuperJSON();
const originalObject = superjson.parse(original);
const newPayloadObject = JSON.parse(newPayload);
const { meta } = superjson.serialize(originalObject);

if (meta?.values) {
const originalUndefinedKeys = Object.entries(meta.values)
.filter(([, value]) => Array.isArray(value) && value.at(0) === "undefined")
.map(([key]) => key);

const overridenUndefinedKeys = originalUndefinedKeys.filter(
(key) => get(newPayloadObject, key) !== undefined
);

overridenUndefinedKeys.forEach((key) => {
delete (meta.values as Record<string, any>)[key];
});
}

const newSuperJson = {
json: JSON.parse(newPayload) as any,
json: newPayloadObject,
meta,
};

Expand Down
191 changes: 191 additions & 0 deletions packages/core/test/ioSerialization.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import { replaceSuperJsonPayload } from "../src/v3/utils/ioSerialization.js";

describe("ioSerialization", () => {
describe("replaceSuperJsonPayload", () => {
it("should replace simple JSON payload while preserving SuperJSON metadata", async () => {
const originalData = {
name: "John",
age: 30,
date: new Date("2023-01-01"),
};

const superjson = await import("superjson");
const originalSerialized = superjson.stringify(originalData);

const newPayloadJson = JSON.stringify({
name: "Jane",
surname: "Doe",
age: 25,
date: "2023-02-01T00:00:00.000Z",
});

const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any;

expect(result.name).toBe("Jane");
expect(result.surname).toBe("Doe");
expect(result.age).toBe(25);
expect(result.date).toBeInstanceOf(Date);
expect(result.date.toISOString()).toBe("2023-02-01T00:00:00.000Z");
});

// related to issue https://github.com/triggerdotdev/trigger.dev/issues/1968
it("should ignore original undefined type metadata for overriden fields", async () => {
const originalData = {
name: "John",
age: 30,
date: new Date("2023-01-01"),
country: undefined,
settings: {
theme: undefined,
},
};

const superjson = await import("superjson");
const originalSerialized = superjson.stringify(originalData);

const newPayloadJson = JSON.stringify({
name: "Jane",
surname: "Doe",
age: 25,
date: "2023-02-01T00:00:00.000Z",
country: "US",
settings: {
theme: "dark",
},
});

const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any;

expect(result.name).toBe("Jane");
expect(result.surname).toBe("Doe");
expect(result.country).toBe("US");
expect(result.settings.theme).toBe("dark");
expect(result.age).toBe(25);
expect(result.date).toBeInstanceOf(Date);
expect(result.date.toISOString()).toBe("2023-02-01T00:00:00.000Z");
});

it("should preserve BigInt type metadata", async () => {
const originalData = {
id: BigInt(123456789),
count: 42,
};

const superjson = await import("superjson");
const originalSerialized = superjson.stringify(originalData);

const newPayloadJson = JSON.stringify({
id: "987654321",
count: 100,
});

const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any;

expect(result.id).toBe(BigInt(987654321));
expect(typeof result.id).toBe("bigint");
expect(result.count).toBe(100);
});

it("should preserve nested type metadata", async () => {
const originalData = {
user: {
id: BigInt(123),
createdAt: new Date("2023-01-01"),
settings: {
theme: "dark",
updatedAt: new Date("2023-01-01"),
},
},
metadata: {
version: 1,
},
};

const superjson = await import("superjson");
const originalSerialized = superjson.stringify(originalData);

const newPayloadJson = JSON.stringify({
user: {
id: "456",
createdAt: "2023-06-01T00:00:00.000Z",
settings: {
theme: "light",
updatedAt: "2023-06-01T00:00:00.000Z",
},
},
metadata: {
version: 2,
},
});

const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any;

expect(result.user.id).toBe(BigInt(456));
expect(result.user.createdAt).toBeInstanceOf(Date);
expect(result.user.createdAt.toISOString()).toBe("2023-06-01T00:00:00.000Z");
expect(result.user.settings.theme).toBe("light");
expect(result.user.settings.updatedAt).toBeInstanceOf(Date);
expect(result.user.settings.updatedAt.toISOString()).toBe("2023-06-01T00:00:00.000Z");
expect(result.metadata.version).toBe(2);
});

it("should preserve Set type metadata", async () => {
const originalData = {
tags: new Set(["tag1", "tag2"]),
name: "test",
};

const superjson = await import("superjson");
const originalSerialized = superjson.stringify(originalData);

const newPayloadJson = JSON.stringify({
tags: ["tag3", "tag4", "tag5"],
name: "updated",
});

const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any;

expect(result.tags).toBeInstanceOf(Set);
expect(Array.from(result.tags)).toEqual(["tag3", "tag4", "tag5"]);
expect(result.name).toBe("updated");
});

it("should preserve Map type metadata", async () => {
const originalData = {
mapping: new Map([
["key1", "value1"],
["key2", "value2"],
]),
name: "test",
};

const superjson = await import("superjson");
const originalSerialized = superjson.stringify(originalData);

const newPayloadJson = JSON.stringify({
mapping: [
["key3", "value3"],
["key4", "value4"],
],
name: "updated",
});

const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any;

expect(result.mapping).toBeInstanceOf(Map);
expect(result.mapping.get("key3")).toBe("value3");
expect(result.mapping.get("key4")).toBe("value4");
expect(result.name).toBe("updated");
});

it("should throw error for invalid JSON payload", async () => {
const originalData = { name: "test" };

const superjson = await import("superjson");
const originalSerialized = superjson.stringify(originalData);
const invalidPayload = "{ invalid json }";

await expect(replaceSuperJsonPayload(originalSerialized, invalidPayload)).rejects.toThrow();
});
});
});
Loading
Loading