@@ -20,6 +20,7 @@ import {
2020 isStreamingSyncData
2121} from './streaming-sync-types.js' ;
2222import { DataStream } from 'src/utils/DataStream.js' ;
23+ import { InternalProgressInformation } from 'src/db/crud/SyncProgress.js' ;
2324
2425export enum LockType {
2526 CRUD = 'crud' ,
@@ -163,21 +164,23 @@ export abstract class AbstractStreamingSyncImplementation
163164 protected streamingSyncPromise ?: Promise < void > ;
164165
165166 syncStatus : SyncStatus ;
167+ private syncStatusOptions : SyncStatusOptions ;
166168 triggerCrudUpload : ( ) => void ;
167169
168170 constructor ( options : AbstractStreamingSyncImplementationOptions ) {
169171 super ( ) ;
170172 this . options = { ...DEFAULT_STREAMING_SYNC_OPTIONS , ...options } ;
171173
172- this . syncStatus = new SyncStatus ( {
174+ this . syncStatusOptions = {
173175 connected : false ,
174176 connecting : false ,
175177 lastSyncedAt : undefined ,
176178 dataFlow : {
177179 uploading : false ,
178180 downloading : false
179181 }
180- } ) ;
182+ } ;
183+ this . syncStatus = new SyncStatus ( this . syncStatusOptions ) ;
181184 this . abortController = null ;
182185
183186 this . triggerCrudUpload = throttleLeadingTrailing ( ( ) => {
@@ -411,7 +414,8 @@ The next upload iteration will be delayed.`);
411414 connected : false ,
412415 connecting : false ,
413416 dataFlow : {
414- downloading : false
417+ downloading : false ,
418+ downloadProgress : null ,
415419 }
416420 } ) ;
417421 } ) ;
@@ -569,6 +573,7 @@ The next upload iteration will be delayed.`);
569573 bucketMap = newBuckets ;
570574 await this . options . adapter . removeBuckets ( [ ...bucketsToDelete ] ) ;
571575 await this . options . adapter . setTargetCheckpoint ( targetCheckpoint ) ;
576+ await this . updateSyncStatusForStartingCheckpoint ( targetCheckpoint ) ;
572577 } else if ( isStreamingSyncCheckpointComplete ( line ) ) {
573578 this . logger . debug ( 'Checkpoint complete' , targetCheckpoint ) ;
574579 const result = await this . options . adapter . syncLocalDatabase ( targetCheckpoint ! ) ;
@@ -588,7 +593,8 @@ The next upload iteration will be delayed.`);
588593 connected : true ,
589594 lastSyncedAt : new Date ( ) ,
590595 dataFlow : {
591- downloading : false
596+ downloading : false ,
597+ downloadProgress : null ,
592598 }
593599 } ) ;
594600 }
@@ -645,6 +651,7 @@ The next upload iteration will be delayed.`);
645651 write_checkpoint : diff . write_checkpoint
646652 } ;
647653 targetCheckpoint = newCheckpoint ;
654+ await this . updateSyncStatusForStartingCheckpoint ( targetCheckpoint ) ;
648655
649656 bucketMap = new Map ( ) ;
650657 newBuckets . forEach ( ( checksum , name ) =>
@@ -662,9 +669,23 @@ The next upload iteration will be delayed.`);
662669 await this . options . adapter . setTargetCheckpoint ( targetCheckpoint ) ;
663670 } else if ( isStreamingSyncData ( line ) ) {
664671 const { data } = line ;
672+ const previousProgress = this . syncStatusOptions . dataFlow ?. downloadProgress ;
673+ let updatedProgress : InternalProgressInformation | null = null ;
674+ if ( previousProgress ) {
675+ updatedProgress = { ...previousProgress } ;
676+ const progressForBucket = updatedProgress [ data . bucket ] ;
677+ if ( progressForBucket ) {
678+ updatedProgress [ data . bucket ] = {
679+ ...progressForBucket ,
680+ sinceLast : progressForBucket . sinceLast + data . data . length ,
681+ } ;
682+ }
683+ }
684+
665685 this . updateSyncStatus ( {
666686 dataFlow : {
667- downloading : true
687+ downloading : true ,
688+ downloadProgress : updatedProgress ,
668689 }
669690 } ) ;
670691 await this . options . adapter . saveSyncData ( { buckets : [ SyncDataBucket . fromRow ( data ) ] } ) ;
@@ -707,7 +728,8 @@ The next upload iteration will be delayed.`);
707728 lastSyncedAt : new Date ( ) ,
708729 priorityStatusEntries : [ ] ,
709730 dataFlow : {
710- downloading : false
731+ downloading : false ,
732+ downloadProgress : null ,
711733 }
712734 } ) ;
713735 }
@@ -721,6 +743,30 @@ The next upload iteration will be delayed.`);
721743 } ) ;
722744 }
723745
746+ private async updateSyncStatusForStartingCheckpoint ( checkpoint : Checkpoint ) {
747+ const localProgress = await this . options . adapter . getBucketOperationProgress ( ) ;
748+ const progress : InternalProgressInformation = { } ;
749+
750+ for ( const bucket of checkpoint . buckets ) {
751+ const savedProgress = localProgress [ bucket . bucket ] ;
752+ progress [ bucket . bucket ] = {
753+ // The fallback priority doesn't matter here, but 3 is the one newer versions of the sync service
754+ // will use by default.
755+ priority : bucket . priority ?? 3 ,
756+ atLast : savedProgress ?. atLast ?? 0 ,
757+ sinceLast : savedProgress . sinceLast ?? 0 ,
758+ targetCount : bucket . count ?? 0 ,
759+ } ;
760+ }
761+
762+ this . updateSyncStatus ( {
763+ dataFlow : {
764+ downloading : true ,
765+ downloadProgress : progress ,
766+ }
767+ } ) ;
768+ }
769+
724770 protected updateSyncStatus ( options : SyncStatusOptions ) {
725771 const updatedStatus = new SyncStatus ( {
726772 connected : options . connected ?? this . syncStatus . connected ,
@@ -734,6 +780,7 @@ The next upload iteration will be delayed.`);
734780 } ) ;
735781
736782 if ( ! this . syncStatus . isEqual ( updatedStatus ) ) {
783+ this . syncStatusOptions = options ;
737784 this . syncStatus = updatedStatus ;
738785 // Only trigger this is there was a change
739786 this . iterateListeners ( ( cb ) => cb . statusChanged ?.( updatedStatus ) ) ;
0 commit comments