Skip to content

Commit f38d35e

Browse files
authored
ClickHouse replication improvements (retrying, strip bad unicode chars) (#2205)
* Add retry logic for insert operations Add a generic retry mechanism for task run and payload inserts to handle transient connection errors. The new #insertWithRetry method retries up to three times with exponential backoff and jitter on retryable connection errors such as connection resets or timeouts. Errors are logged and recorded in tracing spans to improve observability and robustness of the replication service. * Replication settings are configurable * Log out the runIds for failed batches * Detecting bad JSON in run replication and ignoring it * Reproduced split unicode error * Move output file * Massively improved the performance * Minor performance improvements * Unskip tests * Remove unused test in CH package * Fix for the ClickHouse UI explorer * RunReplication keepAlive defaults to false * Add concurrency_key and bulk_action_group_ids to ClickHouse task runs * ClickHouse package doesn't need to be built anymore for the webapp * Set the concurrency_key from the run replication service
1 parent 1b4dabb commit f38d35e

12 files changed

+598
-8
lines changed

apps/webapp/app/env.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -842,9 +842,13 @@ const EnvironmentSchema = z.object({
842842
RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
843843
RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
844844
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
845-
RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"),
845+
RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("0"),
846846
RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
847847
RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
848+
// Retry configuration for insert operations
849+
RUN_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3),
850+
RUN_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100),
851+
RUN_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),
848852

849853
// Clickhouse
850854
CLICKHOUSE_URL: z.string().optional(),

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ function initializeRunsReplicationInstance() {
6262
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
6363
waitForAsyncInsert: env.RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT === "1",
6464
tracer: provider.getTracer("runs-replication-service"),
65+
insertMaxRetries: env.RUN_REPLICATION_INSERT_MAX_RETRIES,
66+
insertBaseDelayMs: env.RUN_REPLICATION_INSERT_BASE_DELAY_MS,
67+
insertMaxDelayMs: env.RUN_REPLICATION_INSERT_MAX_DELAY_MS,
6568
});
6669

