Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/hot-pets-itch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-module-mongodb': patch
---

Improve intial replication performance for MongoDB by avoiding sessions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ export class MongoBucketBatch
}
}

return resumeBatch;
return resumeBatch?.hasData() ? resumeBatch : null;
}

private saveOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ export class OperationBatch {
return this.batch.length >= MAX_BATCH_COUNT || this.currentSize > MAX_RECORD_BATCH_SIZE;
}

hasData() {
return this.length > 0;
}

/**
*
* @param sizes Map of source key to estimated size of the current_data document, or undefined if current_data is not persisted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ export class PersistedBatch {
}

async flush(db: PowerSyncMongo, session: mongo.ClientSession) {
const startAt = performance.now();
if (this.bucketData.length > 0) {
await db.bucket_data.bulkWrite(this.bucketData, {
session,
Expand All @@ -267,10 +268,11 @@ export class PersistedBatch {
});
}

const duration = performance.now() - startAt;
logger.info(
`powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${
this.currentData.length
} updates, ${Math.round(this.currentSize / 1024)}kb. Last op_id: ${this.debugLastOpId}`
} updates, ${Math.round(this.currentSize / 1024)}kb in ${duration.toFixed(0)}ms. Last op_id: ${this.debugLastOpId}`
);

this.bucketData = [];
Expand Down
115 changes: 55 additions & 60 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ import {
ReplicationAssertionError,
ServiceError
} from '@powersync/lib-services-framework';
import { Metrics, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
import {
BSON_DESERIALIZE_DATA_OPTIONS,
Metrics,
SaveOperationTag,
SourceEntityDescriptor,
SourceTable,
storage
} from '@powersync/service-core';
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import { MongoLSN } from '../common/MongoLSN.js';
import { PostImagesOption } from '../types/types.js';
Expand Down Expand Up @@ -193,39 +200,31 @@ export class ChangeStream {
// Not known where this would happen apart from the above cases
throw new ReplicationAssertionError('MongoDB lastWrite timestamp not found.');
}
// We previously used {snapshot: true} for the snapshot session.
// While it gives nice consistency guarantees, it fails when the
// snapshot takes longer than 5 minutes, due to minSnapshotHistoryWindowInSeconds
// expiring the snapshot.
const session = await this.client.startSession();
try {
await this.storage.startBatch(
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
async (batch) => {
// Start by resolving all tables.
// This checks postImage configuration, and that should fail as
// earlier as possible.
let allSourceTables: SourceTable[] = [];
for (let tablePattern of sourceTables) {
const tables = await this.resolveQualifiedTableNames(batch, tablePattern);
allSourceTables.push(...tables);
}

for (let table of allSourceTables) {
await this.snapshotTable(batch, table, session);
await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable);
await this.storage.startBatch(
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
async (batch) => {
// Start by resolving all tables.
// This checks postImage configuration, and that should fail as
// earlier as possible.
let allSourceTables: SourceTable[] = [];
for (let tablePattern of sourceTables) {
const tables = await this.resolveQualifiedTableNames(batch, tablePattern);
allSourceTables.push(...tables);
}

await touch();
}
for (let table of allSourceTables) {
await this.snapshotTable(batch, table);
await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable);

const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime });
logger.info(`${this.logPrefix} Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
await batch.commit(lsn);
await touch();
}
);
} finally {
session.endSession();
}

const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime });
logger.info(`${this.logPrefix} Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
await batch.commit(lsn);
}
);
}

private async setupCheckpointsCollection() {
Expand Down Expand Up @@ -283,46 +282,42 @@ export class ChangeStream {
}
}

private async snapshotTable(
batch: storage.BucketStorageBatch,
table: storage.SourceTable,
session?: mongo.ClientSession
) {
private async snapshotTable(batch: storage.BucketStorageBatch, table: storage.SourceTable) {
logger.info(`${this.logPrefix} Replicating ${table.qualifiedName}`);
const estimatedCount = await this.estimatedCount(table);
let at = 0;
let lastLogIndex = 0;

const db = this.client.db(table.schema);
const collection = db.collection(table.table);
const query = collection.find({}, { session, readConcern: { level: 'majority' } });

const cursor = query.stream();

for await (let document of cursor) {
if (this.abort_signal.aborted) {
throw new ReplicationAbortedError(`Aborted initial replication`);
}
const cursor = collection.find({}, { batchSize: 6_000, readConcern: 'majority' });

const record = constructAfterRecord(document);
let lastBatch = performance.now();
while (await cursor.hasNext()) {
const docBatch = cursor.readBufferedDocuments();
for (let document of docBatch) {
if (this.abort_signal.aborted) {
throw new ReplicationAbortedError(`Aborted initial replication`);
}

// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: document._id
});
const record = constructAfterRecord(document);

at += 1;
if (at - lastLogIndex >= 5000) {
logger.info(`${this.logPrefix} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
lastLogIndex = at;
// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: document._id
});
}
Metrics.getInstance().rows_replicated_total.add(1);

at += docBatch.length;
Metrics.getInstance().rows_replicated_total.add(docBatch.length);
const duration = performance.now() - lastBatch;
lastBatch = performance.now();
logger.info(
`${this.logPrefix} Replicating ${table.qualifiedName} ${at}/${estimatedCount} in ${duration.toFixed(0)}ms`
);
await touch();
}

Expand Down