@@ -6,7 +6,7 @@ import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/cru
66import * as sync_status from '../../../db/crud/SyncStatus.js' ;
77import { AbortOperation } from '../../../utils/AbortOperation.js' ;
88import { BaseListener , BaseObserver , Disposable } from '../../../utils/BaseObserver.js' ;
9- import { onAbortPromise , throttleLeadingTrailing } from '../../../utils/async.js' ;
9+ import { throttleLeadingTrailing } from '../../../utils/async.js' ;
1010import {
1111 BucketChecksum ,
1212 BucketDescription ,
@@ -19,7 +19,9 @@ import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
1919import { AbstractRemote , FetchStrategy , SyncStreamOptions } from './AbstractRemote.js' ;
2020import {
2121 BucketRequest ,
22+ CrudUploadNotification ,
2223 StreamingSyncLine ,
24+ StreamingSyncLineOrCrudUploadComplete ,
2325 StreamingSyncRequestParameterType ,
2426 isStreamingKeepalive ,
2527 isStreamingSyncCheckpoint ,
@@ -225,7 +227,7 @@ export abstract class AbstractStreamingSyncImplementation
225227 protected crudUpdateListener ?: ( ) => void ;
226228 protected streamingSyncPromise ?: Promise < void > ;
227229
228- private pendingCrudUpload ?: Promise < void > ;
230+ private isUploadingCrud : boolean = false ;
229231 private notifyCompletedUploads ?: ( ) => void ;
230232
231233 syncStatus : SyncStatus ;
@@ -247,16 +249,14 @@ export abstract class AbstractStreamingSyncImplementation
247249 this . abortController = null ;
248250
249251 this . triggerCrudUpload = throttleLeadingTrailing ( ( ) => {
250- if ( ! this . syncStatus . connected || this . pendingCrudUpload != null ) {
252+ if ( ! this . syncStatus . connected || this . isUploadingCrud ) {
251253 return ;
252254 }
253255
254- this . pendingCrudUpload = new Promise ( ( resolve ) => {
255- this . _uploadAllCrud ( ) . finally ( ( ) => {
256- this . notifyCompletedUploads ?.( ) ;
257- this . pendingCrudUpload = undefined ;
258- resolve ( ) ;
259- } ) ;
256+ this . isUploadingCrud = true ;
257+ this . _uploadAllCrud ( ) . finally ( ( ) => {
258+ this . notifyCompletedUploads ?.( ) ;
259+ this . isUploadingCrud = false ;
260260 } ) ;
261261 } , this . options . crudUploadThrottleMs ! ) ;
262262 }
@@ -539,6 +539,8 @@ The next upload iteration will be delayed.`);
539539 }
540540 } ) ;
541541 } finally {
542+ this . notifyCompletedUploads = undefined ;
543+
542544 if ( ! signal . aborted ) {
543545 nestedAbortController . abort ( new AbortOperation ( 'Closing sync stream network requests before retry.' ) ) ;
544546 nestedAbortController = new AbortController ( ) ;
@@ -624,10 +626,9 @@ The next upload iteration will be delayed.`);
624626 this . options . adapter . startSession ( ) ;
625627 let [ req , bucketMap ] = await this . collectLocalBucketState ( ) ;
626628
627- // These are compared by reference
628629 let targetCheckpoint : Checkpoint | null = null ;
629- let validatedCheckpoint : Checkpoint | null = null ;
630- let appliedCheckpoint : Checkpoint | null = null ;
630+ // A checkpoint that has been validated but not applied (e.g. due to pending local writes)
631+ let pendingValidatedCheckpoint : Checkpoint | null = null ;
631632
632633 const clientId = await this . options . adapter . getClientId ( ) ;
633634 const usingFixedKeyFormat = await this . requireKeyFormat ( false ) ;
@@ -646,25 +647,63 @@ The next upload iteration will be delayed.`);
646647 }
647648 } ;
648649
649- let stream : DataStream < StreamingSyncLine > ;
650+ let stream : DataStream < StreamingSyncLineOrCrudUploadComplete > ;
650651 if ( resolvedOptions ?. connectionMethod == SyncStreamConnectionMethod . HTTP ) {
651- stream = await this . options . remote . postStream ( syncOptions ) ;
652- } else {
653- stream = await this . options . remote . socketStream ( {
654- ...syncOptions ,
655- ...{ fetchStrategy : resolvedOptions . fetchStrategy }
652+ stream = await this . options . remote . postStreamRaw ( syncOptions , ( line : string | CrudUploadNotification ) => {
653+ if ( typeof line == 'string' ) {
654+ return JSON . parse ( line ) as StreamingSyncLine ;
655+ } else {
656+ // Directly enqueued by us
657+ return line ;
658+ }
656659 } ) ;
660+ } else {
661+ const bson = await this . options . remote . getBSON ( ) ;
662+ stream = await this . options . remote . socketStreamRaw (
663+ {
664+ ...syncOptions ,
665+ ...{ fetchStrategy : resolvedOptions . fetchStrategy }
666+ } ,
667+ ( payload : Uint8Array | CrudUploadNotification ) => {
668+ if ( payload instanceof Uint8Array ) {
669+ return bson . deserialize ( payload ) as StreamingSyncLine ;
670+ } else {
671+ // Directly enqueued by us
672+ return payload ;
673+ }
674+ } ,
675+ bson
676+ ) ;
657677 }
658678
659679 this . logger . debug ( 'Stream established. Processing events' ) ;
660680
681+ this . notifyCompletedUploads = ( ) => {
682+ if ( ! stream . closed ) {
683+ stream . enqueueData ( { crud_upload_completed : null } ) ;
684+ }
685+ } ;
686+
661687 while ( ! stream . closed ) {
662688 const line = await stream . read ( ) ;
663689 if ( ! line ) {
664690 // The stream has closed while waiting
665691 return ;
666692 }
667693
694+ if ( 'crud_upload_completed' in line ) {
695+ if ( pendingValidatedCheckpoint != null ) {
696+ const { applied, endIteration } = await this . applyCheckpoint ( pendingValidatedCheckpoint ) ;
697+ if ( applied ) {
698+ pendingValidatedCheckpoint = null ;
699+ } else if ( endIteration ) {
700+ break ;
701+ }
702+ }
703+
704+ continue ;
705+ }
706+
668707 // A connection is active and messages are being received
669708 if ( ! this . syncStatus . connected ) {
670709 // There is a connection now
@@ -693,13 +732,12 @@ The next upload iteration will be delayed.`);
693732 await this . options . adapter . setTargetCheckpoint ( targetCheckpoint ) ;
694733 await this . updateSyncStatusForStartingCheckpoint ( targetCheckpoint ) ;
695734 } else if ( isStreamingSyncCheckpointComplete ( line ) ) {
696- const result = await this . applyCheckpoint ( targetCheckpoint ! , signal ) ;
735+ const result = await this . applyCheckpoint ( targetCheckpoint ! ) ;
697736 if ( result . endIteration ) {
698737 return ;
699- } else if ( result . applied ) {
700- appliedCheckpoint = targetCheckpoint ;
738+ } else if ( ! result . applied ) {
739+ pendingValidatedCheckpoint = targetCheckpoint ;
701740 }
702- validatedCheckpoint = targetCheckpoint ;
703741 } else if ( isStreamingSyncCheckpointPartiallyComplete ( line ) ) {
704742 const priority = line . partial_checkpoint_complete . priority ;
705743 this . logger . debug ( 'Partial checkpoint complete' , priority ) ;
@@ -809,25 +847,7 @@ The next upload iteration will be delayed.`);
809847 }
810848 this . triggerCrudUpload ( ) ;
811849 } else {
812- this . logger . debug ( 'Sync complete' ) ;
813-
814- if ( targetCheckpoint === appliedCheckpoint ) {
815- this . updateSyncStatus ( {
816- connected : true ,
817- lastSyncedAt : new Date ( ) ,
818- priorityStatusEntries : [ ] ,
819- dataFlow : {
820- downloadError : undefined
821- }
822- } ) ;
823- } else if ( validatedCheckpoint === targetCheckpoint ) {
824- const result = await this . applyCheckpoint ( targetCheckpoint ! , signal ) ;
825- if ( result . endIteration ) {
826- return ;
827- } else if ( result . applied ) {
828- appliedCheckpoint = targetCheckpoint ;
829- }
830- }
850+ this . logger . debug ( 'Received unknown sync line' , line ) ;
831851 }
832852 }
833853 this . logger . debug ( 'Stream input empty' ) ;
@@ -1066,51 +1086,35 @@ The next upload iteration will be delayed.`);
10661086 } ) ;
10671087 }
10681088
1069- private async applyCheckpoint ( checkpoint : Checkpoint , abort : AbortSignal ) {
1089+ private async applyCheckpoint ( checkpoint : Checkpoint ) {
10701090 let result = await this . options . adapter . syncLocalDatabase ( checkpoint ) ;
1071- const pending = this . pendingCrudUpload ;
10721091
10731092 if ( ! result . checkpointValid ) {
10741093 this . logger . debug ( 'Checksum mismatch in checkpoint, will reconnect' ) ;
10751094 // This means checksums failed. Start again with a new checkpoint.
10761095 // TODO: better back-off
10771096 await new Promise ( ( resolve ) => setTimeout ( resolve , 50 ) ) ;
10781097 return { applied : false , endIteration : true } ;
1079- } else if ( ! result . ready && pending != null ) {
1080- // We have pending entries in the local upload queue or are waiting to confirm a write
1081- // checkpoint, which prevented this checkpoint from applying. Wait for that to complete and
1082- // try again.
1098+ } else if ( ! result . ready ) {
10831099 this . logger . debug (
1084- ` Could not apply checkpoint ${ checkpoint . last_op_id } due to local data. Waiting for in-progress upload before retrying.`
1100+ ' Could not apply checkpoint due to local data. We will retry applying the checkpoint after that upload is completed.'
10851101 ) ;
1086- await Promise . race ( [ pending , onAbortPromise ( abort ) ] ) ;
1087- this . logger . debug ( `Pending uploads complete, retrying local checkpoint at ${ checkpoint . last_op_id } ` ) ;
1088-
1089- if ( abort . aborted ) {
1090- return { applied : false , endIteration : true } ;
1091- }
10921102
1093- // Try again now that uploads have completed.
1094- result = await this . options . adapter . syncLocalDatabase ( checkpoint ) ;
1103+ return { applied : false , endIteration : false } ;
10951104 }
10961105
1097- if ( result . checkpointValid && result . ready ) {
1098- this . logger . debug ( 'validated checkpoint' , checkpoint ) ;
1099- this . updateSyncStatus ( {
1100- connected : true ,
1101- lastSyncedAt : new Date ( ) ,
1102- dataFlow : {
1103- downloading : false ,
1104- downloadProgress : null ,
1105- downloadError : undefined
1106- }
1107- } ) ;
1106+ this . logger . debug ( 'validated checkpoint' , checkpoint ) ;
1107+ this . updateSyncStatus ( {
1108+ connected : true ,
1109+ lastSyncedAt : new Date ( ) ,
1110+ dataFlow : {
1111+ downloading : false ,
1112+ downloadProgress : null ,
1113+ downloadError : undefined
1114+ }
1115+ } ) ;
11081116
1109- return { applied : true , endIteration : false } ;
1110- } else {
1111- this . logger . debug ( 'Could not apply checkpoint. Waiting for next sync complete line.' ) ;
1112- return { applied : false , endIteration : false } ;
1113- }
1117+ return { applied : true , endIteration : false } ;
11141118 }
11151119
11161120 protected updateSyncStatus ( options : SyncStatusOptions ) {
0 commit comments