diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 3baf691f84..31e2381dc7 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -766,6 +766,8 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000), RUN_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10), RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), + RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT: z.coerce.number().int().default(240), + RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500), }); export type Environment = z.infer; diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 6a4c9c4ee5..ad27724410 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -45,6 +45,8 @@ function initializeRunsReplicationInstance() { flushBatchSize: env.RUN_REPLICATION_FLUSH_BATCH_SIZE, leaderLockTimeoutMs: env.RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS, leaderLockExtendIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS, + leaderLockRetryCount: env.RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT, + leaderLockRetryIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS, ackIntervalSeconds: env.RUN_REPLICATION_ACK_INTERVAL_SECONDS, logLevel: env.RUN_REPLICATION_LOG_LEVEL, }); diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 9299e545f9..c7fe512474 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -43,6 +43,8 @@ export type RunsReplicationServiceOptions = { flushBatchSize?: number; leaderLockTimeoutMs?: number; leaderLockExtendIntervalMs?: number; + leaderLockRetryCount?: number; + leaderLockRetryIntervalMs?: number; ackIntervalSeconds?: number; acknowledgeTimeoutMs?: number; logger?: Logger; @@ -103,6 +105,8 @@ export class RunsReplicationService { leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000, leaderLockExtendIntervalMs: options.leaderLockExtendIntervalMs ?? 10_000, ackIntervalSeconds: options.ackIntervalSeconds ?? 10, + leaderLockRetryCount: options.leaderLockRetryCount ?? 240, + leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500, }); this._concurrentFlushScheduler = new ConcurrentFlushScheduler({ diff --git a/internal-packages/replication/src/client.ts b/internal-packages/replication/src/client.ts index c522c38231..d5c56cdfae 100644 --- a/internal-packages/replication/src/client.ts +++ b/internal-packages/replication/src/client.ts @@ -593,6 +593,8 @@ export class LogicalReplicationClient { table: this.options.table, slotName: this.options.slotName, publicationName: this.options.publicationName, + retryCount: this.leaderLockRetryCount, + retryIntervalMs: this.leaderLockRetryIntervalMs, error: err, });