Skip to content

Commit fb1534c

Browse files
provider: batch reprovide (#1122)
* provider: adding provide and reprovide queue * provider: network operations * add some tests * schedule prefix len computations * provider schedule * provider: handleProvide * addressed review * use go-test/random * satisfy linter * log errors during initial prefix len measurement * address review * satisfy linter * address review * provider: explore swarm * provider: batch provide * provider: batch reprovide * fix panic when adding key to trie if superstring already exists * address review * decrease minimal region size from replicationFactor+1 to replicationFactor * simplify unscheduleSubsumedPrefixesNoClock * address review * fix test to match region size (now: replicationFactor, before: replicationFactor+1) * refactor and test groupAndScheduleKeysByPrefix * moved maxPrefixSize const to top * address review * address review
1 parent e3761f9 commit fb1534c

File tree

2 files changed

+505
-6
lines changed

2 files changed

+505
-6
lines changed

provider/provider.go

Lines changed: 184 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ type SweepingProvider struct {
111111

112112
provideQueue *queue.ProvideQueue
113113
provideRunning sync.Mutex
114+
reprovideQueue *queue.ReprovideQueue
114115

115116
workerPool *pool.Pool[workerType]
116117
maxProvideConnsPerWorker int
@@ -126,6 +127,9 @@ type SweepingProvider struct {
126127
scheduleTimer *clock.Timer
127128
scheduleTimerStartedAt time.Time
128129

130+
ongoingReprovides *trie.Trie[bitstr.Key, struct{}]
131+
ongoingReprovidesLk sync.Mutex
132+
129133
avgPrefixLenLk sync.Mutex
130134
avgPrefixLenReady chan struct{}
131135
cachedAvgPrefixLen int
@@ -141,8 +145,8 @@ type SweepingProvider struct {
141145

142146
// FIXME: remove me
143147
func (s *SweepingProvider) SatisfyLinter() {
144-
s.vanillaProvide([]byte{})
145148
s.measureInitialPrefixLen()
149+
s.batchReprovide("", true)
146150
}
147151

148152
// Close stops the provider and releases all resources.
@@ -167,6 +171,12 @@ func (s *SweepingProvider) scheduleNextReprovideNoLock(prefix bitstr.Key, timeUn
167171
s.scheduleTimerStartedAt = s.clock.Now()
168172
}
169173

174+
func (s *SweepingProvider) reschedulePrefix(prefix bitstr.Key) {
175+
s.scheduleLk.Lock()
176+
s.schedulePrefixNoLock(prefix, true)
177+
s.scheduleLk.Unlock()
178+
}
179+
170180
// schedulePrefixNoLock adds the supplied prefix to the schedule, unless
171181
// already present.
172182
//
@@ -776,7 +786,79 @@ func (s *SweepingProvider) batchProvide(prefix bitstr.Key, keys []mh.Multihash)
776786
keys = append(keys, extraKeys...)
777787
regions = keyspace.AssignKeysToRegions(regions, keys)
778788

779-
if !s.provideRegions(regions, addrInfo) {
789+
if !s.provideRegions(regions, addrInfo, false, false) {
790+
logger.Errorf("failed to reprovide any region for prefix %s", prefix)
791+
}
792+
}
793+
794+
func (s *SweepingProvider) batchReprovide(prefix bitstr.Key, periodicReprovide bool) {
795+
addrInfo, ok := s.selfAddrInfo()
796+
if !ok {
797+
// Don't provide if the node doesn't have a valid address to include in the
798+
// provider record.
799+
return
800+
}
801+
802+
// Load keys matching prefix from the keystore.
803+
keys, err := s.keyStore.Get(context.Background(), prefix)
804+
if err != nil {
805+
s.failedReprovide(prefix, fmt.Errorf("couldn't reprovide, error when loading keys: %s", err))
806+
if periodicReprovide {
807+
s.reschedulePrefix(prefix)
808+
}
809+
return
810+
}
811+
if len(keys) == 0 {
812+
logger.Infof("No keys to reprovide for prefix %s", prefix)
813+
return
814+
}
815+
if len(keys) <= individualProvideThreshold {
816+
// Don't fully explore the region, execute simple DHT provides for these
817+
// keys. It isn't worth it to fully explore a region for just a few keys.
818+
s.individualProvide(prefix, keys, true, periodicReprovide)
819+
return
820+
}
821+
822+
regions, coveredPrefix, err := s.exploreSwarm(prefix)
823+
if err != nil {
824+
s.failedReprovide(prefix, fmt.Errorf("reprovide '%s': %w", prefix, err))
825+
if periodicReprovide {
826+
s.reschedulePrefix(prefix)
827+
}
828+
return
829+
}
830+
logger.Debugf("reprovide: requested prefix '%s' (len %d), prefix covered '%s' (len %d)", prefix, len(prefix), coveredPrefix, len(coveredPrefix))
831+
832+
regions = s.claimRegionReprovide(regions)
833+
834+
// Remove all keys matching coveredPrefix from provide queue. No need to
835+
// provide them anymore since they are about to be reprovided.
836+
s.provideQueue.DequeueMatching(coveredPrefix)
837+
// Remove covered prefix from the reprovide queue, so since we are about the
838+
// reprovide the region.
839+
s.reprovideQueue.Remove(coveredPrefix)
840+
841+
// When reproviding a region, remove all scheduled regions starting with
842+
// the currently covered prefix.
843+
s.scheduleLk.Lock()
844+
s.unscheduleSubsumedPrefixesNoLock(coveredPrefix)
845+
s.scheduleLk.Unlock()
846+
847+
if len(coveredPrefix) < len(prefix) {
848+
// Covered prefix is shorter than the requested one, load all the keys
849+
// matching the covered prefix from the keystore.
850+
keys, err = s.keyStore.Get(context.Background(), coveredPrefix)
851+
if err != nil {
852+
err = fmt.Errorf("couldn't reprovide, error when loading keys: %s", err)
853+
s.failedReprovide(prefix, err)
854+
if periodicReprovide {
855+
s.reschedulePrefix(prefix)
856+
}
857+
}
858+
}
859+
regions = keyspace.AssignKeysToRegions(regions, keys)
860+
861+
if !s.provideRegions(regions, addrInfo, true, periodicReprovide) {
780862
logger.Errorf("failed to reprovide any region for prefix %s", prefix)
781863
}
782864
}
@@ -789,6 +871,14 @@ func (s *SweepingProvider) failedProvide(prefix bitstr.Key, keys []mh.Multihash,
789871
s.connectivity.TriggerCheck()
790872
}
791873

874+
func (s *SweepingProvider) failedReprovide(prefix bitstr.Key, err error) {
875+
logger.Error(err)
876+
// Put prefix in the reprovide queue.
877+
s.reprovideQueue.Enqueue(prefix)
878+
879+
s.connectivity.TriggerCheck()
880+
}
881+
792882
// selfAddrInfo returns the current peer.AddrInfo to be used in the provider
793883
// records sent to remote peers.
794884
//
@@ -808,17 +898,69 @@ func (s *SweepingProvider) selfAddrInfo() (peer.AddrInfo, bool) {
808898
// provides for the supplied keys, handles failures and schedules next
809899
// reprovide is necessary.
810900
func (s *SweepingProvider) individualProvide(prefix bitstr.Key, keys []mh.Multihash, reprovide bool, periodicReprovide bool) {
811-
// TODO: implement me
901+
if len(keys) == 0 {
902+
return
903+
}
904+
905+
var provideErr error
906+
if len(keys) == 1 {
907+
coveredPrefix, err := s.vanillaProvide(keys[0])
908+
if err == nil {
909+
s.provideCounter.Add(context.Background(), 1)
910+
} else if !reprovide {
911+
// Put the key back in the provide queue.
912+
s.failedProvide(prefix, keys, fmt.Errorf("individual provide failed for prefix '%s', %w", prefix, err))
913+
}
914+
provideErr = err
915+
if periodicReprovide {
916+
// Schedule next reprovide for the prefix that was actually covered by
917+
// the GCP, otherwise we may schedule a reprovide for a prefix too short
918+
// or too long.
919+
s.reschedulePrefix(coveredPrefix)
920+
}
921+
} else {
922+
wg := sync.WaitGroup{}
923+
success := atomic.Bool{}
924+
for _, key := range keys {
925+
wg.Add(1)
926+
go func() {
927+
defer wg.Done()
928+
_, err := s.vanillaProvide(key)
929+
if err == nil {
930+
s.provideCounter.Add(context.Background(), 1)
931+
success.Store(true)
932+
} else if !reprovide {
933+
// Individual provide failed, put key back in provide queue.
934+
s.failedProvide(prefix, []mh.Multihash{key}, err)
935+
}
936+
}()
937+
}
938+
wg.Wait()
939+
940+
if !success.Load() {
941+
// Only errors if all provides failed.
942+
provideErr = fmt.Errorf("all individual provides failed for prefix %s", prefix)
943+
}
944+
if periodicReprovide {
945+
s.reschedulePrefix(prefix)
946+
}
947+
}
948+
if reprovide && provideErr != nil {
949+
s.failedReprovide(prefix, provideErr)
950+
}
812951
}
813952

814953
// provideRegions contains common logic to batchProvide() and batchReprovide().
815954
// It iterate over supplied regions, and allocates the regions provider records
816955
// to the appropriate DHT servers.
817-
func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo peer.AddrInfo) bool {
956+
func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo peer.AddrInfo, reprovide, periodicReprovide bool) bool {
818957
errCount := 0
819958
for _, r := range regions {
820959
nKeys := r.Keys.Size()
821960
if nKeys == 0 {
961+
if reprovide {
962+
s.releaseRegionReprovide(r.Prefix)
963+
}
822964
continue
823965
}
824966
// Add keys to local provider store
@@ -827,10 +969,20 @@ func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo pe
827969
}
828970
keysAllocations := keyspace.AllocateToKClosest(r.Keys, r.Peers, s.replicationFactor)
829971
err := s.sendProviderRecords(keysAllocations, addrInfo)
972+
if reprovide {
973+
s.releaseRegionReprovide(r.Prefix)
974+
if periodicReprovide {
975+
s.reschedulePrefix(r.Prefix)
976+
}
977+
}
830978
if err != nil {
831979
errCount++
832980
err = fmt.Errorf("cannot send provider records for region %s: %s", r.Prefix, err)
833-
s.failedProvide(r.Prefix, keyspace.AllValues(r.Keys, s.order), err)
981+
if reprovide {
982+
s.failedReprovide(r.Prefix, err)
983+
} else { // provide operation
984+
s.failedProvide(r.Prefix, keyspace.AllValues(r.Keys, s.order), err)
985+
}
834986
continue
835987
}
836988
s.provideCounter.Add(context.Background(), int64(nKeys))
@@ -840,6 +992,33 @@ func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo pe
840992
return errCount < len(regions)
841993
}
842994

995+
// claimRegionReprovide checks if the region is already being reprovided by
996+
// another thread. If not it marks the region as being currently reprovided.
997+
func (s *SweepingProvider) claimRegionReprovide(regions []keyspace.Region) []keyspace.Region {
998+
out := regions[:0]
999+
s.ongoingReprovidesLk.Lock()
1000+
defer s.ongoingReprovidesLk.Unlock()
1001+
for _, r := range regions {
1002+
if r.Peers.IsEmptyLeaf() {
1003+
continue
1004+
}
1005+
if _, ok := keyspace.FindPrefixOfKey(s.ongoingReprovides, r.Prefix); !ok {
1006+
// Prune superstrings of r.Prefix if any
1007+
keyspace.PruneSubtrie(s.ongoingReprovides, r.Prefix)
1008+
out = append(out, r)
1009+
s.ongoingReprovides.Add(r.Prefix, struct{}{})
1010+
}
1011+
}
1012+
return out
1013+
}
1014+
1015+
// releaseRegionReprovide marks the region as no longer being reprovided.
1016+
func (s *SweepingProvider) releaseRegionReprovide(prefix bitstr.Key) {
1017+
s.ongoingReprovidesLk.Lock()
1018+
defer s.ongoingReprovidesLk.Unlock()
1019+
s.ongoingReprovides.Remove(prefix)
1020+
}
1021+
8431022
// ProvideOnce only sends provider records for the given keys out to the DHT
8441023
// swarm. It does NOT take the responsibility to reprovide these keys.
8451024
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) {

0 commit comments

Comments
 (0)