Skip to content

Commit ffc8d98

Browse files
authored
Fix write checkpoint race condition (#201)
* Refactor write checkpoints to make testing easier. * Add basic write checkpoint tests. * Refactor write checkpoint creation. * Add changeset. * Fix for legacy checkpoint api and tests.
1 parent e683d05 commit ffc8d98

File tree

11 files changed

+204
-39
lines changed

11 files changed

+204
-39
lines changed

.changeset/shy-news-smell.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/service-module-postgres': patch
3+
'@powersync/service-module-mongodb': patch
4+
'@powersync/service-core': patch
5+
'@powersync/service-module-mysql': patch
6+
---
7+
8+
Fix write checkpoint race condition

modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as lib_mongo from '@powersync/lib-service-mongodb';
22
import { mongo } from '@powersync/lib-service-mongodb';
3-
import { api, ParseSyncRulesOptions, SourceTable } from '@powersync/service-core';
3+
import { api, ParseSyncRulesOptions, ReplicationHeadCallback, SourceTable } from '@powersync/service-core';
44
import * as sync_rules from '@powersync/service-sync-rules';
55
import * as service_types from '@powersync/service-types';
66

@@ -9,6 +9,8 @@ import { constructAfterRecord, createCheckpoint } from '../replication/MongoRela
99
import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js';
1010
import * as types from '../types/types.js';
1111
import { escapeRegExp } from '../utils.js';
12+
import { ServiceAssertionError } from '@powersync/lib-services-framework';
13+
import { MongoLSN } from '../common/MongoLSN.js';
1214

1315
export class MongoRouteAPIAdapter implements api.RouteAPI {
1416
protected client: mongo.MongoClient;
@@ -208,6 +210,44 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
208210
return createCheckpoint(this.client, this.db);
209211
}
210212

213+
async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
214+
const session = this.client.startSession();
215+
try {
216+
await this.db.command({ hello: 1 }, { session });
217+
const head = session.clusterTime?.clusterTime;
218+
if (head == null) {
219+
throw new ServiceAssertionError(`clusterTime not available for write checkpoint`);
220+
}
221+
222+
const r = await callback(new MongoLSN({ timestamp: head }).comparable);
223+
224+
// Trigger a change on the changestream.
225+
await this.db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate(
226+
{
227+
_id: 'checkpoint' as any
228+
},
229+
{
230+
$inc: { i: 1 }
231+
},
232+
{
233+
upsert: true,
234+
returnDocument: 'after',
235+
session
236+
}
237+
);
238+
const time = session.operationTime!;
239+
if (time == null) {
240+
throw new ServiceAssertionError(`operationTime not available for write checkpoint`);
241+
} else if (time.lt(head)) {
242+
throw new ServiceAssertionError(`operationTime must be > clusterTime`);
243+
}
244+
245+
return r;
246+
} finally {
247+
await session.endSession();
248+
}
249+
}
250+
211251
async getConnectionSchema(): Promise<service_types.DatabaseSchema[]> {
212252
const sampleSize = 50;
213253

modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { api, ParseSyncRulesOptions, storage } from '@powersync/service-core';
1+
import { api, ParseSyncRulesOptions, ReplicationHeadCallback, storage } from '@powersync/service-core';
22

33
import * as sync_rules from '@powersync/service-sync-rules';
44
import * as service_types from '@powersync/service-types';
@@ -275,6 +275,15 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
275275
return result.comparable;
276276
}
277277

