Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/gammazero/deque v1.1.0
github.com/google/gopacket v1.1.19
github.com/google/uuid v1.6.0
github.com/guillaumemichel/reservedpool v0.2.0
github.com/guillaumemichel/reservedpool v0.3.0
github.com/hashicorp/golang-lru v1.0.2
github.com/ipfs/boxo v0.33.1
github.com/ipfs/go-cid v0.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/guillaumemichel/reservedpool v0.2.0 h1:q73gtdMFJHtW+dDJ/fwtk34p7JprQv8fJSK7dEjf8Sw=
github.com/guillaumemichel/reservedpool v0.2.0/go.mod h1:sXSDIaef81TFdAJglsCFCMfgF5E5Z5xK1tFhjDhvbUc=
github.com/guillaumemichel/reservedpool v0.3.0 h1:eqqO/QvTllLBrit7LVtVJBqw4cD0WdV9ajUe7WNTajw=
github.com/guillaumemichel/reservedpool v0.3.0/go.mod h1:sXSDIaef81TFdAJglsCFCMfgF5E5Z5xK1tFhjDhvbUc=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down
2 changes: 1 addition & 1 deletion provider/buffered/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestQueueingMechanism(t *testing.T) {
}

// Wait for operations to be processed by expecting 4 signals
for i := 0; i < 4; i++ {
for i := range 4 {
select {
case <-fake.processed:
case <-time.After(time.Second):
Expand Down
22 changes: 21 additions & 1 deletion provider/internal/connectivity/connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type ConnectivityChecker struct {

lastCheck time.Time
onlineCheckInterval time.Duration // minimum check interval when online
lastStateChange time.Time
lastStateChangeLk sync.Mutex

checkFunc func() bool // function to check whether node is online

Expand Down Expand Up @@ -118,6 +120,20 @@ func (c *ConnectivityChecker) IsOnline() bool {
return c.online.Load()
}

// LastStateChange returns the timestamp of the last state change.
func (c *ConnectivityChecker) LastStateChange() time.Time {
c.lastStateChangeLk.Lock()
defer c.lastStateChangeLk.Unlock()
return c.lastStateChange
}

// stateChanged should be called whenever the connectivity state changes.
func (c *ConnectivityChecker) stateChanged() {
c.lastStateChangeLk.Lock()
defer c.lastStateChangeLk.Unlock()
c.lastStateChange = time.Now()
}

// TriggerCheck triggers an asynchronous connectivity check.
//
// * If a check is already running, does nothing.
Expand Down Expand Up @@ -149,6 +165,7 @@ func (c *ConnectivityChecker) TriggerCheck() {
}

// Online -> Disconnected
c.stateChanged()
c.online.Store(false)

// Start periodic checks until node comes back Online
Expand All @@ -162,8 +179,9 @@ func (c *ConnectivityChecker) probeLoop(init bool) {
var offlineC <-chan time.Time
if !init {
if c.offlineDelay == 0 {
// Online -> Offline
c.stateChanged()
if c.onOffline != nil {
// Online -> Offline
c.onOffline()
}
} else {
Expand All @@ -188,6 +206,7 @@ func (c *ConnectivityChecker) probeLoop(init bool) {
timer.Reset(delay)
case <-offlineC:
// Disconnected -> Offline
c.stateChanged()
if c.onOffline != nil {
c.onOffline()
}
Expand All @@ -206,6 +225,7 @@ func (c *ConnectivityChecker) probe() bool {
c.online.Store(true)

c.lastCheck = time.Now()
c.stateChanged()
if c.onOnline != nil {
c.onOnline()
}
Expand Down
10 changes: 9 additions & 1 deletion provider/internal/connectivity/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ func TestNewConnectiviyChecker(t *testing.T) {
connChecker.Start()

<-onlineChan // wait for onOnline to be run
now := time.Now()
synctest.Wait()

require.True(t, connChecker.IsOnline())
require.Equal(t, now, connChecker.LastStateChange())
})
})

Expand Down Expand Up @@ -139,6 +141,7 @@ func TestStateTransitions(t *testing.T) {
<-onlineChan // wait for onOnline to be run
require.True(t, connChecker.IsOnline())
require.Equal(t, time.Now(), connChecker.lastCheck)
require.Equal(t, time.Now(), connChecker.LastStateChange())

online.Store(false)
// Cannot trigger check yet
Expand All @@ -150,6 +153,7 @@ func TestStateTransitions(t *testing.T) {
connChecker.TriggerCheck()
require.True(t, connChecker.mutex.TryLock()) // node still online
connChecker.mutex.Unlock()
require.NotEqual(t, time.Now(), connChecker.LastStateChange())

time.Sleep(time.Millisecond)
connChecker.TriggerCheck()
Expand All @@ -171,6 +175,7 @@ func TestStateTransitions(t *testing.T) {

require.False(t, connChecker.IsOnline())
<-offlineChan // wait for callback to be run
require.Equal(t, time.Now(), connChecker.LastStateChange())

connChecker.TriggerCheck() // noop since Offline
require.False(t, connChecker.mutex.TryLock())
Expand Down Expand Up @@ -205,9 +210,11 @@ func TestStateTransitions(t *testing.T) {

<-onlineChan

onlineSince := time.Now()
require.True(t, connChecker.IsOnline())
require.Equal(t, int32(1), checkCount.Load())
require.Equal(t, time.Now(), connChecker.lastCheck)
require.Equal(t, onlineSince, connChecker.lastCheck)
require.Equal(t, onlineSince, connChecker.LastStateChange())

connChecker.TriggerCheck() // recent check, should be no-op
synctest.Wait()
Expand All @@ -229,6 +236,7 @@ func TestStateTransitions(t *testing.T) {
synctest.Wait()
require.Equal(t, int32(3), checkCount.Load())
require.Equal(t, time.Now(), connChecker.lastCheck)
require.Equal(t, onlineSince, connChecker.LastStateChange())
})
})
}
Expand Down
39 changes: 39 additions & 0 deletions provider/internal/keyspace/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,45 @@ func allocateToKClosestAtDepth[K kad.Key[K], V0 any, V1 comparable](items *trie.
return m
}

// KeyspaceCovered checks whether the trie covers the entire keyspace without
// gaps.
func KeyspaceCovered[V any](t *trie.Trie[bitstr.Key, V]) bool {
if t.IsLeaf() {
if t.HasKey() {
return *t.Key() == ""
}
return false
}

stack := []bitstr.Key{"1", "0"}
outerLoop:
for _, entry := range AllEntries(t, bit256.ZeroKey()) {
p := entry.Key
stackTop := stack[len(stack)-1]
stackTopLen := len(stackTop)
if len(p) < stackTopLen {
return false
}

if len(p) == stackTopLen {
for len(p) == stackTopLen {
if stackTopLen == 1 && stackTop == p {
stack = stack[:len(stack)-1]
continue outerLoop
}
// Match with stackTop, pop stack and continue
p = p[:stackTopLen-1]
stack = stack[:len(stack)-1]
stackTop = stack[len(stack)-1]
stackTopLen = len(stackTop)
}
}

stack = append(stack, FlipLastBit(p))
}
return len(stack) == 0
}

// Region represents a subtrie of the complete DHT keyspace.
//
// - Prefix is the identifier of the subtrie.
Expand Down
105 changes: 105 additions & 0 deletions provider/internal/keyspace/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,3 +805,108 @@ func TestAssignKeysToRegions(t *testing.T) {
}
}
}

func TestKeyspaceCovered(t *testing.T) {
t.Run("empty trie", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
require.False(t, KeyspaceCovered(tr))
})

t.Run("single key covers entire keyspace", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key(""), struct{}{})
require.True(t, KeyspaceCovered(tr))
})

t.Run("two complementary keys cover keyspace", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0"), struct{}{})
tr.Add(bitstr.Key("1"), struct{}{})
require.True(t, KeyspaceCovered(tr))
})

t.Run("missing one key from full coverage", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0"), struct{}{})
// Missing "1" prefix
require.False(t, KeyspaceCovered(tr))
})

