diff --git a/.changeset/shy-news-smell.md b/.changeset/shy-news-smell.md new file mode 100644 index 000000000..e0fc8ffe2 --- /dev/null +++ b/.changeset/shy-news-smell.md @@ -0,0 +1,8 @@ +--- +'@powersync/service-module-postgres': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-module-mysql': patch +--- + +Fix write checkpoint race condition diff --git a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts index 694fb021d..40e561343 100644 --- a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts +++ b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts @@ -1,6 +1,6 @@ import * as lib_mongo from '@powersync/lib-service-mongodb'; import { mongo } from '@powersync/lib-service-mongodb'; -import { api, ParseSyncRulesOptions, SourceTable } from '@powersync/service-core'; +import { api, ParseSyncRulesOptions, ReplicationHeadCallback, SourceTable } from '@powersync/service-core'; import * as sync_rules from '@powersync/service-sync-rules'; import * as service_types from '@powersync/service-types'; @@ -9,6 +9,8 @@ import { constructAfterRecord, createCheckpoint } from '../replication/MongoRela import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js'; import * as types from '../types/types.js'; import { escapeRegExp } from '../utils.js'; +import { ServiceAssertionError } from '@powersync/lib-services-framework'; +import { MongoLSN } from '../common/MongoLSN.js'; export class MongoRouteAPIAdapter implements api.RouteAPI { protected client: mongo.MongoClient; @@ -208,6 +210,44 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { return createCheckpoint(this.client, this.db); } + async createReplicationHead(callback: ReplicationHeadCallback): Promise { + const session = this.client.startSession(); + try { + await this.db.command({ hello: 1 }, { session }); + const head = session.clusterTime?.clusterTime; + if (head == null) { + throw new ServiceAssertionError(`clusterTime not available for write checkpoint`); + } + + const r = await callback(new MongoLSN({ timestamp: head }).comparable); + + // Trigger a change on the changestream. + await this.db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate( + { + _id: 'checkpoint' as any + }, + { + $inc: { i: 1 } + }, + { + upsert: true, + returnDocument: 'after', + session + } + ); + const time = session.operationTime!; + if (time == null) { + throw new ServiceAssertionError(`operationTime not available for write checkpoint`); + } else if (time.lt(head)) { + throw new ServiceAssertionError(`operationTime must be > clusterTime`); + } + + return r; + } finally { + await session.endSession(); + } + } + async getConnectionSchema(): Promise { const sampleSize = 50; diff --git a/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts b/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts index faa140adc..ab0a74817 100644 --- a/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts +++ b/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts @@ -1,4 +1,4 @@ -import { api, ParseSyncRulesOptions, storage } from '@powersync/service-core'; +import { api, ParseSyncRulesOptions, ReplicationHeadCallback, storage } from '@powersync/service-core'; import * as sync_rules from '@powersync/service-sync-rules'; import * as service_types from '@powersync/service-types'; @@ -275,6 +275,15 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI { return result.comparable; } + async createReplicationHead(callback: ReplicationHeadCallback): Promise { + const head = await this.getReplicationHead(); + const r = await callback(head); + + // TODO: make sure another message is replicated + + return r; + } + async getConnectionSchema(): Promise { const [results] = await this.retriedQuery({ query: ` diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index 755b3fad7..320f1b2b6 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -1,6 +1,6 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import { ErrorCode, ServiceError } from '@powersync/lib-services-framework'; -import { api, ParseSyncRulesOptions } from '@powersync/service-core'; +import { api, ParseSyncRulesOptions, ReplicationHeadCallback } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import * as sync_rules from '@powersync/service-sync-rules'; import * as service_types from '@powersync/service-types'; @@ -241,17 +241,22 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`, // However, on Aurora (Postgres compatible), it can return an entirely different LSN, // causing the write checkpoints to never be replicated back to the client. // For those, we need to use pg_current_wal_lsn() instead. - const { results } = await lib_postgres.retriedQuery( - this.pool, - { statement: `SELECT pg_current_wal_lsn() as lsn` }, - KEEPALIVE_STATEMENT - ); + const { results } = await lib_postgres.retriedQuery(this.pool, `SELECT pg_current_wal_lsn() as lsn`); - // Specifically use the lsn from the first statement, not the second one. const lsn = results[0].rows[0][0]; return String(lsn); } + async createReplicationHead(callback: ReplicationHeadCallback): Promise { + const currentLsn = await this.getReplicationHead(); + + const r = await callback(currentLsn); + + await lib_postgres.retriedQuery(this.pool, KEEPALIVE_STATEMENT); + + return r; + } + async getConnectionSchema(): Promise { // https://github.com/Borvik/vscode-postgres/blob/88ec5ed061a0c9bced6c5d4ec122d0759c3f3247/src/language/server.ts const results = await lib_postgres.retriedQuery( diff --git a/modules/module-postgres/test/src/checkpoints.test.ts b/modules/module-postgres/test/src/checkpoints.test.ts new file mode 100644 index 000000000..84907cdea --- /dev/null +++ b/modules/module-postgres/test/src/checkpoints.test.ts @@ -0,0 +1,70 @@ +import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js'; +import { checkpointUserId, createWriteCheckpoint } from '@powersync/service-core'; +import { describe, test } from 'vitest'; +import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; +import { WalStreamTestContext } from './wal_stream_utils.js'; + +import timers from 'node:timers/promises'; + +const BASIC_SYNC_RULES = `bucket_definitions: + global: + data: + - SELECT id, description, other FROM "test_data"`; + +describe('checkpoint tests', () => { + test('write checkpoints', { timeout: 30_000 }, async () => { + const factory = INITIALIZED_MONGO_STORAGE_FACTORY; + await using context = await WalStreamTestContext.open(factory); + + await context.updateSyncRules(BASIC_SYNC_RULES); + const { pool } = context; + const api = new PostgresRouteAPIAdapter(pool); + + await pool.query(`CREATE TABLE test_data(id text primary key, description text, other text)`); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const controller = new AbortController(); + try { + const stream = context.factory.watchWriteCheckpoint( + checkpointUserId('test_user', 'test_client'), + controller.signal + ); + + let lastWriteCheckpoint: bigint | null = null; + + (async () => { + try { + for await (const cp of stream) { + lastWriteCheckpoint = cp.writeCheckpoint; + } + } catch (e) { + if (e.name != 'AbortError') { + throw e; + } + } + })(); + + for (let i = 0; i < 10; i++) { + const cp = await createWriteCheckpoint({ + userId: 'test_user', + clientId: 'test_client', + api, + storage: context.factory + }); + + const start = Date.now(); + while (lastWriteCheckpoint == null || lastWriteCheckpoint < BigInt(cp.writeCheckpoint)) { + if (Date.now() - start > 2_000) { + throw new Error(`Timeout while waiting for checkpoint`); + } + await timers.setTimeout(0, undefined, { signal: controller.signal }); + } + } + } finally { + controller.abort(); + } + }); +}); diff --git a/modules/module-postgres/test/src/util.ts b/modules/module-postgres/test/src/util.ts index dca5521fa..0f3ad3519 100644 --- a/modules/module-postgres/test/src/util.ts +++ b/modules/module-postgres/test/src/util.ts @@ -68,7 +68,7 @@ export async function getClientCheckpoint( const start = Date.now(); const api = new PostgresRouteAPIAdapter(db); - const lsn = await api.getReplicationHead(); + const lsn = await api.createReplicationHead(async (lsn) => lsn); // This old API needs a persisted checkpoint id. // Since we don't use LSNs anymore, the only way to get that is to wait. diff --git a/packages/service-core/src/api/RouteAPI.ts b/packages/service-core/src/api/RouteAPI.ts index c4212aa2b..4cebc47a5 100644 --- a/packages/service-core/src/api/RouteAPI.ts +++ b/packages/service-core/src/api/RouteAPI.ts @@ -54,6 +54,14 @@ export interface RouteAPI { */ getReplicationHead(): Promise; + /** + * Get the current LSN or equivalent replication HEAD position identifier. + * + * The position is provided to the callback. After the callback returns, + * the replication head or a greater one will be streamed on the replication stream. + */ + createReplicationHead(callback: ReplicationHeadCallback): Promise; + /** * @returns The schema for tables inside the connected database. This is typically * used to validate sync rules. @@ -76,3 +84,5 @@ export interface RouteAPI { */ getParseSyncRulesOptions(): ParseSyncRulesOptions; } + +export type ReplicationHeadCallback = (head: string) => Promise; diff --git a/packages/service-core/src/routes/endpoints/checkpointing.ts b/packages/service-core/src/routes/endpoints/checkpointing.ts index 3ce913ad2..516cb7e10 100644 --- a/packages/service-core/src/routes/endpoints/checkpointing.ts +++ b/packages/service-core/src/routes/endpoints/checkpointing.ts @@ -25,7 +25,7 @@ export const writeCheckpoint = routeDefinition({ // Since we don't use LSNs anymore, the only way to get that is to wait. const start = Date.now(); - const head = await apiHandler.getReplicationHead(); + const head = await apiHandler.createReplicationHead(async (head) => head); const timeout = 50_000; @@ -56,25 +56,14 @@ export const writeCheckpoint2 = routeDefinition({ const apiHandler = service_context.routerEngine!.getAPI(); - const client_id = payload.params.client_id; - const full_user_id = util.checkpointUserId(user_id, client_id); - - const currentCheckpoint = await apiHandler.getReplicationHead(); - const { - storageEngine: { activeBucketStorage } - } = service_context; - - const activeSyncRules = await activeBucketStorage.getActiveSyncRulesContent(); - if (!activeSyncRules) { - throw new framework.errors.ValidationError(`Cannot create Write Checkpoint since no sync rules are active.`); - } - - using syncBucketStorage = activeBucketStorage.getInstance(activeSyncRules); - const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({ - user_id: full_user_id, - heads: { '1': currentCheckpoint } + const { replicationHead, writeCheckpoint } = await util.createWriteCheckpoint({ + userId: user_id, + clientId: payload.params.client_id, + api: apiHandler, + storage: service_context.storageEngine.activeBucketStorage }); - logger.info(`Write checkpoint 2: ${JSON.stringify({ currentCheckpoint, id: String(full_user_id) })}`); + + logger.info(`Write checkpoint for ${user_id}/${payload.params.client_id}: ${writeCheckpoint} | ${replicationHead}`); return { write_checkpoint: String(writeCheckpoint) diff --git a/packages/service-core/src/util/checkpointing.ts b/packages/service-core/src/util/checkpointing.ts new file mode 100644 index 000000000..3f4b3f19b --- /dev/null +++ b/packages/service-core/src/util/checkpointing.ts @@ -0,0 +1,43 @@ +import { ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework'; +import { RouteAPI } from '../api/RouteAPI.js'; +import { BucketStorageFactory } from '../storage/BucketStorage.js'; + +export interface CreateWriteCheckpointOptions { + userId: string | undefined; + clientId: string | undefined; + api: RouteAPI; + storage: BucketStorageFactory; +} +export async function createWriteCheckpoint(options: CreateWriteCheckpointOptions) { + const full_user_id = checkpointUserId(options.userId, options.clientId); + + const activeSyncRules = await options.storage.getActiveSyncRulesContent(); + if (!activeSyncRules) { + throw new ServiceError(ErrorCode.PSYNC_S2302, `Cannot create Write Checkpoint since no sync rules are active.`); + } + + using syncBucketStorage = options.storage.getInstance(activeSyncRules); + + const { writeCheckpoint, currentCheckpoint } = await options.api.createReplicationHead(async (currentCheckpoint) => { + const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({ + user_id: full_user_id, + heads: { '1': currentCheckpoint } + }); + return { writeCheckpoint, currentCheckpoint }; + }); + + return { + writeCheckpoint: String(writeCheckpoint), + replicationHead: currentCheckpoint + }; +} + +export function checkpointUserId(user_id: string | undefined, client_id: string | undefined) { + if (user_id == null) { + throw new Error('user_id is required'); + } + if (client_id == null) { + return user_id; + } + return `${user_id}/${client_id}`; +} diff --git a/packages/service-core/src/util/util-index.ts b/packages/service-core/src/util/util-index.ts index 50ad85dc7..8e19e2461 100644 --- a/packages/service-core/src/util/util-index.ts +++ b/packages/service-core/src/util/util-index.ts @@ -5,6 +5,7 @@ export * from './Mutex.js'; export * from './protocol-types.js'; export * from './secs.js'; export * from './utils.js'; +export * from './checkpointing.js'; export * from './config.js'; export * from './config/compound-config-collector.js'; diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index 89be107dd..0768bdb67 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -145,16 +145,6 @@ export function isCompleteRow(storeData: boolean, row: sync_rules.ToastableSqlit return !hasToastedValues(row); } -export function checkpointUserId(user_id: string | undefined, client_id: string | undefined) { - if (user_id == null) { - throw new Error('user_id is required'); - } - if (client_id == null) { - return user_id; - } - return `${user_id}/${client_id}`; -} - /** * Reduce a bucket to the final state as stored on the client. *