@@ -8,7 +8,14 @@ import {
88 ReplicationAssertionError ,
99 ServiceError
1010} from '@powersync/lib-services-framework' ;
11- import { Metrics , SaveOperationTag , SourceEntityDescriptor , SourceTable , storage } from '@powersync/service-core' ;
11+ import {
12+ BSON_DESERIALIZE_DATA_OPTIONS ,
13+ Metrics ,
14+ SaveOperationTag ,
15+ SourceEntityDescriptor ,
16+ SourceTable ,
17+ storage
18+ } from '@powersync/service-core' ;
1219import { DatabaseInputRow , SqliteRow , SqlSyncRules , TablePattern } from '@powersync/service-sync-rules' ;
1320import { MongoLSN } from '../common/MongoLSN.js' ;
1421import { PostImagesOption } from '../types/types.js' ;
@@ -193,39 +200,31 @@ export class ChangeStream {
193200 // Not known where this would happen apart from the above cases
194201 throw new ReplicationAssertionError ( 'MongoDB lastWrite timestamp not found.' ) ;
195202 }
196- // We previously used {snapshot: true} for the snapshot session.
197- // While it gives nice consistency guarantees, it fails when the
198- // snapshot takes longer than 5 minutes, due to minSnapshotHistoryWindowInSeconds
199- // expiring the snapshot.
200- const session = await this . client . startSession ( ) ;
201- try {
202- await this . storage . startBatch (
203- { zeroLSN : MongoLSN . ZERO . comparable , defaultSchema : this . defaultDb . databaseName , storeCurrentData : false } ,
204- async ( batch ) => {
205- // Start by resolving all tables.
206- // This checks postImage configuration, and that should fail as
207- // earlier as possible.
208- let allSourceTables : SourceTable [ ] = [ ] ;
209- for ( let tablePattern of sourceTables ) {
210- const tables = await this . resolveQualifiedTableNames ( batch , tablePattern ) ;
211- allSourceTables . push ( ...tables ) ;
212- }
213203
214- for ( let table of allSourceTables ) {
215- await this . snapshotTable ( batch , table , session ) ;
216- await batch . markSnapshotDone ( [ table ] , MongoLSN . ZERO . comparable ) ;
204+ await this . storage . startBatch (
205+ { zeroLSN : MongoLSN . ZERO . comparable , defaultSchema : this . defaultDb . databaseName , storeCurrentData : false } ,
206+ async ( batch ) => {
207+ // Start by resolving all tables.
208+ // This checks postImage configuration, and that should fail as
209+ // earlier as possible.
210+ let allSourceTables : SourceTable [ ] = [ ] ;
211+ for ( let tablePattern of sourceTables ) {
212+ const tables = await this . resolveQualifiedTableNames ( batch , tablePattern ) ;
213+ allSourceTables . push ( ...tables ) ;
214+ }
217215
218- await touch ( ) ;
219- }
216+ for ( let table of allSourceTables ) {
217+ await this . snapshotTable ( batch , table ) ;
218+ await batch . markSnapshotDone ( [ table ] , MongoLSN . ZERO . comparable ) ;
220219
221- const { comparable : lsn } = new MongoLSN ( { timestamp : snapshotTime } ) ;
222- logger . info ( `${ this . logPrefix } Snapshot commit at ${ snapshotTime . inspect ( ) } / ${ lsn } ` ) ;
223- await batch . commit ( lsn ) ;
220+ await touch ( ) ;
224221 }
225- ) ;
226- } finally {
227- session . endSession ( ) ;
228- }
222+
223+ const { comparable : lsn } = new MongoLSN ( { timestamp : snapshotTime } ) ;
224+ logger . info ( `${ this . logPrefix } Snapshot commit at ${ snapshotTime . inspect ( ) } / ${ lsn } ` ) ;
225+ await batch . commit ( lsn ) ;
226+ }
227+ ) ;
229228 }
230229
231230 private async setupCheckpointsCollection ( ) {
@@ -283,48 +282,52 @@ export class ChangeStream {
283282 }
284283 }
285284
286- private async snapshotTable (
287- batch : storage . BucketStorageBatch ,
288- table : storage . SourceTable ,
289- session ?: mongo . ClientSession
290- ) {
285+ private async snapshotTable ( batch : storage . BucketStorageBatch , table : storage . SourceTable ) {
291286 logger . info ( `${ this . logPrefix } Replicating ${ table . qualifiedName } ` ) ;
292287 const estimatedCount = await this . estimatedCount ( table ) ;
293288 let at = 0 ;
294- let lastLogIndex = 0 ;
295-
296289 const db = this . client . db ( table . schema ) ;
297290 const collection = db . collection ( table . table ) ;
298- const query = collection . find ( { } , { session, readConcern : { level : 'majority' } } ) ;
299-
300- const cursor = query . stream ( ) ;
301-
302- for await ( let document of cursor ) {
303- if ( this . abort_signal . aborted ) {
304- throw new ReplicationAbortedError ( `Aborted initial replication` ) ;
305- }
306-
307- const record = constructAfterRecord ( document ) ;
291+ const cursor = collection . find ( { } , { batchSize : 6_000 , readConcern : 'majority' } ) ;
292+
293+ let lastBatch = performance . now ( ) ;
294+ // hasNext() is the call that triggers fetching of the next batch,
295+ // then we read it with readBufferedDocuments(). This gives us semi-explicit
296+ // control over the fetching of each batch, and avoids a separate promise per document
297+ let hasNextPromise = cursor . hasNext ( ) ;
298+ while ( await hasNextPromise ) {
299+ const docBatch = cursor . readBufferedDocuments ( ) ;
300+ // Pre-fetch next batch, so that we can read and write concurrently
301+ hasNextPromise = cursor . hasNext ( ) ;
302+ for ( let document of docBatch ) {
303+ if ( this . abort_signal . aborted ) {
304+ throw new ReplicationAbortedError ( `Aborted initial replication` ) ;
305+ }
308306
309- // This auto-flushes when the batch reaches its size limit
310- await batch . save ( {
311- tag : SaveOperationTag . INSERT ,
312- sourceTable : table ,
313- before : undefined ,
314- beforeReplicaId : undefined ,
315- after : record ,
316- afterReplicaId : document . _id
317- } ) ;
307+ const record = constructAfterRecord ( document ) ;
318308
319- at += 1 ;
320- if ( at - lastLogIndex >= 5000 ) {
321- logger . info ( `${ this . logPrefix } Replicating ${ table . qualifiedName } ${ at } /${ estimatedCount } ` ) ;
322- lastLogIndex = at ;
309+ // This auto-flushes when the batch reaches its size limit
310+ await batch . save ( {
311+ tag : SaveOperationTag . INSERT ,
312+ sourceTable : table ,
313+ before : undefined ,
314+ beforeReplicaId : undefined ,
315+ after : record ,
316+ afterReplicaId : document . _id
317+ } ) ;
323318 }
324- Metrics . getInstance ( ) . rows_replicated_total . add ( 1 ) ;
325319
320+ at += docBatch . length ;
321+ Metrics . getInstance ( ) . rows_replicated_total . add ( docBatch . length ) ;
322+ const duration = performance . now ( ) - lastBatch ;
323+ lastBatch = performance . now ( ) ;
324+ logger . info (
325+ `${ this . logPrefix } Replicating ${ table . qualifiedName } ${ at } /${ estimatedCount } in ${ duration . toFixed ( 0 ) } ms`
326+ ) ;
326327 await touch ( ) ;
327328 }
329+ // In case the loop was interrupted, make sure we await the last promise.
330+ await hasNextPromise ;
328331
329332 await batch . flush ( ) ;
330333 logger . info ( `${ this . logPrefix } Replicated ${ at } documents for ${ table . qualifiedName } ` ) ;
0 commit comments