Skip to content

Commit a67da4b

Browse files
provide: handle reprovide (#1125)
* provider: adding provide and reprovide queue * provider: network operations * add some tests * schedule prefix len computations * provider schedule * provider: handleProvide * addressed review * use go-test/random * satisfy linter * log errors during initial prefix len measurement * address review * satisfy linter * address review * provider: explore swarm * provider: batch provide * provider: batch reprovide * provider: catchup pending work * provider: options * provide: handle reprovide * fix panic when adding key to trie if superstring already exists * address review * decrease minimal region size from replicationFactor+1 to replicationFactor * simplify unscheduleSubsumedPrefixesNoClock * address review * fix test to match region size (now: replicationFactor, before: replicationFactor+1) * dequeue outside of go routine * refactor and test groupAndScheduleKeysByPrefix * moved maxPrefixSize const to top * address review * address review * address review * don't allocate capacity to avgPrefixLenReady * address review
1 parent 0cb418a commit a67da4b

File tree

2 files changed

+224
-0
lines changed

2 files changed

+224
-0
lines changed

provider/provider.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,9 @@ func New(opts ...Option) (*SweepingProvider, error) {
235235
func (s *SweepingProvider) run() {
236236
logger.Debug("Starting SweepingProvider")
237237
go s.measureInitialPrefixLen()
238+
239+
// TODO: complete me
240+
s.handleReprovide() // FIXME: to satisfy linter
238241
}
239242

240243
// Close stops the provider and releases all resources.
@@ -724,6 +727,110 @@ func (s *SweepingProvider) provideKeysToPeer(p peer.ID, keys []mh.Multihash, pme
724727
return nil
725728
}
726729

730+
// handleReprovide advances the reprovider schedule and (asynchronously)
731+
// reprovides the region at the current schedule cursor.
732+
//
733+
// Behavior:
734+
// - Determines the next region to reprovide based on the current cursor and
735+
// the schedule, reprovides the region under the cursor, and moves the cursor
736+
// to the next region.
737+
// - Programs the schedule timer (alarm) for the next region’s reprovide
738+
// time. When the timer fires, this method must be invoked again.
739+
// - If the node has been blocked past the reprovide interval or if one or
740+
// more regions’ times are already in the past, those regions are added to
741+
// the reprovide queue for catch-up and a connectivity check is triggered.
742+
// - If the node is currently offline, it skips the immediate reprovide of
743+
// the current region and enqueues it to the reprovide queue for later.
744+
// - If the node is online it removes the current region from the reprovide
745+
// queue (if present) and starts an asynchronous batch reprovide using a
746+
// periodic worker.
747+
func (s *SweepingProvider) handleReprovide() {
748+
s.scheduleLk.Lock()
749+
currentPrefix := s.scheduleCursor
750+
// Get next prefix to reprovide, and set timer for it.
751+
next := keyspace.NextNonEmptyLeaf(s.schedule, currentPrefix, s.order)
752+
753+
if next == nil {
754+
// Schedule is empty, don't reprovide anything.
755+
s.scheduleLk.Unlock()
756+
return
757+
}
758+
759+
var nextPrefix bitstr.Key
760+
var timeUntilNextReprovide time.Duration
761+
if next.Key == currentPrefix {
762+
// There is a single prefix in the schedule.
763+
nextPrefix = currentPrefix
764+
timeUntilNextReprovide = s.timeUntil(s.reprovideTimeForPrefix(currentPrefix))
765+
} else {
766+
currentTimeOffset := s.currentTimeOffset()
767+
timeSinceTimerRunning := s.timeBetween(s.timeOffset(s.scheduleTimerStartedAt), currentTimeOffset)
768+
timeSinceTimerUntilNext := s.timeBetween(s.timeOffset(s.scheduleTimerStartedAt), next.Data)
769+
770+
if s.scheduleTimerStartedAt.Add(s.reprovideInterval).Before(s.clock.Now()) {
771+
// Alarm was programmed more than reprovideInterval ago, which means that
772+
// no regions has been reprovided since. Add all regions to the reprovide
773+
// queue. This only happens if the main thread gets blocked for more than
774+
// reprovideInterval.
775+
nextKeyFound := false
776+
scheduleEntries := keyspace.AllEntries(s.schedule, s.order)
777+
next = scheduleEntries[0]
778+
for _, entry := range scheduleEntries {
779+
// Add all regions from the schedule to the reprovide queue. The next
780+
// region to be scheduled for reprovide is the one immediately
781+
// following the current time offset in the schedule.
782+
if !nextKeyFound && entry.Data > currentTimeOffset {
783+
next = entry
784+
nextKeyFound = true
785+
}
786+
s.reprovideQueue.Enqueue(entry.Key)
787+
}
788+
// Don't reprovide any region now, but schedule the next one. All regions
789+
// are expected to be reprovided when the provider is catching up with
790+
// failed regions.
791+
s.scheduleNextReprovideNoLock(next.Key, s.timeUntil(next.Data))
792+
s.scheduleLk.Unlock()
793+
return
794+
}
795+
if timeSinceTimerUntilNext < timeSinceTimerRunning {
796+
// next is scheduled in the past. While next is in the past, add next to
797+
// failedRegions and take nextLeaf as next.
798+
count := 0
799+
scheduleSize := s.schedule.Size()
800+
for timeSinceTimerUntilNext < timeSinceTimerRunning && count < scheduleSize {
801+
prefix := next.Key
802+
s.reprovideQueue.Enqueue(prefix)
803+
next = keyspace.NextNonEmptyLeaf(s.schedule, next.Key, s.order)
804+
timeSinceTimerUntilNext = s.timeBetween(s.timeOffset(s.scheduleTimerStartedAt), next.Data)
805+
count++
806+
}
807+
}
808+
809+
// next is in the future
810+
nextPrefix = next.Key
811+
timeUntilNextReprovide = s.timeUntil(next.Data)
812+
}
813+
814+
s.scheduleNextReprovideNoLock(nextPrefix, timeUntilNextReprovide)
815+
s.scheduleLk.Unlock()
816+
817+
// If we are offline, don't try to reprovide region.
818+
if !s.connectivity.IsOnline() {
819+
s.reprovideQueue.Enqueue(currentPrefix)
820+
return
821+
}
822+
823+
// Remove prefix that is about to be reprovided from the reprovide queue if
824+
// present.
825+
s.reprovideQueue.Remove(currentPrefix)
826+
827+
go func() {
828+
s.workerPool.Acquire(periodicWorker)
829+
defer s.workerPool.Release(periodicWorker)
830+
s.batchReprovide(currentPrefix, true)
831+
}()
832+
}
833+
727834
// handleProvide provides supplied keys to the network if needed and schedules
728835
// the keys to be reprovided if needed.
729836
func (s *SweepingProvider) handleProvide(force, reprovide bool, keys ...mh.Multihash) {

provider/provider_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ import (
88
"strconv"
99
"strings"
1010
"sync"
11+
"sync/atomic"
1112
"testing"
1213
"time"
1314

1415
"github.com/filecoin-project/go-clock"
16+
"github.com/guillaumemichel/reservedpool"
1517
logging "github.com/ipfs/go-log/v2"
1618
"github.com/ipfs/go-test/random"
1719
"github.com/libp2p/go-libp2p/core/peer"
@@ -546,3 +548,118 @@ func TestIndividualProvideMultiple(t *testing.T) {
546548
require.True(t, r.reprovideQueue.IsEmpty())
547549
require.True(t, r.provideQueue.IsEmpty())
548550
}
551+
552+
func waitUntil(t *testing.T, condition func() bool, maxDelay time.Duration, args ...any) {
553+
step := time.Millisecond
554+
for range maxDelay / step {
555+
if condition() {
556+
return
557+
}
558+
time.Sleep(step)
559+
}
560+
t.Fatal(args...)
561+
}
562+
563+
func TestHandleReprovide(t *testing.T) {
564+
mockClock := clock.NewMock()
565+
566+
online := atomic.Bool{}
567+
online.Store(true)
568+
connectivityCheckInterval := time.Second
569+
connChecker, err := connectivity.New(
570+
func() bool { return online.Load() },
571+
func() {},
572+
connectivity.WithClock(mockClock),
573+
connectivity.WithOfflineCheckInterval(connectivityCheckInterval),
574+
connectivity.WithOnlineCheckInterval(connectivityCheckInterval),
575+
)
576+
require.NoError(t, err)
577+
defer connChecker.Close()
578+
579+
prov := SweepingProvider{
580+
order: bit256.ZeroKey(),
581+
582+
connectivity: connChecker,
583+
584+
clock: mockClock,
585+
cycleStart: mockClock.Now(),
586+
scheduleTimer: mockClock.Timer(time.Hour),
587+
schedule: trie.New[bitstr.Key, time.Duration](),
588+
589+
reprovideQueue: queue.NewReprovideQueue(),
590+
workerPool: reservedpool.New[workerType](1, nil), // single worker
591+
592+
reprovideInterval: time.Minute,
593+
maxReprovideDelay: 5 * time.Second,
594+
595+
getSelfAddrs: func() []ma.Multiaddr { return nil },
596+
}
597+
prov.scheduleTimer.Stop()
598+
599+
prefixes := []bitstr.Key{
600+
"00",
601+
"10",
602+
"11",
603+
}
604+
605+
// Empty schedule -> early return
606+
prov.handleReprovide()
607+
require.Zero(t, prov.scheduleCursor)
608+
609+
// Single prefix in schedule
610+
prov.schedule.Add(prefixes[0], prov.reprovideTimeForPrefix(prefixes[0]))
611+
prov.scheduleCursor = prefixes[0]
612+
prov.handleReprovide()
613+
require.Equal(t, prefixes[0], prov.scheduleCursor)
614+
615+
// Two prefixes in schedule
616+
mockClock.Add(1)
617+
prov.schedule.Add(prefixes[1], prov.reprovideTimeForPrefix(prefixes[1]))
618+
prov.handleReprovide() // reprovides prefixes[0], set scheduleCursor to prefixes[1]
619+
require.Equal(t, prefixes[1], prov.scheduleCursor)
620+
621+
// Wait more than reprovideInterval to call handleReprovide again.
622+
// All prefixes should be added to the reprovide queue.
623+
mockClock.Add(prov.reprovideInterval + 1)
624+
require.True(t, prov.reprovideQueue.IsEmpty())
625+
prov.handleReprovide()
626+
require.Equal(t, prefixes[1], prov.scheduleCursor)
627+
628+
require.Equal(t, 2, prov.reprovideQueue.Size())
629+
dequeued, ok := prov.reprovideQueue.Dequeue()
630+
require.True(t, ok)
631+
require.Equal(t, prefixes[0], dequeued)
632+
dequeued, ok = prov.reprovideQueue.Dequeue()
633+
require.True(t, ok)
634+
require.Equal(t, prefixes[1], dequeued)
635+
require.True(t, prov.reprovideQueue.IsEmpty())
636+
637+
// Go in time past prefixes[1] and prefixes[2]
638+
prov.schedule.Add(prefixes[2], prov.reprovideTimeForPrefix(prefixes[2]))
639+
mockClock.Add(3 * prov.reprovideInterval / 4)
640+
// reprovides prefixes[1], add prefixes[2] to reprovide queue, set
641+
// scheduleCursor to prefixes[0]
642+
prov.handleReprovide()
643+
require.Equal(t, prefixes[0], prov.scheduleCursor)
644+
645+
require.Equal(t, 1, prov.reprovideQueue.Size())
646+
dequeued, ok = prov.reprovideQueue.Dequeue()
647+
require.True(t, ok)
648+
require.Equal(t, prefixes[2], dequeued)
649+
require.True(t, prov.reprovideQueue.IsEmpty())
650+
651+
mockClock.Add(prov.reprovideInterval / 4)
652+
653+
// Node goes offline -> prefixes are queued
654+
online.Store(false)
655+
prov.connectivity.TriggerCheck()
656+
waitUntil(t, func() bool { return !prov.connectivity.IsOnline() }, 100*time.Millisecond, "connectivity should be offline")
657+
// require.True(t, prov.reprovideQueue.IsEmpty())
658+
prov.handleReprovide()
659+
// require.Equal(t, 1, prov.reprovideQueue.Size())
660+
661+
// Node comes back online
662+
online.Store(true)
663+
mockClock.Add(connectivityCheckInterval)
664+
waitUntil(t, func() bool { return prov.connectivity.IsOnline() }, 100*time.Millisecond, "connectivity should be online")
665+
}

0 commit comments

Comments
 (0)