6770
if (env.RUN_REPLICATION_ENABLED === "1") {

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import { TaskRun } from "@trigger.dev/database";
1515
import { nanoid } from "nanoid";
1616
import EventEmitter from "node:events";
1717
import pLimit from "p-limit";
18+
import { logger } from "./logger.server";
19+
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
1820

1921
interface TransactionEvent<T = any> {
2022
tag: "insert" | "update" | "delete";
@@ -51,6 +53,10 @@ export type RunsReplicationServiceOptions = {
5153
logLevel?: LogLevel;
5254
tracer?: Tracer;
5355
waitForAsyncInsert?: boolean;
56+
// Retry configuration for insert operations
57+
insertMaxRetries?: number;
58+
insertBaseDelayMs?: number;
59+
insertMaxDelayMs?: number;
5460
};
5561

5662
type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
@@ -80,6 +86,10 @@ export class RunsReplicationService {
8086
private _latestCommitEndLsn: string | null = null;
8187
private _lastAcknowledgedLsn: string | null = null;
8288
private _acknowledgeInterval: NodeJS.Timeout | null = null;
89+
// Retry configuration
90+
private _insertMaxRetries: number;
91+
private _insertBaseDelayMs: number;
92+
private _insertMaxDelayMs: number;
8393

8494
public readonly events: EventEmitter<RunsReplicationServiceEvents>;
8595

@@ -151,6 +161,11 @@ export class RunsReplicationService {
151161
this._replicationClient.events.on("leaderElection", (isLeader) => {
152162
this.logger.info("Leader election", { isLeader });
153163
});
164+
165+
// Initialize retry configuration
166+
this._insertMaxRetries = options.insertMaxRetries ?? 3;
167+
this._insertBaseDelayMs = options.insertBaseDelayMs ?? 100;
168+
this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000;
154169
}
155170

156171
public async shutdown() {
@@ -445,8 +460,37 @@ export class RunsReplicationService {
445460
payloadInserts: payloadInserts.length,
446461
});
447462

448-
await this.#insertTaskRunInserts(taskRunInserts);
449-
await this.#insertPayloadInserts(payloadInserts);
463+
// Insert task runs and payloads with retry logic for connection errors
464+
const [taskRunError, taskRunResult] = await this.#insertWithRetry(
465+
() => this.#insertTaskRunInserts(taskRunInserts),
466+
"task run inserts",
467+
flushId
468+
);
469+
470+
const [payloadError, payloadResult] = await this.#insertWithRetry(
471+
() => this.#insertPayloadInserts(payloadInserts),
472+
"payload inserts",
473+
flushId
474+
);
475+
476+
// Log any errors that occurred
477+
if (taskRunError) {
478+
this.logger.error("Error inserting task run inserts", {
479+
error: taskRunError,
480+
flushId,
481+
runIds: taskRunInserts.map((r) => r.run_id),
482+
});
483+
recordSpanError(span, taskRunError);
484+
}
485+
486+
if (payloadError) {
487+
this.logger.error("Error inserting payload inserts", {
488+
error: payloadError,
489+
flushId,
490+
runIds: payloadInserts.map((r) => r.run_id),
491+
});
492+
recordSpanError(span, payloadError);
493+
}
450494

451495
this.logger.debug("Flushed inserts", {
452496
flushId,
@@ -456,6 +500,73 @@ export class RunsReplicationService {
456500
});
457501
}
458502

503+
// New method to handle inserts with retry logic for connection errors
504+
async #insertWithRetry<T>(
505+
insertFn: () => Promise<T>,
506+
operationName: string,
507+
flushId: string
508+
): Promise<[Error | null, T | null]> {
509+
let lastError: Error | null = null;
510+
511+
for (let attempt = 1; attempt <= this._insertMaxRetries; attempt++) {
512+
try {
513+
const result = await insertFn();
514+
return [null, result];
515+
} catch (error) {
516+
lastError = error instanceof Error ? error : new Error(String(error));
517+
518+
// Check if this is a retryable connection error
519+
if (this.#isRetryableConnectionError(lastError) && attempt < this._insertMaxRetries) {
520+
const delay = this.#calculateConnectionRetryDelay(attempt);
521+
522+
this.logger.warn(`Retrying ${operationName} due to connection error`, {
523+
flushId,
524+
attempt,
525+
maxRetries: this._insertMaxRetries,
526+
error: lastError.message,
527+
delay,
528+
});
529+
530+
await new Promise((resolve) => setTimeout(resolve, delay));
531+
continue;
532+
}
533+
break;
534+
}
535+
}
536+
537+
return [lastError, null];
538+
}
539+
540+
// New method to check if an error is a retryable connection error
541+
#isRetryableConnectionError(error: Error): boolean {
542+
const errorMessage = error.message.toLowerCase();
543+
const retryableConnectionPatterns = [
544+
"socket hang up",
545+
"econnreset",
546+
"connection reset",
547+
"connection refused",
548+
"connection timeout",
549+
"network error",
550+
"read econnreset",
551+
"write econnreset",
552+
];
553+
554+
return retryableConnectionPatterns.some((pattern) => errorMessage.includes(pattern));
555+
}
556+
557+
// New method to calculate retry delay for connection errors
558+
#calculateConnectionRetryDelay(attempt: number): number {
559+
// Exponential backoff: baseDelay, baseDelay*2, baseDelay*4, etc.
560+
const delay = Math.min(
561+
this._insertBaseDelayMs * Math.pow(2, attempt - 1),
562+
this._insertMaxDelayMs
563+
);
564+
565+
// Add some jitter to prevent thundering herd
566+
const jitter = Math.random() * 100;
567+
return delay + jitter;
568+
}
569+
459570
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
460571
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
461572
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
@@ -604,6 +715,7 @@ export class RunsReplicationService {
604715
idempotency_key: run.idempotencyKey ?? "",
605716
expiration_ttl: run.ttl ?? "",
606717
output,
718+
concurrency_key: run.concurrencyKey ?? "",
607719
_version: _version.toString(),
608720
_is_deleted: event === "delete" ? 1 : 0,
609721
};
@@ -631,6 +743,14 @@ export class RunsReplicationService {
631743
return { data: undefined };
632744
}
633745

