Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 108 additions & 116 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse";
import type { ClickHouse, TaskRunInsertArray, PayloadInsertArray } from "@internal/clickhouse";
import { TASK_RUN_INDEX, PAYLOAD_INDEX } from "@internal/clickhouse";
import { type RedisOptions } from "@internal/redis";
import {
LogicalReplicationClient,
Expand Down Expand Up @@ -81,7 +82,7 @@ type TaskRunInsert = {
export type RunsReplicationServiceEvents = {
message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }];
batchFlushed: [
{ flushId: string; taskRunInserts: TaskRunV2[]; payloadInserts: RawTaskRunPayloadV1[] }
{ flushId: string; taskRunInserts: TaskRunInsertArray[]; payloadInserts: PayloadInsertArray[] },
];
};

Expand Down Expand Up @@ -171,12 +172,9 @@ export class RunsReplicationService {
description: "Insert retry attempts",
});

this._eventsProcessedCounter = this._meter.createCounter(
"runs_replication.events_processed",
{
description: "Replication events processed (inserts, updates, deletes)",
}
);
this._eventsProcessedCounter = this._meter.createCounter("runs_replication.events_processed", {
description: "Replication events processed (inserts, updates, deletes)",
});

this._flushDurationHistogram = this._meter.createHistogram(
"runs_replication.flush_duration_ms",
Expand Down Expand Up @@ -578,32 +576,46 @@ export class RunsReplicationService {

const taskRunInserts = preparedInserts
.map(({ taskRunInsert }) => taskRunInsert)
.filter(Boolean)
.filter((x): x is TaskRunInsertArray => Boolean(x))
// batch inserts in clickhouse are more performant if the items
// are pre-sorted by the primary key
.sort((a, b) => {
if (a.organization_id !== b.organization_id) {
return a.organization_id < b.organization_id ? -1 : 1;
const aOrgId = a[TASK_RUN_INDEX.organization_id] as string;
const bOrgId = b[TASK_RUN_INDEX.organization_id] as string;
if (aOrgId !== bOrgId) {
return aOrgId < bOrgId ? -1 : 1;
}
if (a.project_id !== b.project_id) {
return a.project_id < b.project_id ? -1 : 1;
const aProjId = a[TASK_RUN_INDEX.project_id] as string;
const bProjId = b[TASK_RUN_INDEX.project_id] as string;
if (aProjId !== bProjId) {
return aProjId < bProjId ? -1 : 1;
}
if (a.environment_id !== b.environment_id) {
return a.environment_id < b.environment_id ? -1 : 1;
const aEnvId = a[TASK_RUN_INDEX.environment_id] as string;
const bEnvId = b[TASK_RUN_INDEX.environment_id] as string;
if (aEnvId !== bEnvId) {
return aEnvId < bEnvId ? -1 : 1;
}
if (a.created_at !== b.created_at) {
return a.created_at - b.created_at;
const aCreatedAt = a[TASK_RUN_INDEX.created_at] as number;
const bCreatedAt = b[TASK_RUN_INDEX.created_at] as number;
if (aCreatedAt !== bCreatedAt) {
return aCreatedAt - bCreatedAt;
}
return a.run_id < b.run_id ? -1 : 1;
const aRunId = a[TASK_RUN_INDEX.run_id] as string;
const bRunId = b[TASK_RUN_INDEX.run_id] as string;
if (aRunId === bRunId) return 0;
return aRunId < bRunId ? -1 : 1;
});

const payloadInserts = preparedInserts
.map(({ payloadInsert }) => payloadInsert)
.filter(Boolean)
.filter((x): x is PayloadInsertArray => Boolean(x))
// batch inserts in clickhouse are more performant if the items
// are pre-sorted by the primary key
.sort((a, b) => {
return a.run_id < b.run_id ? -1 : 1;
const aRunId = a[PAYLOAD_INDEX.run_id] as string;
const bRunId = b[PAYLOAD_INDEX.run_id] as string;
if (aRunId === bRunId) return 0;
return aRunId < bRunId ? -1 : 1;
});

span.setAttribute("task_run_inserts", taskRunInserts.length);
Expand Down Expand Up @@ -633,7 +645,6 @@ export class RunsReplicationService {
this.logger.error("Error inserting task run inserts", {
error: taskRunError,
flushId,
runIds: taskRunInserts.map((r) => r.run_id),
});
recordSpanError(span, taskRunError);
}
Expand All @@ -642,7 +653,6 @@ export class RunsReplicationService {
this.logger.error("Error inserting payload inserts", {
error: payloadError,
flushId,
runIds: payloadInserts.map((r) => r.run_id),
});
recordSpanError(span, payloadError);
}
Expand Down Expand Up @@ -760,26 +770,24 @@ export class RunsReplicationService {
#getClickhouseInsertSettings() {
if (this._insertStrategy === "insert") {
return {};
} else if (this._insertStrategy === "insert_async") {
return {
async_insert: 1 as const,
async_insert_max_data_size: "1000000",
async_insert_busy_timeout_ms: 1000,
wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const),
};
}

return {
async_insert: 1 as const,
async_insert_max_data_size: "1000000",
async_insert_busy_timeout_ms: 1000,
wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const),
};
}

async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) {
async #insertTaskRunInserts(taskRunInserts: TaskRunInsertArray[], attempt: number) {
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
taskRunInserts,
{
const [insertError, insertResult] =
await this.options.clickhouse.taskRuns.insertCompactArrays(taskRunInserts, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
}
);
});

if (insertError) {
this.logger.error("Error inserting task run inserts attempt", {
Expand All @@ -795,16 +803,14 @@ export class RunsReplicationService {
});
}

async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[], attempt: number) {
async #insertPayloadInserts(payloadInserts: PayloadInsertArray[], attempt: number) {
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads(
payloadInserts,
{
const [insertError, insertResult] =
await this.options.clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
}
);
});

if (insertError) {
this.logger.error("Error inserting payload inserts attempt", {
Expand All @@ -822,25 +828,15 @@ export class RunsReplicationService {

async #prepareRunInserts(
batchedRun: TaskRunInsert
): Promise<{ taskRunInsert?: TaskRunV2; payloadInsert?: RawTaskRunPayloadV1 }> {
): Promise<{ taskRunInsert?: TaskRunInsertArray; payloadInsert?: PayloadInsertArray }> {
this.logger.debug("Preparing run", {
batchedRun,
});

const { run, _version, event } = batchedRun;

if (!run.environmentType) {
return {
taskRunInsert: undefined,
payloadInsert: undefined,
};
}

if (!run.organizationId) {
return {
taskRunInsert: undefined,
payloadInsert: undefined,
};
if (!run.environmentType || !run.organizationId) {
return {};
}

if (event === "update" || event === "delete" || this._disablePayloadInsert) {
Expand All @@ -852,21 +848,15 @@ export class RunsReplicationService {
_version
);

return {
taskRunInsert,
payloadInsert: undefined,
};
return { taskRunInsert };
}

const [taskRunInsert, payloadInsert] = await Promise.all([
this.#prepareTaskRunInsert(run, run.organizationId, run.environmentType, event, _version),
this.#preparePayloadInsert(run, _version),
]);

return {
taskRunInsert,
payloadInsert,
};
return { taskRunInsert, payloadInsert };
}

async #prepareTaskRunInsert(
Expand All @@ -875,66 +865,68 @@ export class RunsReplicationService {
environmentType: string,
event: "insert" | "update" | "delete",
_version: bigint
): Promise<TaskRunV2> {
): Promise<TaskRunInsertArray> {
const output = await this.#prepareJson(run.output, run.outputType);

return {
environment_id: run.runtimeEnvironmentId,
organization_id: organizationId,
project_id: run.projectId,
run_id: run.id,
updated_at: run.updatedAt.getTime(),
created_at: run.createdAt.getTime(),
status: run.status,
environment_type: environmentType,
friendly_id: run.friendlyId,
engine: run.engine,
task_identifier: run.taskIdentifier,
queue: run.queue,
span_id: run.spanId,
trace_id: run.traceId,
error: { data: run.error },
attempt: run.attemptNumber ?? 1,
schedule_id: run.scheduleId ?? "",
batch_id: run.batchId ?? "",
completed_at: run.completedAt?.getTime(),
started_at: run.startedAt?.getTime(),
executed_at: run.executedAt?.getTime(),
delay_until: run.delayUntil?.getTime(),
queued_at: run.queuedAt?.getTime(),
expired_at: run.expiredAt?.getTime(),
usage_duration_ms: run.usageDurationMs,
cost_in_cents: run.costInCents,
base_cost_in_cents: run.baseCostInCents,
tags: run.runTags ?? [],
task_version: run.taskVersion ?? "",
sdk_version: run.sdkVersion ?? "",
cli_version: run.cliVersion ?? "",
machine_preset: run.machinePreset ?? "",
root_run_id: run.rootTaskRunId ?? "",
parent_run_id: run.parentTaskRunId ?? "",
depth: run.depth,
is_test: run.isTest,
idempotency_key: run.idempotencyKey ?? "",
expiration_ttl: run.ttl ?? "",
output,
concurrency_key: run.concurrencyKey ?? "",
bulk_action_group_ids: run.bulkActionGroupIds ?? [],
worker_queue: run.masterQueue,
max_duration_in_seconds: run.maxDurationInSeconds ?? undefined,
_version: _version.toString(),
_is_deleted: event === "delete" ? 1 : 0,
};
// Return array matching TASK_RUN_COLUMNS order
return [
run.runtimeEnvironmentId, // environment_id
organizationId, // organization_id
run.projectId, // project_id
run.id, // run_id
run.updatedAt.getTime(), // updated_at
run.createdAt.getTime(), // created_at
run.status, // status
environmentType, // environment_type
run.friendlyId, // friendly_id
run.attemptNumber ?? 1, // attempt
run.engine, // engine
run.taskIdentifier, // task_identifier
run.queue, // queue
run.scheduleId ?? "", // schedule_id
run.batchId ?? "", // batch_id
run.completedAt?.getTime() ?? null, // completed_at
run.startedAt?.getTime() ?? null, // started_at
run.executedAt?.getTime() ?? null, // executed_at
run.delayUntil?.getTime() ?? null, // delay_until
run.queuedAt?.getTime() ?? null, // queued_at
run.expiredAt?.getTime() ?? null, // expired_at
run.usageDurationMs ?? 0, // usage_duration_ms
run.costInCents ?? 0, // cost_in_cents
run.baseCostInCents ?? 0, // base_cost_in_cents
output, // output
{ data: run.error }, // error
run.runTags ?? [], // tags
run.taskVersion ?? "", // task_version
run.sdkVersion ?? "", // sdk_version
run.cliVersion ?? "", // cli_version
run.machinePreset ?? "", // machine_preset
run.rootTaskRunId ?? "", // root_run_id
run.parentTaskRunId ?? "", // parent_run_id
run.depth ?? 0, // depth
run.spanId, // span_id
run.traceId, // trace_id
run.idempotencyKey ?? "", // idempotency_key
run.ttl ?? "", // expiration_ttl
run.isTest ?? false, // is_test
_version.toString(), // _version
event === "delete" ? 1 : 0, // _is_deleted
run.concurrencyKey ?? "", // concurrency_key
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
run.masterQueue ?? "", // worker_queue
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
];
}

async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<RawTaskRunPayloadV1> {
async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<PayloadInsertArray> {
const payload = await this.#prepareJson(run.payload, run.payloadType);

return {
run_id: run.id,
created_at: run.createdAt.getTime(),
payload,
};
// Return array matching PAYLOAD_COLUMNS order
return [
run.id, // run_id
run.createdAt.getTime(), // created_at
payload, // payload
];
}

async #prepareJson(
Expand Down
Loading
Loading