Skip to content

Commit 9292db4

Browse files
committed
Refactor write checkpoint creation.
1 parent be0ea66 commit 9292db4

File tree

5 files changed

+81
-14
lines changed

5 files changed

+81
-14
lines changed

modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as lib_mongo from '@powersync/lib-service-mongodb';
22
import { mongo } from '@powersync/lib-service-mongodb';
3-
import { api, ParseSyncRulesOptions, SourceTable } from '@powersync/service-core';
3+
import { api, ParseSyncRulesOptions, ReplicationHeadCallback, SourceTable } from '@powersync/service-core';
44
import * as sync_rules from '@powersync/service-sync-rules';
55
import * as service_types from '@powersync/service-types';
66

@@ -9,6 +9,8 @@ import { constructAfterRecord, createCheckpoint } from '../replication/MongoRela
99
import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js';
1010
import * as types from '../types/types.js';
1111
import { escapeRegExp } from '../utils.js';
12+
import { ServiceAssertionError } from '@powersync/lib-services-framework';
13+
import { MongoLSN } from '../common/MongoLSN.js';
1214

1315
export class MongoRouteAPIAdapter implements api.RouteAPI {
1416
protected client: mongo.MongoClient;
@@ -208,6 +210,44 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
208210
return createCheckpoint(this.client, this.db);
209211
}
210212

213+
async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
214+
const session = this.client.startSession();
215+
try {
216+
await this.db.command({ hello: 1 }, { session });
217+
const head = session.clusterTime?.clusterTime;
218+
if (head == null) {
219+
throw new ServiceAssertionError(`clusterTime not available for write checkpoint`);
220+
}
221+
222+
const r = await callback(new MongoLSN({ timestamp: head }).comparable);
223+
224+
// Trigger a change on the changestream.
225+
await this.db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate(
226+
{
227+
_id: 'checkpoint' as any
228+
},
229+
{
230+
$inc: { i: 1 }
231+
},
232+
{
233+
upsert: true,
234+
returnDocument: 'after',
235+
session
236+
}
237+
);
238+
const time = session.operationTime!;
239+
if (time == null) {
240+
throw new ServiceAssertionError(`operationTime not available for write checkpoint`);
241+
} else if (time.lt(head)) {
242+
throw new ServiceAssertionError(`operationTime must be > clusterTime`);
243+
}
244+
245+
return r;
246+
} finally {
247+
await session.endSession();
248+
}
249+
}
250+
211251
async getConnectionSchema(): Promise<service_types.DatabaseSchema[]> {
212252
const sampleSize = 50;
213253

modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { api, ParseSyncRulesOptions, storage } from '@powersync/service-core';
1+
import { api, ParseSyncRulesOptions, ReplicationHeadCallback, storage } from '@powersync/service-core';
22

33
import * as sync_rules from '@powersync/service-sync-rules';
44
import * as service_types from '@powersync/service-types';
@@ -275,6 +275,15 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
275275
return result.comparable;
276276
}
277277

278+
async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
279+
const head = await this.getReplicationHead();
280+
const r = await callback(head);
281+
282+
// TODO: make sure another message is replicated
283+
284+
return r;
285+
}
286+
278287
async getConnectionSchema(): Promise<service_types.DatabaseSchema[]> {
279288
const [results] = await this.retriedQuery({
280289
query: `

modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as lib_postgres from '@powersync/lib-service-postgres';
22
import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';
3-
import { api, ParseSyncRulesOptions } from '@powersync/service-core';
3+
import { api, ParseSyncRulesOptions, ReplicationHeadCallback } from '@powersync/service-core';
44
import * as pgwire from '@powersync/service-jpgwire';
55
import * as sync_rules from '@powersync/service-sync-rules';
66
import * as service_types from '@powersync/service-types';
@@ -241,17 +241,22 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`,
241241
// However, on Aurora (Postgres compatible), it can return an entirely different LSN,
242242
// causing the write checkpoints to never be replicated back to the client.
243243
// For those, we need to use pg_current_wal_lsn() instead.
244-
const { results } = await lib_postgres.retriedQuery(
245-
this.pool,
246-
{ statement: `SELECT pg_current_wal_lsn() as lsn` },
247-
KEEPALIVE_STATEMENT
248-
);
244+
const { results } = await lib_postgres.retriedQuery(this.pool, `SELECT pg_current_wal_lsn() as lsn`);
249245

250-
// Specifically use the lsn from the first statement, not the second one.
251246
const lsn = results[0].rows[0][0];
252247
return String(lsn);
253248
}
254249

250+
async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
251+
const currentLsn = await this.getReplicationHead();
252+
253+
const r = await callback(currentLsn);
254+
255+
await lib_postgres.retriedQuery(this.pool, KEEPALIVE_STATEMENT);
256+
257+
return r;
258+
}
259+
255260
async getConnectionSchema(): Promise<service_types.DatabaseSchema[]> {
256261
// https://github.com/Borvik/vscode-postgres/blob/88ec5ed061a0c9bced6c5d4ec122d0759c3f3247/src/language/server.ts
257262
const results = await lib_postgres.retriedQuery(

packages/service-core/src/api/RouteAPI.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ export interface RouteAPI {
5454
*/
5555
getReplicationHead(): Promise<string>;
5656

57+
/**
58+
* Get the current LSN or equivalent replication HEAD position identifier.
59+
*
60+
* The position is provided to the callback. After the callback returns,
61+
* the replication head or a greater one will be streamed on the replication stream.
62+
*/
63+
createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T>;
64+
5765
/**
5866
* @returns The schema for tables inside the connected database. This is typically
5967
* used to validate sync rules.
@@ -76,3 +84,5 @@ export interface RouteAPI {
7684
*/
7785
getParseSyncRulesOptions(): ParseSyncRulesOptions;
7886
}
87+
88+
export type ReplicationHeadCallback<T> = (head: string) => Promise<T>;

packages/service-core/src/util/checkpointing.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,21 @@ export interface CreateWriteCheckpointOptions {
1111
export async function createWriteCheckpoint(options: CreateWriteCheckpointOptions) {
1212
const full_user_id = checkpointUserId(options.userId, options.clientId);
1313

14-
const currentCheckpoint = await options.api.getReplicationHead();
15-
1614
const activeSyncRules = await options.storage.getActiveSyncRulesContent();
1715
if (!activeSyncRules) {
1816
throw new ServiceError(ErrorCode.PSYNC_S2302, `Cannot create Write Checkpoint since no sync rules are active.`);
1917
}
2018

2119
using syncBucketStorage = options.storage.getInstance(activeSyncRules);
22-
const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({
23-
user_id: full_user_id,
24-
heads: { '1': currentCheckpoint }
20+
21+
const { writeCheckpoint, currentCheckpoint } = await options.api.createReplicationHead(async (currentCheckpoint) => {
22+
const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({
23+
user_id: full_user_id,
24+
heads: { '1': currentCheckpoint }
25+
});
26+
return { writeCheckpoint, currentCheckpoint };
2527
});
28+
2629
return {
2730
writeCheckpoint: String(writeCheckpoint),
2831
replicationHead: currentCheckpoint

0 commit comments

Comments
 (0)