746+
if (detectBadJsonStrings(data)) {
747+
this.logger.warn("Detected bad JSON strings", {
748+
data,
749+
dataType,
750+
});
751+
return { data: undefined };
752+
}
753+
634754
const packet = {
635755
data,
636756
dataType,
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
export function detectBadJsonStrings(jsonString: string): boolean {
2+
// Fast path: skip everything if no \u
3+
let idx = jsonString.indexOf("\\u");
4+
if (idx === -1) return false;
5+
6+
// Use a more efficient scanning strategy
7+
const length = jsonString.length;
8+
9+
while (idx !== -1 && idx < length - 5) {
10+
// Only check if we have enough characters left
11+
if (idx + 6 > length) break;
12+
13+
if (jsonString[idx + 1] === "u" && jsonString[idx + 2] === "d") {
14+
const third = jsonString[idx + 3];
15+
16+
// High surrogate check
17+
if (
18+
/[89ab]/.test(third) &&
19+
/[0-9a-f]/.test(jsonString[idx + 4]) &&
20+
/[0-9a-f]/.test(jsonString[idx + 5])
21+
) {
22+
// Check for low surrogate after (need at least 6 more chars)
23+
if (idx + 12 > length) {
24+
return true; // Incomplete high surrogate (not enough chars left)
25+
}
26+
27+
if (
28+
jsonString[idx + 6] !== "\\" ||
29+
jsonString[idx + 7] !== "u" ||
30+
jsonString[idx + 8] !== "d" ||
31+
!/[cd]/.test(jsonString[idx + 9]) ||
32+
!/[0-9a-f]/.test(jsonString[idx + 10]) ||
33+
!/[0-9a-f]/.test(jsonString[idx + 11])
34+
) {
35+
return true; // Incomplete high surrogate
36+
}
37+
}
38+
39+
// Low surrogate check
40+
if (
41+
(third === "c" || third === "d") &&
42+
/[0-9a-f]/.test(jsonString[idx + 4]) &&
43+
/[0-9a-f]/.test(jsonString[idx + 5])
44+
) {
45+
// Check for high surrogate before (need at least 6 chars before)
46+
if (idx < 6) {
47+
return true; // Incomplete low surrogate (not enough chars before)
48+
}
49+
50+
if (
51+
jsonString[idx - 6] !== "\\" ||
52+
jsonString[idx - 5] !== "u" ||
53+
jsonString[idx - 4] !== "d" ||
54+
!/[89ab]/.test(jsonString[idx - 3]) ||
55+
!/[0-9a-f]/.test(jsonString[idx - 2]) ||
56+
!/[0-9a-f]/.test(jsonString[idx - 1])
57+
) {
58+
return true; // Incomplete low surrogate
59+
}
60+
}
61+
}
62+
63+
// More efficient next search - skip ahead by 2 to avoid overlapping matches
64+
idx = jsonString.indexOf("\\u", idx + 2);
65+
}
66+
67+
return false;
68+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"title": "❜ 𝐒 𝐏𝗈𝗌𝗍 . . . 𝐍𝖾𝗐 𝐂𝗈𝗇𝗍𝖾𝗇𝗍 ꒰ ⚔️ ꒱ 𝐒𝐋 ❜ 𝐔𝐋\n\n꒰ ❤️ ꒱ 𓃊 𝐋𝗲𝗮𝘃𝗲 𝖺 𝗹𝗶𝗸𝗲 𝖺𝗇\ud835"
3+
}

0 commit comments

Comments
 (0)