@@ -714,22 +714,28 @@ export class MongoSyncBucketStorage
714714 let lastCheckpoint : ReplicationCheckpoint | null = null ;
715715
716716 const iter = this . sharedIter [ Symbol . asyncIterator ] ( options . signal ) ;
717+
717718 let writeCheckpoint : bigint | null = null ;
719+ // true if we queried the initial write checkpoint, even if it doesn't exist
720+ let queriedInitialWriteCheckpoint = false ;
718721
719722 for await ( const nextCheckpoint of iter ) {
720723 // lsn changes are not important by itself.
721724 // What is important is:
722725 // 1. checkpoint (op_id) changes.
723726 // 2. write checkpoint changes for the specific user
724727
725- if ( nextCheckpoint . lsn != null ) {
726- writeCheckpoint ??= await this . writeCheckpointAPI . lastWriteCheckpoint ( {
728+ if ( nextCheckpoint . lsn != null && ! queriedInitialWriteCheckpoint ) {
729+ // Lookup the first write checkpoint for the user when we can.
730+ // There will not actually be one in all cases.
731+ writeCheckpoint = await this . writeCheckpointAPI . lastWriteCheckpoint ( {
727732 sync_rules_id : this . group_id ,
728733 user_id : options . user_id ,
729734 heads : {
730735 '1' : nextCheckpoint . lsn
731736 }
732737 } ) ;
738+ queriedInitialWriteCheckpoint = true ;
733739 }
734740
735741 if (
@@ -739,12 +745,13 @@ export class MongoSyncBucketStorage
739745 ) {
740746 // No change - wait for next one
741747 // In some cases, many LSNs may be produced in a short time.
742- // Add a delay to throttle the write checkpoint lookup a bit.
748+ // Add a delay to throttle the loop a bit.
743749 await timers . setTimeout ( 20 + 10 * Math . random ( ) ) ;
744750 continue ;
745751 }
746752
747753 if ( lastCheckpoint == null ) {
754+ // First message for this stream - "INVALIDATE_ALL" means it will lookup all data
748755 yield {
749756 base : nextCheckpoint ,
750757 writeCheckpoint,
@@ -758,7 +765,9 @@ export class MongoSyncBucketStorage
758765
759766 let updatedWriteCheckpoint = updates . updatedWriteCheckpoints . get ( options . user_id ) ?? null ;
760767 if ( updates . invalidateWriteCheckpoints ) {
761- updatedWriteCheckpoint ??= await this . writeCheckpointAPI . lastWriteCheckpoint ( {
768+ // Invalidated means there were too many updates to track the individual ones,
769+ // so we switch to "polling" (querying directly in each stream).
770+ updatedWriteCheckpoint = await this . writeCheckpointAPI . lastWriteCheckpoint ( {
762771 sync_rules_id : this . group_id ,
763772 user_id : options . user_id ,
764773 heads : {
@@ -768,6 +777,9 @@ export class MongoSyncBucketStorage
768777 }
769778 if ( updatedWriteCheckpoint != null && ( writeCheckpoint == null || updatedWriteCheckpoint > writeCheckpoint ) ) {
770779 writeCheckpoint = updatedWriteCheckpoint ;
780+ // If it happened that we haven't queried a write checkpoint at this point,
781+ // then we don't need to anymore, since we got an updated one.
782+ queriedInitialWriteCheckpoint = true ;
771783 }
772784
773785 yield {
0 commit comments