Skip to content

Commit 4a79b3b

Browse files
authored
Fixed retrying on run replication inserts on connection errors (#2216)
1 parent 0da6a19 commit 4a79b3b

File tree

1 file changed

+15
-10
lines changed

1 file changed

+15
-10
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -462,13 +462,13 @@ export class RunsReplicationService {
462462

463463
// Insert task runs and payloads with retry logic for connection errors
464464
const [taskRunError, taskRunResult] = await this.#insertWithRetry(
465-
() => this.#insertTaskRunInserts(taskRunInserts),
465+
(attempt) => this.#insertTaskRunInserts(taskRunInserts, attempt),
466466
"task run inserts",
467467
flushId
468468
);
469469

470470
const [payloadError, payloadResult] = await this.#insertWithRetry(
471-
() => this.#insertPayloadInserts(payloadInserts),
471+
(attempt) => this.#insertPayloadInserts(payloadInserts, attempt),
472472
"payload inserts",
473473
flushId
474474
);
@@ -502,24 +502,25 @@ export class RunsReplicationService {
502502

503503
// New method to handle inserts with retry logic for connection errors
504504
async #insertWithRetry<T>(
505-
insertFn: () => Promise<T>,
505+
insertFn: (attempt: number) => Promise<T>,
506506
operationName: string,
507507
flushId: string
508508
): Promise<[Error | null, T | null]> {
509509
let lastError: Error | null = null;
510510

511511
for (let attempt = 1; attempt <= this._insertMaxRetries; attempt++) {
512512
try {
513-
const result = await insertFn();
513+
const result = await insertFn(attempt);
514514
return [null, result];
515515
} catch (error) {
516516
lastError = error instanceof Error ? error : new Error(String(error));
517517

518518
// Check if this is a retryable connection error
519-
if (this.#isRetryableConnectionError(lastError) && attempt < this._insertMaxRetries) {
519+
if (this.#isRetryableConnectionError(lastError)) {
520520
const delay = this.#calculateConnectionRetryDelay(attempt);
521521

522-
this.logger.warn(`Retrying ${operationName} due to connection error`, {
522+
this.logger.warn(`Retrying RunReplication insert due to connection error`, {
523+
operationName,
523524
flushId,
524525
attempt,
525526
maxRetries: this._insertMaxRetries,
@@ -567,7 +568,7 @@ export class RunsReplicationService {
567568
return delay + jitter;
568569
}
569570

570-
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
571+
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) {
571572
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
572573
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
573574
taskRunInserts,
@@ -581,18 +582,20 @@ export class RunsReplicationService {
581582
);
582583

583584
if (insertError) {
584-
this.logger.error("Error inserting task run inserts", {
585+
this.logger.error("Error inserting task run inserts attempt", {
585586
error: insertError,
587+
attempt,
586588
});
587589

588590
recordSpanError(span, insertError);
591+
throw insertError;
589592
}
590593

591594
return insertResult;
592595
});
593596
}
594597

595-
async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[]) {
598+
async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[], attempt: number) {
596599
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
597600
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads(
598601
payloadInserts,
@@ -606,11 +609,13 @@ export class RunsReplicationService {
606609
);
607610

608611
if (insertError) {
609-
this.logger.error("Error inserting payload inserts", {
612+
this.logger.error("Error inserting payload inserts attempt", {
610613
error: insertError,
614+
attempt,
611615
});
612616

613617
recordSpanError(span, insertError);
618+
throw insertError;
614619
}
615620

616621
return insertResult;

0 commit comments

Comments
 (0)