diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index aaad687717..eed1d3c9f3 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -10,7 +10,7 @@ import { import { recordSpanError, startSpan, trace, type Tracer } from "@internal/tracing"; import { Logger, LogLevel } from "@trigger.dev/core/logger"; import { tryCatch } from "@trigger.dev/core/utils"; -import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization"; +import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization"; import { TaskRun } from "@trigger.dev/database"; import { nanoid } from "nanoid"; import EventEmitter from "node:events"; @@ -636,7 +636,7 @@ export class RunsReplicationService { dataType, }; - const [parseError, parsedData] = await tryCatch(parsePacket(packet)); + const [parseError, parsedData] = await tryCatch(parsePacketAsJson(packet)); if (parseError) { this.logger.error("Error parsing packet", { diff --git a/apps/webapp/test/runsReplicationService.test.ts b/apps/webapp/test/runsReplicationService.test.ts index 1494b8bd70..d11abb3248 100644 --- a/apps/webapp/test/runsReplicationService.test.ts +++ b/apps/webapp/test/runsReplicationService.test.ts @@ -6,6 +6,7 @@ import { z } from "zod"; import { TaskRunStatus } from "~/database-types"; import { RunsReplicationService } from "~/services/runsReplicationService.server"; import { createInMemoryTracing } from "./utils/tracing"; +import superjson from "superjson"; vi.setConfig({ testTimeout: 60_000 }); @@ -133,6 +134,147 @@ describe("RunsReplicationService", () => { } ); + containerTest( + "should replicate runs with super json payloads to clickhouse", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { + request: true, + }, + }); + + const { tracer, exporter } = createInMemoryTracing(); + + const runsReplicationService = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + tracer, + }); + + await runsReplicationService.start(); + + const organization = await prisma.organization.create({ + data: { + title: "test", + slug: "test", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + const date = new Date(); + + // Now we insert a row into the table + const taskRun = await prisma.taskRun.create({ + data: { + friendlyId: "run_1234", + taskIdentifier: "my-task", + payload: superjson.stringify({ + foo: "bar", + bigint: BigInt(1234), + date, + map: new Map([["foo", "bar"]]), + }), + payloadType: "application/super+json", + traceId: "1234", + spanId: "1234", + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + + await setTimeout(1000); + + // Check that the row was replicated to clickhouse + const queryRuns = clickhouse.reader.query({ + name: "runs-replication", + query: "SELECT * FROM trigger_dev.task_runs_v2", + schema: z.any(), + }); + + const [queryError, result] = await queryRuns({}); + + expect(queryError).toBeNull(); + expect(result?.length).toBe(1); + expect(result?.[0]).toEqual( + expect.objectContaining({ + run_id: taskRun.id, + friendly_id: taskRun.friendlyId, + task_identifier: taskRun.taskIdentifier, + environment_id: runtimeEnvironment.id, + project_id: project.id, + organization_id: organization.id, + environment_type: "DEVELOPMENT", + engine: "V2", + }) + ); + + const queryPayloads = clickhouse.reader.query({ + name: "runs-replication", + query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}", + schema: z.any(), + params: z.object({ run_id: z.string() }), + }); + + const [payloadQueryError, payloadResult] = await queryPayloads({ run_id: taskRun.id }); + + expect(payloadQueryError).toBeNull(); + expect(payloadResult?.length).toBe(1); + expect(payloadResult?.[0]).toEqual( + expect.objectContaining({ + run_id: taskRun.id, + payload: { + data: expect.objectContaining({ + foo: "bar", + bigint: "1234", + date: date.toISOString(), + map: [["foo", "bar"]], + }), + }, + }) + ); + + await runsReplicationService.stop(); + } + ); + containerTest( "should not produce any handle_transaction spans when no TaskRun events are produced", async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { diff --git a/packages/core/src/v3/utils/ioSerialization.ts b/packages/core/src/v3/utils/ioSerialization.ts index 8fbfa2e537..cbea46671e 100644 --- a/packages/core/src/v3/utils/ioSerialization.ts +++ b/packages/core/src/v3/utils/ioSerialization.ts @@ -41,6 +41,36 @@ export async function parsePacket(value: IOPacket, options?: ParsePacketOptions) } } +export async function parsePacketAsJson( + value: IOPacket, + options?: ParsePacketOptions +): Promise { + if (!value.data) { + return undefined; + } + + switch (value.dataType) { + case "application/json": + return JSON.parse(value.data, makeSafeReviver(options)); + case "application/super+json": + const { parse, serialize } = await loadSuperJSON(); + + const superJsonResult = parse(value.data); + + const { json } = serialize(superJsonResult); + + return json; + case "text/plain": + return value.data; + case "application/store": + throw new Error( + `Cannot parse an application/store packet (${value.data}). Needs to be imported first.` + ); + default: + return value.data; + } +} + export async function conditionallyImportAndParsePacket( value: IOPacket, client?: ApiClient