t.Run("four keys at depth 2 covering keyspace", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("00"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
tr.Add(bitstr.Key("10"), struct{}{})
tr.Add(bitstr.Key("11"), struct{}{})
require.True(t, KeyspaceCovered(tr))
})

t.Run("mixed depth coverage", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0"), struct{}{}) // covers all of 0xxx
tr.Add(bitstr.Key("10"), struct{}{}) // covers 10xx
tr.Add(bitstr.Key("11"), struct{}{}) // covers 11xx
require.True(t, KeyspaceCovered(tr))
})

t.Run("gaps in coverage", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("00"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
tr.Add(bitstr.Key("10"), struct{}{})
// Missing "11" prefix
require.False(t, KeyspaceCovered(tr))
})

t.Run("complex coverage pattern", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("000"), struct{}{})
tr.Add(bitstr.Key("001"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
tr.Add(bitstr.Key("1"), struct{}{})
require.True(t, KeyspaceCovered(tr))
})

t.Run("deep unbalanced tree coverage", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0000"), struct{}{})
tr.Add(bitstr.Key("0001"), struct{}{})
tr.Add(bitstr.Key("001"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
tr.Add(bitstr.Key("1"), struct{}{})
require.True(t, KeyspaceCovered(tr))
})

t.Run("shallow gap in deep tree", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0000"), struct{}{})
tr.Add(bitstr.Key("0001"), struct{}{})
tr.Add(bitstr.Key("001"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
// Missing "1" prefix
require.False(t, KeyspaceCovered(tr))
})

t.Run("deep gap in deep tree", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("0001"), struct{}{})
tr.Add(bitstr.Key("001"), struct{}{})
tr.Add(bitstr.Key("01"), struct{}{})
tr.Add(bitstr.Key("1"), struct{}{})
// Missing "0000" prefix
require.False(t, KeyspaceCovered(tr))
})

