Skip to content

Commit 25b509f

Browse files
committed
Add retry logic for insert operations
Add a generic retry mechanism for task run and payload inserts to handle transient connection errors. The new #insertWithRetry method retries up to three times with exponential backoff and jitter on retryable connection errors such as connection resets or timeouts. Errors are logged and recorded in tracing spans to improve observability and robustness of the replication service.
1 parent 119bfc0 commit 25b509f

File tree

1 file changed

+96
-2
lines changed

1 file changed

+96
-2
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,35 @@ export class RunsReplicationService {
445445
payloadInserts: payloadInserts.length,
446446
});
447447

448-
await this.#insertTaskRunInserts(taskRunInserts);
449-
await this.#insertPayloadInserts(payloadInserts);
448+
// Insert task runs and payloads with retry logic for connection errors
449+
const [taskRunError, taskRunResult] = await this.#insertWithRetry(
450+
() => this.#insertTaskRunInserts(taskRunInserts),
451+
"task run inserts",
452+
flushId
453+
);
454+
455+
const [payloadError, payloadResult] = await this.#insertWithRetry(
456+
() => this.#insertPayloadInserts(payloadInserts),
457+
"payload inserts",
458+
flushId
459+
);
460+
461+
// Log any errors that occurred
462+
if (taskRunError) {
463+
this.logger.error("Error inserting task run inserts", {
464+
error: taskRunError,
465+
flushId,
466+
});
467+
recordSpanError(span, taskRunError);
468+
}
469+
470+
if (payloadError) {
471+
this.logger.error("Error inserting payload inserts", {
472+
error: payloadError,
473+
flushId,
474+
});
475+
recordSpanError(span, payloadError);
476+
}
450477

451478
this.logger.debug("Flushed inserts", {
452479
flushId,
@@ -456,6 +483,73 @@ export class RunsReplicationService {
456483
});
457484
}
458485

486+
// New method to handle inserts with retry logic for connection errors
487+
async #insertWithRetry<T>(
488+
insertFn: () => Promise<T>,
489+
operationName: string,
490+
flushId: string,
491+
maxRetries: number = 3
492+
): Promise<[Error | null, T | null]> {
493+
let lastError: Error | null = null;
494+
495+
for (let attempt = 1; attempt <= maxRetries; attempt++) {
496+
try {
497+
const result = await insertFn();
498+
return [null, result];
499+
} catch (error) {
500+
lastError = error instanceof Error ? error : new Error(String(error));
501+
502+
// Check if this is a retryable connection error
503+
if (this.#isRetryableConnectionError(lastError) && attempt < maxRetries) {
504+
const delay = this.#calculateConnectionRetryDelay(attempt);
505+
506+
this.logger.warn(`Retrying ${operationName} due to connection error`, {
507+
flushId,
508+
attempt,
509+
maxRetries,
510+
error: lastError.message,
511+
delay,
512+
});
513+
514+
await new Promise((resolve) => setTimeout(resolve, delay));
515+
continue;
516+
}
517+
break;
518+
}
519+
}
520+
521+
return [lastError, null];
522+
}
523+
524+
// New method to check if an error is a retryable connection error
525+
#isRetryableConnectionError(error: Error): boolean {
526+
const errorMessage = error.message.toLowerCase();
527+
const retryableConnectionPatterns = [
528+
"socket hang up",
529+
"econnreset",
530+
"connection reset",
531+
"connection refused",
532+
"connection timeout",
533+
"network error",
534+
"read econnreset",
535+
"write econnreset",
536+
];
537+
538+
return retryableConnectionPatterns.some((pattern) => errorMessage.includes(pattern));
539+
}
540+
541+
// New method to calculate retry delay for connection errors
542+
#calculateConnectionRetryDelay(attempt: number): number {
543+
// Exponential backoff: 100ms, 200ms, 400ms
544+
const baseDelay = 100;
545+
const maxDelay = 2000;
546+
const delay = Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay);
547+
548+
// Add some jitter to prevent thundering herd
549+
const jitter = Math.random() * 100;
550+
return delay + jitter;
551+
}
552+
459553
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
460554
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
461555
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(

0 commit comments

Comments
 (0)