Skip to content

Commit 03c564a

Browse files
committed
use compact insert strategy for runs
1 parent b1e6dd0 commit 03c564a

File tree

9 files changed

+558
-121
lines changed

9 files changed

+558
-121
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 85 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse";
2+
import { TASK_RUN_COLUMNS, PAYLOAD_COLUMNS } from "@internal/clickhouse";
23
import { type RedisOptions } from "@internal/redis";
34
import {
45
LogicalReplicationClient,
@@ -80,9 +81,7 @@ type TaskRunInsert = {
8081

8182
export type RunsReplicationServiceEvents = {
8283
message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }];
83-
batchFlushed: [
84-
{ flushId: string; taskRunInserts: TaskRunV2[]; payloadInserts: RawTaskRunPayloadV1[] }
85-
];
84+
batchFlushed: [{ flushId: string; taskRunInserts: any[][]; payloadInserts: any[][] }];
8685
};
8786

8887
export class RunsReplicationService {
@@ -171,12 +170,9 @@ export class RunsReplicationService {
171170
description: "Insert retry attempts",
172171
});
173172

174-
this._eventsProcessedCounter = this._meter.createCounter(
175-
"runs_replication.events_processed",
176-
{
177-
description: "Replication events processed (inserts, updates, deletes)",
178-
}
179-
);
173+
this._eventsProcessedCounter = this._meter.createCounter("runs_replication.events_processed", {
174+
description: "Replication events processed (inserts, updates, deletes)",
175+
});
180176

