@@ -23,14 +23,7 @@ import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
2323import * as timers from 'timers/promises' ;
2424import { MongoBucketStorage } from '../MongoBucketStorage.js' ;
2525import { PowerSyncMongo } from './db.js' ;
26- import {
27- BucketDataDocument ,
28- BucketDataKey ,
29- BucketParameterDocument ,
30- SourceKey ,
31- SyncRuleCheckpointState ,
32- SyncRuleDocument
33- } from './models.js' ;
26+ import { BucketDataDocument , BucketDataKey , SourceKey , SyncRuleCheckpointState , SyncRuleDocument } from './models.js' ;
3427import { MongoBucketBatch } from './MongoBucketBatch.js' ;
3528import { MongoCompactor } from './MongoCompactor.js' ;
3629import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js' ;
@@ -642,7 +635,7 @@ export class MongoSyncBucketStorage
642635 /**
643636 * Instance-wide watch on the latest available checkpoint (op_id + lsn).
644637 */
645- private async * watchActiveCheckpoint ( signal : AbortSignal ) : AsyncIterable < BucketCheckpointEvent > {
638+ private async * watchActiveCheckpoint ( signal : AbortSignal ) : AsyncIterable < ReplicationCheckpoint > {
646639 // Use this form instead of (doc: SyncRuleCheckpointState | null = null),
647640 // otherwise we get weird "doc: never" issues.
648641 let doc = null as SyncRuleCheckpointState | null ;
@@ -685,14 +678,14 @@ export class MongoSyncBucketStorage
685678 return ;
686679 }
687680
688- yield { checkpoint : this . makeActiveCheckpoint ( doc ) , invalidate : true } ;
681+ yield this . makeActiveCheckpoint ( doc ) ;
689682
690683 // We only watch changes to the active sync rules.
691684 // If it changes to inactive, we abort and restart with the new sync rules.
692685
693686 const pipeline = this . getChangeStreamPipeline ( ) ;
694687
695- const stream = this . db . db . watch ( pipeline , {
688+ const stream = this . db . sync_rules . watch ( pipeline , {
696689 // Start at the cluster time where we got the initial doc, to make sure
697690 // we don't skip any updates.
698691 // This may result in the first operation being a duplicate, but we filter
@@ -719,35 +712,24 @@ export class MongoSyncBucketStorage
719712 continue ;
720713 }
721714
722- if ( update . ns . coll == this . db . sync_rules . collectionName ) {
723- const doc = await this . getOperationDoc (
724- lastDoc ,
725- update as lib_mongo . mongo . ChangeStreamDocument < SyncRuleDocument >
726- ) ;
727- if ( doc == null ) {
728- // Irrelevant update
729- continue ;
730- }
731- if ( doc . state != storage . SyncRuleState . ACTIVE ) {
732- // Sync rules have changed - abort and restart.
733- // Should this error instead?
734- break ;
735- }
715+ const doc = await this . getOperationDoc ( lastDoc , update as lib_mongo . mongo . ChangeStreamDocument < SyncRuleDocument > ) ;
716+ if ( doc == null ) {
717+ // Irrelevant update
718+ continue ;
719+ }
720+ if ( doc . state != storage . SyncRuleState . ACTIVE ) {
721+ // Sync rules have changed - abort and restart.
722+ // Should this error instead?
723+ break ;
724+ }
736725
737- lastDoc = doc ;
726+ lastDoc = doc ;
738727
739- const op = this . makeActiveCheckpoint ( doc ) ;
740- // Check for LSN / checkpoint changes - ignore other metadata changes
741- if ( lastOp == null || op . lsn != lastOp . lsn || op . checkpoint != lastOp . checkpoint ) {
742- lastOp = op ;
743- yield { checkpoint : op } ;
744- }
745- } else if ( update . ns . coll == this . db . bucket_data . collectionName ) {
746- const bucket = ( update . documentKey . _id as unknown as BucketDataKey ) . b ;
747- yield { dataBucket : bucket } ;
748- } else if ( update . ns . coll == this . db . bucket_parameters . collectionName && update . fullDocument != null ) {
749- const bucketDefinition = getLookupBucketDefinitionName ( ( update . fullDocument as BucketParameterDocument ) . lookup ) ;
750- yield { parameterBucketDefinition : bucketDefinition } ;
728+ const op = this . makeActiveCheckpoint ( doc ) ;
729+ // Check for LSN / checkpoint changes - ignore other metadata changes
730+ if ( lastOp == null || op . lsn != lastOp . lsn || op . checkpoint != lastOp . checkpoint ) {
731+ lastOp = op ;
732+ yield op ;
751733 }
752734 }
753735 }
@@ -767,53 +749,51 @@ export class MongoSyncBucketStorage
767749
768750 const iter = wrapWithAbort ( this . sharedIter , signal ) ;
769751 for await ( const event of iter ) {
770- if ( event . checkpoint ) {
771- const { checkpoint, lsn } = event . checkpoint ;
752+ const { checkpoint, lsn } = event ;
772753
773- // lsn changes are not important by itself.
774- // What is important is:
775- // 1. checkpoint (op_id) changes.
776- // 2. write checkpoint changes for the specific user
754+ // lsn changes are not important by itself.
755+ // What is important is:
756+ // 1. checkpoint (op_id) changes.
757+ // 2. write checkpoint changes for the specific user
777758
778- const lsnFilters : Record < string , string > = lsn ? { 1 : lsn } : { } ;
759+ const lsnFilters : Record < string , string > = lsn ? { 1 : lsn } : { } ;
779760
780- const currentWriteCheckpoint = await this . lastWriteCheckpoint ( {
781- user_id,
782- heads : {
783- ...lsnFilters
784- }
785- } ) ;
786-
787- if ( currentWriteCheckpoint == lastWriteCheckpoint && checkpoint == lastCheckpoint ) {
788- // No change - wait for next one
789- // In some cases, many LSNs may be produced in a short time.
790- // Add a delay to throttle the write checkpoint lookup a bit.
791- await timers . setTimeout ( 20 + 10 * Math . random ( ) ) ;
792- continue ;
761+ const currentWriteCheckpoint = await this . lastWriteCheckpoint ( {
762+ user_id,
763+ heads : {
764+ ...lsnFilters
793765 }
766+ } ) ;
794767
795- const updates : CheckpointChanges =
796- lastCheckpoint == null
797- ? {
798- invalidateDataBuckets : true ,
799- invalidateParameterBuckets : true ,
800- updatedDataBuckets : [ ] ,
801- updatedParameterBucketDefinitions : [ ]
802- }
803- : await this . getCheckpointChanges ( {
804- lastCheckpoint : lastCheckpoint ,
805- nextCheckpoint : checkpoint
806- } ) ;
807-
808- lastWriteCheckpoint = currentWriteCheckpoint ;
809- lastCheckpoint = checkpoint ;
810-
811- yield {
812- base : event . checkpoint ! ,
813- writeCheckpoint : currentWriteCheckpoint ,
814- update : updates
815- } ;
768+ if ( currentWriteCheckpoint == lastWriteCheckpoint && checkpoint == lastCheckpoint ) {
769+ // No change - wait for next one
770+ // In some cases, many LSNs may be produced in a short time.
771+ // Add a delay to throttle the write checkpoint lookup a bit.
772+ await timers . setTimeout ( 20 + 10 * Math . random ( ) ) ;
773+ continue ;
816774 }
775+
776+ const updates : CheckpointChanges =
777+ lastCheckpoint == null
778+ ? {
779+ invalidateDataBuckets : true ,
780+ invalidateParameterBuckets : true ,
781+ updatedDataBuckets : [ ] ,
782+ updatedParameterBucketDefinitions : [ ]
783+ }
784+ : await this . getCheckpointChanges ( {
785+ lastCheckpoint : lastCheckpoint ,
786+ nextCheckpoint : checkpoint
787+ } ) ;
788+
789+ lastWriteCheckpoint = currentWriteCheckpoint ;
790+ lastCheckpoint = checkpoint ;
791+
792+ yield {
793+ base : event ,
794+ writeCheckpoint : currentWriteCheckpoint ,
795+ update : updates
796+ } ;
817797 }
818798 }
819799
@@ -845,74 +825,17 @@ export class MongoSyncBucketStorage
845825
846826 private getChangeStreamPipeline ( ) {
847827 const syncRulesId = this . group_id ;
848- const parsedSyncRules = this . getParsedSyncRules ( {
849- // Not applicable here
850- defaultSchema : 'default'
851- } ) ;
852- const staticBucketDefinitions = parsedSyncRules . getStaticBucketDefinitionList ( ) ;
853- const staticBucketFilters = staticBucketDefinitions . map ( ( name ) => {
854- return {
855- // Check that the docuemnt starts with bucket name
856- 'documentKey._id.b' : {
857- $gte : name + '[' ,
858- $lt : name + '[\uFFFF'
859- }
860- } ;
861- } ) ;
862-
863- let filters : mongo . Document [ ] = [
864- // sync_rules events
865- {
866- 'ns.coll' : this . db . sync_rules . collectionName ,
867- 'documentKey._id' : syncRulesId ,
868- operationType : { $in : [ 'insert' , 'update' , 'replace' ] }
869- } ,
870- // bucket_data events
871- {
872- 'ns.coll' : this . db . bucket_data . collectionName ,
873- 'documentKey._id.g' : syncRulesId ,
874- operationType : 'insert' ,
875- $or : staticBucketFilters
876- } ,
877- // bucket_parameters events
878- {
879- 'ns.coll' : this . db . bucket_parameters . collectionName ,
880- 'fullDocument.key.g' : syncRulesId ,
881- operationType : 'insert'
882- }
883- ] ;
884-
885- if ( staticBucketFilters . length > 0 ) {
886- // bucket_data events, only if there are static bucket definitions
887- filters . push ( {
888- 'ns.coll' : this . db . bucket_data . collectionName ,
889- 'documentKey._id.g' : syncRulesId ,
890- operationType : 'insert' ,
891- $or : staticBucketFilters
892- } ) ;
893- }
894-
895828 const pipeline : mongo . Document [ ] = [
896829 {
897830 $match : {
898- $or : filters
831+ 'documentKey._id' : syncRulesId ,
832+ operationType : { $in : [ 'insert' , 'update' , 'replace' ] }
899833 }
900834 } ,
901835 {
902836 $project : {
903- // Common fields
904837 operationType : 1 ,
905- ns : 1 ,
906-
907- // For bucket_data, this contains the bucket
908- // For sync_rules, this contains the sync_rules id
909838 'documentKey._id' : 1 ,
910-
911- // For bucket_parameters
912- 'fullDocument.key' : 1 ,
913- 'fullDocument.lookup' : 1 ,
914-
915- // For sync_rules events
916839 'updateDescription.updatedFields.state' : 1 ,
917840 'updateDescription.updatedFields.last_checkpoint' : 1 ,
918841 'updateDescription.updatedFields.last_checkpoint_lsn' : 1 ,
@@ -974,10 +897,3 @@ export class MongoSyncBucketStorage
974897 } ;
975898 }
976899}
977-
978- interface BucketCheckpointEvent {
979- checkpoint ?: ReplicationCheckpoint ;
980- dataBucket ?: string ;
981- parameterBucketDefinition ?: string ;
982- invalidate ?: boolean ;
983- }
0 commit comments