Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d05720c
Create performance test harness for runs replication service
ericallam Jan 9, 2026
57861d3
improve test harness producer throughput and better organize run outputs
ericallam Jan 9, 2026
0bf8cfa
use a less CPU-intensive way of inserting task runs
ericallam Jan 9, 2026
be64766
use compact insert strategy for runs
ericallam Jan 10, 2026
32f758a
added back in max duration in seconds
ericallam Jan 10, 2026
4f946a4
cleanup the types
ericallam Jan 10, 2026
df12287
simplify
ericallam Jan 10, 2026
55ed118
much better type safety
ericallam Jan 10, 2026
44c5267
fixed types
ericallam Jan 11, 2026
284c196
fix clickhouse tests
ericallam Jan 11, 2026
a2fa855
really fix clickhouse tests
ericallam Jan 11, 2026
f8640f6
Add object-based insert functions and fix index generation
ericallam Jan 11, 2026
5f40360
Fix TypeScript errors in sort functions
ericallam Jan 11, 2026
ca37d7e
Fix sort comparators to return 0 for equal values
ericallam Jan 11, 2026
d1129d7
Remove performance test harness
ericallam Jan 11, 2026
df3ea39
Remove remaining performance harness artifacts
ericallam Jan 11, 2026
b9db398
Update pnpm-lock.yaml
ericallam Jan 11, 2026
4de8899
Stop dynamically loading superjson
ericallam Jan 12, 2026
777db7c
made accessing run and payload fields more type safe
ericallam Jan 13, 2026
8778403
speed up deduplicating of runs by making it more efficient
ericallam Jan 13, 2026
ec6ec2f
Fixed tests and made them less loggy
ericallam Jan 13, 2026
5ddabad
Fixed tests for realz
ericallam Jan 13, 2026
443f63b
added back in null check in getKey
ericallam Jan 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 108 additions & 116 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse";
import type { ClickHouse, TaskRunInsertArray, PayloadInsertArray } from "@internal/clickhouse";
import { TASK_RUN_INDEX, PAYLOAD_INDEX } from "@internal/clickhouse";
import { type RedisOptions } from "@internal/redis";
import {
LogicalReplicationClient,
Expand Down Expand Up @@ -81,7 +82,7 @@ type TaskRunInsert = {
export type RunsReplicationServiceEvents = {
message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }];
batchFlushed: [
{ flushId: string; taskRunInserts: TaskRunV2[]; payloadInserts: RawTaskRunPayloadV1[] }
{ flushId: string; taskRunInserts: TaskRunInsertArray[]; payloadInserts: PayloadInsertArray[] },
];
};

Expand Down Expand Up @@ -171,12 +172,9 @@ export class RunsReplicationService {
description: "Insert retry attempts",
});

this._eventsProcessedCounter = this._meter.createCounter(
"runs_replication.events_processed",
{
description: "Replication events processed (inserts, updates, deletes)",
}
);
this._eventsProcessedCounter = this._meter.createCounter("runs_replication.events_processed", {
description: "Replication events processed (inserts, updates, deletes)",
});

