From 32271ce3b956bb2c7c062270be3172c6a4b5e590 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 14 May 2025 14:36:23 +0100 Subject: [PATCH 1/4] runs replication leader lock expiration fix --- apps/webapp/app/env.server.ts | 2 +- .../runsReplicationInstance.server.ts | 2 +- .../services/runsReplicationService.server.ts | 6 +- .../test/runsReplicationService.test.ts | 129 ++++++++++++++++++ internal-packages/replication/src/client.ts | 100 ++++++++++---- 5 files changed, 207 insertions(+), 32 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index beceac9a5f..2952f7eca9 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -766,7 +766,7 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000), RUN_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10), RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), - RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT: z.coerce.number().int().default(240), + RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000), RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500), RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"), }); diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 4fcb734379..d71498c459 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -45,7 +45,7 @@ function initializeRunsReplicationInstance() { flushBatchSize: env.RUN_REPLICATION_FLUSH_BATCH_SIZE, leaderLockTimeoutMs: env.RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS, leaderLockExtendIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS, - leaderLockRetryCount: env.RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT, + leaderLockAcquireAdditionalTimeMs: env.RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS, leaderLockRetryIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS, ackIntervalSeconds: env.RUN_REPLICATION_ACK_INTERVAL_SECONDS, logLevel: env.RUN_REPLICATION_LOG_LEVEL, diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index bb81cd5dc4..e768922b39 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -43,7 +43,7 @@ export type RunsReplicationServiceOptions = { flushBatchSize?: number; leaderLockTimeoutMs?: number; leaderLockExtendIntervalMs?: number; - leaderLockRetryCount?: number; + leaderLockAcquireAdditionalTimeMs?: number; leaderLockRetryIntervalMs?: number; ackIntervalSeconds?: number; acknowledgeTimeoutMs?: number; @@ -102,11 +102,11 @@ export class RunsReplicationService { redisOptions: options.redisOptions, autoAcknowledge: false, publicationActions: ["insert", "update", "delete"], - logger: new Logger("LogicalReplicationClient", options.logLevel ?? "info"), + logger: options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"), leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000, leaderLockExtendIntervalMs: options.leaderLockExtendIntervalMs ?? 10_000, ackIntervalSeconds: options.ackIntervalSeconds ?? 10, - leaderLockRetryCount: options.leaderLockRetryCount ?? 240, + leaderLockAcquireAdditionalTimeMs: options.leaderLockAcquireAdditionalTimeMs ?? 10_000, leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500, tracer: options.tracer, }); diff --git a/apps/webapp/test/runsReplicationService.test.ts b/apps/webapp/test/runsReplicationService.test.ts index 9edf0a66d0..4fa173563a 100644 --- a/apps/webapp/test/runsReplicationService.test.ts +++ b/apps/webapp/test/runsReplicationService.test.ts @@ -1030,6 +1030,135 @@ describe("RunsReplicationService", () => { } ); + containerTest( + "should handover leadership to a second service, and the second service should be able to extend the leader lock", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-shutdown-handover", + }); + + // Service A + const runsReplicationServiceA = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication-shutdown-handover", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + leaderLockAcquireAdditionalTimeMs: 10_000, + ackIntervalSeconds: 5, + logger: new Logger("runs-replication-shutdown-handover-a", "debug"), + }); + + await runsReplicationServiceA.start(); + + // Service A + const runsReplicationServiceB = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication-shutdown-handover", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + leaderLockAcquireAdditionalTimeMs: 10_000, + ackIntervalSeconds: 5, + logger: new Logger("runs-replication-shutdown-handover-b", "debug"), + }); + + // Now we need to initiate starting the second service, and after 6 seconds, we need to shutdown the first service + await Promise.all([ + setTimeout(6000).then(() => runsReplicationServiceA.stop()), + runsReplicationServiceB.start(), + ]); + + const organization = await prisma.organization.create({ + data: { + title: "test", + slug: "test", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + // Now we insert a row into the table + const taskRun = await prisma.taskRun.create({ + data: { + friendlyId: "run_1234", + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: "1234", + spanId: "1234", + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + + await setTimeout(10_000); + + // Check that the row was replicated to clickhouse + const queryRuns = clickhouse.reader.query({ + name: "runs-replication", + query: "SELECT * FROM trigger_dev.task_runs_v2", + schema: z.any(), + }); + + const [queryError, result] = await queryRuns({}); + + expect(queryError).toBeNull(); + expect(result?.length).toBe(1); + expect(result?.[0]).toEqual( + expect.objectContaining({ + run_id: taskRun.id, + friendly_id: taskRun.friendlyId, + task_identifier: taskRun.taskIdentifier, + environment_id: runtimeEnvironment.id, + project_id: project.id, + organization_id: organization.id, + environment_type: "DEVELOPMENT", + engine: "V2", + }) + ); + + await runsReplicationServiceB.stop(); + } + ); + containerTest( "should replicate all 1,000 TaskRuns inserted in bulk to ClickHouse", async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { diff --git a/internal-packages/replication/src/client.ts b/internal-packages/replication/src/client.ts index d5c56cdfae..d49f753358 100644 --- a/internal-packages/replication/src/client.ts +++ b/internal-packages/replication/src/client.ts @@ -53,14 +53,14 @@ export interface LogicalReplicationClientOptions { leaderLockExtendIntervalMs?: number; /** - * The number of times to retry acquiring the leader lock (default: 120) + * The interval in ms to retry acquiring the leader lock (default: 500) */ - leaderLockRetryCount?: number; + leaderLockRetryIntervalMs?: number; /** - * The interval in ms to retry acquiring the leader lock (default: 500) + * The additional time in ms to retry acquiring the leader lock (default: 1000ms) */ - leaderLockRetryIntervalMs?: number; + leaderLockAcquireAdditionalTimeMs?: number; /** * The interval in seconds to automatically acknowledge the last LSN if no ack has been sent (default: 10) @@ -97,7 +97,7 @@ export class LogicalReplicationClient { private lastAcknowledgedLsn: string | null = null; private leaderLockTimeoutMs: number; private leaderLockExtendIntervalMs: number; - private leaderLockRetryCount: number; + private leaderLockAcquireAdditionalTimeMs: number; private leaderLockRetryIntervalMs: number; private leaderLockHeartbeatTimer: NodeJS.Timeout | null = null; private ackIntervalSeconds: number; @@ -124,7 +124,7 @@ export class LogicalReplicationClient { this.leaderLockTimeoutMs = options.leaderLockTimeoutMs ?? 30000; this.leaderLockExtendIntervalMs = options.leaderLockExtendIntervalMs ?? 10000; - this.leaderLockRetryCount = options.leaderLockRetryCount ?? 120; + this.leaderLockAcquireAdditionalTimeMs = options.leaderLockAcquireAdditionalTimeMs ?? 1000; this.leaderLockRetryIntervalMs = options.leaderLockRetryIntervalMs ?? 500; this.ackIntervalSeconds = options.ackIntervalSeconds ?? 10; @@ -578,34 +578,74 @@ export class LogicalReplicationClient { } async #acquireLeaderLock(): Promise { - try { - this.leaderLock = await this.redlock.acquire( - [`logical-replication-client:${this.options.name}`], - this.leaderLockTimeoutMs, - { - retryCount: this.leaderLockRetryCount, - retryDelay: this.leaderLockRetryIntervalMs, - } - ); - } catch (err) { - this.logger.error("Leader election failed", { - name: this.options.name, - table: this.options.table, - slotName: this.options.slotName, - publicationName: this.options.publicationName, - retryCount: this.leaderLockRetryCount, - retryIntervalMs: this.leaderLockRetryIntervalMs, - error: err, - }); + const startTime = Date.now(); + const maxWaitTime = this.leaderLockTimeoutMs + this.leaderLockAcquireAdditionalTimeMs; - return false; + this.logger.debug("Acquiring leader lock", { + name: this.options.name, + slotName: this.options.slotName, + publicationName: this.options.publicationName, + maxWaitTime, + }); + + let attempt = 0; + + while (Date.now() - startTime < maxWaitTime) { + try { + this.leaderLock = await this.redlock.acquire( + [`logical-replication-client:${this.options.name}`], + this.leaderLockTimeoutMs + ); + + this.logger.debug("Acquired leader lock", { + name: this.options.name, + slotName: this.options.slotName, + publicationName: this.options.publicationName, + lockTimeoutMs: this.leaderLockTimeoutMs, + lockExtendIntervalMs: this.leaderLockExtendIntervalMs, + lock: this.leaderLock, + attempt, + }); + return true; + } catch (err) { + attempt++; + + this.logger.debug("Failed to acquire leader lock, retrying", { + name: this.options.name, + slotName: this.options.slotName, + publicationName: this.options.publicationName, + attempt, + retryIntervalMs: this.leaderLockRetryIntervalMs, + error: err, + }); + + await new Promise((resolve) => setTimeout(resolve, this.leaderLockRetryIntervalMs)); + } } - return true; + this.logger.error("Leader election failed after retries", { + name: this.options.name, + table: this.options.table, + slotName: this.options.slotName, + publicationName: this.options.publicationName, + totalAttempts: attempt, + totalWaitTimeMs: Date.now() - startTime, + }); + return false; } async #releaseLeaderLock() { if (!this.leaderLock) return; + + this.logger.debug("Releasing leader lock", { + name: this.options.name, + slotName: this.options.slotName, + publicationName: this.options.publicationName, + lockTimeoutMs: this.leaderLockTimeoutMs, + lockExtendIntervalMs: this.leaderLockExtendIntervalMs, + lock: this.leaderLock, + }); + const [releaseError] = await tryCatch(this.leaderLock.release()); this.leaderLock = null; @@ -631,6 +671,9 @@ export class LogicalReplicationClient { name: this.options.name, slotName: this.options.slotName, publicationName: this.options.publicationName, + lockTimeoutMs: this.leaderLockTimeoutMs, + lockExtendIntervalMs: this.leaderLockExtendIntervalMs, + lock: this.leaderLock, }); } catch (err) { this.logger.error("Failed to extend leader lock", { @@ -638,6 +681,9 @@ export class LogicalReplicationClient { slotName: this.options.slotName, publicationName: this.options.publicationName, error: err, + lockTimeoutMs: this.leaderLockTimeoutMs, + lockExtendIntervalMs: this.leaderLockExtendIntervalMs, + lock: this.leaderLock, }); // Optionally emit an error or handle loss of leadership this.events.emit("error", err instanceof Error ? err : new Error(String(err))); From ffb56956b7ec3a998340ec5b94dde914d9d0af86 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 14 May 2025 15:04:16 +0100 Subject: [PATCH 2/4] Allow configuring the container image --max-old-space-size using NODE_MAX_OLD_SPACE_SIZE --- docker/scripts/entrypoint.sh | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/docker/scripts/entrypoint.sh b/docker/scripts/entrypoint.sh index 3c7b3165ab..3c88fff952 100755 --- a/docker/scripts/entrypoint.sh +++ b/docker/scripts/entrypoint.sh @@ -27,6 +27,13 @@ cp internal-packages/database/prisma/schema.prisma apps/webapp/prisma/ cp node_modules/@prisma/engines/*.node apps/webapp/prisma/ cd /triggerdotdev/apps/webapp -# exec dumb-init pnpm run start:local -NODE_PATH='/triggerdotdev/node_modules/.pnpm/node_modules' exec dumb-init node --max-old-space-size=8192 ./build/server.js + + +# Decide how much old-space memory Node should get. +# Use $NODE_MAX_OLD_SPACE_SIZE if it’s set; otherwise fall back to 8192. +MAX_OLD_SPACE_SIZE="${NODE_MAX_OLD_SPACE_SIZE:-8192}" + +echo "Setting max old space size to ${MAX_OLD_SPACE_SIZE}" + +NODE_PATH='/triggerdotdev/node_modules/.pnpm/node_modules' exec dumb-init node --max-old-space-size=${MAX_OLD_SPACE_SIZE} ./build/server.js From a98e58c3ee4a2097548e638cd3eb7d87ad307cf0 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 14 May 2025 15:14:42 +0100 Subject: [PATCH 3/4] Ability to configure the clickhouse keep alive settings --- apps/webapp/app/env.server.ts | 2 ++ .../runsReplicationInstance.server.ts | 5 +++ .../clickhouse/src/client/client.ts | 15 ++++++-- internal-packages/clickhouse/src/index.ts | 36 ++++++++++++++----- 4 files changed, 46 insertions(+), 12 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 2952f7eca9..a74d5d64f0 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -769,6 +769,8 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000), RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500), RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"), + RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"), + RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().default(9_000), }); export type Environment = z.infer; diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index d71498c459..c51defe214 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -23,6 +23,11 @@ function initializeRunsReplicationInstance() { const clickhouse = new ClickHouse({ url: env.RUN_REPLICATION_CLICKHOUSE_URL, name: "runs-replication", + keepAlive: { + enabled: env.RUN_REPLICATION_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.RUN_REPLICATION_LOG_LEVEL, }); const service = new RunsReplicationService({ diff --git a/internal-packages/clickhouse/src/client/client.ts b/internal-packages/clickhouse/src/client/client.ts index eab28fec16..96f04d547e 100644 --- a/internal-packages/clickhouse/src/client/client.ts +++ b/internal-packages/clickhouse/src/client/client.ts @@ -15,14 +15,22 @@ import type { ClickhouseWriter, } from "./types.js"; import { generateErrorMessage } from "zod-error"; -import { Logger } from "@trigger.dev/core/logger"; +import { Logger, type LogLevel } from "@trigger.dev/core/logger"; +import type { Agent as HttpAgent } from "http"; +import type { Agent as HttpsAgent } from "https"; export type ClickhouseConfig = { name: string; url: string; tracer?: Tracer; + keepAlive?: { + enabled?: boolean; + idleSocketTtl?: number; + }; + httpAgent?: HttpAgent | HttpsAgent; clickhouseSettings?: ClickHouseSettings; logger?: Logger; + logLevel?: LogLevel; }; export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { @@ -33,11 +41,12 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { constructor(config: ClickhouseConfig) { this.name = config.name; - this.logger = config.logger ?? new Logger("ClickhouseClient", "debug"); + this.logger = config.logger ?? new Logger("ClickhouseClient", config.logLevel ?? "info"); this.client = createClient({ url: config.url, - + keep_alive: config.keepAlive, + http_agent: config.httpAgent, clickhouse_settings: { ...config.clickhouseSettings, output_format_json_quote_64bit_integers: 0, diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index 5d33b45410..51066ffed8 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -3,29 +3,38 @@ import { ClickhouseClient } from "./client/client.js"; import { ClickhouseReader, ClickhouseWriter } from "./client/types.js"; import { NoopClient } from "./client/noop.js"; import { insertTaskRuns, insertRawTaskRunPayloads } from "./taskRuns.js"; -import { Logger } from "@trigger.dev/core/logger"; +import { Logger, type LogLevel } from "@trigger.dev/core/logger"; +import type { Agent as HttpAgent } from "http"; +import type { Agent as HttpsAgent } from "https"; export type * from "./taskRuns.js"; +export type ClickhouseCommonConfig = { + keepAlive?: { + enabled?: boolean; + idleSocketTtl?: number; + }; + httpAgent?: HttpAgent | HttpsAgent; + clickhouseSettings?: ClickHouseSettings; + logger?: Logger; + logLevel?: LogLevel; +}; + export type ClickHouseConfig = - | { + | ({ name?: string; url?: string; writerUrl?: never; readerUrl?: never; - clickhouseSettings?: ClickHouseSettings; - logger?: Logger; - } - | { + } & ClickhouseCommonConfig) + | ({ name?: never; url?: never; writerName?: string; writerUrl: string; readerName?: string; readerUrl: string; - clickhouseSettings?: ClickHouseSettings; - logger?: Logger; - }; + } & ClickhouseCommonConfig); export class ClickHouse { public readonly reader: ClickhouseReader; @@ -47,6 +56,9 @@ export class ClickHouse { url: config.url, clickhouseSettings: config.clickhouseSettings, logger: this.logger, + logLevel: config.logLevel, + keepAlive: config.keepAlive, + httpAgent: config.httpAgent, }); this.reader = client; this.writer = client; @@ -58,12 +70,18 @@ export class ClickHouse { url: config.readerUrl, clickhouseSettings: config.clickhouseSettings, logger: this.logger, + logLevel: config.logLevel, + keepAlive: config.keepAlive, + httpAgent: config.httpAgent, }); this.writer = new ClickhouseClient({ name: config.writerName ?? "clickhouse-writer", url: config.writerUrl, clickhouseSettings: config.clickhouseSettings, logger: this.logger, + logLevel: config.logLevel, + keepAlive: config.keepAlive, + httpAgent: config.httpAgent, }); this._splitClients = true; From 55ddfeed0551f51c74135671c9f0ca75d33b6355 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 14 May 2025 15:19:36 +0100 Subject: [PATCH 4/4] Add some logging because we might not be able to do telemetry --- .../services/runsReplicationService.server.ts | 83 +++++++++++++------ 1 file changed, 56 insertions(+), 27 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index e768922b39..90af5768fd 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -330,10 +330,6 @@ export class RunsReplicationService { return; } - this.logger.debug("Handling transaction", { - transaction, - }); - const lsnToUInt64Start = process.hrtime.bigint(); // If there are events, we need to handle them @@ -349,20 +345,32 @@ export class RunsReplicationService { })) ); - const currentSpan = this._tracer.startSpan("handle_transaction", { - attributes: { - "transaction.xid": transaction.xid, - "transaction.replication_lag_ms": transaction.replicationLagMs, - "transaction.events": transaction.events.length, - "transaction.commit_end_lsn": transaction.commitEndLsn, - "transaction.parse_duration_ms": this._currentParseDurationMs ?? undefined, - "transaction.lsn_to_uint64_ms": lsnToUInt64DurationMs, - "transaction.version": _version.toString(), + this._tracer + .startSpan("handle_transaction", { + attributes: { + "transaction.xid": transaction.xid, + "transaction.replication_lag_ms": transaction.replicationLagMs, + "transaction.events": transaction.events.length, + "transaction.commit_end_lsn": transaction.commitEndLsn, + "transaction.parse_duration_ms": this._currentParseDurationMs ?? undefined, + "transaction.lsn_to_uint64_ms": lsnToUInt64DurationMs, + "transaction.version": _version.toString(), + }, + startTime: transaction.beginStartTimestamp, + }) + .end(); + + this.logger.debug("handle_transaction", { + transaction: { + xid: transaction.xid, + commitLsn: transaction.commitLsn, + commitEndLsn: transaction.commitEndLsn, + events: transaction.events.length, + parseDurationMs: this._currentParseDurationMs, + lsnToUInt64DurationMs, + version: _version.toString(), }, - startTime: transaction.beginStartTimestamp, }); - - currentSpan.end(); } async #acknowledgeLatestTransaction() { @@ -387,7 +395,7 @@ export class RunsReplicationService { this._lastAcknowledgedAt = now; this._lastAcknowledgedLsn = this._latestCommitEndLsn; - this.logger.debug("Acknowledging transaction", { + this.logger.debug("acknowledge_latest_transaction", { commitEndLsn: this._latestCommitEndLsn, lastAcknowledgedAt: this._lastAcknowledgedAt, }); @@ -747,7 +755,7 @@ export class ConcurrentFlushScheduler { const callback = this.config.callback; const promise = this.concurrencyLimiter(async () => { - await startSpan(this._tracer, "flushNextBatch", async (span) => { + return await startSpan(this._tracer, "flushNextBatch", async (span) => { const batchId = nanoid(); span.setAttribute("batch_id", batchId); @@ -756,26 +764,47 @@ export class ConcurrentFlushScheduler { span.setAttribute("concurrency_pending_count", this.concurrencyLimiter.pendingCount); span.setAttribute("concurrency_concurrency", this.concurrencyLimiter.concurrency); + this.logger.debug("flush_next_batch", { + batchId, + batchSize: batch.length, + concurrencyActiveCount: this.concurrencyLimiter.activeCount, + concurrencyPendingCount: this.concurrencyLimiter.pendingCount, + concurrencyConcurrency: this.concurrencyLimiter.concurrency, + }); + + const start = performance.now(); + await callback(batchId, batch); + + const end = performance.now(); + + const duration = end - start; + + return { + batchId, + duration, + }; }); }); - const [error] = await tryCatch(promise); + const [error, result] = await tryCatch(promise); if (error) { - this.logger.error("Error flushing batch", { + this.logger.error("flush_batch_error", { error, }); this.failedBatchCount++; + } else { + this.logger.debug("flush_batch_complete", { + totalBatches: 1, + successfulBatches: 1, + failedBatches: 0, + totalFailedBatches: this.failedBatchCount, + duration: result?.duration, + batchId: result?.batchId, + }); } - - this.logger.debug("Batch flush complete", { - totalBatches: 1, - successfulBatches: 1, - failedBatches: 0, - totalFailedBatches: this.failedBatchCount, - }); } }