Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ const EnvironmentSchema = z.object({
RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
RUN_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT: z.coerce.number().int().default(240),
RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
});
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ function initializeRunsReplicationInstance() {
flushBatchSize: env.RUN_REPLICATION_FLUSH_BATCH_SIZE,
leaderLockTimeoutMs: env.RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS,
leaderLockExtendIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS,
leaderLockRetryCount: env.RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT,
leaderLockAcquireAdditionalTimeMs: env.RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS,
leaderLockRetryIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS,
ackIntervalSeconds: env.RUN_REPLICATION_ACK_INTERVAL_SECONDS,
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export type RunsReplicationServiceOptions = {
flushBatchSize?: number;
leaderLockTimeoutMs?: number;
leaderLockExtendIntervalMs?: number;
leaderLockRetryCount?: number;
leaderLockAcquireAdditionalTimeMs?: number;
leaderLockRetryIntervalMs?: number;
ackIntervalSeconds?: number;
acknowledgeTimeoutMs?: number;
Expand Down Expand Up @@ -102,11 +102,11 @@ export class RunsReplicationService {
redisOptions: options.redisOptions,
autoAcknowledge: false,
publicationActions: ["insert", "update", "delete"],
logger: new Logger("LogicalReplicationClient", options.logLevel ?? "info"),
logger: options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"),
leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000,
leaderLockExtendIntervalMs: options.leaderLockExtendIntervalMs ?? 10_000,
ackIntervalSeconds: options.ackIntervalSeconds ?? 10,
leaderLockRetryCount: options.leaderLockRetryCount ?? 240,
leaderLockAcquireAdditionalTimeMs: options.leaderLockAcquireAdditionalTimeMs ?? 10_000,
leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500,
tracer: options.tracer,
});
Expand Down
129 changes: 129 additions & 0 deletions apps/webapp/test/runsReplicationService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,135 @@ describe("RunsReplicationService", () => {
}
);

containerTest(
"should handover leadership to a second service, and the second service should be able to extend the leader lock",
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);

const clickhouse = new ClickHouse({
url: clickhouseContainer.getConnectionUrl(),
name: "runs-replication-shutdown-handover",
});

// Service A
const runsReplicationServiceA = new RunsReplicationService({
clickhouse,
pgConnectionUrl: postgresContainer.getConnectionUri(),
serviceName: "runs-replication-shutdown-handover",
slotName: "task_runs_to_clickhouse_v1",
publicationName: "task_runs_to_clickhouse_v1_publication",
redisOptions,
maxFlushConcurrency: 1,
flushIntervalMs: 100,
flushBatchSize: 1,
leaderLockTimeoutMs: 5000,
leaderLockExtendIntervalMs: 1000,
leaderLockAcquireAdditionalTimeMs: 10_000,
ackIntervalSeconds: 5,
logger: new Logger("runs-replication-shutdown-handover-a", "debug"),
});

await runsReplicationServiceA.start();

// Service A
const runsReplicationServiceB = new RunsReplicationService({
clickhouse,
pgConnectionUrl: postgresContainer.getConnectionUri(),
serviceName: "runs-replication-shutdown-handover",
slotName: "task_runs_to_clickhouse_v1",
publicationName: "task_runs_to_clickhouse_v1_publication",
redisOptions,
maxFlushConcurrency: 1,
flushIntervalMs: 100,
flushBatchSize: 1,
leaderLockTimeoutMs: 5000,
leaderLockExtendIntervalMs: 1000,
leaderLockAcquireAdditionalTimeMs: 10_000,
ackIntervalSeconds: 5,
logger: new Logger("runs-replication-shutdown-handover-b", "debug"),
});

// Now we need to initiate starting the second service, and after 6 seconds, we need to shutdown the first service
await Promise.all([
setTimeout(6000).then(() => runsReplicationServiceA.stop()),
runsReplicationServiceB.start(),
]);

const organization = await prisma.organization.create({
data: {
title: "test",
slug: "test",
},
});

const project = await prisma.project.create({
data: {
name: "test",
slug: "test",
organizationId: organization.id,
externalRef: "test",
},
});

const runtimeEnvironment = await prisma.runtimeEnvironment.create({
data: {
slug: "test",
type: "DEVELOPMENT",
projectId: project.id,
organizationId: organization.id,
apiKey: "test",
pkApiKey: "test",
shortcode: "test",
},
});

// Now we insert a row into the table
const taskRun = await prisma.taskRun.create({
data: {
friendlyId: "run_1234",
taskIdentifier: "my-task",
payload: JSON.stringify({ foo: "bar" }),
traceId: "1234",
spanId: "1234",
queue: "test",
runtimeEnvironmentId: runtimeEnvironment.id,
projectId: project.id,
organizationId: organization.id,
environmentType: "DEVELOPMENT",
engine: "V2",
},
});

await setTimeout(10_000);

// Check that the row was replicated to clickhouse
const queryRuns = clickhouse.reader.query({
name: "runs-replication",
query: "SELECT * FROM trigger_dev.task_runs_v2",
schema: z.any(),
});

const [queryError, result] = await queryRuns({});

expect(queryError).toBeNull();
expect(result?.length).toBe(1);
expect(result?.[0]).toEqual(
expect.objectContaining({
run_id: taskRun.id,
friendly_id: taskRun.friendlyId,
task_identifier: taskRun.taskIdentifier,
environment_id: runtimeEnvironment.id,
project_id: project.id,
organization_id: organization.id,
environment_type: "DEVELOPMENT",
engine: "V2",
})
);

await runsReplicationServiceB.stop();
}
);