this._flushDurationHistogram = this._meter.createHistogram(
"runs_replication.flush_duration_ms",
Expand Down Expand Up @@ -578,32 +576,46 @@ export class RunsReplicationService {

const taskRunInserts = preparedInserts
.map(({ taskRunInsert }) => taskRunInsert)
.filter(Boolean)
.filter((x): x is TaskRunInsertArray => Boolean(x))
// batch inserts in clickhouse are more performant if the items
// are pre-sorted by the primary key
.sort((a, b) => {
if (a.organization_id !== b.organization_id) {
return a.organization_id < b.organization_id ? -1 : 1;
const aOrgId = a[TASK_RUN_INDEX.organization_id] as string;
const bOrgId = b[TASK_RUN_INDEX.organization_id] as string;
if (aOrgId !== bOrgId) {
return aOrgId < bOrgId ? -1 : 1;
}
if (a.project_id !== b.project_id) {
return a.project_id < b.project_id ? -1 : 1;
const aProjId = a[TASK_RUN_INDEX.project_id] as string;
const bProjId = b[TASK_RUN_INDEX.project_id] as string;
if (aProjId !== bProjId) {
return aProjId < bProjId ? -1 : 1;
}
if (a.environment_id !== b.environment_id) {
return a.environment_id < b.environment_id ? -1 : 1;
const aEnvId = a[TASK_RUN_INDEX.environment_id] as string;
const bEnvId = b[TASK_RUN_INDEX.environment_id] as string;
if (aEnvId !== bEnvId) {
return aEnvId < bEnvId ? -1 : 1;
}
if (a.created_at !== b.created_at) {
return a.created_at - b.created_at;
const aCreatedAt = a[TASK_RUN_INDEX.created_at] as number;
const bCreatedAt = b[TASK_RUN_INDEX.created_at] as number;
if (aCreatedAt !== bCreatedAt) {
return aCreatedAt - bCreatedAt;
}
return a.run_id < b.run_id ? -1 : 1;
const aRunId = a[TASK_RUN_INDEX.run_id] as string;
const bRunId = b[TASK_RUN_INDEX.run_id] as string;
if (aRunId === bRunId) return 0;
return aRunId < bRunId ? -1 : 1;
});

const payloadInserts = preparedInserts
.map(({ payloadInsert }) => payloadInsert)
.filter(Boolean)
.filter((x): x is PayloadInsertArray => Boolean(x))
// batch inserts in clickhouse are more performant if the items
// are pre-sorted by the primary key
.sort((a, b) => {
return a.run_id < b.run_id ? -1 : 1;
const aRunId = a[PAYLOAD_INDEX.run_id] as string;
const bRunId = b[PAYLOAD_INDEX.run_id] as string;
if (aRunId === bRunId) return 0;
return aRunId < bRunId ? -1 : 1;
});

span.setAttribute("task_run_inserts", taskRunInserts.length);
Expand Down Expand Up @@ -633,7 +645,6 @@ export class RunsReplicationService {
this.logger.error("Error inserting task run inserts", {
error: taskRunError,
flushId,
runIds: taskRunInserts.map((r) => r.run_id),
});
recordSpanError(span, taskRunError);
}
Expand All @@ -642,7 +653,6 @@ export class RunsReplicationService {
this.logger.error("Error inserting payload inserts", {
error: payloadError,
flushId,
runIds: payloadInserts.map((r) => r.run_id),
});
recordSpanError(span, payloadError);
}
Expand Down Expand Up @@ -760,26 +770,24 @@ export class RunsReplicationService {
#getClickhouseInsertSettings() {
if (this._insertStrategy === "insert") {
return {};
} else if (this._insertStrategy === "insert_async") {
return {
async_insert: 1 as const,
async_insert_max_data_size: "1000000",
async_insert_busy_timeout_ms: 1000,
wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const),
};
}

return {
async_insert: 1 as const,
async_insert_max_data_size: "1000000",
async_insert_busy_timeout_ms: 1000,
wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const),
};
}

async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) {
async #insertTaskRunInserts(taskRunInserts: TaskRunInsertArray[], attempt: number) {
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
taskRunInserts,
{
const [insertError, insertResult] =
await this.options.clickhouse.taskRuns.insertCompactArrays(taskRunInserts, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
}
);
});

if (insertError) {
this.logger.error("Error inserting task run inserts attempt", {
Expand All @@ -795,16 +803,14 @@ export class RunsReplicationService {
});
}

async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[], attempt: number) {
async #insertPayloadInserts(payloadInserts: PayloadInsertArray[], attempt: number) {
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads(
payloadInserts,
{
const [insertError, insertResult] =
await this.options.clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
}
);
});

if (insertError) {
this.logger.error("Error inserting payload inserts attempt", {
Expand All @@ -822,25 +828,15 @@ export class RunsReplicationService {

async #prepareRunInserts(
batchedRun: TaskRunInsert
): Promise<{ taskRunInsert?: TaskRunV2; payloadInsert?: RawTaskRunPayloadV1 }> {
): Promise<{ taskRunInsert?: TaskRunInsertArray; payloadInsert?: PayloadInsertArray }> {
this.logger.debug("Preparing run", {
batchedRun,
});

const { run, _version, event } = batchedRun;

if (!run.environmentType) {
return {
taskRunInsert: undefined,
payloadInsert: undefined,
};
}

if (!run.organizationId) {
return {
taskRunInsert: undefined,
payloadInsert: undefined,
};
if (!run.environmentType || !run.organizationId) {
return {};
}

if (event === "update" || event === "delete" || this._disablePayloadInsert) {
Expand All @@ -852,21 +848,15 @@ export class RunsReplicationService {
_version
);

return {
taskRunInsert,
payloadInsert: undefined,
};
return { taskRunInsert };
}

const [taskRunInsert, payloadInsert] = await Promise.all([
this.#prepareTaskRunInsert(run, run.organizationId, run.environmentType, event, _version),
this.#preparePayloadInsert(run, _version),
]);

return {
taskRunInsert,
payloadInsert,
};
return { taskRunInsert, payloadInsert };
}

async #prepareTaskRunInsert(
Expand All @@ -875,66 +865,68 @@ export class RunsReplicationService {
environmentType: string,
event: "insert" | "update" | "delete",
_version: bigint
): Promise<TaskRunV2> {
): Promise<TaskRunInsertArray> {
const output = await this.#prepareJson(run.output, run.outputType);

return {
environment_id: run.runtimeEnvironmentId,
organization_id: organizationId,
project_id: run.projectId,
run_id: run.id,
updated_at: run.updatedAt.getTime(),
created_at: run.createdAt.getTime(),
status: run.status,
environment_type: environmentType,
friendly_id: run.friendlyId,
engine: run.engine,
task_identifier: run.taskIdentifier,
queue: run.queue,
span_id: run.spanId,
trace_id: run.traceId,
error: { data: run.error },
attempt: run.attemptNumber ?? 1,
schedule_id: run.scheduleId ?? "",
batch_id: run.batchId ?? "",
completed_at: run.completedAt?.getTime(),
started_at: run.startedAt?.getTime(),
executed_at: run.executedAt?.getTime(),
delay_until: run.delayUntil?.getTime(),
queued_at: run.queuedAt?.getTime(),
expired_at: run.expiredAt?.getTime(),
usage_duration_ms: run.usageDurationMs,
cost_in_cents: run.costInCents,
base_cost_in_cents: run.baseCostInCents,
tags: run.runTags ?? [],
task_version: run.taskVersion ?? "",
sdk_version: run.sdkVersion ?? "",
cli_version: run.cliVersion ?? "",
machine_preset: run.machinePreset ?? "",
root_run_id: run.rootTaskRunId ?? "",
parent_run_id: run.parentTaskRunId ?? "",
depth: run.depth,
is_test: run.isTest,
idempotency_key: run.idempotencyKey ?? "",
expiration_ttl: run.ttl ?? "",
output,
concurrency_key: run.concurrencyKey ?? "",
bulk_action_group_ids: run.bulkActionGroupIds ?? [],
worker_queue: run.masterQueue,
max_duration_in_seconds: run.maxDurationInSeconds ?? undefined,
_version: _version.toString(),
_is_deleted: event === "delete" ? 1 : 0,
};
// Return array matching TASK_RUN_COLUMNS order
return [
run.runtimeEnvironmentId, // environment_id
organizationId, // organization_id
run.projectId, // project_id
run.id, // run_id
run.updatedAt.getTime(), // updated_at
run.createdAt.getTime(), // created_at
run.status, // status
environmentType, // environment_type
run.friendlyId, // friendly_id
run.attemptNumber ?? 1, // attempt
run.engine, // engine
run.taskIdentifier, // task_identifier
run.queue, // queue
run.scheduleId ?? "", // schedule_id
run.batchId ?? "", // batch_id
run.completedAt?.getTime() ?? null, // completed_at
run.startedAt?.getTime() ?? null, // started_at
run.executedAt?.getTime() ?? null, // executed_at
run.delayUntil?.getTime() ?? null, // delay_until
run.queuedAt?.getTime() ?? null, // queued_at
run.expiredAt?.getTime() ?? null, // expired_at
run.usageDurationMs ?? 0, // usage_duration_ms
run.costInCents ?? 0, // cost_in_cents
run.baseCostInCents ?? 0, // base_cost_in_cents
output, // output
{ data: run.error }, // error
run.runTags ?? [], // tags
run.taskVersion ?? "", // task_version
run.sdkVersion ?? "", // sdk_version
run.cliVersion ?? "", // cli_version
run.machinePreset ?? "", // machine_preset
run.rootTaskRunId ?? "", // root_run_id
run.parentTaskRunId ?? "", // parent_run_id
run.depth ?? 0, // depth
run.spanId, // span_id
run.traceId, // trace_id
run.idempotencyKey ?? "", // idempotency_key
run.ttl ?? "", // expiration_ttl
run.isTest ?? false, // is_test
_version.toString(), // _version
event === "delete" ? 1 : 0, // _is_deleted
run.concurrencyKey ?? "", // concurrency_key
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
run.masterQueue ?? "", // worker_queue
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
];
}

async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<RawTaskRunPayloadV1> {
async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<PayloadInsertArray> {
const payload = await this.#prepareJson(run.payload, run.payloadType);

return {
run_id: run.id,
created_at: run.createdAt.getTime(),
payload,
};
// Return array matching PAYLOAD_COLUMNS order
return [
run.id, // run_id
run.createdAt.getTime(), // created_at
payload, // payload
];
}

async #prepareJson(
Expand Down
Loading