diff --git a/.changeset/forty-doors-deliver.md b/.changeset/forty-doors-deliver.md new file mode 100644 index 000000000..bcd4c2107 --- /dev/null +++ b/.changeset/forty-doors-deliver.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-mongodb': minor +--- + +Added progress logs to initial snapshot diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index cb2e1ffa6..555348a76 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -164,7 +164,7 @@ export class ChangeStream { async estimatedCount(table: storage.SourceTable): Promise { const db = this.client.db(table.schema); - const count = db.collection(table.table).estimatedDocumentCount(); + const count = await db.collection(table.table).estimatedDocumentCount(); return `~${count}`; } @@ -298,6 +298,7 @@ export class ChangeStream { logger.info(`Replicating ${table.qualifiedName}`); const estimatedCount = await this.estimatedCount(table); let at = 0; + let lastLogIndex = 0; const db = this.client.db(table.schema); const collection = db.collection(table.table); @@ -310,8 +311,6 @@ export class ChangeStream { throw new ReplicationAbortedError(`Aborted initial replication`); } - at += 1; - const record = constructAfterRecord(document); // This auto-flushes when the batch reaches its size limit @@ -325,6 +324,10 @@ export class ChangeStream { }); at += 1; + if (at - lastLogIndex >= 5000) { + logger.info(`[${this.group_id}] Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); + lastLogIndex = at; + } Metrics.getInstance().rows_replicated_total.add(1); await touch();