Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions provider/buffered/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type SweepingProvider struct {
closed chan struct{}

newItems chan struct{}
provider internal.Provider
Provider internal.Provider
queue *dsqueue.DSQueue
batchSize int
}
Expand All @@ -53,7 +53,7 @@ func New(prov internal.Provider, ds datastore.Batching, opts ...Option) *Sweepin
closed: make(chan struct{}),

newItems: make(chan struct{}, 1),
provider: prov,
Provider: prov,
queue: dsqueue.New(ds, cfg.dsName,
dsqueue.WithDedupCacheSize(0), // disable deduplication
dsqueue.WithIdleWriteTime(cfg.idleWriteTime),
Expand All @@ -73,7 +73,7 @@ func (s *SweepingProvider) Close() error {
var err error
s.closeOnce.Do(func() {
close(s.closed)
err = errors.Join(s.queue.Close(), s.provider.Close())
err = errors.Join(s.queue.Close(), s.Provider.Close())
<-s.done
})
return err
Expand Down Expand Up @@ -175,14 +175,14 @@ func (s *SweepingProvider) worker() {
// Process `StartProviding` (force=true) ops first, so that if
// `StartProviding` (force=false) is called after, there is no need to
// enqueue the multihash a second time to the provide queue.
executeOperation(func(keys ...mh.Multihash) error { return s.provider.StartProviding(true, keys...) }, ops[forceStartProvidingOp])
executeOperation(func(keys ...mh.Multihash) error { return s.provider.StartProviding(false, keys...) }, ops[startProvidingOp])
executeOperation(s.provider.ProvideOnce, ops[provideOnceOp])
executeOperation(func(keys ...mh.Multihash) error { return s.Provider.StartProviding(true, keys...) }, ops[forceStartProvidingOp])
executeOperation(func(keys ...mh.Multihash) error { return s.Provider.StartProviding(false, keys...) }, ops[startProvidingOp])
executeOperation(s.Provider.ProvideOnce, ops[provideOnceOp])
// Process `StopProviding` last, so that multihashes that should have been
// provided, and then stopped provided in the same batch are provided only
// once. Don't `StopProviding` multihashes, for which `StartProviding` has
// been called after `StopProviding`.
executeOperation(s.provider.StopProviding, ops[stopProvidingOp])
executeOperation(s.Provider.StopProviding, ops[stopProvidingOp])
}
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) error {
// The keys are not deleted from the keystore, so they will continue to be
// reprovided as scheduled.
func (s *SweepingProvider) Clear() int {
return s.provider.Clear()
return s.Provider.Clear()
}

// RefreshSchedule scans the KeyStore for any keys that are not currently
Expand All @@ -265,5 +265,5 @@ func (s *SweepingProvider) Clear() int {
// `OfflineDelay`). The schedule depends on the network size, hence recent
// network connectivity is essential.
func (s *SweepingProvider) RefreshSchedule() error {
return s.provider.RefreshSchedule()
return s.Provider.RefreshSchedule()
}
22 changes: 21 additions & 1 deletion provider/internal/connectivity/connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type ConnectivityChecker struct {

lastCheck time.Time
onlineCheckInterval time.Duration // minimum check interval when online
lastStateChange time.Time
lastStateChangeLk sync.Mutex

checkFunc func() bool // function to check whether node is online

Expand Down Expand Up @@ -117,6 +119,20 @@ func (c *ConnectivityChecker) IsOnline() bool {
return c.online.Load()
}

// LastStateChange returns the timestamp of the last state change.
func (c *ConnectivityChecker) LastStateChange() time.Time {
c.lastStateChangeLk.Lock()
defer c.lastStateChangeLk.Unlock()
return c.lastStateChange
}

// stateChanged should be called whenever the connectivity state changes.
func (c *ConnectivityChecker) stateChanged() {
c.lastStateChangeLk.Lock()
defer c.lastStateChangeLk.Unlock()
c.lastStateChange = time.Now()
}

// TriggerCheck triggers an asynchronous connectivity check.
//
// * If a check is already running, does nothing.
Expand Down Expand Up @@ -148,6 +164,7 @@ func (c *ConnectivityChecker) TriggerCheck() {
}

// Online -> Disconnected
c.stateChanged()
c.online.Store(false)

// Start periodic checks until node comes back Online
Expand All @@ -161,8 +178,9 @@ func (c *ConnectivityChecker) probeLoop(init bool) {
var offlineC <-chan time.Time
if !init {
if c.offlineDelay == 0 {
// Online -> Offline
c.stateChanged()
if c.onOffline != nil {
// Online -> Offline
c.onOffline()
}
} else {
Expand All @@ -187,6 +205,7 @@ func (c *ConnectivityChecker) probeLoop(init bool) {
timer.Reset(delay)
case <-offlineC:
// Disconnected -> Offline
c.stateChanged()
if c.onOffline != nil {
c.onOffline()
}
Expand All @@ -205,6 +224,7 @@ func (c *ConnectivityChecker) probe() bool {
c.online.Store(true)

c.lastCheck = time.Now()
c.stateChanged()
if c.onOnline != nil {
c.onOnline()
}
Expand Down
10 changes: 9 additions & 1 deletion provider/internal/connectivity/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ func TestNewConnectiviyChecker(t *testing.T) {
connChecker.Start()

<-onlineChan // wait for onOnline to be run
now := time.Now()
synctest.Wait()

require.True(t, connChecker.IsOnline())
require.Equal(t, now, connChecker.LastStateChange())
})
})

Expand Down Expand Up @@ -139,6 +141,7 @@ func TestStateTransitions(t *testing.T) {
<-onlineChan // wait for onOnline to be run
require.True(t, connChecker.IsOnline())
require.Equal(t, time.Now(), connChecker.lastCheck)
require.Equal(t, time.Now(), connChecker.LastStateChange())

online.Store(false)
// Cannot trigger check yet
Expand All @@ -150,6 +153,7 @@ func TestStateTransitions(t *testing.T) {
connChecker.TriggerCheck()
require.True(t, connChecker.mutex.TryLock()) // node still online
connChecker.mutex.Unlock()
require.NotEqual(t, time.Now(), connChecker.LastStateChange())

time.Sleep(time.Millisecond)
connChecker.TriggerCheck()
Expand All @@ -171,6 +175,7 @@ func TestStateTransitions(t *testing.T) {

require.False(t, connChecker.IsOnline())
<-offlineChan // wait for callback to be run
require.Equal(t, time.Now(), connChecker.LastStateChange())

connChecker.TriggerCheck() // noop since Offline
require.False(t, connChecker.mutex.TryLock())
Expand Down Expand Up @@ -205,9 +210,11 @@ func TestStateTransitions(t *testing.T) {

<-onlineChan

onlineSince := time.Now()
require.True(t, connChecker.IsOnline())
require.Equal(t, int32(1), checkCount.Load())
require.Equal(t, time.Now(), connChecker.lastCheck)
require.Equal(t, onlineSince, connChecker.lastCheck)
require.Equal(t, onlineSince, connChecker.LastStateChange())

connChecker.TriggerCheck() // recent check, should be no-op
synctest.Wait()
Expand All @@ -229,6 +236,7 @@ func TestStateTransitions(t *testing.T) {
synctest.Wait()
require.Equal(t, int32(3), checkCount.Load())
require.Equal(t, time.Now(), connChecker.lastCheck)
require.Equal(t, onlineSince, connChecker.LastStateChange())
})
})
}
Expand Down
37 changes: 37 additions & 0 deletions provider/internal/keyspace/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,43 @@ func allocateToKClosestAtDepth[K kad.Key[K], V0 any, V1 comparable](items *trie.
return m
}

// KeyspaceCovered checks whether the trie covers the entire keyspace without
// gaps.
func KeyspaceCovered[D any](t *trie.Trie[bitstr.Key, D]) bool {
if t.IsLeaf() {
if t.HasKey() {
return *t.Key() == ""
}
return false
}

stack := []bitstr.Key{"1", "0"}
outerLoop:
for _, entry := range AllEntries(t, bit256.ZeroKey()) {
p := entry.Key
stackTop := stack[len(stack)-1]
stackTopLen := len(stackTop)
if len(p) < stackTopLen {
return false
}

for len(p) == stackTopLen {
if stackTopLen == 1 && stackTop == p {
stack = stack[:len(stack)-1]
continue outerLoop
}
// Match with stackTop, pop stack and continue
p = p[:stackTopLen-1]
stack = stack[:len(stack)-1]
stackTop = stack[len(stack)-1]
stackTopLen = len(stackTop)
}

stack = append(stack, FlipLastBit(p))
}
return len(stack) == 0
}

// Region represents a subtrie of the complete DHT keyspace.
//
// - Prefix is the identifier of the subtrie.
Expand Down
105 changes: 105 additions & 0 deletions provider/internal/keyspace/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,3 +939,108 @@ func TestAssignKeysToRegions(t *testing.T) {
}
}
}

func TestKeyspaceCovered(t *testing.T) {
t.Run("empty trie", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
require.False(t, KeyspaceCovered(tr))
})

t.Run("single key covers entire keyspace", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key(""), struct{}{})
require.True(t, KeyspaceCovered(tr))
})

t.Run("two complementary keys cover keyspace", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0"), struct{}{})
tr.Add(bitstr.Key("1"), struct{}{})
require.True(t, KeyspaceCovered(tr))
})

t.Run("missing one key from full coverage", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0"), struct{}{})
// Missing "1" prefix
require.False(t, KeyspaceCovered(tr))
})

