Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import { recordSpanError, startSpan, trace, type Tracer } from "@internal/tracing";
import { Logger, LogLevel } from "@trigger.dev/core/logger";
import { tryCatch } from "@trigger.dev/core/utils";
import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
import { TaskRun } from "@trigger.dev/database";
import { nanoid } from "nanoid";
import EventEmitter from "node:events";
Expand Down Expand Up @@ -636,7 +636,7 @@ export class RunsReplicationService {
dataType,
};

const [parseError, parsedData] = await tryCatch(parsePacket(packet));
const [parseError, parsedData] = await tryCatch(parsePacketAsJson(packet));

if (parseError) {
this.logger.error("Error parsing packet", {
Expand Down
142 changes: 142 additions & 0 deletions apps/webapp/test/runsReplicationService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { z } from "zod";
import { TaskRunStatus } from "~/database-types";
import { RunsReplicationService } from "~/services/runsReplicationService.server";
import { createInMemoryTracing } from "./utils/tracing";
import superjson from "superjson";

vi.setConfig({ testTimeout: 60_000 });

Expand Down Expand Up @@ -133,6 +134,147 @@ describe("RunsReplicationService", () => {
}
);

containerTest(
"should replicate runs with super json payloads to clickhouse",
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);

const clickhouse = new ClickHouse({
url: clickhouseContainer.getConnectionUrl(),
name: "runs-replication",
compression: {
request: true,
},
});

const { tracer, exporter } = createInMemoryTracing();

const runsReplicationService = new RunsReplicationService({
clickhouse,
pgConnectionUrl: postgresContainer.getConnectionUri(),
serviceName: "runs-replication",
slotName: "task_runs_to_clickhouse_v1",
publicationName: "task_runs_to_clickhouse_v1_publication",
redisOptions,
maxFlushConcurrency: 1,
flushIntervalMs: 100,
flushBatchSize: 1,
leaderLockTimeoutMs: 5000,
leaderLockExtendIntervalMs: 1000,
ackIntervalSeconds: 5,
tracer,
});

await runsReplicationService.start();

const organization = await prisma.organization.create({
data: {
title: "test",
slug: "test",
},
});

const project = await prisma.project.create({
data: {
name: "test",
slug: "test",
organizationId: organization.id,
externalRef: "test",
},
});

const runtimeEnvironment = await prisma.runtimeEnvironment.create({
data: {
slug: "test",
type: "DEVELOPMENT",
projectId: project.id,
organizationId: organization.id,
apiKey: "test",
pkApiKey: "test",
shortcode: "test",
},
});

const date = new Date();

// Now we insert a row into the table
const taskRun = await prisma.taskRun.create({
data: {
friendlyId: "run_1234",
taskIdentifier: "my-task",
payload: superjson.stringify({
foo: "bar",
bigint: BigInt(1234),
date,
map: new Map([["foo", "bar"]]),
}),
payloadType: "application/super+json",
traceId: "1234",
spanId: "1234",
queue: "test",
runtimeEnvironmentId: runtimeEnvironment.id,
projectId: project.id,
organizationId: organization.id,
environmentType: "DEVELOPMENT",
engine: "V2",
},
});

await setTimeout(1000);

// Check that the row was replicated to clickhouse
const queryRuns = clickhouse.reader.query({
name: "runs-replication",
query: "SELECT * FROM trigger_dev.task_runs_v2",
schema: z.any(),
});

const [queryError, result] = await queryRuns({});

expect(queryError).toBeNull();
expect(result?.length).toBe(1);
expect(result?.[0]).toEqual(
expect.objectContaining({
run_id: taskRun.id,
friendly_id: taskRun.friendlyId,
task_identifier: taskRun.taskIdentifier,
environment_id: runtimeEnvironment.id,
project_id: project.id,
organization_id: organization.id,
environment_type: "DEVELOPMENT",
engine: "V2",
})
);

const queryPayloads = clickhouse.reader.query({
name: "runs-replication",
query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}",
schema: z.any(),
params: z.object({ run_id: z.string() }),
});

const [payloadQueryError, payloadResult] = await queryPayloads({ run_id: taskRun.id });

expect(payloadQueryError).toBeNull();
expect(payloadResult?.length).toBe(1);
expect(payloadResult?.[0]).toEqual(
expect.objectContaining({
run_id: taskRun.id,
payload: {
data: expect.objectContaining({
foo: "bar",
bigint: "1234",
date: date.toISOString(),
map: [["foo", "bar"]],
}),
},
})
);

await runsReplicationService.stop();
}
);

containerTest(
"should not produce any handle_transaction spans when no TaskRun events are produced",
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
Expand Down
30 changes: 30 additions & 0 deletions packages/core/src/v3/utils/ioSerialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,36 @@ export async function parsePacket(value: IOPacket, options?: ParsePacketOptions)
}
}

export async function parsePacketAsJson(
value: IOPacket,
options?: ParsePacketOptions
): Promise<any> {
if (!value.data) {
return undefined;
}

switch (value.dataType) {
case "application/json":
return JSON.parse(value.data, makeSafeReviver(options));
case "application/super+json":
const { parse, serialize } = await loadSuperJSON();

const superJsonResult = parse(value.data);

const { json } = serialize(superJsonResult);

return json;
case "text/plain":
return value.data;
case "application/store":
throw new Error(
`Cannot parse an application/store packet (${value.data}). Needs to be imported first.`
);
default:
return value.data;
}
}

export async function conditionallyImportAndParsePacket(
value: IOPacket,
client?: ApiClient
Expand Down