Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/shy-news-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-module-postgres': patch
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-module-mysql': patch
---

Fix write checkpoint race condition
42 changes: 41 additions & 1 deletion modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as lib_mongo from '@powersync/lib-service-mongodb';
import { mongo } from '@powersync/lib-service-mongodb';
import { api, ParseSyncRulesOptions, SourceTable } from '@powersync/service-core';
import { api, ParseSyncRulesOptions, ReplicationHeadCallback, SourceTable } from '@powersync/service-core';
import * as sync_rules from '@powersync/service-sync-rules';
import * as service_types from '@powersync/service-types';

Expand All @@ -9,6 +9,8 @@ import { constructAfterRecord, createCheckpoint } from '../replication/MongoRela
import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js';
import * as types from '../types/types.js';
import { escapeRegExp } from '../utils.js';
import { ServiceAssertionError } from '@powersync/lib-services-framework';
import { MongoLSN } from '../common/MongoLSN.js';

export class MongoRouteAPIAdapter implements api.RouteAPI {
protected client: mongo.MongoClient;
Expand Down Expand Up @@ -208,6 +210,44 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
return createCheckpoint(this.client, this.db);
}

async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
const session = this.client.startSession();
try {
await this.db.command({ hello: 1 }, { session });
const head = session.clusterTime?.clusterTime;
if (head == null) {
throw new ServiceAssertionError(`clusterTime not available for write checkpoint`);
}

const r = await callback(new MongoLSN({ timestamp: head }).comparable);

// Trigger a change on the changestream.
await this.db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate(
{
_id: 'checkpoint' as any
},
{
$inc: { i: 1 }
},
{
upsert: true,
returnDocument: 'after',
session
}
);
const time = session.operationTime!;
if (time == null) {
throw new ServiceAssertionError(`operationTime not available for write checkpoint`);
} else if (time.lt(head)) {
throw new ServiceAssertionError(`operationTime must be > clusterTime`);
}

return r;
} finally {
await session.endSession();
}
}

async getConnectionSchema(): Promise<service_types.DatabaseSchema[]> {
const sampleSize = 50;

Expand Down
11 changes: 10 additions & 1 deletion modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { api, ParseSyncRulesOptions, storage } from '@powersync/service-core';
import { api, ParseSyncRulesOptions, ReplicationHeadCallback, storage } from '@powersync/service-core';

import * as sync_rules from '@powersync/service-sync-rules';
import * as service_types from '@powersync/service-types';
Expand Down Expand Up @@ -275,6 +275,15 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
return result.comparable;
}

async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
const head = await this.getReplicationHead();
const r = await callback(head);

// TODO: make sure another message is replicated

return r;
}