181177
this._flushDurationHistogram = this._meter.createHistogram(
182178
"runs_replication.flush_duration_ms",
@@ -581,29 +577,35 @@ export class RunsReplicationService {
581577
.filter(Boolean)
582578
// batch inserts in clickhouse are more performant if the items
583579
// are pre-sorted by the primary key
580+
// Array indices: [0]=environment_id, [1]=organization_id, [2]=project_id, [3]=run_id, [5]=created_at
584581
.sort((a, b) => {
585-
if (a.organization_id !== b.organization_id) {
586-
return a.organization_id < b.organization_id ? -1 : 1;
582+
if (a[1] !== b[1]) {
583+
// organization_id
584+
return a[1] < b[1] ? -1 : 1;
587585
}
588-
if (a.project_id !== b.project_id) {
589-
return a.project_id < b.project_id ? -1 : 1;
586+
if (a[2] !== b[2]) {
587+
// project_id
588+
return a[2] < b[2] ? -1 : 1;
590589
}
591-
if (a.environment_id !== b.environment_id) {
592-
return a.environment_id < b.environment_id ? -1 : 1;
590+
if (a[0] !== b[0]) {
591+
// environment_id
592+
return a[0] < b[0] ? -1 : 1;
593593
}
594-
if (a.created_at !== b.created_at) {
595-
return a.created_at - b.created_at;
594+
if (a[5] !== b[5]) {
595+
// created_at
596+
return a[5] - b[5];
596597
}
597-
return a.run_id < b.run_id ? -1 : 1;
598+
return a[3] < b[3] ? -1 : 1; // run_id
598599
});
599600

600601
const payloadInserts = preparedInserts
601602
.map(({ payloadInsert }) => payloadInsert)
602603
.filter(Boolean)
603604
// batch inserts in clickhouse are more performant if the items
604605
// are pre-sorted by the primary key
606+
// Array indices: [0]=run_id
605607
.sort((a, b) => {
606-
return a.run_id < b.run_id ? -1 : 1;
608+
return a[0] < b[0] ? -1 : 1; // run_id
607609
});
608610

609611
span.setAttribute("task_run_inserts", taskRunInserts.length);
@@ -633,7 +635,6 @@ export class RunsReplicationService {
633635
this.logger.error("Error inserting task run inserts", {
634636
error: taskRunError,
635637
flushId,
636-
runIds: taskRunInserts.map((r) => r.run_id),
637638
});
638639
recordSpanError(span, taskRunError);
639640
}
@@ -642,7 +643,6 @@ export class RunsReplicationService {
642643
this.logger.error("Error inserting payload inserts", {
643644
error: payloadError,
644645
flushId,
645-
runIds: payloadInserts.map((r) => r.run_id),
646646
});
647647
recordSpanError(span, payloadError);
648648
}
@@ -770,16 +770,14 @@ export class RunsReplicationService {
770770
}
771771
}
772772

773-
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) {
773+
async #insertTaskRunInserts(taskRunInserts: any[][], attempt: number) {
774774
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
775-
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertUnsafe(
776-
taskRunInserts,
777-
{
775+
const [insertError, insertResult] =
776+
await this.options.clickhouse.taskRuns.insertCompactArrays(taskRunInserts, {
778777
params: {
779778
clickhouse_settings: this.#getClickhouseInsertSettings(),
780779
},
781-
}
782-
);
780+
});
783781

784782
if (insertError) {
785783
this.logger.error("Error inserting task run inserts attempt", {
@@ -795,16 +793,14 @@ export class RunsReplicationService {
795793
});
796794
}
797795

798-
async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[], attempt: number) {
796+
async #insertPayloadInserts(payloadInserts: any[][], attempt: number) {
799797
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
800-
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloadsUnsafe(
801-
payloadInserts,
802-
{
798+
const [insertError, insertResult] =
799+
await this.options.clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, {
803800
params: {
804801
clickhouse_settings: this.#getClickhouseInsertSettings(),
805802
},
806-
}
807-
);
803+
});
808804

809805
if (insertError) {
810806
this.logger.error("Error inserting payload inserts attempt", {
@@ -822,7 +818,7 @@ export class RunsReplicationService {
822818

823819
async #prepareRunInserts(
824820
batchedRun: TaskRunInsert
825-
): Promise<{ taskRunInsert?: TaskRunV2; payloadInsert?: RawTaskRunPayloadV1 }> {
821+
): Promise<{ taskRunInsert?: any[]; payloadInsert?: any[] }> {
826822
this.logger.debug("Preparing run", {
827823
batchedRun,
828824
});
@@ -875,66 +871,67 @@ export class RunsReplicationService {
875871
environmentType: string,
876872
event: "insert" | "update" | "delete",
877873
_version: bigint
878-
): Promise<TaskRunV2> {
874+
): Promise<any[]> {
879875
const output = await this.#prepareJson(run.output, run.outputType);
880876

881-
return {
882-
environment_id: run.runtimeEnvironmentId,
883-
organization_id: organizationId,
884-
project_id: run.projectId,
885-
run_id: run.id,
886-
updated_at: run.updatedAt.getTime(),
887-
created_at: run.createdAt.getTime(),
888-
status: run.status,
889-
environment_type: environmentType,
890-
friendly_id: run.friendlyId,
891-
engine: run.engine,
892-
task_identifier: run.taskIdentifier,
893-
queue: run.queue,
894-
span_id: run.spanId,
895-
trace_id: run.traceId,
896-
error: { data: run.error },
897-
attempt: run.attemptNumber ?? 1,
898-
schedule_id: run.scheduleId ?? "",
899-
batch_id: run.batchId ?? "",
900-
completed_at: run.completedAt?.getTime(),
901-
started_at: run.startedAt?.getTime(),
902-
executed_at: run.executedAt?.getTime(),
903-
delay_until: run.delayUntil?.getTime(),
904-
queued_at: run.queuedAt?.getTime(),
905-
expired_at: run.expiredAt?.getTime(),
906-
usage_duration_ms: run.usageDurationMs,
907-
cost_in_cents: run.costInCents,
908-
base_cost_in_cents: run.baseCostInCents,
909-
tags: run.runTags ?? [],
910-
task_version: run.taskVersion ?? "",
911-
sdk_version: run.sdkVersion ?? "",
912-
cli_version: run.cliVersion ?? "",
913-
machine_preset: run.machinePreset ?? "",
914-
root_run_id: run.rootTaskRunId ?? "",
915-
parent_run_id: run.parentTaskRunId ?? "",
916-
depth: run.depth,
917-
is_test: run.isTest,
918-
idempotency_key: run.idempotencyKey ?? "",
919-
expiration_ttl: run.ttl ?? "",
920-
output,
921-
concurrency_key: run.concurrencyKey ?? "",
922-
bulk_action_group_ids: run.bulkActionGroupIds ?? [],
923-
worker_queue: run.masterQueue,
924-
max_duration_in_seconds: run.maxDurationInSeconds ?? undefined,
925-
_version: _version.toString(),
926-
_is_deleted: event === "delete" ? 1 : 0,
927-
};
877+
// Return array matching TASK_RUN_COLUMNS order
878+
return [
879+
run.runtimeEnvironmentId, // environment_id
880+
organizationId, // organization_id
881+
run.projectId, // project_id
882+
run.id, // run_id
883+
run.updatedAt.getTime(), // updated_at
884+
run.createdAt.getTime(), // created_at
885+
run.status, // status
886+
environmentType, // environment_type
887+
run.friendlyId, // friendly_id
888+
run.attemptNumber ?? 1, // attempt
889+
run.engine, // engine
890+
run.taskIdentifier, // task_identifier
891+
run.queue, // queue
892+
run.scheduleId ?? "", // schedule_id
893+
run.batchId ?? "", // batch_id
894+
run.completedAt?.getTime() ?? null, // completed_at
895+
run.startedAt?.getTime() ?? null, // started_at
896+
run.executedAt?.getTime() ?? null, // executed_at
897+
run.delayUntil?.getTime() ?? null, // delay_until
898+
run.queuedAt?.getTime() ?? null, // queued_at
899+
run.expiredAt?.getTime() ?? null, // expired_at
900+
run.usageDurationMs ?? 0, // usage_duration_ms
901+
run.costInCents ?? 0, // cost_in_cents
902+
run.baseCostInCents ?? 0, // base_cost_in_cents
903+
output, // output
904+
{ data: run.error }, // error
905+
run.runTags ?? [], // tags
906+
run.taskVersion ?? "", // task_version
907+
run.sdkVersion ?? "", // sdk_version
908+
run.cliVersion ?? "", // cli_version
909+
run.machinePreset ?? "", // machine_preset
910+
run.rootTaskRunId ?? "", // root_run_id
911+
run.parentTaskRunId ?? "", // parent_run_id
912+
run.depth ?? 0, // depth
913+
run.spanId, // span_id
914+
run.traceId, // trace_id
915+
run.idempotencyKey ?? "", // idempotency_key
916+
run.ttl ?? "", // expiration_ttl
917+
run.isTest ?? false, // is_test
918+
run.concurrencyKey ?? "", // concurrency_key
919+
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
920+
run.masterQueue ?? "", // worker_queue
921+
_version.toString(), // _version
922+
event === "delete" ? 1 : 0, // _is_deleted
923+
];
928924
}
929925

930-
async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<RawTaskRunPayloadV1> {
926+
async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<any[]> {
931927
const payload = await this.#prepareJson(run.payload, run.payloadType);
932928

933-
return {
934-
run_id: run.id,
935-
created_at: run.createdAt.getTime(),
936-
payload,
937-
};
929+
// Return array matching PAYLOAD_COLUMNS order
930+
return [
931+
run.id, // run_id
932+
run.createdAt.getTime(), // created_at
933+
payload, // payload
934+
];
938935
}
939936

940937
async #prepareJson(

apps/webapp/test/performance/harness.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ export class RunsReplicationHarness {
511511
const clickhouse = new ClickHouse({
512512
url: this.config.infrastructure.clickhouseUrl!,
513513
name: "profiling-migrator",
514+
logLevel: "error", // Suppress migration spam - we handle errors ourselves
514515
});
515516

516517
try {
@@ -523,10 +524,11 @@ export class RunsReplicationHarness {
523524
.filter((f) => f.endsWith(".sql"))
524525
.sort(); // Ensure numeric ordering
525526

526-
console.log(`Found ${sqlFiles.length} migration files`);
527+
// Suppress verbose output - only show errors
528+
let successCount = 0;
529+
let skippedCount = 0;
527530

528531
for (const file of sqlFiles) {
529-
console.log(` Running migration: ${file}`);
530532
const sql = await fs.readFile(path.join(migrationsPath, file), "utf-8");
531533

532534
// Parse goose migration file - only execute "up" section
@@ -567,21 +569,28 @@ export class RunsReplicationHarness {
567569

568570
for (const statement of statements) {
569571
try {
570-
console.log(` Executing: ${statement.substring(0, 80)}...`);
571572
// Use the underlying client's command method
572573
await (clickhouse.writer as any).client.command({ query: statement });
573-
console.log(` ✓ Success`);
574+
successCount++;
574575
} catch (error: any) {
575-
// Ignore "already exists" errors
576-
if (error.message?.includes("already exists") || error.message?.includes("ALREADY_EXISTS")) {
577-
console.log(` ⊘ Skipped (already exists)`);
576+
// Ignore "already exists" errors silently
577+
if (
578+
error.message?.includes("already exists") ||
579+
error.message?.includes("ALREADY_EXISTS") ||
580+
error.code === "57"
581+
) {
582+
skippedCount++;
578583
} else {
579-
console.log(` ✗ Error: ${error.message}`);
584+
console.error(`✗ Migration error in ${file}: ${error.message}`);
580585
throw error;
581586
}
582587
}
583588
}
584589
}
590+
591+
if (successCount > 0 || skippedCount > 0) {
592+
console.log(`Migrations: ${successCount} applied, ${skippedCount} skipped`);
593+
}
585594
} finally {
586595
await clickhouse.close();
587596
}

apps/webapp/test/runsReplicationService.part2.test.ts

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -889,18 +889,12 @@ describe("RunsReplicationService (part 2/2)", () => {
889889
await setTimeout(1000);
890890

891891
expect(batchFlushedEvents?.[0].taskRunInserts).toHaveLength(2);
892-
expect(batchFlushedEvents?.[0].taskRunInserts[0]).toEqual(
893-
expect.objectContaining({
894-
run_id: run.id,
895-
status: "PENDING_VERSION",
896-
})
897-
);
898-
expect(batchFlushedEvents?.[0].taskRunInserts[1]).toEqual(
899-
expect.objectContaining({
900-
run_id: run.id,
901-
status: "COMPLETED_SUCCESSFULLY",
902-
})
903-
);
892+
// taskRunInserts are now arrays, not objects
893+
// Index 3 is run_id, Index 6 is status
894+
expect(batchFlushedEvents?.[0].taskRunInserts[0][3]).toEqual(run.id); // run_id
895+
expect(batchFlushedEvents?.[0].taskRunInserts[0][6]).toEqual("PENDING_VERSION"); // status
896+
expect(batchFlushedEvents?.[0].taskRunInserts[1][3]).toEqual(run.id); // run_id
897+
expect(batchFlushedEvents?.[0].taskRunInserts[1][6]).toEqual("COMPLETED_SUCCESSFULLY"); // status
904898

905899
await runsReplicationService.stop();
906900
}
@@ -1065,23 +1059,24 @@ describe("RunsReplicationService (part 2/2)", () => {
10651059
expect(batchFlushedEvents[0]?.payloadInserts.length).toBeGreaterThan(1);
10661060

10671061
// Verify sorting order: organization_id, project_id, environment_id, created_at, run_id
1062+
// taskRunInserts are now arrays: [0]=environment_id, [1]=organization_id, [2]=project_id, [3]=run_id, [5]=created_at
10681063
for (let i = 1; i < batchFlushedEvents[0]?.taskRunInserts.length; i++) {
10691064
const prev = batchFlushedEvents[0]?.taskRunInserts[i - 1];
10701065
const curr = batchFlushedEvents[0]?.taskRunInserts[i];
10711066

10721067
const prevKey = [
1073-
prev.organization_id,
1074-
prev.project_id,
1075-
prev.environment_id,
1076-
prev.created_at,
1077-
prev.run_id,
1068+
prev[1], // organization_id
1069+
prev[2], // project_id
1070+
prev[0], // environment_id
1071+
prev[5], // created_at
1072+
prev[3], // run_id
10781073
];
10791074
const currKey = [
1080-
curr.organization_id,
1081-
curr.project_id,
1082-
curr.environment_id,
1083-
curr.created_at,
1084-
curr.run_id,
1075+
curr[1], // organization_id
1076+
curr[2], // project_id
1077+
curr[0], // environment_id
1078+
curr[5], // created_at
1079+
curr[3], // run_id
10851080
];
10861081

10871082
const keysAreEqual = prevKey.every((val, idx) => val === currKey[idx]);
@@ -1111,7 +1106,9 @@ describe("RunsReplicationService (part 2/2)", () => {
11111106
for (let i = 1; i < batchFlushedEvents[0]?.payloadInserts.length; i++) {
11121107
const prev = batchFlushedEvents[0]?.payloadInserts[i - 1];
11131108
const curr = batchFlushedEvents[0]?.payloadInserts[i];
1114-
expect(prev.run_id <= curr.run_id).toBeTruthy();
1109+
// payloadInserts are now arrays, not objects
1110+
// Index 0 is run_id
1111+
expect(prev[0] <= curr[0]).toBeTruthy();
11151112
}
11161113

11171114
await runsReplicationService.stop();

0 commit comments

Comments
 (0)