containerTest(
"should replicate all 1,000 TaskRuns inserted in bulk to ClickHouse",
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
Expand Down
100 changes: 73 additions & 27 deletions internal-packages/replication/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ export interface LogicalReplicationClientOptions {
leaderLockExtendIntervalMs?: number;

/**
* The number of times to retry acquiring the leader lock (default: 120)
* The interval in ms to retry acquiring the leader lock (default: 500)
*/
leaderLockRetryCount?: number;
leaderLockRetryIntervalMs?: number;

/**
* The interval in ms to retry acquiring the leader lock (default: 500)
* The additional time in ms to retry acquiring the leader lock (default: 1000ms)
*/
leaderLockRetryIntervalMs?: number;
leaderLockAcquireAdditionalTimeMs?: number;

/**
* The interval in seconds to automatically acknowledge the last LSN if no ack has been sent (default: 10)
Expand Down Expand Up @@ -97,7 +97,7 @@ export class LogicalReplicationClient {
private lastAcknowledgedLsn: string | null = null;
private leaderLockTimeoutMs: number;
private leaderLockExtendIntervalMs: number;
private leaderLockRetryCount: number;
private leaderLockAcquireAdditionalTimeMs: number;
private leaderLockRetryIntervalMs: number;
private leaderLockHeartbeatTimer: NodeJS.Timeout | null = null;
private ackIntervalSeconds: number;
Expand All @@ -124,7 +124,7 @@ export class LogicalReplicationClient {

this.leaderLockTimeoutMs = options.leaderLockTimeoutMs ?? 30000;
this.leaderLockExtendIntervalMs = options.leaderLockExtendIntervalMs ?? 10000;
this.leaderLockRetryCount = options.leaderLockRetryCount ?? 120;
this.leaderLockAcquireAdditionalTimeMs = options.leaderLockAcquireAdditionalTimeMs ?? 1000;
this.leaderLockRetryIntervalMs = options.leaderLockRetryIntervalMs ?? 500;
this.ackIntervalSeconds = options.ackIntervalSeconds ?? 10;

Expand Down Expand Up @@ -578,34 +578,74 @@ export class LogicalReplicationClient {
}

async #acquireLeaderLock(): Promise<boolean> {
try {
this.leaderLock = await this.redlock.acquire(
[`logical-replication-client:${this.options.name}`],
this.leaderLockTimeoutMs,
{
retryCount: this.leaderLockRetryCount,
retryDelay: this.leaderLockRetryIntervalMs,
}
);
} catch (err) {
this.logger.error("Leader election failed", {
name: this.options.name,
table: this.options.table,
slotName: this.options.slotName,
publicationName: this.options.publicationName,
retryCount: this.leaderLockRetryCount,
retryIntervalMs: this.leaderLockRetryIntervalMs,
error: err,
});
const startTime = Date.now();
const maxWaitTime = this.leaderLockTimeoutMs + this.leaderLockAcquireAdditionalTimeMs;

return false;
this.logger.debug("Acquiring leader lock", {
name: this.options.name,
slotName: this.options.slotName,
publicationName: this.options.publicationName,
maxWaitTime,
});

let attempt = 0;

while (Date.now() - startTime < maxWaitTime) {
try {
this.leaderLock = await this.redlock.acquire(
[`logical-replication-client:${this.options.name}`],
this.leaderLockTimeoutMs
);

this.logger.debug("Acquired leader lock", {
name: this.options.name,
slotName: this.options.slotName,
publicationName: this.options.publicationName,
lockTimeoutMs: this.leaderLockTimeoutMs,
lockExtendIntervalMs: this.leaderLockExtendIntervalMs,
lock: this.leaderLock,
attempt,
});
return true;
} catch (err) {
attempt++;

this.logger.debug("Failed to acquire leader lock, retrying", {
name: this.options.name,
slotName: this.options.slotName,
publicationName: this.options.publicationName,
attempt,
retryIntervalMs: this.leaderLockRetryIntervalMs,
error: err,
});

await new Promise((resolve) => setTimeout(resolve, this.leaderLockRetryIntervalMs));
}
}

return true;
this.logger.error("Leader election failed after retries", {
name: this.options.name,
table: this.options.table,
slotName: this.options.slotName,
publicationName: this.options.publicationName,
totalAttempts: attempt,
totalWaitTimeMs: Date.now() - startTime,
});
return false;
}

async #releaseLeaderLock() {
if (!this.leaderLock) return;

this.logger.debug("Releasing leader lock", {
name: this.options.name,
slotName: this.options.slotName,
publicationName: this.options.publicationName,
lockTimeoutMs: this.leaderLockTimeoutMs,
lockExtendIntervalMs: this.leaderLockExtendIntervalMs,
lock: this.leaderLock,
});

const [releaseError] = await tryCatch(this.leaderLock.release());
this.leaderLock = null;

Expand All @@ -631,13 +671,19 @@ export class LogicalReplicationClient {
name: this.options.name,
slotName: this.options.slotName,
publicationName: this.options.publicationName,
lockTimeoutMs: this.leaderLockTimeoutMs,
lockExtendIntervalMs: this.leaderLockExtendIntervalMs,
lock: this.leaderLock,
});
} catch (err) {
this.logger.error("Failed to extend leader lock", {
name: this.options.name,
slotName: this.options.slotName,
publicationName: this.options.publicationName,
error: err,
lockTimeoutMs: this.leaderLockTimeoutMs,
lockExtendIntervalMs: this.leaderLockExtendIntervalMs,
lock: this.leaderLock,
});
// Optionally emit an error or handle loss of leadership
this.events.emit("error", err instanceof Error ? err : new Error(String(err)));
Expand Down