@@ -14,7 +14,7 @@ import { MongoLSN } from '../common/MongoLSN.js';
1414import { PostImagesOption } from '../types/types.js' ;
1515import { escapeRegExp } from '../utils.js' ;
1616import { MongoManager } from './MongoManager.js' ;
17- import { constructAfterRecord , createCheckpoint , getMongoRelation } from './MongoRelation.js' ;
17+ import { constructAfterRecord , createCheckpoint , getCacheIdentifier , getMongoRelation } from './MongoRelation.js' ;
1818import { CHECKPOINTS_COLLECTION } from './replication-utils.js' ;
1919
2020export interface ChangeStreamOptions {
@@ -89,6 +89,10 @@ export class ChangeStream {
8989 return this . connections . options . postImages == PostImagesOption . AUTO_CONFIGURE ;
9090 }
9191
92+ private get logPrefix ( ) {
93+ return `[powersync_${ this . group_id } ]` ;
94+ }
95+
9296 /**
9397 * This resolves a pattern, persists the related metadata, and returns
9498 * the resulting SourceTables.
@@ -124,18 +128,13 @@ export class ChangeStream {
124128 . toArray ( ) ;
125129
126130 if ( ! tablePattern . isWildcard && collections . length == 0 ) {
127- logger . warn ( `Collection ${ schema } .${ tablePattern . name } not found` ) ;
131+ logger . warn ( `${ this . logPrefix } Collection ${ schema } .${ tablePattern . name } not found` ) ;
128132 }
129133
130134 for ( let collection of collections ) {
131135 const table = await this . handleRelation (
132136 batch ,
133- {
134- name : collection . name ,
135- schema,
136- objectId : collection . name ,
137- replicationColumns : [ { name : '_id' } ]
138- } as SourceEntityDescriptor ,
137+ getMongoRelation ( { db : schema , coll : collection . name } ) ,
139138 // This is done as part of the initial setup - snapshot is handled elsewhere
140139 { snapshot : false , collectionInfo : collection }
141140 ) ;
@@ -149,7 +148,7 @@ export class ChangeStream {
149148 async initSlot ( ) : Promise < InitResult > {
150149 const status = await this . storage . getStatus ( ) ;
151150 if ( status . snapshot_done && status . checkpoint_lsn ) {
152- logger . info ( `Initial replication already done` ) ;
151+ logger . info ( `${ this . logPrefix } Initial replication already done` ) ;
153152 return { needsInitialSync : false } ;
154153 }
155154
@@ -220,7 +219,7 @@ export class ChangeStream {
220219 }
221220
222221 const { comparable : lsn } = new MongoLSN ( { timestamp : snapshotTime } ) ;
223- logger . info ( `Snapshot commit at ${ snapshotTime . inspect ( ) } / ${ lsn } ` ) ;
222+ logger . info ( `${ this . logPrefix } Snapshot commit at ${ snapshotTime . inspect ( ) } / ${ lsn } ` ) ;
224223 await batch . commit ( lsn ) ;
225224 }
226225 ) ;
@@ -289,7 +288,7 @@ export class ChangeStream {
289288 table : storage . SourceTable ,
290289 session ?: mongo . ClientSession
291290 ) {
292- logger . info ( `Replicating ${ table . qualifiedName } ` ) ;
291+ logger . info ( `${ this . logPrefix } Replicating ${ table . qualifiedName } ` ) ;
293292 const estimatedCount = await this . estimatedCount ( table ) ;
294293 let at = 0 ;
295294 let lastLogIndex = 0 ;
@@ -319,7 +318,7 @@ export class ChangeStream {
319318
320319 at += 1 ;
321320 if ( at - lastLogIndex >= 5000 ) {
322- logger . info ( `[ ${ this . group_id } ] Replicating ${ table . qualifiedName } ${ at } /${ estimatedCount } ` ) ;
321+ logger . info ( `${ this . logPrefix } Replicating ${ table . qualifiedName } ${ at } /${ estimatedCount } ` ) ;
323322 lastLogIndex = at ;
324323 }
325324 Metrics . getInstance ( ) . rows_replicated_total . add ( 1 ) ;
@@ -328,14 +327,16 @@ export class ChangeStream {
328327 }
329328
330329 await batch . flush ( ) ;
331- logger . info ( `Replicated ${ at } documents for ${ table . qualifiedName } ` ) ;
330+ logger . info ( `${ this . logPrefix } Replicated ${ at } documents for ${ table . qualifiedName } ` ) ;
332331 }
333332
334333 private async getRelation (
335334 batch : storage . BucketStorageBatch ,
336- descriptor : SourceEntityDescriptor
335+ descriptor : SourceEntityDescriptor ,
336+ options : { snapshot : boolean }
337337 ) : Promise < SourceTable > {
338- const existing = this . relation_cache . get ( descriptor . objectId ) ;
338+ const cacheId = getCacheIdentifier ( descriptor ) ;
339+ const existing = this . relation_cache . get ( cacheId ) ;
339340 if ( existing != null ) {
340341 return existing ;
341342 }
@@ -344,7 +345,7 @@ export class ChangeStream {
344345 // missing values.
345346 const collection = await this . getCollectionInfo ( descriptor . schema , descriptor . name ) ;
346347
347- return this . handleRelation ( batch , descriptor , { snapshot : false , collectionInfo : collection } ) ;
348+ return this . handleRelation ( batch , descriptor , { snapshot : options . snapshot , collectionInfo : collection } ) ;
348349 }
349350
350351 private async getCollectionInfo ( db : string , name : string ) : Promise < mongo . CollectionInfo | undefined > {
@@ -375,7 +376,7 @@ export class ChangeStream {
375376 collMod : collectionInfo . name ,
376377 changeStreamPreAndPostImages : { enabled : true }
377378 } ) ;
378- logger . info ( `Enabled postImages on ${ db } .${ collectionInfo . name } ` ) ;
379+ logger . info ( `${ this . logPrefix } Enabled postImages on ${ db } .${ collectionInfo . name } ` ) ;
379380 } else if ( ! enabled ) {
380381 throw new ServiceError ( ErrorCode . PSYNC_S1343 , `postImages not enabled on ${ db } .${ collectionInfo . name } ` ) ;
381382 }
@@ -394,27 +395,31 @@ export class ChangeStream {
394395 }
395396
396397 const snapshot = options . snapshot ;
397- if ( ! descriptor . objectId && typeof descriptor . objectId != 'string' ) {
398- throw new ReplicationAssertionError ( 'MongoDB replication - objectId expected' ) ;
399- }
400398 const result = await this . storage . resolveTable ( {
401399 group_id : this . group_id ,
402400 connection_id : this . connection_id ,
403401 connection_tag : this . connections . connectionTag ,
404402 entity_descriptor : descriptor ,
405403 sync_rules : this . sync_rules
406404 } ) ;
407- this . relation_cache . set ( descriptor . objectId , result . table ) ;
405+ this . relation_cache . set ( getCacheIdentifier ( descriptor ) , result . table ) ;
408406
409- // Drop conflicting tables. This includes for example renamed tables.
410- await batch . drop ( result . dropTables ) ;
407+ // Drop conflicting collections.
408+ // This is generally not expected for MongoDB source dbs, so we log an error.
409+ if ( result . dropTables . length > 0 ) {
410+ logger . error (
411+ `Conflicting collections found for ${ JSON . stringify ( descriptor ) } . Dropping: ${ result . dropTables . map ( ( t ) => t . id ) . join ( ', ' ) } `
412+ ) ;
413+ await batch . drop ( result . dropTables ) ;
414+ }
411415
412416 // Snapshot if:
413417 // 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere)
414418 // 2. Snapshot is not already done, AND:
415419 // 3. The table is used in sync rules.
416420 const shouldSnapshot = snapshot && ! result . table . snapshotComplete && result . table . syncAny ;
417421 if ( shouldSnapshot ) {
422+ logger . info ( `${ this . logPrefix } New collection: ${ descriptor . schema } .${ descriptor . name } ` ) ;
418423 // Truncate this table, in case a previous snapshot was interrupted.
419424 await batch . truncate ( [ result . table ] ) ;
420425
@@ -434,7 +439,7 @@ export class ChangeStream {
434439 change : mongo . ChangeStreamDocument
435440 ) : Promise < storage . FlushedResult | null > {
436441 if ( ! table . syncAny ) {
437- logger . debug ( `Collection ${ table . qualifiedName } not used in sync rules - skipping` ) ;
442+ logger . debug ( `${ this . logPrefix } Collection ${ table . qualifiedName } not used in sync rules - skipping` ) ;
438443 return null ;
439444 }
440445
@@ -528,7 +533,7 @@ export class ChangeStream {
528533 const startAfter = lastLsn ?. timestamp ;
529534 const resumeAfter = lastLsn ?. resumeToken ;
530535
531- logger . info ( `Resume streaming at ${ startAfter ?. inspect ( ) } / ${ lastLsn } ` ) ;
536+ logger . info ( `${ this . logPrefix } Resume streaming at ${ startAfter ?. inspect ( ) } / ${ lastLsn } ` ) ;
532537
533538 const filters = this . getSourceNamespaceFilters ( ) ;
534539
@@ -590,13 +595,14 @@ export class ChangeStream {
590595
591596 let splitDocument : mongo . ChangeStreamDocument | null = null ;
592597
598+ let flexDbNameWorkaroundLogged = false ;
599+
593600 while ( true ) {
594601 if ( this . abort_signal . aborted ) {
595602 break ;
596603 }
597604
598605 const originalChangeDocument = await stream . tryNext ( ) ;
599-
600606 // The stream was closed, we will only ever receive `null` from it
601607 if ( ! originalChangeDocument && stream . closed ) {
602608 break ;
@@ -636,6 +642,29 @@ export class ChangeStream {
636642 throw new ReplicationAssertionError ( `Incomplete splitEvent: ${ JSON . stringify ( splitDocument . splitEvent ) } ` ) ;
637643 }
638644
645+ if (
646+ ! filters . multipleDatabases &&
647+ 'ns' in changeDocument &&
648+ changeDocument . ns . db != this . defaultDb . databaseName &&
649+ changeDocument . ns . db . endsWith ( `_${ this . defaultDb . databaseName } ` )
650+ ) {
651+ // When all of the following conditions are met:
652+ // 1. We're replicating from an Atlas Flex instance.
653+ // 2. There were changestream events recorded while the PowerSync service is paused.
654+ // 3. We're only replicating from a single database.
655+ // Then we've observed an ns with for example {db: '67b83e86cd20730f1e766dde_ps'},
656+ // instead of the expected {db: 'ps'}.
657+ // We correct this.
658+ changeDocument . ns . db = this . defaultDb . databaseName ;
659+
660+ if ( ! flexDbNameWorkaroundLogged ) {
661+ flexDbNameWorkaroundLogged = true ;
662+ logger . warn (
663+ `${ this . logPrefix } Incorrect DB name in change stream: ${ changeDocument . ns . db } . Changed to ${ this . defaultDb . databaseName } .`
664+ ) ;
665+ }
666+ }
667+
639668 if (
640669 ( changeDocument . operationType == 'insert' ||
641670 changeDocument . operationType == 'update' ||
@@ -682,28 +711,44 @@ export class ChangeStream {
682711 waitForCheckpointLsn = await createCheckpoint ( this . client , this . defaultDb ) ;
683712 }
684713 const rel = getMongoRelation ( changeDocument . ns ) ;
685- const table = await this . getRelation ( batch , rel ) ;
714+ const table = await this . getRelation ( batch , rel , {
715+ // In most cases, we should not need to snapshot this. But if this is the first time we see the collection
716+ // for whatever reason, then we do need to snapshot it.
717+ // This may result in some duplicate operations when a collection is created for the first time after
718+ // sync rules was deployed.
719+ snapshot : true
720+ } ) ;
686721 if ( table . syncAny ) {
687722 await this . writeChange ( batch , table , changeDocument ) ;
688723 }
689724 } else if ( changeDocument . operationType == 'drop' ) {
690725 const rel = getMongoRelation ( changeDocument . ns ) ;
691- const table = await this . getRelation ( batch , rel ) ;
726+ const table = await this . getRelation ( batch , rel , {
727+ // We're "dropping" this collection, so never snapshot it.
728+ snapshot : false
729+ } ) ;
692730 if ( table . syncAny ) {
693731 await batch . drop ( [ table ] ) ;
694- this . relation_cache . delete ( table . objectId ) ;
732+ this . relation_cache . delete ( getCacheIdentifier ( rel ) ) ;
695733 }
696734 } else if ( changeDocument . operationType == 'rename' ) {
697735 const relFrom = getMongoRelation ( changeDocument . ns ) ;
698736 const relTo = getMongoRelation ( changeDocument . to ) ;
699- const tableFrom = await this . getRelation ( batch , relFrom ) ;
737+ const tableFrom = await this . getRelation ( batch , relFrom , {
738+ // We're "dropping" this collection, so never snapshot it.
739+ snapshot : false
740+ } ) ;
700741 if ( tableFrom . syncAny ) {
701742 await batch . drop ( [ tableFrom ] ) ;
702- this . relation_cache . delete ( tableFrom . objectId ) ;
743+ this . relation_cache . delete ( getCacheIdentifier ( relFrom ) ) ;
703744 }
704745 // Here we do need to snapshot the new table
705746 const collection = await this . getCollectionInfo ( relTo . schema , relTo . name ) ;
706- await this . handleRelation ( batch , relTo , { snapshot : true , collectionInfo : collection } ) ;
747+ await this . handleRelation ( batch , relTo , {
748+ // This is a new (renamed) collection, so always snapshot it.
749+ snapshot : true ,
750+ collectionInfo : collection
751+ } ) ;
707752 }
708753 }
709754 }
0 commit comments