From 71aa4a0de78dce215004be75f62cbc8b7b6b6e18 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 14 May 2025 11:28:28 +0100 Subject: [PATCH] Improvements to the runs replication service to prevent buffer exhaustion --- apps/webapp/app/env.server.ts | 1 + .../runsReplicationInstance.server.ts | 1 + .../services/runsReplicationService.server.ts | 90 +++++++++--------- .../test/runsReplicationService.test.ts | 20 ++-- .../schema/004_create_task_runs_v2.sql | 94 +++++++++++++++++++ .../clickhouse/src/taskRuns.test.ts | 6 +- internal-packages/clickhouse/src/taskRuns.ts | 8 +- 7 files changed, 161 insertions(+), 59 deletions(-) create mode 100644 internal-packages/clickhouse/schema/004_create_task_runs_v2.sql diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 31e2381dc7..beceac9a5f 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -768,6 +768,7 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT: z.coerce.number().int().default(240), RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500), + RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"), }); export type Environment = z.infer; diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index e956c2b93d..4fcb734379 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -49,6 +49,7 @@ function initializeRunsReplicationInstance() { leaderLockRetryIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS, ackIntervalSeconds: env.RUN_REPLICATION_ACK_INTERVAL_SECONDS, logLevel: env.RUN_REPLICATION_LOG_LEVEL, + waitForAsyncInsert: env.RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT === "1", tracer: provider.getTracer("runs-replication-service"), }); diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index c7fe512474..bb81cd5dc4 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -1,4 +1,4 @@ -import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV1 } from "@internal/clickhouse"; +import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse"; import { RedisOptions } from "@internal/redis"; import { LogicalReplicationClient, @@ -7,7 +7,7 @@ import { type MessageUpdate, type PgoutputMessage, } from "@internal/replication"; -import { startSpan, trace, type Tracer } from "@internal/tracing"; +import { recordSpanError, startSpan, trace, type Tracer } from "@internal/tracing"; import { Logger, LogLevel } from "@trigger.dev/core/logger"; import { tryCatch } from "@trigger.dev/core/utils"; import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization"; @@ -50,6 +50,7 @@ export type RunsReplicationServiceOptions = { logger?: Logger; logLevel?: LogLevel; tracer?: Tracer; + waitForAsyncInsert?: boolean; }; type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" }; @@ -107,6 +108,7 @@ export class RunsReplicationService { ackIntervalSeconds: options.ackIntervalSeconds ?? 10, leaderLockRetryCount: options.leaderLockRetryCount ?? 240, leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500, + tracer: options.tracer, }); this._concurrentFlushScheduler = new ConcurrentFlushScheduler({ @@ -115,6 +117,7 @@ export class RunsReplicationService { maxConcurrency: options.maxFlushConcurrency ?? 100, callback: this.#flushBatch.bind(this), logger: new Logger("ConcurrentFlushScheduler", options.logLevel ?? "info"), + tracer: options.tracer, }); this._replicationClient.events.on("data", async ({ lsn, log, parseDuration }) => { @@ -404,9 +407,6 @@ export class RunsReplicationService { async #flushBatch(flushId: string, batch: Array) { if (batch.length === 0) { - this.logger.debug("No runs to flush", { - flushId, - }); return; } @@ -437,10 +437,8 @@ export class RunsReplicationService { payloadInserts: payloadInserts.length, }); - await Promise.all([ - this.#insertTaskRunInserts(taskRunInserts), - this.#insertPayloadInserts(payloadInserts), - ]); + await this.#insertTaskRunInserts(taskRunInserts); + await this.#insertPayloadInserts(payloadInserts); this.logger.debug("Flushed inserts", { flushId, @@ -450,51 +448,59 @@ export class RunsReplicationService { }); } - async #insertTaskRunInserts(taskRunInserts: TaskRunV1[]) { - const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert( - taskRunInserts, - { - params: { - clickhouse_settings: { - wait_for_async_insert: 1, + async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) { + return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { + const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert( + taskRunInserts, + { + params: { + clickhouse_settings: { + wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0, + }, }, - }, - } - ); + } + ); - if (insertError) { - this.logger.error("Error inserting task run inserts", { - error: insertError, - }); - } + if (insertError) { + this.logger.error("Error inserting task run inserts", { + error: insertError, + }); + + recordSpanError(span, insertError); + } - return insertResult; + return insertResult; + }); } async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[]) { - const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads( - payloadInserts, - { - params: { - clickhouse_settings: { - wait_for_async_insert: 1, + return await startSpan(this._tracer, "insertPayloadInserts", async (span) => { + const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads( + payloadInserts, + { + params: { + clickhouse_settings: { + wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0, + }, }, - }, - } - ); + } + ); - if (insertError) { - this.logger.error("Error inserting payload inserts", { - error: insertError, - }); - } + if (insertError) { + this.logger.error("Error inserting payload inserts", { + error: insertError, + }); - return insertResult; + recordSpanError(span, insertError); + } + + return insertResult; + }); } async #prepareRunInserts( batchedRun: TaskRunInsert - ): Promise<{ taskRunInsert?: TaskRunV1; payloadInsert?: RawTaskRunPayloadV1 }> { + ): Promise<{ taskRunInsert?: TaskRunV2; payloadInsert?: RawTaskRunPayloadV1 }> { this.logger.debug("Preparing run", { batchedRun, }); @@ -547,7 +553,7 @@ export class RunsReplicationService { environmentType: string, event: "insert" | "update" | "delete", _version: bigint - ): Promise { + ): Promise { const output = await this.#prepareJson(run.output, run.outputType); return { diff --git a/apps/webapp/test/runsReplicationService.test.ts b/apps/webapp/test/runsReplicationService.test.ts index 90802da273..9edf0a66d0 100644 --- a/apps/webapp/test/runsReplicationService.test.ts +++ b/apps/webapp/test/runsReplicationService.test.ts @@ -90,7 +90,7 @@ describe("RunsReplicationService", () => { // Check that the row was replicated to clickhouse const queryRuns = clickhouse.reader.query({ name: "runs-replication", - query: "SELECT * FROM trigger_dev.task_runs_v1", + query: "SELECT * FROM trigger_dev.task_runs_v2", schema: z.any(), }); @@ -279,7 +279,7 @@ describe("RunsReplicationService", () => { // Query ClickHouse for the replicated run const queryRuns = clickhouse.reader.query({ name: "runs-replication-batching", - query: "SELECT * FROM trigger_dev.task_runs_v1 WHERE run_id = {run_id:String}", + query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {run_id:String}", schema: z.any(), params: z.object({ run_id: z.string() }), }); @@ -604,7 +604,7 @@ describe("RunsReplicationService", () => { // Query ClickHouse for the replicated run const queryRuns = clickhouse.reader.query({ name: "runs-replication-update", - query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id = {run_id:String}", + query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}", schema: z.any(), params: z.object({ run_id: z.string() }), }); @@ -713,7 +713,7 @@ describe("RunsReplicationService", () => { // Query ClickHouse for the replicated run using FINAL const queryRuns = clickhouse.reader.query({ name: "runs-replication-delete", - query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id = {run_id:String}", + query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}", schema: z.any(), params: z.object({ run_id: z.string() }), }); @@ -838,7 +838,7 @@ describe("RunsReplicationService", () => { // Query ClickHouse for both runs using FINAL const queryRuns = clickhouse.reader.query({ name: "runs-replication-shutdown-handover", - query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL ORDER BY created_at ASC", + query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL ORDER BY created_at ASC", schema: z.any(), }); @@ -966,7 +966,7 @@ describe("RunsReplicationService", () => { // Query ClickHouse for the run using FINAL const queryRuns = clickhouse.reader.query({ name: "runs-replication-shutdown-after-processed", - query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id = {run_id:String}", + query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}", schema: z.any(), params: z.object({ run_id: z.string() }), }); @@ -1117,7 +1117,7 @@ describe("RunsReplicationService", () => { // Query ClickHouse for all runs using FINAL const queryRuns = clickhouse.reader.query({ name: "runs-replication-stress-bulk-insert", - query: `SELECT run_id, friendly_id, trace_id, task_identifier FROM trigger_dev.task_runs_v1 FINAL`, + query: `SELECT run_id, friendly_id, trace_id, task_identifier FROM trigger_dev.task_runs_v2 FINAL`, schema: z.any(), }); @@ -1236,7 +1236,7 @@ describe("RunsReplicationService", () => { // Query ClickHouse for all runs using FINAL const queryRuns = clickhouse.reader.query({ name: "runs-replication-stress-bulk-insert", - query: `SELECT * FROM trigger_dev.task_runs_v1 FINAL`, + query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL`, schema: z.any(), }); @@ -1375,7 +1375,7 @@ describe("RunsReplicationService", () => { // Query ClickHouse for both runs using FINAL const queryRuns = clickhouse.reader.query({ name: "runs-replication-multi-event-tx", - query: `SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id IN ({run_id_1:String}, {run_id_2:String})`, + query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id IN ({run_id_1:String}, {run_id_2:String})`, schema: z.any(), params: z.object({ run_id_1: z.string(), run_id_2: z.string() }), }); @@ -1488,7 +1488,7 @@ describe("RunsReplicationService", () => { // Query ClickHouse for all runs using FINAL const queryRuns = clickhouse.reader.query({ name: "runs-replication-long-tx", - query: `SELECT * FROM trigger_dev.task_runs_v1 FINAL`, + query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL`, schema: z.any(), }); diff --git a/internal-packages/clickhouse/schema/004_create_task_runs_v2.sql b/internal-packages/clickhouse/schema/004_create_task_runs_v2.sql new file mode 100644 index 0000000000..cf52a97168 --- /dev/null +++ b/internal-packages/clickhouse/schema/004_create_task_runs_v2.sql @@ -0,0 +1,94 @@ +-- +goose Up + +/* + This is the second version of the task runs table. + The main change is we've added organization_id and project_id to the sort key, and removed the toDate(created_at) and task_identifier columns from the sort key. + We will add a skip index for the task_identifier column in a future migration. +*/ +CREATE TABLE trigger_dev.task_runs_v2 +( + /* ─── ids & hierarchy ─────────────────────────────────────── */ + environment_id String, + organization_id String, + project_id String, + run_id String, + + environment_type LowCardinality(String), + friendly_id String, + attempt UInt8 DEFAULT 1, + + /* ─── enums / status ──────────────────────────────────────── */ + engine LowCardinality(String), + status LowCardinality(String), + + /* ─── queue / concurrency / schedule ─────────────────────── */ + task_identifier String, + queue String, + + schedule_id String, + batch_id String, + + /* ─── related runs ─────────────────────────────────────────────── */ + root_run_id String, + parent_run_id String, + depth UInt8 DEFAULT 0, + + /* ─── telemetry ─────────────────────────────────────────────── */ + span_id String, + trace_id String, + idempotency_key String, + + /* ─── timing ─────────────────────────────────────────────── */ + created_at DateTime64(3), + updated_at DateTime64(3), + started_at Nullable(DateTime64(3)), + executed_at Nullable(DateTime64(3)), + completed_at Nullable(DateTime64(3)), + delay_until Nullable(DateTime64(3)), + queued_at Nullable(DateTime64(3)), + expired_at Nullable(DateTime64(3)), + expiration_ttl String, + + /* ─── cost / usage ───────────────────────────────────────── */ + usage_duration_ms UInt32 DEFAULT 0, + cost_in_cents Float64 DEFAULT 0, + base_cost_in_cents Float64 DEFAULT 0, + + /* ─── payload & context ──────────────────────────────────── */ + output JSON(max_dynamic_paths = 1024), + error JSON(max_dynamic_paths = 64), + + /* ─── tagging / versions ─────────────────────────────────── */ + tags Array(String) CODEC(ZSTD(1)), + task_version String CODEC(LZ4), + sdk_version String CODEC(LZ4), + cli_version String CODEC(LZ4), + machine_preset LowCardinality(String) CODEC(LZ4), + + is_test UInt8 DEFAULT 0, + + /* ─── commit lsn ─────────────────────────────────────────────── */ + _version UInt64, + _is_deleted UInt8 DEFAULT 0 +) +ENGINE = ReplacingMergeTree(_version, _is_deleted) +PARTITION BY toYYYYMM(created_at) +ORDER BY (organization_id, project_id, environment_id, created_at, run_id) +SETTINGS enable_json_type = 1; + +/* Fast tag filtering */ +ALTER TABLE trigger_dev.task_runs_v2 + ADD INDEX idx_tags tags TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4; + +CREATE VIEW trigger_dev.tmp_eric_task_runs_full_v2 AS +SELECT + s.*, + p.payload as payload +FROM trigger_dev.task_runs_v2 AS s FINAL +LEFT JOIN trigger_dev.raw_task_runs_payload_v1 AS p ON s.run_id = p.run_id +SETTINGS enable_json_type = 1; + + +-- +goose Down +DROP TABLE IF EXISTS trigger_dev.task_runs_v2; +DROP VIEW IF EXISTS trigger_dev.tmp_eric_task_runs_full_v2 \ No newline at end of file diff --git a/internal-packages/clickhouse/src/taskRuns.test.ts b/internal-packages/clickhouse/src/taskRuns.test.ts index 2374d5de19..8f7145d1dd 100644 --- a/internal-packages/clickhouse/src/taskRuns.test.ts +++ b/internal-packages/clickhouse/src/taskRuns.test.ts @@ -3,7 +3,7 @@ import { z } from "zod"; import { ClickhouseClient } from "./client/client.js"; import { insertRawTaskRunPayloads, insertTaskRuns } from "./taskRuns.js"; -describe("Task Runs V1", () => { +describe("Task Runs V2", () => { clickhouseTest("should be able to insert task runs", async ({ clickhouseContainer }) => { const client = new ClickhouseClient({ name: "test", @@ -70,7 +70,7 @@ describe("Task Runs V1", () => { const query = client.query({ name: "query-task-runs", - query: "SELECT * FROM trigger_dev.task_runs_v1", + query: "SELECT * FROM trigger_dev.task_runs_v2", schema: z.object({ environment_id: z.string(), run_id: z.string(), @@ -226,7 +226,7 @@ describe("Task Runs V1", () => { const query = client.query({ name: "query-task-runs", - query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL", + query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL", schema: z.object({ environment_id: z.string(), run_id: z.string(), diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index 9a478f523d..ca983b1b60 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -2,7 +2,7 @@ import { ClickHouseSettings } from "@clickhouse/client"; import { z } from "zod"; import { ClickhouseWriter } from "./client/types.js"; -export const TaskRunV1 = z.object({ +export const TaskRunV2 = z.object({ environment_id: z.string(), organization_id: z.string(), project_id: z.string(), @@ -46,13 +46,13 @@ export const TaskRunV1 = z.object({ _is_deleted: z.number().int().default(0), }); -export type TaskRunV1 = z.input; +export type TaskRunV2 = z.input; export function insertTaskRuns(ch: ClickhouseWriter, settings?: ClickHouseSettings) { return ch.insert({ name: "insertTaskRuns", - table: "trigger_dev.task_runs_v1", - schema: TaskRunV1, + table: "trigger_dev.task_runs_v2", + schema: TaskRunV2, settings: { async_insert: 1, wait_for_async_insert: 0,