t.Run("edge case: single bit differences", func(t *testing.T) {
tr := trie.New[bitstr.Key, struct{}]()
tr.Add(bitstr.Key("000"), struct{}{})
tr.Add(bitstr.Key("001"), struct{}{})
tr.Add(bitstr.Key("010"), struct{}{})
tr.Add(bitstr.Key("011"), struct{}{})
tr.Add(bitstr.Key("100"), struct{}{})
tr.Add(bitstr.Key("101"), struct{}{})
tr.Add(bitstr.Key("110"), struct{}{})
tr.Add(bitstr.Key("111"), struct{}{})
require.True(t, KeyspaceCovered(tr))
})
}
14 changes: 11 additions & 3 deletions provider/internal/queue/provide.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ProvideQueue struct {
mu sync.Mutex

queue prefixQueue
keys *trie.Trie[bit256.Key, mh.Multihash] // used to store keys in the queue
keys *trie.Trie[bit256.Key, mh.Multihash] // stores keys currently in the queue
}

// NewProvideQueue creates a new ProvideQueue instance.
Expand Down Expand Up @@ -161,16 +161,24 @@ func (q *ProvideQueue) Remove(keys ...mh.Multihash) {
func (q *ProvideQueue) IsEmpty() bool {
q.mu.Lock()
defer q.mu.Unlock()
return q.queue.Size() == 0
return q.keys.IsEmptyLeaf()
}

// Size returns the number of regions containing at least one key in the queue.
// Size returns the number of keys currently in the queue.
func (q *ProvideQueue) Size() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.keys.Size()
}

// NumRegions returns the number of regions containing at least one key
// currently in the queue.
func (q *ProvideQueue) NumRegions() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.queue.Size()
}

// Clear removes all keys from the queue and returns the number of keys that
// were removed.
func (q *ProvideQueue) Clear() int {
Expand Down
4 changes: 4 additions & 0 deletions provider/internal/queue/provide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func TestProvideEnqueueSimple(t *testing.T) {
}
// Verify the count of multihashes matches
require.Equal(t, len(prefixes)*nMultihashesPerPrefix, q.Size())
// Verify count of regions in the queue
require.Equal(t, len(prefixes), q.NumRegions())
}

func TestProvideEnqueueOverlapping(t *testing.T) {
Expand All @@ -73,6 +75,7 @@ func TestProvideEnqueueOverlapping(t *testing.T) {
}

require.Equal(t, 1, q.queue.prefixes.Size()) // Only shortest prefix should remain
require.Equal(t, 1, q.NumRegions())
require.Equal(t, 1, q.queue.queue.Len())
require.GreaterOrEqual(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[0] }), 0) // "000" is in queue
require.Negative(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[1] })) // "0000" is NOT in queue
Expand All @@ -90,6 +93,7 @@ func TestProvideEnqueueOverlapping(t *testing.T) {
}

require.Equal(t, 2, q.queue.prefixes.Size()) // only "000" and "111" should remain
require.Equal(t, 2, q.NumRegions())
require.Equal(t, 2, q.queue.queue.Len())
require.GreaterOrEqual(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[1] }), 0) // "111" is in queue
require.Negative(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[0] })) // "1111" is NOT in queue
Expand Down
Loading
Loading