278+
async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
279+
const head = await this.getReplicationHead();
280+
const r = await callback(head);
281+
282+
// TODO: make sure another message is replicated
283+
284+
return r;
285+
}
286+
278287
async getConnectionSchema(): Promise<service_types.DatabaseSchema[]> {
279288
const [results] = await this.retriedQuery({
280289
query: `

modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as lib_postgres from '@powersync/lib-service-postgres';
22
import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';
3-
import { api, ParseSyncRulesOptions } from '@powersync/service-core';
3+
import { api, ParseSyncRulesOptions, ReplicationHeadCallback } from '@powersync/service-core';
44
import * as pgwire from '@powersync/service-jpgwire';
55
import * as sync_rules from '@powersync/service-sync-rules';
66
import * as service_types from '@powersync/service-types';
@@ -241,17 +241,22 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`,
241241
// However, on Aurora (Postgres compatible), it can return an entirely different LSN,
242242
// causing the write checkpoints to never be replicated back to the client.
243243
// For those, we need to use pg_current_wal_lsn() instead.
244-
const { results } = await lib_postgres.retriedQuery(
245-
this.pool,
246-
{ statement: `SELECT pg_current_wal_lsn() as lsn` },
247-
KEEPALIVE_STATEMENT
248-
);
244+
const { results } = await lib_postgres.retriedQuery(this.pool, `SELECT pg_current_wal_lsn() as lsn`);
249245

250-
// Specifically use the lsn from the first statement, not the second one.
251246
const lsn = results[0].rows[0][0];
252247
return String(lsn);
253248
}
254249

250+
async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
251+
const currentLsn = await this.getReplicationHead();
252+
253+
const r = await callback(currentLsn);
254+
255+
await lib_postgres.retriedQuery(this.pool, KEEPALIVE_STATEMENT);
256+
257+
return r;
258+
}
259+
255260
async getConnectionSchema(): Promise<service_types.DatabaseSchema[]> {
256261
// https://github.com/Borvik/vscode-postgres/blob/88ec5ed061a0c9bced6c5d4ec122d0759c3f3247/src/language/server.ts
257262
const results = await lib_postgres.retriedQuery(
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js';
2+
import { checkpointUserId, createWriteCheckpoint } from '@powersync/service-core';
3+
import { describe, test } from 'vitest';
4+
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';
5+
import { WalStreamTestContext } from './wal_stream_utils.js';
6+
7+
import timers from 'node:timers/promises';
8+
9+
const BASIC_SYNC_RULES = `bucket_definitions:
10+
global:
11+
data:
12+
- SELECT id, description, other FROM "test_data"`;
13+
14+
describe('checkpoint tests', () => {
15+
test('write checkpoints', { timeout: 30_000 }, async () => {
16+
const factory = INITIALIZED_MONGO_STORAGE_FACTORY;
17+
await using context = await WalStreamTestContext.open(factory);
18+
19+
await context.updateSyncRules(BASIC_SYNC_RULES);
20+
const { pool } = context;
21+
const api = new PostgresRouteAPIAdapter(pool);
22+
23+
await pool.query(`CREATE TABLE test_data(id text primary key, description text, other text)`);
24+
25+
await context.replicateSnapshot();
26+
27+
context.startStreaming();
28+
29+
const controller = new AbortController();
30+
try {
31+
const stream = context.factory.watchWriteCheckpoint(
32+
checkpointUserId('test_user', 'test_client'),
33+
controller.signal
34+
);
35+
36+
let lastWriteCheckpoint: bigint | null = null;
37+
38+
(async () => {
39+
try {
40+
for await (const cp of stream) {
41+
lastWriteCheckpoint = cp.writeCheckpoint;
42+
}
43+
} catch (e) {
44+
if (e.name != 'AbortError') {
45+
throw e;
46+
}
47+
}
48+
})();
49+
50+
for (let i = 0; i < 10; i++) {
51+
const cp = await createWriteCheckpoint({
52+
userId: 'test_user',
53+
clientId: 'test_client',
54+
api,
55+
storage: context.factory
56+
});
57+
58+
const start = Date.now();
59+
while (lastWriteCheckpoint == null || lastWriteCheckpoint < BigInt(cp.writeCheckpoint)) {
60+
if (Date.now() - start > 2_000) {
61+
throw new Error(`Timeout while waiting for checkpoint`);
62+
}
63+
await timers.setTimeout(0, undefined, { signal: controller.signal });
64+
}
65+
}
66+
} finally {
67+
controller.abort();
68+
}
69+
});
70+
});

modules/module-postgres/test/src/util.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ export async function getClientCheckpoint(
6868
const start = Date.now();
6969

7070
const api = new PostgresRouteAPIAdapter(db);
71-
const lsn = await api.getReplicationHead();
71+
const lsn = await api.createReplicationHead(async (lsn) => lsn);
7272

7373
// This old API needs a persisted checkpoint id.
7474
// Since we don't use LSNs anymore, the only way to get that is to wait.

packages/service-core/src/api/RouteAPI.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ export interface RouteAPI {
5454
*/
5555
getReplicationHead(): Promise<string>;
5656

57+
/**
58+
* Get the current LSN or equivalent replication HEAD position identifier.
59+
*
60+
* The position is provided to the callback. After the callback returns,
61+
* the replication head or a greater one will be streamed on the replication stream.
62+
*/
63+
createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T>;
64+
5765
/**
5866
* @returns The schema for tables inside the connected database. This is typically
5967
* used to validate sync rules.
@@ -76,3 +84,5 @@ export interface RouteAPI {
7684
*/
7785
getParseSyncRulesOptions(): ParseSyncRulesOptions;
7886
}
87+
88+
export type ReplicationHeadCallback<T> = (head: string) => Promise<T>;

packages/service-core/src/routes/endpoints/checkpointing.ts

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export const writeCheckpoint = routeDefinition({
2525
// Since we don't use LSNs anymore, the only way to get that is to wait.
2626
const start = Date.now();
2727

28-
const head = await apiHandler.getReplicationHead();
28+
const head = await apiHandler.createReplicationHead(async (head) => head);
2929

3030
const timeout = 50_000;
3131

@@ -56,25 +56,14 @@ export const writeCheckpoint2 = routeDefinition({
5656

5757
const apiHandler = service_context.routerEngine!.getAPI();
5858

59-
const client_id = payload.params.client_id;
60-
const full_user_id = util.checkpointUserId(user_id, client_id);
61-
62-
const currentCheckpoint = await apiHandler.getReplicationHead();
63-
const {
64-
storageEngine: { activeBucketStorage }
65-
} = service_context;
66-
67-
const activeSyncRules = await activeBucketStorage.getActiveSyncRulesContent();
68-
if (!activeSyncRules) {
69-
throw new framework.errors.ValidationError(`Cannot create Write Checkpoint since no sync rules are active.`);
70-
}
71-
72-
using syncBucketStorage = activeBucketStorage.getInstance(activeSyncRules);
73-
const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({
74-
user_id: full_user_id,
75-
heads: { '1': currentCheckpoint }
59+
const { replicationHead, writeCheckpoint } = await util.createWriteCheckpoint({
60+
userId: user_id,
61+
clientId: payload.params.client_id,
62+
api: apiHandler,
63+
storage: service_context.storageEngine.activeBucketStorage
7664
});
77-
logger.info(`Write checkpoint 2: ${JSON.stringify({ currentCheckpoint, id: String(full_user_id) })}`);
65+
66+
logger.info(`Write checkpoint for ${user_id}/${payload.params.client_id}: ${writeCheckpoint} | ${replicationHead}`);
7867

7968
return {
8069
write_checkpoint: String(writeCheckpoint)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework';
2+
import { RouteAPI } from '../api/RouteAPI.js';
3+
import { BucketStorageFactory } from '../storage/BucketStorage.js';
4+
5+
export interface CreateWriteCheckpointOptions {
6+
userId: string | undefined;
7+
clientId: string | undefined;
8+
api: RouteAPI;
9+
storage: BucketStorageFactory;
10+
}
11+
export async function createWriteCheckpoint(options: CreateWriteCheckpointOptions) {
12+
const full_user_id = checkpointUserId(options.userId, options.clientId);
13+
14+
const activeSyncRules = await options.storage.getActiveSyncRulesContent();
15+
if (!activeSyncRules) {
16+
throw new ServiceError(ErrorCode.PSYNC_S2302, `Cannot create Write Checkpoint since no sync rules are active.`);
17+
}
18+
19+
using syncBucketStorage = options.storage.getInstance(activeSyncRules);
20+
21+
const { writeCheckpoint, currentCheckpoint } = await options.api.createReplicationHead(async (currentCheckpoint) => {
22+
const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({
23+
user_id: full_user_id,
24+
heads: { '1': currentCheckpoint }
25+
});
26+
return { writeCheckpoint, currentCheckpoint };
27+
});
28+
29+
return {
30+
writeCheckpoint: String(writeCheckpoint),
31+
replicationHead: currentCheckpoint
32+
};
33+
}
34+
35+
export function checkpointUserId(user_id: string | undefined, client_id: string | undefined) {
36+
if (user_id == null) {
37+
throw new Error('user_id is required');
38+
}
39+
if (client_id == null) {
40+
return user_id;
41+
}
42+
return `${user_id}/${client_id}`;
43+
}

packages/service-core/src/util/util-index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export * from './Mutex.js';
55
export * from './protocol-types.js';
66
export * from './secs.js';
77
export * from './utils.js';
8+
export * from './checkpointing.js';
89

910
export * from './config.js';
1011
export * from './config/compound-config-collector.js';

0 commit comments

Comments
 (0)