diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 8800686236..65a7059e23 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -911,6 +911,7 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3), RUN_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100), RUN_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000), + RUN_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"), // Clickhouse CLICKHOUSE_URL: z.string(), diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 86b17601a7..45b7b7a971 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -65,6 +65,7 @@ function initializeRunsReplicationInstance() { insertMaxRetries: env.RUN_REPLICATION_INSERT_MAX_RETRIES, insertBaseDelayMs: env.RUN_REPLICATION_INSERT_BASE_DELAY_MS, insertMaxDelayMs: env.RUN_REPLICATION_INSERT_MAX_DELAY_MS, + insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY, }); if (env.RUN_REPLICATION_ENABLED === "1") { diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 3acd04e412..aeaea7a046 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -53,6 +53,7 @@ export type RunsReplicationServiceOptions = { logLevel?: LogLevel; tracer?: Tracer; waitForAsyncInsert?: boolean; + insertStrategy?: "insert" | "insert_async"; // Retry configuration for insert operations insertMaxRetries?: number; insertBaseDelayMs?: number; @@ -90,6 +91,7 @@ export class RunsReplicationService { private _insertMaxRetries: number; private _insertBaseDelayMs: number; private _insertMaxDelayMs: number; + private _insertStrategy: "insert" | "insert_async"; public readonly events: EventEmitter; @@ -101,6 +103,8 @@ export class RunsReplicationService { this._acknowledgeTimeoutMs = options.acknowledgeTimeoutMs ?? 1_000; + this._insertStrategy = options.insertStrategy ?? "insert"; + this._replicationClient = new LogicalReplicationClient({ pgConfig: { connectionString: options.pgConnectionUrl, @@ -598,15 +602,26 @@ export class RunsReplicationService { return delay + jitter; } + #getClickhouseInsertSettings() { + if (this._insertStrategy === "insert") { + return {}; + } else if (this._insertStrategy === "insert_async") { + return { + async_insert: 1 as const, + async_insert_max_data_size: "1000000", + async_insert_busy_timeout_ms: 1000, + wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const), + }; + } + } + async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) { return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert( taskRunInserts, { params: { - clickhouse_settings: { - wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0, - }, + clickhouse_settings: this.#getClickhouseInsertSettings(), }, } ); @@ -631,9 +646,7 @@ export class RunsReplicationService { payloadInserts, { params: { - clickhouse_settings: { - wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0, - }, + clickhouse_settings: this.#getClickhouseInsertSettings(), }, } ); diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index e30affaf84..1d11477208 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -56,10 +56,6 @@ export function insertTaskRuns(ch: ClickhouseWriter, settings?: ClickHouseSettin table: "trigger_dev.task_runs_v2", schema: TaskRunV2, settings: { - async_insert: 1, - wait_for_async_insert: 0, - async_insert_max_data_size: "1000000", - async_insert_busy_timeout_ms: 1000, enable_json_type: 1, type_json_skip_duplicated_paths: 1, ...settings,