diff --git a/apps/webapp/app/v3/replayTask.ts b/apps/webapp/app/v3/replayTask.ts index b7283c3c3f..90897cf7c2 100644 --- a/apps/webapp/app/v3/replayTask.ts +++ b/apps/webapp/app/v3/replayTask.ts @@ -9,11 +9,12 @@ export const ReplayRunData = z .optional() .transform((val, ctx) => { if (!val) { - return {}; + return "{}"; } try { - return JSON.parse(val); + JSON.parse(val); + return val; } catch { ctx.addIssue({ code: z.ZodIssueCode.custom, diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index 42b3aac804..dbcd26af90 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -2,18 +2,19 @@ import { type MachinePresetName, conditionallyImportPacket, parsePacket, + stringifyIO, } from "@trigger.dev/core/v3"; import { type TaskRun } from "@trigger.dev/database"; import { findEnvironmentById } from "~/models/runtimeEnvironment.server"; -import { getTagsForRunId } from "~/models/taskRunTag.server"; import { logger } from "~/services/logger.server"; import { BaseService } from "./baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; import { type RunOptionsData } from "../testTask"; +import { replaceSuperJsonPayload } from "@trigger.dev/core/v3/utils/ioSerialization"; type OverrideOptions = { environmentId?: string; - payload?: unknown; + payload?: string; metadata?: unknown; bulkActionId?: string; } & RunOptionsData; @@ -36,7 +37,15 @@ export class ReplayTaskRunService extends BaseService { taskRunFriendlyId: existingTaskRun.friendlyId, }); - const payload = overrideOptions.payload ?? (await this.getExistingPayload(existingTaskRun)); + const payloadPacket = await this.overrideExistingPayloadPacket( + existingTaskRun, + overrideOptions.payload + ); + const parsedPayload = + payloadPacket.dataType === "application/json" + ? await parsePacket(payloadPacket) + : payloadPacket.data; + const payloadType = payloadPacket.dataType; const metadata = overrideOptions.metadata ?? (await this.getExistingMetadata(existingTaskRun)); const tags = overrideOptions.tags ?? existingTaskRun.runTags; @@ -53,8 +62,9 @@ export class ReplayTaskRunService extends BaseService { existingTaskRun.taskIdentifier, authenticatedEnvironment, { - payload, + payload: parsedPayload, options: { + payloadType, queue: taskQueue ? { name: taskQueue.name, @@ -108,15 +118,23 @@ export class ReplayTaskRunService extends BaseService { } } - private async getExistingPayload(existingTaskRun: TaskRun) { - const existingPayloadPacket = await conditionallyImportPacket({ - data: existingTaskRun.payload, + private async overrideExistingPayloadPacket( + existingTaskRun: TaskRun, + stringifiedPayloadOverride: string | undefined + ) { + if (stringifiedPayloadOverride && existingTaskRun.payloadType === "application/super+json") { + const newPayload = await replaceSuperJsonPayload( + existingTaskRun.payload, + stringifiedPayloadOverride + ); + + return stringifyIO(newPayload); + } + + return conditionallyImportPacket({ + data: stringifiedPayloadOverride ?? existingTaskRun.payload, dataType: existingTaskRun.payloadType, }); - - return existingPayloadPacket.dataType === "application/json" - ? await parsePacket(existingPayloadPacket) - : existingPayloadPacket.data; } private async getExistingMetadata(existingTaskRun: TaskRun) { diff --git a/packages/core/package.json b/packages/core/package.json index b58dcb4982..4d16c555b6 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -188,6 +188,7 @@ "execa": "^8.0.1", "humanize-duration": "^3.27.3", "jose": "^5.4.0", + "lodash.get": "^4.4.2", "nanoid": "3.3.8", "prom-client": "^15.1.0", "socket.io": "4.7.4", @@ -206,6 +207,7 @@ "@epic-web/test-server": "^0.1.0", "@trigger.dev/database": "workspace:*", "@types/humanize-duration": "^3.27.1", + "@types/lodash.get": "^4.4.9", "@types/readable-stream": "^4.0.14", "ai": "^3.4.33", "defu": "^6.1.4", diff --git a/packages/core/src/v3/utils/ioSerialization.ts b/packages/core/src/v3/utils/ioSerialization.ts index 10f4edb2c4..b3049187e8 100644 --- a/packages/core/src/v3/utils/ioSerialization.ts +++ b/packages/core/src/v3/utils/ioSerialization.ts @@ -12,6 +12,7 @@ import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; import { TriggerTracer } from "../tracer.js"; import { zodfetch } from "../zodfetch.js"; import { flattenAttributes } from "./flattenAttributes.js"; +import get from "lodash.get"; export type IOPacket = { data?: string | undefined; @@ -505,13 +506,46 @@ function safeJsonParse(value: string): any { } } +/** + * Replaces the data in a SuperJSON-serialized string with new payload data while preserving + * the original type metadata (Dates, BigInts, Sets, Maps, etc.). + * + * It is primarily useful for our run replay functionality where we want to preserve the original + * type metadata for the new payload. + * + * Note that `undefined` type metadata is ignored when the corresponding field is overriden in the + * new payload, i.e., fields which were previously undefined in the original payload are restored into + * the primitive type they have in the new payload, instead of `undefined`. + * This is a workaround for https://github.com/triggerdotdev/trigger.dev/issues/1968. + * + * @param original - A SuperJSON-serialized string containing the original data with type metadata + * @param newPayload - A JSON string containing the new data to replace the original payload + * @returns The deserialized object with new data but original type metadata preserved + * + * @throws {Error} If the newPayload is not valid JSON + */ export async function replaceSuperJsonPayload(original: string, newPayload: string) { const superjson = await loadSuperJSON(); const originalObject = superjson.parse(original); + const newPayloadObject = JSON.parse(newPayload); const { meta } = superjson.serialize(originalObject); + if (meta?.values) { + const originalUndefinedKeys = Object.entries(meta.values) + .filter(([, value]) => Array.isArray(value) && value.at(0) === "undefined") + .map(([key]) => key); + + const overridenUndefinedKeys = originalUndefinedKeys.filter( + (key) => get(newPayloadObject, key) !== undefined + ); + + overridenUndefinedKeys.forEach((key) => { + delete (meta.values as Record)[key]; + }); + } + const newSuperJson = { - json: JSON.parse(newPayload) as any, + json: newPayloadObject, meta, }; diff --git a/packages/core/test/ioSerialization.test.ts b/packages/core/test/ioSerialization.test.ts new file mode 100644 index 0000000000..ffb9b30753 --- /dev/null +++ b/packages/core/test/ioSerialization.test.ts @@ -0,0 +1,191 @@ +import { replaceSuperJsonPayload } from "../src/v3/utils/ioSerialization.js"; + +describe("ioSerialization", () => { + describe("replaceSuperJsonPayload", () => { + it("should replace simple JSON payload while preserving SuperJSON metadata", async () => { + const originalData = { + name: "John", + age: 30, + date: new Date("2023-01-01"), + }; + + const superjson = await import("superjson"); + const originalSerialized = superjson.stringify(originalData); + + const newPayloadJson = JSON.stringify({ + name: "Jane", + surname: "Doe", + age: 25, + date: "2023-02-01T00:00:00.000Z", + }); + + const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any; + + expect(result.name).toBe("Jane"); + expect(result.surname).toBe("Doe"); + expect(result.age).toBe(25); + expect(result.date).toBeInstanceOf(Date); + expect(result.date.toISOString()).toBe("2023-02-01T00:00:00.000Z"); + }); + + // related to issue https://github.com/triggerdotdev/trigger.dev/issues/1968 + it("should ignore original undefined type metadata for overriden fields", async () => { + const originalData = { + name: "John", + age: 30, + date: new Date("2023-01-01"), + country: undefined, + settings: { + theme: undefined, + }, + }; + + const superjson = await import("superjson"); + const originalSerialized = superjson.stringify(originalData); + + const newPayloadJson = JSON.stringify({ + name: "Jane", + surname: "Doe", + age: 25, + date: "2023-02-01T00:00:00.000Z", + country: "US", + settings: { + theme: "dark", + }, + }); + + const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any; + + expect(result.name).toBe("Jane"); + expect(result.surname).toBe("Doe"); + expect(result.country).toBe("US"); + expect(result.settings.theme).toBe("dark"); + expect(result.age).toBe(25); + expect(result.date).toBeInstanceOf(Date); + expect(result.date.toISOString()).toBe("2023-02-01T00:00:00.000Z"); + }); + + it("should preserve BigInt type metadata", async () => { + const originalData = { + id: BigInt(123456789), + count: 42, + }; + + const superjson = await import("superjson"); + const originalSerialized = superjson.stringify(originalData); + + const newPayloadJson = JSON.stringify({ + id: "987654321", + count: 100, + }); + + const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any; + + expect(result.id).toBe(BigInt(987654321)); + expect(typeof result.id).toBe("bigint"); + expect(result.count).toBe(100); + }); + + it("should preserve nested type metadata", async () => { + const originalData = { + user: { + id: BigInt(123), + createdAt: new Date("2023-01-01"), + settings: { + theme: "dark", + updatedAt: new Date("2023-01-01"), + }, + }, + metadata: { + version: 1, + }, + }; + + const superjson = await import("superjson"); + const originalSerialized = superjson.stringify(originalData); + + const newPayloadJson = JSON.stringify({ + user: { + id: "456", + createdAt: "2023-06-01T00:00:00.000Z", + settings: { + theme: "light", + updatedAt: "2023-06-01T00:00:00.000Z", + }, + }, + metadata: { + version: 2, + }, + }); + + const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any; + + expect(result.user.id).toBe(BigInt(456)); + expect(result.user.createdAt).toBeInstanceOf(Date); + expect(result.user.createdAt.toISOString()).toBe("2023-06-01T00:00:00.000Z"); + expect(result.user.settings.theme).toBe("light"); + expect(result.user.settings.updatedAt).toBeInstanceOf(Date); + expect(result.user.settings.updatedAt.toISOString()).toBe("2023-06-01T00:00:00.000Z"); + expect(result.metadata.version).toBe(2); + }); + + it("should preserve Set type metadata", async () => { + const originalData = { + tags: new Set(["tag1", "tag2"]), + name: "test", + }; + + const superjson = await import("superjson"); + const originalSerialized = superjson.stringify(originalData); + + const newPayloadJson = JSON.stringify({ + tags: ["tag3", "tag4", "tag5"], + name: "updated", + }); + + const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any; + + expect(result.tags).toBeInstanceOf(Set); + expect(Array.from(result.tags)).toEqual(["tag3", "tag4", "tag5"]); + expect(result.name).toBe("updated"); + }); + + it("should preserve Map type metadata", async () => { + const originalData = { + mapping: new Map([ + ["key1", "value1"], + ["key2", "value2"], + ]), + name: "test", + }; + + const superjson = await import("superjson"); + const originalSerialized = superjson.stringify(originalData); + + const newPayloadJson = JSON.stringify({ + mapping: [ + ["key3", "value3"], + ["key4", "value4"], + ], + name: "updated", + }); + + const result = (await replaceSuperJsonPayload(originalSerialized, newPayloadJson)) as any; + + expect(result.mapping).toBeInstanceOf(Map); + expect(result.mapping.get("key3")).toBe("value3"); + expect(result.mapping.get("key4")).toBe("value4"); + expect(result.name).toBe("updated"); + }); + + it("should throw error for invalid JSON payload", async () => { + const originalData = { name: "test" }; + + const superjson = await import("superjson"); + const originalSerialized = superjson.stringify(originalData); + const invalidPayload = "{ invalid json }"; + + await expect(replaceSuperJsonPayload(originalSerialized, invalidPayload)).rejects.toThrow(); + }); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a5cdef7b3e..01f186038b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1534,6 +1534,9 @@ importers: jose: specifier: ^5.4.0 version: 5.4.0 + lodash.get: + specifier: ^4.4.2 + version: 4.4.2 nanoid: specifier: 3.3.8 version: 3.3.8 @@ -1583,6 +1586,9 @@ importers: '@types/humanize-duration': specifier: ^3.27.1 version: 3.27.1 + '@types/lodash.get': + specifier: ^4.4.9 + version: 4.4.9 '@types/readable-stream': specifier: ^4.0.14 version: 4.0.14 @@ -19069,6 +19075,12 @@ packages: '@types/node': 20.14.14 dev: true + /@types/lodash.get@4.4.9: + resolution: {integrity: sha512-J5dvW98sxmGnamqf+/aLP87PYXyrha9xIgc2ZlHl6OHMFR2Ejdxep50QfU0abO1+CH6+ugx+8wEUN1toImAinA==} + dependencies: + '@types/lodash': 4.14.191 + dev: true + /@types/lodash.omit@4.5.7: resolution: {integrity: sha512-6q6cNg0tQ6oTWjSM+BcYMBhan54P/gLqBldG4AuXd3nKr0oeVekWNS4VrNEu3BhCSDXtGapi7zjhnna0s03KpA==} dependencies: @@ -27238,6 +27250,11 @@ packages: resolution: {integrity: sha512-C5N2Z3DgnnKr0LOpv/hKCgKdb7ZZwafIrsesve6lmzvZIRZRGaZ/l6Q8+2W7NaT+ZwO3fFlSCzCzrDCFdJfZ4g==} dev: true + /lodash.get@4.4.2: + resolution: {integrity: sha512-z+Uw/vLuy6gQe8cfaFWD7p0wVv8fJl3mbzXh33RS+0oW2wvUqiRXiQ69gLWSLpgB5/6sU+r6BlQR0MBILadqTQ==} + deprecated: This package is deprecated. Use the optional chaining (?.) operator instead. + dev: false + /lodash.groupby@4.6.0: resolution: {integrity: sha512-5dcWxm23+VAoz+awKmBaiBvzox8+RqMgFhi7UvX9DHZr2HdxHXM/Wrf8cfKpsW37RNrvtPn6hSwNqurSILbmJw==} dev: false