async getConnectionSchema(): Promise<service_types.DatabaseSchema[]> {
const [results] = await this.retriedQuery({
query: `
Expand Down
19 changes: 12 additions & 7 deletions modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as lib_postgres from '@powersync/lib-service-postgres';
import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';
import { api, ParseSyncRulesOptions } from '@powersync/service-core';
import { api, ParseSyncRulesOptions, ReplicationHeadCallback } from '@powersync/service-core';
import * as pgwire from '@powersync/service-jpgwire';
import * as sync_rules from '@powersync/service-sync-rules';
import * as service_types from '@powersync/service-types';
Expand Down Expand Up @@ -241,17 +241,22 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`,
// However, on Aurora (Postgres compatible), it can return an entirely different LSN,
// causing the write checkpoints to never be replicated back to the client.
// For those, we need to use pg_current_wal_lsn() instead.
const { results } = await lib_postgres.retriedQuery(
this.pool,
{ statement: `SELECT pg_current_wal_lsn() as lsn` },
KEEPALIVE_STATEMENT
);
const { results } = await lib_postgres.retriedQuery(this.pool, `SELECT pg_current_wal_lsn() as lsn`);

// Specifically use the lsn from the first statement, not the second one.
const lsn = results[0].rows[0][0];
return String(lsn);
}

async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
const currentLsn = await this.getReplicationHead();

const r = await callback(currentLsn);

await lib_postgres.retriedQuery(this.pool, KEEPALIVE_STATEMENT);

return r;
}

async getConnectionSchema(): Promise<service_types.DatabaseSchema[]> {
// https://github.com/Borvik/vscode-postgres/blob/88ec5ed061a0c9bced6c5d4ec122d0759c3f3247/src/language/server.ts
const results = await lib_postgres.retriedQuery(
Expand Down
70 changes: 70 additions & 0 deletions modules/module-postgres/test/src/checkpoints.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js';
import { checkpointUserId, createWriteCheckpoint } from '@powersync/service-core';
import { describe, test } from 'vitest';
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';
import { WalStreamTestContext } from './wal_stream_utils.js';

import timers from 'node:timers/promises';

const BASIC_SYNC_RULES = `bucket_definitions:
global:
data:
- SELECT id, description, other FROM "test_data"`;

describe('checkpoint tests', () => {
test('write checkpoints', { timeout: 30_000 }, async () => {
const factory = INITIALIZED_MONGO_STORAGE_FACTORY;
await using context = await WalStreamTestContext.open(factory);

await context.updateSyncRules(BASIC_SYNC_RULES);
const { pool } = context;
const api = new PostgresRouteAPIAdapter(pool);

await pool.query(`CREATE TABLE test_data(id text primary key, description text, other text)`);

await context.replicateSnapshot();

context.startStreaming();

const controller = new AbortController();
try {
const stream = context.factory.watchWriteCheckpoint(
checkpointUserId('test_user', 'test_client'),
controller.signal
);

let lastWriteCheckpoint: bigint | null = null;

(async () => {
try {
for await (const cp of stream) {
lastWriteCheckpoint = cp.writeCheckpoint;
}
} catch (e) {
if (e.name != 'AbortError') {
throw e;
}
}
})();

for (let i = 0; i < 10; i++) {
const cp = await createWriteCheckpoint({
userId: 'test_user',
clientId: 'test_client',
api,
storage: context.factory
});

const start = Date.now();
while (lastWriteCheckpoint == null || lastWriteCheckpoint < BigInt(cp.writeCheckpoint)) {
if (Date.now() - start > 2_000) {
throw new Error(`Timeout while waiting for checkpoint`);
}
await timers.setTimeout(0, undefined, { signal: controller.signal });
}
}
} finally {
controller.abort();
}
});
});
2 changes: 1 addition & 1 deletion modules/module-postgres/test/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export async function getClientCheckpoint(
const start = Date.now();

const api = new PostgresRouteAPIAdapter(db);
const lsn = await api.getReplicationHead();
const lsn = await api.createReplicationHead(async (lsn) => lsn);

// This old API needs a persisted checkpoint id.
// Since we don't use LSNs anymore, the only way to get that is to wait.
Expand Down
10 changes: 10 additions & 0 deletions packages/service-core/src/api/RouteAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ export interface RouteAPI {
*/
getReplicationHead(): Promise<string>;

/**
* Get the current LSN or equivalent replication HEAD position identifier.
*
* The position is provided to the callback. After the callback returns,
* the replication head or a greater one will be streamed on the replication stream.
*/
createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T>;

/**
* @returns The schema for tables inside the connected database. This is typically
* used to validate sync rules.
Expand All @@ -76,3 +84,5 @@ export interface RouteAPI {
*/
getParseSyncRulesOptions(): ParseSyncRulesOptions;
}

export type ReplicationHeadCallback<T> = (head: string) => Promise<T>;
27 changes: 8 additions & 19 deletions packages/service-core/src/routes/endpoints/checkpointing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const writeCheckpoint = routeDefinition({
// Since we don't use LSNs anymore, the only way to get that is to wait.
const start = Date.now();

const head = await apiHandler.getReplicationHead();
const head = await apiHandler.createReplicationHead(async (head) => head);

const timeout = 50_000;

Expand Down Expand Up @@ -56,25 +56,14 @@ export const writeCheckpoint2 = routeDefinition({

const apiHandler = service_context.routerEngine!.getAPI();

const client_id = payload.params.client_id;
const full_user_id = util.checkpointUserId(user_id, client_id);

const currentCheckpoint = await apiHandler.getReplicationHead();
const {
storageEngine: { activeBucketStorage }
} = service_context;

const activeSyncRules = await activeBucketStorage.getActiveSyncRulesContent();
if (!activeSyncRules) {
throw new framework.errors.ValidationError(`Cannot create Write Checkpoint since no sync rules are active.`);
}

using syncBucketStorage = activeBucketStorage.getInstance(activeSyncRules);
const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({
user_id: full_user_id,
heads: { '1': currentCheckpoint }
const { replicationHead, writeCheckpoint } = await util.createWriteCheckpoint({
userId: user_id,
clientId: payload.params.client_id,
api: apiHandler,
storage: service_context.storageEngine.activeBucketStorage
});
logger.info(`Write checkpoint 2: ${JSON.stringify({ currentCheckpoint, id: String(full_user_id) })}`);

logger.info(`Write checkpoint for ${user_id}/${payload.params.client_id}: ${writeCheckpoint} | ${replicationHead}`);

return {
write_checkpoint: String(writeCheckpoint)
Expand Down
43 changes: 43 additions & 0 deletions packages/service-core/src/util/checkpointing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework';
import { RouteAPI } from '../api/RouteAPI.js';
import { BucketStorageFactory } from '../storage/BucketStorage.js';

export interface CreateWriteCheckpointOptions {
userId: string | undefined;
clientId: string | undefined;
api: RouteAPI;
storage: BucketStorageFactory;
}
export async function createWriteCheckpoint(options: CreateWriteCheckpointOptions) {
const full_user_id = checkpointUserId(options.userId, options.clientId);

const activeSyncRules = await options.storage.getActiveSyncRulesContent();
if (!activeSyncRules) {
throw new ServiceError(ErrorCode.PSYNC_S2302, `Cannot create Write Checkpoint since no sync rules are active.`);
}

using syncBucketStorage = options.storage.getInstance(activeSyncRules);

const { writeCheckpoint, currentCheckpoint } = await options.api.createReplicationHead(async (currentCheckpoint) => {
const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({
user_id: full_user_id,
heads: { '1': currentCheckpoint }
});
return { writeCheckpoint, currentCheckpoint };
});

return {
writeCheckpoint: String(writeCheckpoint),
replicationHead: currentCheckpoint
};
}

export function checkpointUserId(user_id: string | undefined, client_id: string | undefined) {
if (user_id == null) {
throw new Error('user_id is required');
}
if (client_id == null) {
return user_id;
}
return `${user_id}/${client_id}`;
}
1 change: 1 addition & 0 deletions packages/service-core/src/util/util-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from './Mutex.js';
export * from './protocol-types.js';
export * from './secs.js';
export * from './utils.js';
export * from './checkpointing.js';

export * from './config.js';
export * from './config/compound-config-collector.js';
Expand Down
10 changes: 0 additions & 10 deletions packages/service-core/src/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,6 @@ export function isCompleteRow(storeData: boolean, row: sync_rules.ToastableSqlit
return !hasToastedValues(row);
}

export function checkpointUserId(user_id: string | undefined, client_id: string | undefined) {
if (user_id == null) {
throw new Error('user_id is required');
}
if (client_id == null) {
return user_id;
}
return `${user_id}/${client_id}`;
}

/**
* Reduce a bucket to the final state as stored on the client.
*
Expand Down
Loading