@@ -729,150 +729,102 @@ class SyncSequenceLogService {
729729 }
730730
731731 /// Populate the sequence log from existing journal entries.
732- /// This is used to backfill the sequence log for entries that were
733- /// created before the sequence log feature was added.
734- ///
735- /// This method streams journal entries in batches and records their vector
736- /// clocks in the sequence log. Records entries for ALL hosts in each entry's
737- /// vector clock so any device with the entry can respond to backfill requests.
738- ///
739- /// [onProgress] is called with progress from 0.0 to 1.0 as entries are
740- /// processed.
741- ///
742- /// Returns the number of entries populated.
732+ /// Returns the number of sequence log entries populated.
743733 Future <int > populateFromJournal ({
744734 required Stream <List <({String id, Map <String , int >? vectorClock})>>
745735 entryStream,
746736 required Future <int > Function () getTotalCount,
747737 void Function (double progress)? onProgress,
748- }) async {
749- final total = await getTotalCount ();
750- var processed = 0 ;
751- var populated = 0 ;
752- final now = DateTime .now ();
753-
754- // Cache of existing (hostId, counter) pairs to avoid duplicates
755- // We'll populate this lazily per-host as we encounter them
756- final existingByHost = < String , Set <int >> {};
757-
758- await for (final batch in entryStream) {
759- final toInsert = < SyncSequenceLogCompanion > [];
760-
761- for (final entry in batch) {
762- processed++ ;
763-
764- final vc = entry.vectorClock;
765- if (vc == null || vc.isEmpty) continue ;
766-
767- // Find the originating host (the one with the highest counter,
768- // which is typically the creator of this specific entry version)
769- String ? originatingHost;
770- var maxCounter = 0 ;
771- for (final e in vc.entries) {
772- if (e.value > maxCounter) {
773- maxCounter = e.value;
774- originatingHost = e.key;
775- }
776- }
777-
778- // Record entry for each host in the vector clock
779- for (final vcEntry in vc.entries) {
780- final hostId = vcEntry.key;
781- final counter = vcEntry.value;
782-
783- // Lazily load existing counters for this host
784- if (! existingByHost.containsKey (hostId)) {
785- existingByHost[hostId] =
786- await _syncDatabase.getCountersForHost (hostId);
787- }
788-
789- final existing = existingByHost[hostId]! ;
790-
791- // Skip if already exists
792- if (existing.contains (counter)) continue ;
793-
794- // Mark as existing to avoid duplicates within this run
795- existing.add (counter);
796-
797- toInsert.add (
798- SyncSequenceLogCompanion (
799- hostId: Value (hostId),
800- counter: Value (counter),
801- entryId: Value (entry.id),
802- originatingHostId: Value (originatingHost ?? hostId),
803- status: Value (SyncSequenceStatus .received.index),
804- createdAt: Value (now),
805- updatedAt: Value (now),
806- ),
807- );
808- }
809- }
810-
811- // Batch insert
812- if (toInsert.isNotEmpty) {
813- await _syncDatabase.batchInsertSequenceEntries (toInsert);
814- populated += toInsert.length;
815- }
816-
817- // Report progress after each batch
818- if (onProgress != null && total > 0 ) {
819- onProgress (processed / total);
820- }
821- }
822-
823- if (populated > 0 ) {
824- _loggingService.captureEvent (
825- 'populateFromJournal: added $populated sequence log entries' ,
826- domain: 'SYNC_SEQUENCE' ,
827- subDomain: 'populate' ,
828- );
829- }
830-
831- return populated;
738+ }) {
739+ return _populateFromStream (
740+ dataStream: entryStream,
741+ getTotalCount: getTotalCount,
742+ onProgress: onProgress,
743+ payloadType: SyncSequencePayloadType .journalEntity,
744+ label: 'populateFromJournal' ,
745+ );
832746 }
833747
834748 /// Populate the sequence log from existing entry links.
835- /// This is used to backfill the sequence log for entry links that were
836- /// created before the sequence log feature was added, or to resolve
837- /// "ghost missing" counters that correspond to EntryLink operations.
838- ///
839- /// This method streams entry links in batches and records their vector
840- /// clocks in the sequence log with [SyncSequencePayloadType.entryLink] .
841- /// Records entries for ALL hosts in each link's vector clock so any device
842- /// with the link can respond to backfill requests.
843- ///
844- /// [onProgress] is called with progress from 0.0 to 1.0 as links are
845- /// processed.
846- ///
847- /// Returns the number of entries populated.
749+ /// Returns the number of sequence log entries populated.
848750 Future <int > populateFromEntryLinks ({
849751 required Stream <List <({String id, Map <String , int >? vectorClock})>>
850752 linkStream,
851753 required Future <int > Function () getTotalCount,
852754 void Function (double progress)? onProgress,
755+ }) {
756+ return _populateFromStream (
757+ dataStream: linkStream,
758+ getTotalCount: getTotalCount,
759+ onProgress: onProgress,
760+ payloadType: SyncSequencePayloadType .entryLink,
761+ label: 'populateFromEntryLinks' ,
762+ );
763+ }
764+
765+ /// Populate the sequence log from existing agent entities.
766+ /// Returns the number of sequence log entries populated.
767+ Future <int > populateFromAgentEntities ({
768+ required Stream <List <({String id, Map <String , int >? vectorClock})>>
769+ entityStream,
770+ required Future <int > Function () getTotalCount,
771+ void Function (double progress)? onProgress,
772+ }) {
773+ return _populateFromStream (
774+ dataStream: entityStream,
775+ getTotalCount: getTotalCount,
776+ onProgress: onProgress,
777+ payloadType: SyncSequencePayloadType .agentEntity,
778+ label: 'populateFromAgentEntities' ,
779+ );
780+ }
781+
782+ /// Populate the sequence log from existing agent links.
783+ /// Returns the number of sequence log entries populated.
784+ Future <int > populateFromAgentLinks ({
785+ required Stream <List <({String id, Map <String , int >? vectorClock})>>
786+ linkStream,
787+ required Future <int > Function () getTotalCount,
788+ void Function (double progress)? onProgress,
789+ }) {
790+ return _populateFromStream (
791+ dataStream: linkStream,
792+ getTotalCount: getTotalCount,
793+ onProgress: onProgress,
794+ payloadType: SyncSequencePayloadType .agentLink,
795+ label: 'populateFromAgentLinks' ,
796+ );
797+ }
798+
799+ /// Shared implementation for populating the sequence log from a paginated
800+ /// stream of records with vector clocks. Used by all four populate methods.
801+ Future <int > _populateFromStream ({
802+ required Stream <List <({String id, Map <String , int >? vectorClock})>>
803+ dataStream,
804+ required Future <int > Function () getTotalCount,
805+ required SyncSequencePayloadType payloadType,
806+ required String label,
807+ void Function (double progress)? onProgress,
853808 }) async {
854809 final total = await getTotalCount ();
855810 var processed = 0 ;
856811 var populated = 0 ;
857812 final now = DateTime .now ();
858813
859814 // Cache of existing (hostId, counter) pairs to avoid duplicates
860- // We'll populate this lazily per-host as we encounter them
861815 final existingByHost = < String , Set <int >> {};
862816
863- await for (final batch in linkStream ) {
817+ await for (final batch in dataStream ) {
864818 final toInsert = < SyncSequenceLogCompanion > [];
865819
866- for (final link in batch) {
820+ for (final record in batch) {
867821 processed++ ;
868822
869- final vc = link .vectorClock;
823+ final vc = record .vectorClock;
870824 if (vc == null || vc.isEmpty) continue ;
871825
872- // Find the originating host (the one with the highest counter,
873- // which is typically the creator of this specific link version).
874- // Sort entries by host ID first to ensure deterministic tie-breaking
875- // when multiple hosts have the same max counter.
826+ // Find the originating host (the one with the highest counter).
827+ // Sort entries by host ID for deterministic tie-breaking.
876828 String ? originatingHost;
877829 var maxCounter = - 1 ;
878830 final sortedEntries = vc.entries.toList ()
@@ -907,8 +859,8 @@ class SyncSequenceLogService {
907859 SyncSequenceLogCompanion (
908860 hostId: Value (hostId),
909861 counter: Value (counter),
910- entryId: Value (link .id),
911- payloadType: Value (SyncSequencePayloadType .entryLink .index),
862+ entryId: Value (record .id),
863+ payloadType: Value (payloadType .index),
912864 originatingHostId: Value (originatingHost ?? hostId),
913865 status: Value (SyncSequenceStatus .received.index),
914866 createdAt: Value (now),
@@ -932,7 +884,7 @@ class SyncSequenceLogService {
932884
933885 if (populated > 0 ) {
934886 _loggingService.captureEvent (
935- 'populateFromEntryLinks : added $populated sequence log entries' ,
887+ '$ label : added $populated sequence log entries' ,
936888 domain: 'SYNC_SEQUENCE' ,
937889 subDomain: 'populate' ,
938890 );
0 commit comments