t.Run("four keys at depth 2 covering keyspace", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("00"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
tr.Add(bitstr.Key("10"), struct{}{})
tr.Add(bitstr.Key("11"), struct{}{})
require.True(t, KeyspaceCovered(tr))
})

t.Run("mixed depth coverage", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0"), struct{}{}) // covers all of 0xxx
tr.Add(bitstr.Key("10"), struct{}{}) // covers 10xx
tr.Add(bitstr.Key("11"), struct{}{}) // covers 11xx
require.True(t, KeyspaceCovered(tr))
})

t.Run("gaps in coverage", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("00"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
tr.Add(bitstr.Key("10"), struct{}{})
// Missing "11" prefix
require.False(t, KeyspaceCovered(tr))
})

t.Run("complex coverage pattern", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("000"), struct{}{})
tr.Add(bitstr.Key("001"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
tr.Add(bitstr.Key("1"), struct{}{})
require.True(t, KeyspaceCovered(tr))
})

t.Run("deep unbalanced tree coverage", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0000"), struct{}{})
tr.Add(bitstr.Key("0001"), struct{}{})
tr.Add(bitstr.Key("001"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
tr.Add(bitstr.Key("1"), struct{}{})
require.True(t, KeyspaceCovered(tr))
})

t.Run("shallow gap in deep tree", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0000"), struct{}{})
tr.Add(bitstr.Key("0001"), struct{}{})
tr.Add(bitstr.Key("001"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
// Missing "1" prefix
require.False(t, KeyspaceCovered(tr))
})

t.Run("deep gap in deep tree", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0001"), struct{}{})
tr.Add(bitstr.Key("001"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
tr.Add(bitstr.Key("1"), struct{}{})
// Missing "0000" prefix
require.False(t, KeyspaceCovered(tr))
})

t.Run("edge case: single bit differences", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("000"), struct{}{})
tr.Add(bitstr.Key("001"), struct{}{})
tr.Add(bitstr.Key("010"), struct{}{})
tr.Add(bitstr.Key("011"), struct{}{})
tr.Add(bitstr.Key("100"), struct{}{})
tr.Add(bitstr.Key("101"), struct{}{})
tr.Add(bitstr.Key("110"), struct{}{})
tr.Add(bitstr.Key("111"), struct{}{})
require.True(t, KeyspaceCovered(tr))
})
}
14 changes: 11 additions & 3 deletions provider/internal/queue/provide.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ProvideQueue struct {
mu sync.Mutex

queue prefixQueue
keys *trie.Trie[bit256.Key, mh.Multihash] // used to store keys in the queue
keys *trie.Trie[bit256.Key, mh.Multihash] // stores keys currently in the queue
}

