Skip to content

Commit faef0ad

Browse files
committed
Replication settings are configurable
1 parent 25b509f commit faef0ad

File tree

3 files changed

+29
-9
lines changed

3 files changed

+29
-9
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,10 @@ const EnvironmentSchema = z.object({
840840
RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"),
841841
RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
842842
RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
843+
// Retry configuration for insert operations
844+
RUN_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3),
845+
RUN_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100),
846+
RUN_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),
843847

844848
// Clickhouse
845849
CLICKHOUSE_URL: z.string().optional(),

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ function initializeRunsReplicationInstance() {
6262
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
6363
waitForAsyncInsert: env.RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT === "1",
6464
tracer: provider.getTracer("runs-replication-service"),
65+
insertMaxRetries: env.RUN_REPLICATION_INSERT_MAX_RETRIES,
66+
insertBaseDelayMs: env.RUN_REPLICATION_INSERT_BASE_DELAY_MS,
67+
insertMaxDelayMs: env.RUN_REPLICATION_INSERT_MAX_DELAY_MS,
6568
});
6669

6770
if (env.RUN_REPLICATION_ENABLED === "1") {

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ export type RunsReplicationServiceOptions = {
5151
logLevel?: LogLevel;
5252
tracer?: Tracer;
5353
waitForAsyncInsert?: boolean;
54+
// Retry configuration for insert operations
55+
insertMaxRetries?: number;
56+
insertBaseDelayMs?: number;
57+
insertMaxDelayMs?: number;
5458
};
5559

5660
type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
@@ -80,6 +84,10 @@ export class RunsReplicationService {
8084
private _latestCommitEndLsn: string | null = null;
8185
private _lastAcknowledgedLsn: string | null = null;
8286
private _acknowledgeInterval: NodeJS.Timeout | null = null;
87+
// Retry configuration
88+
private _insertMaxRetries: number;
89+
private _insertBaseDelayMs: number;
90+
private _insertMaxDelayMs: number;
8391

8492
public readonly events: EventEmitter<RunsReplicationServiceEvents>;
8593

@@ -151,6 +159,11 @@ export class RunsReplicationService {
151159
this._replicationClient.events.on("leaderElection", (isLeader) => {
152160
this.logger.info("Leader election", { isLeader });
153161
});
162+
163+
// Initialize retry configuration
164+
this._insertMaxRetries = options.insertMaxRetries ?? 3;
165+
this._insertBaseDelayMs = options.insertBaseDelayMs ?? 100;
166+
this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000;
154167
}
155168

156169
public async shutdown() {
@@ -487,26 +500,25 @@ export class RunsReplicationService {
487500
async #insertWithRetry<T>(
488501
insertFn: () => Promise<T>,
489502
operationName: string,
490-
flushId: string,
491-
maxRetries: number = 3
503+
flushId: string
492504
): Promise<[Error | null, T | null]> {
493505
let lastError: Error | null = null;
494506

495-
for (let attempt = 1; attempt <= maxRetries; attempt++) {
507+
for (let attempt = 1; attempt <= this._insertMaxRetries; attempt++) {
496508
try {
497509
const result = await insertFn();
498510
return [null, result];
499511
} catch (error) {
500512
lastError = error instanceof Error ? error : new Error(String(error));
501513

502514
// Check if this is a retryable connection error
503-
if (this.#isRetryableConnectionError(lastError) && attempt < maxRetries) {
515+
if (this.#isRetryableConnectionError(lastError) && attempt < this._insertMaxRetries) {
504516
const delay = this.#calculateConnectionRetryDelay(attempt);
505517

506518
this.logger.warn(`Retrying ${operationName} due to connection error`, {
507519
flushId,
508520
attempt,
509-
maxRetries,
521+
maxRetries: this._insertMaxRetries,
510522
error: lastError.message,
511523
delay,
512524
});
@@ -540,10 +552,11 @@ export class RunsReplicationService {
540552

541553
// New method to calculate retry delay for connection errors
542554
#calculateConnectionRetryDelay(attempt: number): number {
543-
// Exponential backoff: 100ms, 200ms, 400ms
544-
const baseDelay = 100;
545-
const maxDelay = 2000;
546-
const delay = Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay);
555+
// Exponential backoff: baseDelay, baseDelay*2, baseDelay*4, etc.
556+
const delay = Math.min(
557+
this._insertBaseDelayMs * Math.pow(2, attempt - 1),
558+
this._insertMaxDelayMs
559+
);
547560

548561
// Add some jitter to prevent thundering herd
549562
const jitter = Math.random() * 100;

0 commit comments

Comments
 (0)