Skip to content

Commit 1704edb

Browse files
provider: catchup pending work (#1123)
* provider: adding provide and reprovide queue * provider: network operations * add some tests * schedule prefix len computations * provider schedule * provider: handleProvide * addressed review * use go-test/random * satisfy linter * log errors during initial prefix len measurement * address review * satisfy linter * address review * provider: explore swarm * provider: batch provide * provider: batch reprovide * provider: catchup pending work * fix panic when adding key to trie if superstring already exists * address review * decrease minimal region size from replicationFactor+1 to replicationFactor * simplify unscheduleSubsumedPrefixesNoClock * address review * fix test to match region size (now: replicationFactor, before: replicationFactor+1) * dequeue outside of go routine * refactor and test groupAndScheduleKeysByPrefix * moved maxPrefixSize const to top * address review * address review * address review
1 parent fb1534c commit 1704edb

File tree

1 file changed

+61
-5
lines changed

1 file changed

+61
-5
lines changed

provider/provider.go

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ const (
9595
)
9696

9797
type SweepingProvider struct {
98-
// TODO: complete me
9998
done chan struct{}
10099
closeOnce sync.Once
101100

@@ -109,9 +108,10 @@ type SweepingProvider struct {
109108

110109
replicationFactor int
111110

112-
provideQueue *queue.ProvideQueue
113-
provideRunning sync.Mutex
114-
reprovideQueue *queue.ReprovideQueue
111+
provideQueue *queue.ProvideQueue
112+
provideRunning sync.Mutex
113+
reprovideQueue *queue.ReprovideQueue
114+
lateReprovideRunning sync.Mutex
115115

116116
workerPool *pool.Pool[workerType]
117117
maxProvideConnsPerWorker int
@@ -146,7 +146,7 @@ type SweepingProvider struct {
146146
// FIXME: remove me
147147
func (s *SweepingProvider) SatisfyLinter() {
148148
s.measureInitialPrefixLen()
149-
s.batchReprovide("", true)
149+
s.catchupPendingWork()
150150
}
151151

152152
// Close stops the provider and releases all resources.
@@ -718,6 +718,32 @@ func (s *SweepingProvider) groupAndScheduleKeysByPrefix(keys []mh.Multihash, sch
718718
return prefixes
719719
}
720720

721+
// catchupPendingWork is called when the provider comes back online after being offline.
722+
//
723+
// 1. Try again to reprovide regions that failed to be reprovided on time.
724+
// 2. Try again to provide keys that failed to be provided.
725+
//
726+
// This function is guarded by s.lateReprovideRunning, ensuring the function
727+
// cannot be called again while it is working on reproviding late regions.
728+
func (s *SweepingProvider) catchupPendingWork() {
729+
if !s.lateReprovideRunning.TryLock() {
730+
return
731+
}
732+
go func() {
733+
// Reprovide late regions if any.
734+
s.reprovideLateRegions()
735+
s.lateReprovideRunning.Unlock()
736+
737+
// Provides are handled after reprovides, because keys pending to be
738+
// provided will be provided as part of a region reprovide if they belong
739+
// to that region. Hence, the provideLoop will use less resources if run
740+
// after the reprovides.
741+
742+
// Restart provide loop if it was stopped.
743+
s.provideLoop()
744+
}()
745+
}
746+
721747
// provideLoop is the loop providing keys to the DHT swarm as long as the
722748
// provide queue isn't empty.
723749
//
@@ -756,6 +782,36 @@ func (s *SweepingProvider) provideLoop() {
756782
}
757783
}
758784

785+
// reprovideLateRegions is the loop reproviding regions that failed to be
786+
// reprovided on time. It returns once the reprovide queue is empty.
787+
func (s *SweepingProvider) reprovideLateRegions() {
788+
for !s.reprovideQueue.IsEmpty() {
789+
if s.closed() {
790+
// Exit loop if provider is closed.
791+
return
792+
}
793+
if !s.connectivity.IsOnline() {
794+
// Don't try to reprovide a region if node is offline.
795+
return
796+
}
797+
// Block until we can acquire a worker from the pool.
798+
err := s.workerPool.Acquire(burstWorker)
799+
if err != nil {
800+
// Provider was closed while waiting for a worker.
801+
return
802+
}
803+
prefix, ok := s.reprovideQueue.Dequeue()
804+
if ok {
805+
go func(prefix bitstr.Key) {
806+
defer s.workerPool.Release(burstWorker)
807+
s.batchReprovide(prefix, false)
808+
}(prefix)
809+
} else {
810+
s.workerPool.Release(burstWorker)
811+
}
812+
}
813+
}
814+
759815
func (s *SweepingProvider) batchProvide(prefix bitstr.Key, keys []mh.Multihash) {
760816
if len(keys) == 0 {
761817
return

0 commit comments

Comments
 (0)