diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 97ceb23ea8..8e32b2dbdd 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -462,13 +462,13 @@ export class RunsReplicationService { // Insert task runs and payloads with retry logic for connection errors const [taskRunError, taskRunResult] = await this.#insertWithRetry( - () => this.#insertTaskRunInserts(taskRunInserts), + (attempt) => this.#insertTaskRunInserts(taskRunInserts, attempt), "task run inserts", flushId ); const [payloadError, payloadResult] = await this.#insertWithRetry( - () => this.#insertPayloadInserts(payloadInserts), + (attempt) => this.#insertPayloadInserts(payloadInserts, attempt), "payload inserts", flushId ); @@ -502,7 +502,7 @@ export class RunsReplicationService { // New method to handle inserts with retry logic for connection errors async #insertWithRetry( - insertFn: () => Promise, + insertFn: (attempt: number) => Promise, operationName: string, flushId: string ): Promise<[Error | null, T | null]> { @@ -510,16 +510,17 @@ export class RunsReplicationService { for (let attempt = 1; attempt <= this._insertMaxRetries; attempt++) { try { - const result = await insertFn(); + const result = await insertFn(attempt); return [null, result]; } catch (error) { lastError = error instanceof Error ? error : new Error(String(error)); // Check if this is a retryable connection error - if (this.#isRetryableConnectionError(lastError) && attempt < this._insertMaxRetries) { + if (this.#isRetryableConnectionError(lastError)) { const delay = this.#calculateConnectionRetryDelay(attempt); - this.logger.warn(`Retrying ${operationName} due to connection error`, { + this.logger.warn(`Retrying RunReplication insert due to connection error`, { + operationName, flushId, attempt, maxRetries: this._insertMaxRetries, @@ -567,7 +568,7 @@ export class RunsReplicationService { return delay + jitter; } - async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) { + async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) { return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert( taskRunInserts, @@ -581,18 +582,20 @@ export class RunsReplicationService { ); if (insertError) { - this.logger.error("Error inserting task run inserts", { + this.logger.error("Error inserting task run inserts attempt", { error: insertError, + attempt, }); recordSpanError(span, insertError); + throw insertError; } return insertResult; }); } - async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[]) { + async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[], attempt: number) { return await startSpan(this._tracer, "insertPayloadInserts", async (span) => { const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads( payloadInserts, @@ -606,11 +609,13 @@ export class RunsReplicationService { ); if (insertError) { - this.logger.error("Error inserting payload inserts", { + this.logger.error("Error inserting payload inserts attempt", { error: insertError, + attempt, }); recordSpanError(span, insertError); + throw insertError; } return insertResult;