Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/beige-clouds-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

[MongoDB] Fix replication batching
8 changes: 2 additions & 6 deletions modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import * as sync_rules from '@powersync/service-sync-rules';
import * as service_types from '@powersync/service-types';

import { MongoManager } from '../replication/MongoManager.js';
import { constructAfterRecord, createCheckpoint } from '../replication/MongoRelation.js';
import { constructAfterRecord, createCheckpoint, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js';
import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js';
import * as types from '../types/types.js';
import { escapeRegExp } from '../utils.js';
Expand Down Expand Up @@ -206,10 +206,6 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
return undefined;
}

async getReplicationHead(): Promise<string> {
return createCheckpoint(this.client, this.db);
}

async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
const session = this.client.startSession();
try {
Expand All @@ -224,7 +220,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
// Trigger a change on the changestream.
await this.db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate(
{
_id: 'checkpoint' as any
_id: STANDALONE_CHECKPOINT_ID as any
},
{
$inc: { i: 1 }
Expand Down
60 changes: 49 additions & 11 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ import { MongoLSN } from '../common/MongoLSN.js';
import { PostImagesOption } from '../types/types.js';
import { escapeRegExp } from '../utils.js';
import { MongoManager } from './MongoManager.js';
import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js';
import {
constructAfterRecord,
createCheckpoint,
getCacheIdentifier,
getMongoRelation,
STANDALONE_CHECKPOINT_ID
} from './MongoRelation.js';
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';

export interface ChangeStreamOptions {
Expand Down Expand Up @@ -69,6 +75,8 @@ export class ChangeStream {

private relation_cache = new Map<string | number, storage.SourceTable>();

private checkpointStreamId = new mongo.ObjectId();

constructor(options: ChangeStreamOptions) {
this.storage = options.storage;
this.metrics = options.metrics;
Expand Down Expand Up @@ -247,6 +255,11 @@ export class ChangeStream {
await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, {
changeStreamPreAndPostImages: { enabled: true }
});
} else {
// Clear the collection on startup, to keep it clean
// We never query this collection directly, and don't want to keep the data around.
// We only use this to get data into the oplog/changestream.
await this.defaultDb.collection(CHECKPOINTS_COLLECTION).deleteMany({});
}
}

Expand Down Expand Up @@ -434,7 +447,7 @@ export class ChangeStream {
await batch.truncate([result.table]);

await this.snapshotTable(batch, result.table);
const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb);
const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID);

const [table] = await batch.markSnapshotDone([result.table], no_checkpoint_before_lsn);
return table;
Expand Down Expand Up @@ -601,7 +614,11 @@ export class ChangeStream {
// Always start with a checkpoint.
// This helps us to clear errors when restarting, even if there is
// no data to replicate.
let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb);
let waitForCheckpointLsn: string | null = await createCheckpoint(
this.client,
this.defaultDb,
this.checkpointStreamId
);

let splitDocument: mongo.ChangeStreamDocument | null = null;

Expand Down Expand Up @@ -700,13 +717,9 @@ export class ChangeStream {
}
}

if (
(changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
changeDocument.operationType == 'replace' ||
changeDocument.operationType == 'drop') &&
changeDocument.ns.coll == CHECKPOINTS_COLLECTION
) {
const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined;

if (ns?.coll == CHECKPOINTS_COLLECTION) {
/**
* Dropping the database does not provide an `invalidate` event.
* We typically would receive `drop` events for the collection which we
Expand All @@ -727,6 +740,31 @@ export class ChangeStream {
);
}

if (
!(
changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
changeDocument.operationType == 'replace'
)
) {
continue;
}

// We handle two types of checkpoint events:
// 1. "Standalone" checkpoints, typically write checkpoints. We want to process these
// immediately, regardless of where they were created.
// 2. "Batch" checkpoints for the current stream. This is used as a form of dynamic rate
// limiting of commits, so we specifically want to exclude checkpoints from other streams.
//
// It may be useful to also throttle commits due to standalone checkpoints in the future.
// However, these typically have a much lower rate than batch checkpoints, so we don't do that for now.

const checkpointId = changeDocument._id as string | mongo.ObjectId;
if (
!(checkpointId == STANDALONE_CHECKPOINT_ID || this.checkpointStreamId.equals(this.checkpointStreamId))
) {
continue;
}
const { comparable: lsn } = new MongoLSN({
timestamp: changeDocument.clusterTime!,
resume_token: changeDocument._id
Expand All @@ -743,7 +781,7 @@ export class ChangeStream {
changeDocument.operationType == 'delete'
) {
if (waitForCheckpointLsn == null) {
waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb);
waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId);
}
const rel = getMongoRelation(changeDocument.ns);
const table = await this.getRelation(batch, rel, {
Expand Down
22 changes: 17 additions & 5 deletions modules/module-mongodb/src/replication/MongoRelation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,27 @@ function filterJsonData(data: any, depth = 0): any {
}
}

export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): Promise<string> {
/**
* Id for checkpoints not associated with any specific replication stream.
*
* Use this for write checkpoints, or any other case where we want to process
* the checkpoint immeidately, and not wait for batching.
*/
export const STANDALONE_CHECKPOINT_ID = '_standalone_checkpoint';

export async function createCheckpoint(
client: mongo.MongoClient,
db: mongo.Db,
id: mongo.ObjectId | string
): Promise<string> {
const session = client.startSession();
try {
// Note: If multiple PowerSync instances are replicating the same source database,
// they'll modify the same checkpoint document. This is fine - it could create
// more replication load than required, but won't break anything.
// We use an unique id per process, and clear documents on startup.
// This is so that we can filter events for our own process only, and ignore
// events from other processes.
await db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate(
{
_id: 'checkpoint' as any
_id: id as any
},
{
$inc: { i: 1 }
Expand Down
4 changes: 2 additions & 2 deletions modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests';

import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js';
import { MongoManager } from '@module/replication/MongoManager.js';
import { createCheckpoint } from '@module/replication/MongoRelation.js';
import { createCheckpoint, STANDALONE_CHECKPOINT_ID } from '@module/replication/MongoRelation.js';
import { NormalizedMongoConnectionConfig } from '@module/types/types.js';

import { TEST_CONNECTION_OPTIONS, clearTestDb } from './util.js';
Expand Down Expand Up @@ -160,7 +160,7 @@ export async function getClientCheckpoint(
options?: { timeout?: number }
): Promise<InternalOpId> {
const start = Date.now();
const lsn = await createCheckpoint(client, db);
const lsn = await createCheckpoint(client, db, STANDALONE_CHECKPOINT_ID);
// This old API needs a persisted checkpoint id.
// Since we don't use LSNs anymore, the only way to get that is to wait.

Expand Down
5 changes: 0 additions & 5 deletions packages/service-core/src/api/RouteAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ export interface RouteAPI {
*/
getReplicationLag(options: ReplicationLagOptions): Promise<number | undefined>;

/**
* Get the current LSN or equivalent replication HEAD position identifier
*/
getReplicationHead(): Promise<string>;

/**
* Get the current LSN or equivalent replication HEAD position identifier.
*
Expand Down