// NewProvideQueue creates a new ProvideQueue instance.
Expand Down Expand Up @@ -163,16 +163,24 @@ func (q *ProvideQueue) Remove(keys ...mh.Multihash) {
func (q *ProvideQueue) IsEmpty() bool {
q.mu.Lock()
defer q.mu.Unlock()
return q.queue.Size() == 0
return q.keys.IsEmptyLeaf()
}

// Size returns the number of regions containing at least one key in the queue.
// Size returns the number of keys currently in the queue.
func (q *ProvideQueue) Size() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.keys.Size()
}

// NumRegions returns the number of regions containing at least one key
// currently in the queue.
func (q *ProvideQueue) NumRegions() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.queue.Size()
}

// Clear removes all keys from the queue and returns the number of keys that
// were removed.
func (q *ProvideQueue) Clear() int {
Expand Down
4 changes: 4 additions & 0 deletions provider/internal/queue/provide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func TestProvideEnqueueSimple(t *testing.T) {
}
// Verify the count of multihashes matches
require.Equal(t, len(prefixes)*nMultihashesPerPrefix, q.Size())
// Verify count of regions in the queue
require.Equal(t, len(prefixes), q.NumRegions())
}

func TestProvideEnqueueOverlapping(t *testing.T) {
Expand All @@ -73,6 +75,7 @@ func TestProvideEnqueueOverlapping(t *testing.T) {
}

require.Equal(t, 1, q.queue.prefixes.Size()) // Only shortest prefix should remain
require.Equal(t, 1, q.NumRegions())
require.Equal(t, 1, q.queue.queue.Len())
require.GreaterOrEqual(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[0] }), 0) // "000" is in queue
require.Negative(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[1] })) // "0000" is NOT in queue
Expand All @@ -90,6 +93,7 @@ func TestProvideEnqueueOverlapping(t *testing.T) {
}

require.Equal(t, 2, q.queue.prefixes.Size()) // only "000" and "111" should remain
require.Equal(t, 2, q.NumRegions())
require.Equal(t, 2, q.queue.queue.Len())
require.GreaterOrEqual(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[1] }), 0) // "111" is in queue
require.Negative(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[0] })) // "1111" is NOT in queue
Expand Down
Loading