Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/hot-pets-itch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-module-mongodb': patch
---

Improve intial replication performance for MongoDB by avoiding sessions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ export class MongoBucketBatch
}
}

return resumeBatch;
return resumeBatch?.hasData() ? resumeBatch : null;
}

private saveOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ export class OperationBatch {
return this.batch.length >= MAX_BATCH_COUNT || this.currentSize > MAX_RECORD_BATCH_SIZE;
}

hasData() {
return this.length > 0;
}

/**
*
* @param sizes Map of source key to estimated size of the current_data document, or undefined if current_data is not persisted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ export class PersistedBatch {
}

async flush(db: PowerSyncMongo, session: mongo.ClientSession) {
const startAt = performance.now();
if (this.bucketData.length > 0) {
await db.bucket_data.bulkWrite(this.bucketData, {
session,
Expand All @@ -267,10 +268,11 @@ export class PersistedBatch {
});
}

const duration = performance.now() - startAt;
logger.info(
`powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${
this.currentData.length
} updates, ${Math.round(this.currentSize / 1024)}kb. Last op_id: ${this.debugLastOpId}`
} updates, ${Math.round(this.currentSize / 1024)}kb in ${duration.toFixed(0)}ms. Last op_id: ${this.debugLastOpId}`
);

this.bucketData = [];
Expand Down
125 changes: 64 additions & 61 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ import {
ReplicationAssertionError,
ServiceError
} from '@powersync/lib-services-framework';
import { Metrics, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
import {
BSON_DESERIALIZE_DATA_OPTIONS,
Metrics,
SaveOperationTag,
SourceEntityDescriptor,
SourceTable,
storage
} from '@powersync/service-core';
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import { MongoLSN } from '../common/MongoLSN.js';
import { PostImagesOption } from '../types/types.js';
Expand Down Expand Up @@ -193,39 +200,31 @@ export class ChangeStream {
// Not known where this would happen apart from the above cases
throw new ReplicationAssertionError('MongoDB lastWrite timestamp not found.');
}
// We previously used {snapshot: true} for the snapshot session.
// While it gives nice consistency guarantees, it fails when the
// snapshot takes longer than 5 minutes, due to minSnapshotHistoryWindowInSeconds
// expiring the snapshot.
const session = await this.client.startSession();
try {
await this.storage.startBatch(
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
async (batch) => {
// Start by resolving all tables.
// This checks postImage configuration, and that should fail as
// earlier as possible.
let allSourceTables: SourceTable[] = [];
for (let tablePattern of sourceTables) {
const tables = await this.resolveQualifiedTableNames(batch, tablePattern);
allSourceTables.push(...tables);
}

for (let table of allSourceTables) {
await this.snapshotTable(batch, table, session);
await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable);
await this.storage.startBatch(
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
async (batch) => {
// Start by resolving all tables.
// This checks postImage configuration, and that should fail as
// earlier as possible.
let allSourceTables: SourceTable[] = [];
for (let tablePattern of sourceTables) {
const tables = await this.resolveQualifiedTableNames(batch, tablePattern);
allSourceTables.push(...tables);
}

await touch();
}
for (let table of allSourceTables) {
await this.snapshotTable(batch, table);
await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable);

const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime });
logger.info(`${this.logPrefix} Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
await batch.commit(lsn);
await touch();
}
);
} finally {
session.endSession();
}

const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime });
logger.info(`${this.logPrefix} Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
await batch.commit(lsn);
}
);
}

private async setupCheckpointsCollection() {
Expand Down Expand Up @@ -283,48 +282,52 @@ export class ChangeStream {
}
}

private async snapshotTable(
batch: storage.BucketStorageBatch,
table: storage.SourceTable,
session?: mongo.ClientSession
) {
private async snapshotTable(batch: storage.BucketStorageBatch, table: storage.SourceTable) {
logger.info(`${this.logPrefix} Replicating ${table.qualifiedName}`);
const estimatedCount = await this.estimatedCount(table);
let at = 0;
let lastLogIndex = 0;

const db = this.client.db(table.schema);
const collection = db.collection(table.table);
const query = collection.find({}, { session, readConcern: { level: 'majority' } });

const cursor = query.stream();

for await (let document of cursor) {
if (this.abort_signal.aborted) {
throw new ReplicationAbortedError(`Aborted initial replication`);
}

const record = constructAfterRecord(document);
const cursor = collection.find({}, { batchSize: 6_000, readConcern: 'majority' });

let lastBatch = performance.now();
// hasNext() is the call that triggers fetching of the next batch,
// then we read it with readBufferedDocuments(). This gives us semi-explicit
// control over the fetching of each batch, and avoids a separate promise per document
let hasNextPromise = cursor.hasNext();
while (await hasNextPromise) {
const docBatch = cursor.readBufferedDocuments();
// Pre-fetch next batch, so that we can read and write concurrently
hasNextPromise = cursor.hasNext();
for (let document of docBatch) {
if (this.abort_signal.aborted) {
throw new ReplicationAbortedError(`Aborted initial replication`);
}

// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: document._id
});
const record = constructAfterRecord(document);

at += 1;
if (at - lastLogIndex >= 5000) {
logger.info(`${this.logPrefix} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
lastLogIndex = at;
// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: document._id
});
}
Metrics.getInstance().rows_replicated_total.add(1);

at += docBatch.length;
Metrics.getInstance().rows_replicated_total.add(docBatch.length);
const duration = performance.now() - lastBatch;
lastBatch = performance.now();
logger.info(
`${this.logPrefix} Replicating ${table.qualifiedName} ${at}/${estimatedCount} in ${duration.toFixed(0)}ms`
);
await touch();
}
// In case the loop was interrupted, make sure we await the last promise.
await hasNextPromise;

await batch.flush();
logger.info(`${this.logPrefix} Replicated ${at} documents for ${table.qualifiedName}`);
Expand Down