diff --git a/provider/buffered/provider.go b/provider/buffered/provider.go index a1710d60f..50f4fb3ad 100644 --- a/provider/buffered/provider.go +++ b/provider/buffered/provider.go @@ -38,7 +38,7 @@ type SweepingProvider struct { closed chan struct{} newItems chan struct{} - provider internal.Provider + Provider internal.Provider queue *dsqueue.DSQueue batchSize int } @@ -53,7 +53,7 @@ func New(prov internal.Provider, ds datastore.Batching, opts ...Option) *Sweepin closed: make(chan struct{}), newItems: make(chan struct{}, 1), - provider: prov, + Provider: prov, queue: dsqueue.New(ds, cfg.dsName, dsqueue.WithDedupCacheSize(0), // disable deduplication dsqueue.WithIdleWriteTime(cfg.idleWriteTime), @@ -73,7 +73,7 @@ func (s *SweepingProvider) Close() error { var err error s.closeOnce.Do(func() { close(s.closed) - err = errors.Join(s.queue.Close(), s.provider.Close()) + err = errors.Join(s.queue.Close(), s.Provider.Close()) <-s.done }) return err @@ -175,14 +175,14 @@ func (s *SweepingProvider) worker() { // Process `StartProviding` (force=true) ops first, so that if // `StartProviding` (force=false) is called after, there is no need to // enqueue the multihash a second time to the provide queue. - executeOperation(func(keys ...mh.Multihash) error { return s.provider.StartProviding(true, keys...) }, ops[forceStartProvidingOp]) - executeOperation(func(keys ...mh.Multihash) error { return s.provider.StartProviding(false, keys...) }, ops[startProvidingOp]) - executeOperation(s.provider.ProvideOnce, ops[provideOnceOp]) + executeOperation(func(keys ...mh.Multihash) error { return s.Provider.StartProviding(true, keys...) }, ops[forceStartProvidingOp]) + executeOperation(func(keys ...mh.Multihash) error { return s.Provider.StartProviding(false, keys...) }, ops[startProvidingOp]) + executeOperation(s.Provider.ProvideOnce, ops[provideOnceOp]) // Process `StopProviding` last, so that multihashes that should have been // provided, and then stopped provided in the same batch are provided only // once. Don't `StopProviding` multihashes, for which `StartProviding` has // been called after `StopProviding`. - executeOperation(s.provider.StopProviding, ops[stopProvidingOp]) + executeOperation(s.Provider.StopProviding, ops[stopProvidingOp]) } } @@ -249,7 +249,7 @@ func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) error { // The keys are not deleted from the keystore, so they will continue to be // reprovided as scheduled. func (s *SweepingProvider) Clear() int { - return s.provider.Clear() + return s.Provider.Clear() } // RefreshSchedule scans the KeyStore for any keys that are not currently @@ -265,5 +265,5 @@ func (s *SweepingProvider) Clear() int { // `OfflineDelay`). The schedule depends on the network size, hence recent // network connectivity is essential. func (s *SweepingProvider) RefreshSchedule() error { - return s.provider.RefreshSchedule() + return s.Provider.RefreshSchedule() } diff --git a/provider/internal/connectivity/connectivity.go b/provider/internal/connectivity/connectivity.go index 8565d253a..a167dd9eb 100644 --- a/provider/internal/connectivity/connectivity.go +++ b/provider/internal/connectivity/connectivity.go @@ -43,6 +43,8 @@ type ConnectivityChecker struct { lastCheck time.Time onlineCheckInterval time.Duration // minimum check interval when online + lastStateChange time.Time + lastStateChangeLk sync.Mutex checkFunc func() bool // function to check whether node is online @@ -117,6 +119,20 @@ func (c *ConnectivityChecker) IsOnline() bool { return c.online.Load() } +// LastStateChange returns the timestamp of the last state change. +func (c *ConnectivityChecker) LastStateChange() time.Time { + c.lastStateChangeLk.Lock() + defer c.lastStateChangeLk.Unlock() + return c.lastStateChange +} + +// stateChanged should be called whenever the connectivity state changes. +func (c *ConnectivityChecker) stateChanged() { + c.lastStateChangeLk.Lock() + defer c.lastStateChangeLk.Unlock() + c.lastStateChange = time.Now() +} + // TriggerCheck triggers an asynchronous connectivity check. // // * If a check is already running, does nothing. @@ -148,6 +164,7 @@ func (c *ConnectivityChecker) TriggerCheck() { } // Online -> Disconnected + c.stateChanged() c.online.Store(false) // Start periodic checks until node comes back Online @@ -161,8 +178,9 @@ func (c *ConnectivityChecker) probeLoop(init bool) { var offlineC <-chan time.Time if !init { if c.offlineDelay == 0 { + // Online -> Offline + c.stateChanged() if c.onOffline != nil { - // Online -> Offline c.onOffline() } } else { @@ -187,6 +205,7 @@ func (c *ConnectivityChecker) probeLoop(init bool) { timer.Reset(delay) case <-offlineC: // Disconnected -> Offline + c.stateChanged() if c.onOffline != nil { c.onOffline() } @@ -205,6 +224,7 @@ func (c *ConnectivityChecker) probe() bool { c.online.Store(true) c.lastCheck = time.Now() + c.stateChanged() if c.onOnline != nil { c.onOnline() } diff --git a/provider/internal/connectivity/connectivity_test.go b/provider/internal/connectivity/connectivity_test.go index c30de2f2c..b3ca270a5 100644 --- a/provider/internal/connectivity/connectivity_test.go +++ b/provider/internal/connectivity/connectivity_test.go @@ -42,9 +42,11 @@ func TestNewConnectiviyChecker(t *testing.T) { connChecker.Start() <-onlineChan // wait for onOnline to be run + now := time.Now() synctest.Wait() require.True(t, connChecker.IsOnline()) + require.Equal(t, now, connChecker.LastStateChange()) }) }) @@ -139,6 +141,7 @@ func TestStateTransitions(t *testing.T) { <-onlineChan // wait for onOnline to be run require.True(t, connChecker.IsOnline()) require.Equal(t, time.Now(), connChecker.lastCheck) + require.Equal(t, time.Now(), connChecker.LastStateChange()) online.Store(false) // Cannot trigger check yet @@ -150,6 +153,7 @@ func TestStateTransitions(t *testing.T) { connChecker.TriggerCheck() require.True(t, connChecker.mutex.TryLock()) // node still online connChecker.mutex.Unlock() + require.NotEqual(t, time.Now(), connChecker.LastStateChange()) time.Sleep(time.Millisecond) connChecker.TriggerCheck() @@ -171,6 +175,7 @@ func TestStateTransitions(t *testing.T) { require.False(t, connChecker.IsOnline()) <-offlineChan // wait for callback to be run + require.Equal(t, time.Now(), connChecker.LastStateChange()) connChecker.TriggerCheck() // noop since Offline require.False(t, connChecker.mutex.TryLock()) @@ -205,9 +210,11 @@ func TestStateTransitions(t *testing.T) { <-onlineChan + onlineSince := time.Now() require.True(t, connChecker.IsOnline()) require.Equal(t, int32(1), checkCount.Load()) - require.Equal(t, time.Now(), connChecker.lastCheck) + require.Equal(t, onlineSince, connChecker.lastCheck) + require.Equal(t, onlineSince, connChecker.LastStateChange()) connChecker.TriggerCheck() // recent check, should be no-op synctest.Wait() @@ -229,6 +236,7 @@ func TestStateTransitions(t *testing.T) { synctest.Wait() require.Equal(t, int32(3), checkCount.Load()) require.Equal(t, time.Now(), connChecker.lastCheck) + require.Equal(t, onlineSince, connChecker.LastStateChange()) }) }) } diff --git a/provider/internal/keyspace/trie.go b/provider/internal/keyspace/trie.go index 16beb05d5..5a8ed90c4 100644 --- a/provider/internal/keyspace/trie.go +++ b/provider/internal/keyspace/trie.go @@ -391,6 +391,43 @@ func allocateToKClosestAtDepth[K kad.Key[K], V0 any, V1 comparable](items *trie. return m } +// KeyspaceCovered checks whether the trie covers the entire keyspace without +// gaps. +func KeyspaceCovered[D any](t *trie.Trie[bitstr.Key, D]) bool { + if t.IsLeaf() { + if t.HasKey() { + return *t.Key() == "" + } + return false + } + + stack := []bitstr.Key{"1", "0"} +outerLoop: + for _, entry := range AllEntries(t, bit256.ZeroKey()) { + p := entry.Key + stackTop := stack[len(stack)-1] + stackTopLen := len(stackTop) + if len(p) < stackTopLen { + return false + } + + for len(p) == stackTopLen { + if stackTopLen == 1 && stackTop == p { + stack = stack[:len(stack)-1] + continue outerLoop + } + // Match with stackTop, pop stack and continue + p = p[:stackTopLen-1] + stack = stack[:len(stack)-1] + stackTop = stack[len(stack)-1] + stackTopLen = len(stackTop) + } + + stack = append(stack, FlipLastBit(p)) + } + return len(stack) == 0 +} + // Region represents a subtrie of the complete DHT keyspace. // // - Prefix is the identifier of the subtrie. diff --git a/provider/internal/keyspace/trie_test.go b/provider/internal/keyspace/trie_test.go index f94c310ea..ec116ebc7 100644 --- a/provider/internal/keyspace/trie_test.go +++ b/provider/internal/keyspace/trie_test.go @@ -939,3 +939,108 @@ func TestAssignKeysToRegions(t *testing.T) { } } } + +func TestKeyspaceCovered(t *testing.T) { + t.Run("empty trie", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + require.False(t, KeyspaceCovered(tr)) + }) + + t.Run("single key covers entire keyspace", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + tr.Add(bitstr.Key(""), struct{}{}) + require.True(t, KeyspaceCovered(tr)) + }) + + t.Run("two complementary keys cover keyspace", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + tr.Add(bitstr.Key("0"), struct{}{}) + tr.Add(bitstr.Key("1"), struct{}{}) + require.True(t, KeyspaceCovered(tr)) + }) + + t.Run("missing one key from full coverage", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + tr.Add(bitstr.Key("0"), struct{}{}) + // Missing "1" prefix + require.False(t, KeyspaceCovered(tr)) + }) + + t.Run("four keys at depth 2 covering keyspace", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + tr.Add(bitstr.Key("00"), struct{}{}) + tr.Add(bitstr.Key("01"), struct{}{}) + tr.Add(bitstr.Key("10"), struct{}{}) + tr.Add(bitstr.Key("11"), struct{}{}) + require.True(t, KeyspaceCovered(tr)) + }) + + t.Run("mixed depth coverage", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + tr.Add(bitstr.Key("0"), struct{}{}) // covers all of 0xxx + tr.Add(bitstr.Key("10"), struct{}{}) // covers 10xx + tr.Add(bitstr.Key("11"), struct{}{}) // covers 11xx + require.True(t, KeyspaceCovered(tr)) + }) + + t.Run("gaps in coverage", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + tr.Add(bitstr.Key("00"), struct{}{}) + tr.Add(bitstr.Key("01"), struct{}{}) + tr.Add(bitstr.Key("10"), struct{}{}) + // Missing "11" prefix + require.False(t, KeyspaceCovered(tr)) + }) + + t.Run("complex coverage pattern", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + tr.Add(bitstr.Key("000"), struct{}{}) + tr.Add(bitstr.Key("001"), struct{}{}) + tr.Add(bitstr.Key("01"), struct{}{}) + tr.Add(bitstr.Key("1"), struct{}{}) + require.True(t, KeyspaceCovered(tr)) + }) + + t.Run("deep unbalanced tree coverage", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + tr.Add(bitstr.Key("0000"), struct{}{}) + tr.Add(bitstr.Key("0001"), struct{}{}) + tr.Add(bitstr.Key("001"), struct{}{}) + tr.Add(bitstr.Key("01"), struct{}{}) + tr.Add(bitstr.Key("1"), struct{}{}) + require.True(t, KeyspaceCovered(tr)) + }) + + t.Run("shallow gap in deep tree", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + tr.Add(bitstr.Key("0000"), struct{}{}) + tr.Add(bitstr.Key("0001"), struct{}{}) + tr.Add(bitstr.Key("001"), struct{}{}) + tr.Add(bitstr.Key("01"), struct{}{}) + // Missing "1" prefix + require.False(t, KeyspaceCovered(tr)) + }) + + t.Run("deep gap in deep tree", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + tr.Add(bitstr.Key("0001"), struct{}{}) + tr.Add(bitstr.Key("001"), struct{}{}) + tr.Add(bitstr.Key("01"), struct{}{}) + tr.Add(bitstr.Key("1"), struct{}{}) + // Missing "0000" prefix + require.False(t, KeyspaceCovered(tr)) + }) + + t.Run("edge case: single bit differences", func(t *testing.T) { + tr := trie.New[bitstr.Key, struct{}]() + tr.Add(bitstr.Key("000"), struct{}{}) + tr.Add(bitstr.Key("001"), struct{}{}) + tr.Add(bitstr.Key("010"), struct{}{}) + tr.Add(bitstr.Key("011"), struct{}{}) + tr.Add(bitstr.Key("100"), struct{}{}) + tr.Add(bitstr.Key("101"), struct{}{}) + tr.Add(bitstr.Key("110"), struct{}{}) + tr.Add(bitstr.Key("111"), struct{}{}) + require.True(t, KeyspaceCovered(tr)) + }) +} diff --git a/provider/internal/queue/provide.go b/provider/internal/queue/provide.go index 2dc87bec1..c6fea1cb1 100644 --- a/provider/internal/queue/provide.go +++ b/provider/internal/queue/provide.go @@ -29,7 +29,7 @@ type ProvideQueue struct { mu sync.Mutex queue prefixQueue - keys *trie.Trie[bit256.Key, mh.Multihash] // used to store keys in the queue + keys *trie.Trie[bit256.Key, mh.Multihash] // stores keys currently in the queue } // NewProvideQueue creates a new ProvideQueue instance. @@ -163,16 +163,24 @@ func (q *ProvideQueue) Remove(keys ...mh.Multihash) { func (q *ProvideQueue) IsEmpty() bool { q.mu.Lock() defer q.mu.Unlock() - return q.queue.Size() == 0 + return q.keys.IsEmptyLeaf() } -// Size returns the number of regions containing at least one key in the queue. +// Size returns the number of keys currently in the queue. func (q *ProvideQueue) Size() int { q.mu.Lock() defer q.mu.Unlock() return q.keys.Size() } +// NumRegions returns the number of regions containing at least one key +// currently in the queue. +func (q *ProvideQueue) NumRegions() int { + q.mu.Lock() + defer q.mu.Unlock() + return q.queue.Size() +} + // Clear removes all keys from the queue and returns the number of keys that // were removed. func (q *ProvideQueue) Clear() int { diff --git a/provider/internal/queue/provide_test.go b/provider/internal/queue/provide_test.go index 21ab8e76b..908648b9e 100644 --- a/provider/internal/queue/provide_test.go +++ b/provider/internal/queue/provide_test.go @@ -56,6 +56,8 @@ func TestProvideEnqueueSimple(t *testing.T) { } // Verify the count of multihashes matches require.Equal(t, len(prefixes)*nMultihashesPerPrefix, q.Size()) + // Verify count of regions in the queue + require.Equal(t, len(prefixes), q.NumRegions()) } func TestProvideEnqueueOverlapping(t *testing.T) { @@ -73,6 +75,7 @@ func TestProvideEnqueueOverlapping(t *testing.T) { } require.Equal(t, 1, q.queue.prefixes.Size()) // Only shortest prefix should remain + require.Equal(t, 1, q.NumRegions()) require.Equal(t, 1, q.queue.queue.Len()) require.GreaterOrEqual(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[0] }), 0) // "000" is in queue require.Negative(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[1] })) // "0000" is NOT in queue @@ -90,6 +93,7 @@ func TestProvideEnqueueOverlapping(t *testing.T) { } require.Equal(t, 2, q.queue.prefixes.Size()) // only "000" and "111" should remain + require.Equal(t, 2, q.NumRegions()) require.Equal(t, 2, q.queue.queue.Len()) require.GreaterOrEqual(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[1] }), 0) // "111" is in queue require.Negative(t, q.queue.queue.Index(func(k bitstr.Key) bool { return k == prefixes[0] })) // "1111" is NOT in queue diff --git a/provider/internal/timeseries/cycle_stats.go b/provider/internal/timeseries/cycle_stats.go new file mode 100644 index 000000000..289d0e431 --- /dev/null +++ b/provider/internal/timeseries/cycle_stats.go @@ -0,0 +1,129 @@ +package timeseries + +import ( + "time" + + "github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace" + "github.com/probe-lab/go-libdht/kad/key/bit256" + "github.com/probe-lab/go-libdht/kad/key/bitstr" + "github.com/probe-lab/go-libdht/kad/trie" +) + +type entry struct { + time time.Time + val int64 +} + +// CycleStats tracks statistics organized by keyspace prefixes with TTL-based cleanup. +// It maintains a trie structure where statistics are aggregated by prefix and +// automatically cleaned up after the retention period. +type CycleStats struct { + trie *trie.Trie[bitstr.Key, entry] + + queue *trie.Trie[bitstr.Key, entry] + + ttl, maxDelay time.Duration +} + +// NewCycleStats creates a new CycleStats with the specified TTL and maximum delay. +func NewCycleStats(ttl, maxDelay time.Duration) CycleStats { + return CycleStats{ + trie: trie.New[bitstr.Key, entry](), + queue: trie.New[bitstr.Key, entry](), + ttl: ttl, + maxDelay: maxDelay, + } +} + +// Cleanup removes entries that have exceeded their TTL plus maximum delay. +func (s *CycleStats) Cleanup() { + allEntries := keyspace.AllEntries(s.trie, bit256.ZeroKey()) + now := time.Now() + for _, e := range allEntries { + if e.Data.time.Add(s.ttl + s.maxDelay).Before(now) { + s.trie.Remove(e.Key) + if subtrie, ok := keyspace.FindSubtrie(s.queue, e.Key); ok { + for _, qe := range keyspace.AllEntries(subtrie, bit256.ZeroKey()) { + s.trie.Add(qe.Key, qe.Data) + } + } + } + } +} + +// Add records a value for the given prefix, handling prefix aggregation logic. +func (s *CycleStats) Add(prefix bitstr.Key, val int64) { + e := entry{time: time.Now(), val: val} + if _, ok := keyspace.FindSubtrie(s.trie, prefix); ok { + // shorter prefix + keyspace.PruneSubtrie(s.trie, prefix) + s.trie.Add(prefix, e) + return + } + // longer prefix, group with complements before replacing + target, ok := keyspace.FindPrefixOfKey(s.trie, prefix) + if !ok { + // No keys in s.trie is a prefix of `prefix` + s.trie.Add(prefix, e) + return + } + + if queuePrefix, ok := keyspace.FindPrefixOfKey(s.queue, prefix); ok { + _, entry := trie.Find(s.queue, queuePrefix) + if time.Since(entry.time) < s.maxDelay { + // A recent entry is a superset of the current one, skip. + return + } + // Remove old entry + keyspace.PruneSubtrie(s.queue, queuePrefix) + } else { + // Remove (older) superstrings from queue + keyspace.PruneSubtrie(s.queue, prefix) + } + // Add prefix to queue + s.queue.Add(prefix, e) + + subtrie, ok := keyspace.FindSubtrie(s.queue, target) + if !ok || !keyspace.KeyspaceCovered(subtrie) { + // Subtrie not complete + return + } + // Target keyspace is fully covered by queue entries. Replace target with + // queue entries. + keyspace.PruneSubtrie(s.trie, target) + for _, e := range keyspace.AllEntries(subtrie, bit256.ZeroKey()) { + s.trie.Add(e.Key, e.Data) + } +} + +// Sum returns the sum of all values in the trie. +func (s *CycleStats) Sum() int64 { + var sum int64 + for _, v := range keyspace.AllValues(s.trie, bit256.ZeroKey()) { + sum += v.val + } + return sum +} + +// Avg returns the average of all values in the trie. +func (s *CycleStats) Avg() float64 { + allValues := keyspace.AllValues(s.trie, bit256.ZeroKey()) + if len(allValues) == 0 { + return 0 + } + var sum int64 + for _, v := range allValues { + sum += v.val + } + return float64(sum) / float64(len(allValues)) +} + +// Count returns the number of entries in the trie. +func (s *CycleStats) Count() int { + return s.trie.Size() +} + +// FullyCovered returns true if the trie covers the complete keyspace. +func (s *CycleStats) FullyCovered() bool { + return keyspace.KeyspaceCovered(s.trie) +} diff --git a/provider/internal/timeseries/cycle_stats_test.go b/provider/internal/timeseries/cycle_stats_test.go new file mode 100644 index 000000000..0b3f0d39a --- /dev/null +++ b/provider/internal/timeseries/cycle_stats_test.go @@ -0,0 +1,100 @@ +package timeseries + +import ( + "testing" + "time" + + "github.com/probe-lab/go-libdht/kad/key/bitstr" + "github.com/stretchr/testify/require" +) + +func TestCycleStatsSimple(t *testing.T) { + cs := NewCycleStats(time.Hour, time.Minute) + + // Test empty stats + require.Equal(t, int64(0), cs.Sum(), "sum of empty CycleStats") + require.Equal(t, 0.0, cs.Avg(), "avg of empty CycleStats") + require.Equal(t, 0, cs.Count(), "count of empty CycleStats") + + // Add non-overlapping prefixes + cs.Add(bitstr.Key("0"), 10) + cs.Add(bitstr.Key("1"), 20) + + require.Equal(t, int64(30), cs.Sum(), "sum with two prefixes") + require.Equal(t, 2, cs.Count(), "count with two prefixes") + require.Equal(t, 15.0, cs.Avg(), "average with two prefixes") +} + +func TestCycleStatsSimpleFullyCovered(t *testing.T) { + cs := NewCycleStats(time.Hour, time.Minute) + + // Single bit prefixes should cover full keyspace + cs.Add(bitstr.Key("0"), 10) + cs.Add(bitstr.Key("1"), 20) + + require.True(t, cs.FullyCovered(), "should be fully covered with prefixes '0' and '1'") +} + +func TestCycleStatsSimpleCleanup(t *testing.T) { + // Use very short TTL for testing + cs := NewCycleStats(time.Millisecond, time.Millisecond) + + cs.Add(bitstr.Key("0"), 10) + cs.Add(bitstr.Key("1"), 20) + + require.Equal(t, 2, cs.Count(), "count before cleanup") + + // Wait for entries to expire + time.Sleep(5 * time.Millisecond) + cs.Cleanup() + + require.Equal(t, 0, cs.Count(), "count after cleanup") + require.Equal(t, int64(0), cs.Sum(), "sum after cleanup") +} + +func TestCycleStatsReplacement(t *testing.T) { + cs := NewCycleStats(time.Hour, time.Minute) + + // Add parent prefix + cs.Add(bitstr.Key(""), 100) // Root covers everything + + require.Equal(t, 1, cs.Count(), "count with root prefix") + require.True(t, cs.FullyCovered(), "should be fully covered with root prefix") + + // Add more specific prefixes that should replace the root + cs.Add(bitstr.Key("0"), 20) + cs.Add(bitstr.Key("1"), 20) + + // Should now have 2 entries instead of 1 + require.Equal(t, 2, cs.Count(), "count after replacement") + require.Equal(t, int64(40), cs.Sum(), "sum after replacement") +} + +func TestCycleStatsShorterPrefix(t *testing.T) { + cs := NewCycleStats(time.Hour, time.Minute) + + // Add some specific prefixes + cs.Add(bitstr.Key("000"), 10) + cs.Add(bitstr.Key("001"), 20) + cs.Add(bitstr.Key("010"), 30) + + require.Equal(t, 3, cs.Count(), "count with three specific prefixes") + + // Add a shorter prefix that should prune the subtrie + cs.Add(bitstr.Key("00"), 100) + + // Should now have "00"->100 and "010"->30 + require.Equal(t, 2, cs.Count(), "count after shorter prefix addition") + require.Equal(t, int64(130), cs.Sum(), "sum after shorter prefix addition") +} + +func TestCycleStatsZeroValues(t *testing.T) { + cs := NewCycleStats(time.Hour, time.Minute) + + cs.Add(bitstr.Key("0"), 0) + cs.Add(bitstr.Key("1"), 10) + + require.Equal(t, 2, cs.Count(), "count with zero value") + require.Equal(t, int64(10), cs.Sum(), "sum with zero value") + require.Equal(t, 5.0, cs.Avg(), "average with zero value") +} diff --git a/provider/internal/timeseries/doc.go b/provider/internal/timeseries/doc.go new file mode 100644 index 000000000..d760aa9fc --- /dev/null +++ b/provider/internal/timeseries/doc.go @@ -0,0 +1,40 @@ +// Package timeseries provides time-windowed data structures for collecting +// and analyzing performance metrics in the libp2p Kademlia DHT provider. +// +// This package contains three main types of time series collectors: +// +// IntTimeSeries maintains a rolling window of integer values with automatic +// cleanup of expired entries. It's used for tracking counts and durations +// over time, such as the number of keys provided or operation durations. +// +// FloatTimeSeries maintains a rolling window of weighted float values, +// useful for computing weighted averages. Each entry has a value and a +// weight, allowing for more sophisticated statistical calculations. +// +// CycleStats tracks statistics organized by keyspace prefixes with TTL-based +// cleanup. It uses a trie structure to efficiently aggregate statistics +// across different regions of the DHT keyspace. This is particularly useful +// for tracking reprovide operations that cover different keyspace regions. +// +// All types are thread-safe and designed for high-frequency updates with +// minimal lock contention. The retention periods are configurable and +// typically align with the provider's reprovide intervals. +// +// Example usage: +// +// // Track operation counts over the last hour +// counts := NewIntTimeSeries(time.Hour) +// counts.Add(5) +// total := counts.Sum() +// average := counts.Avg() +// +// // Track weighted averages +// averages := NewFloatTimeSeries(time.Hour) +// averages.Add(3.5, 10) // value=3.5, weight=10 +// weightedAvg := averages.Avg() +// +// // Track keyspace region statistics +// stats := NewCycleStats(time.Hour, time.Minute) +// stats.Add("101", 42) // prefix "101", value 42 +// total := stats.Sum() +package timeseries diff --git a/provider/internal/timeseries/series.go b/provider/internal/timeseries/series.go new file mode 100644 index 000000000..51b2045f0 --- /dev/null +++ b/provider/internal/timeseries/series.go @@ -0,0 +1,164 @@ +package timeseries + +import ( + "sync" + "time" + + "github.com/gammazero/deque" +) + +// Numeric represents types that can be used in mathematical operations. +type Numeric interface { + ~int64 | ~float64 +} + +// timeEntry represents a timestamped value with optional weight. +type timeEntry[T Numeric] struct { + timestamp time.Time + value T + weight int +} + +// TimeSeries maintains a time-windowed series of numeric values with optional weights. +// It automatically removes entries older than the retention period. +type TimeSeries[T Numeric] struct { + mutex sync.Mutex + data deque.Deque[timeEntry[T]] + retention time.Duration + weighted bool // whether this series uses weights +} + +// NewTimeSeries creates a new TimeSeries for the given numeric type with the specified retention period. +func NewTimeSeries[T Numeric](retention time.Duration) TimeSeries[T] { + return TimeSeries[T]{ + data: deque.Deque[timeEntry[T]]{}, + retention: retention, + weighted: false, + } +} + +// NewWeightedTimeSeries creates a new TimeSeries that supports weighted values. +func NewWeightedTimeSeries[T Numeric](retention time.Duration) TimeSeries[T] { + return TimeSeries[T]{ + data: deque.Deque[timeEntry[T]]{}, + retention: retention, + weighted: true, + } +} + +// Add records a new value with the current timestamp and weight 1. +func (t *TimeSeries[T]) Add(value T) { + t.AddWeighted(value, 1) +} + +// AddWeighted records a new value with the current timestamp and specified weight. +// For non-weighted series, the weight parameter is ignored. +func (t *TimeSeries[T]) AddWeighted(value T, weight int) { + now := time.Now() + if !t.weighted { + weight = 1 + } + + t.mutex.Lock() + defer t.mutex.Unlock() + t.data.PushBack(timeEntry[T]{timestamp: now, value: value, weight: weight}) + t.gc(now) +} + +// Sum returns the sum of all values within the retention window. +// For weighted series, returns the weighted sum (sum of value * weight). +func (t *TimeSeries[T]) Sum() T { + t.mutex.Lock() + defer t.mutex.Unlock() + return t.sumUnlocked() +} + +// sumUnlocked returns the sum without acquiring the mutex. +// The caller must hold the mutex. +func (t *TimeSeries[T]) sumUnlocked() T { + t.gc(time.Now()) + + var sum T + for i := 0; i < t.data.Len(); i++ { + entry := t.data.At(i) + if t.weighted { + sum += entry.value * T(entry.weight) + } else { + sum += entry.value + } + } + return sum +} + +// Avg returns the average of all values within the retention window. +// For weighted series, returns the weighted average (weighted sum / total weight). +// For non-weighted series, returns the arithmetic mean. +func (t *TimeSeries[T]) Avg() float64 { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.gc(time.Now()) + + length := t.data.Len() + if length == 0 { + return 0 + } + + if t.weighted { + var weightedSum T + var totalWeight int + for i := range length { + entry := t.data.At(i) + weightedSum += entry.value * T(entry.weight) + totalWeight += entry.weight + } + if totalWeight == 0 { + return 0 + } + return float64(weightedSum) / float64(totalWeight) + } else { + sum := t.sumUnlocked() + return float64(sum) / float64(length) + } +} + +// Count returns the number of entries within the retention window. +func (t *TimeSeries[T]) Count() int { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.gc(time.Now()) + return t.data.Len() +} + +// gc removes entries older than the retention period. +// The caller must hold the mutex. +func (t *TimeSeries[T]) gc(now time.Time) { + cutoff := now.Add(-t.retention) + for t.data.Len() > 0 { + if t.data.Front().timestamp.Before(cutoff) { + t.data.PopFront() + } else { + break + } + } +} + +// IntTimeSeries is a convenience type alias for TimeSeries[int64]. +type IntTimeSeries = TimeSeries[int64] + +// FloatTimeSeries is a convenience type alias for TimeSeries[float64] with weighted support. +type FloatTimeSeries = TimeSeries[float64] + +// NewIntTimeSeries creates a new TimeSeries for int64 values with the specified retention period. +// This is a convenience constructor for backward compatibility. +func NewIntTimeSeries(retention time.Duration) IntTimeSeries { + return NewTimeSeries[int64](retention) +} + +// NewFloatTimeSeries creates a new weighted TimeSeries for float64 values with the specified retention period. +// This series supports weighted values and calculates weighted averages. +// This is a convenience constructor for backward compatibility. +func NewFloatTimeSeries(retention time.Duration) FloatTimeSeries { + return NewWeightedTimeSeries[float64](retention) +} diff --git a/provider/internal/timeseries/series_test.go b/provider/internal/timeseries/series_test.go new file mode 100644 index 000000000..731744a79 --- /dev/null +++ b/provider/internal/timeseries/series_test.go @@ -0,0 +1,144 @@ +package timeseries + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestIntTimeSeries(t *testing.T) { + ts := NewIntTimeSeries(time.Hour) + + // Test empty series + require.Equal(t, int64(0), ts.Sum(), "sum of empty series") + require.Equal(t, 0.0, ts.Avg(), "avg of empty series") + require.Equal(t, 0, ts.Count(), "count of empty series") + + // Add some values + ts.Add(10) + ts.Add(20) + ts.Add(30) + + // Test sum + require.Equal(t, int64(60), ts.Sum(), "sum after adding values") + + // Test average + require.Equal(t, 20.0, ts.Avg(), "average after adding values") + + // Test count + require.Equal(t, 3, ts.Count(), "count after adding values") +} + +func TestIntTimeSeriesRetention(t *testing.T) { + ts := NewIntTimeSeries(time.Millisecond) + + ts.Add(10) + ts.Add(20) + + // Wait for entries to expire + time.Sleep(2 * time.Millisecond) + + // Add new value + ts.Add(30) + + // Should only have the latest value + require.Equal(t, int64(30), ts.Sum(), "sum after retention cleanup") +} + +func TestFloatTimeSeries(t *testing.T) { + ts := NewFloatTimeSeries(time.Hour) + + // Test empty series + require.Equal(t, 0.0, ts.Sum(), "sum of empty float series") + require.Equal(t, 0.0, ts.Avg(), "avg of empty float series") + + // Add weighted values using AddWeighted + ts.AddWeighted(10.0, 2) // 10 * 2 = 20 + ts.AddWeighted(20.0, 3) // 20 * 3 = 60 + ts.AddWeighted(30.0, 1) // 30 * 1 = 30 + + // Test weighted sum: 20 + 60 + 30 = 110 + require.Equal(t, 110.0, ts.Sum(), "weighted sum") + + // Test weighted average: 110 / (2+3+1) = 110/6 ≈ 18.33 + expectedAvg := 110.0 / 6.0 + require.Equal(t, expectedAvg, ts.Avg(), "weighted average") +} + +func TestFloatTimeSeriesRetention(t *testing.T) { + ts := NewFloatTimeSeries(time.Millisecond) + + ts.AddWeighted(10.0, 1) + ts.AddWeighted(20.0, 2) + + // Wait for entries to expire + time.Sleep(2 * time.Millisecond) + + // Add new value + ts.AddWeighted(30.0, 3) + + // Should only have the latest value: 30 * 3 = 90 + require.Equal(t, 90.0, ts.Sum(), "weighted sum after retention cleanup") +} + +func TestGenericTimeSeries(t *testing.T) { + // Test the generic version directly with int64 + ts := NewTimeSeries[int64](time.Hour) + + ts.Add(100) + ts.Add(200) + + require.Equal(t, int64(300), ts.Sum(), "generic int64 sum") + require.Equal(t, 150.0, ts.Avg(), "generic int64 average") + + // Test the generic version with float64 (non-weighted) + floatTS := NewTimeSeries[float64](time.Hour) + + floatTS.Add(1.5) + floatTS.Add(2.5) + + require.Equal(t, 4.0, floatTS.Sum(), "generic float64 sum") + require.Equal(t, 2.0, floatTS.Avg(), "generic float64 average") +} + +func TestWeightedVsNonWeighted(t *testing.T) { + // Compare weighted vs non-weighted behavior + weighted := NewWeightedTimeSeries[float64](time.Hour) + nonWeighted := NewTimeSeries[float64](time.Hour) + + // Add same values + weighted.AddWeighted(10.0, 2) + weighted.AddWeighted(20.0, 1) + + nonWeighted.Add(10.0) + nonWeighted.Add(20.0) + + // Weighted sum: (10*2) + (20*1) = 40 + require.Equal(t, 40.0, weighted.Sum(), "weighted sum") + + // Non-weighted sum: 10 + 20 = 30 + require.Equal(t, 30.0, nonWeighted.Sum(), "non-weighted sum") + + // Weighted average: 40 / (2+1) = 13.33 + expectedWeightedAvg := 40.0 / 3.0 + require.Equal(t, expectedWeightedAvg, weighted.Avg(), "weighted average") + + // Non-weighted average: 30 / 2 = 15.0 + require.Equal(t, 15.0, nonWeighted.Avg(), "non-weighted average") +} + +func TestTimeSeriesAddWithWeight(t *testing.T) { + // Test that Add() method works on weighted series (should use weight=1) + ts := NewFloatTimeSeries(time.Hour) + + ts.Add(10.0) // Should be equivalent to AddWeighted(10.0, 1) + ts.AddWeighted(20.0, 2) + + // Sum: (10*1) + (20*2) = 50 + require.Equal(t, 50.0, ts.Sum(), "mixed Add and AddWeighted sum") + + // Average: 50 / (1+2) = 16.67 + expectedAvg := 50.0 / 3.0 + require.Equal(t, expectedAvg, ts.Avg(), "mixed Add and AddWeighted average") +} diff --git a/provider/keystore/keystore.go b/provider/keystore/keystore.go index 99efb4722..962f37d13 100644 --- a/provider/keystore/keystore.go +++ b/provider/keystore/keystore.go @@ -349,11 +349,11 @@ func (s *keystore) size(ctx context.Context) (size int, err error) { q := query.Query{KeysOnly: true} for _, err = range ds.QueryIter(ctx, s.ds, q) { if err != nil { - return + return size, err } size++ } - return + return size, err } // executeOperation sends an operation request to the worker goroutine and diff --git a/provider/provider.go b/provider/provider.go index d5258de5c..0f2bfc36d 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -1,3 +1,20 @@ +// Package provider implements a content provider for the libp2p Kademlia DHT. +// +// The core component is SweepingProvider, which efficiently announces content +// availability to the DHT network through a region-based approach. Instead of +// providing each key individually, it divides the keyspace into regions and +// provides all keys within a region together, reducing network overhead. +// +// Key features: +// - Batched providing: Groups keys by keyspace regions for efficient batch operations +// - Automatic reproviding: Continuously reprovides content at configurable intervals +// - Adaptive scheduling: Balances load across time using keyspace-aware scheduling +// - Network resilience: Handles connectivity issues and peer churn gracefully +// - Performance monitoring: Comprehensive statistics collection and reporting +// - Worker pool management: Configurable parallelism for provide operations +// +// The provider tracks performance metrics including operation counts, timing +// information, and network statistics over configurable time windows. package provider import ( @@ -136,18 +153,21 @@ type SweepingProvider struct { reprovideInterval time.Duration maxReprovideDelay time.Duration - schedule *trie.Trie[bitstr.Key, time.Duration] + // Schedule state - grouped for better organization scheduleLk sync.Mutex + schedule *trie.Trie[bitstr.Key, time.Duration] scheduleCursor bitstr.Key scheduleTimer *time.Timer scheduleTimerStartedAt time.Time - ongoingReprovides *trie.Trie[bitstr.Key, struct{}] - ongoingReprovidesLk sync.Mutex + // Active reprovides tracking + activeReprovidesLk sync.Mutex + activeReprovides *trie.Trie[bitstr.Key, struct{}] - cachedAvgPrefixLen int + // Prefix length estimation state avgPrefixLenLk sync.Mutex approxPrefixLenRunning sync.Mutex + cachedAvgPrefixLen int lastAvgPrefixLen time.Time avgPrefixLenValidity time.Duration @@ -156,6 +176,7 @@ type SweepingProvider struct { addLocalRecord func(mh.Multihash) error provideCounter metric.Int64Counter + stats operationStats } // New creates a new SweepingProvider instance with the supplied options. @@ -241,9 +262,10 @@ func New(opts ...Option) (*SweepingProvider, error) { provideQueue: queue.NewProvideQueue(), reprovideQueue: queue.NewReprovideQueue(), - ongoingReprovides: trie.New[bitstr.Key, struct{}](), + activeReprovides: trie.New[bitstr.Key, struct{}](), provideCounter: providerCounter, + stats: newOperationStats(cfg.reprovideInterval, cfg.maxReprovideDelay), } // Set up callbacks after both provider and connectivity checker are initialized // This breaks the circular dependency between connectivity, onOnline, and approxPrefixLen @@ -550,7 +572,7 @@ func (s *SweepingProvider) getAvgPrefixLenNoLock() (int, error) { // optimization. It should be used for providing a small number of keys // (typically 1 or 2), because exploring the keyspace would add too much // overhead for a small number of keys. -func (s *SweepingProvider) vanillaProvide(k mh.Multihash) (bitstr.Key, error) { +func (s *SweepingProvider) vanillaProvide(k mh.Multihash, reprovide bool) (bitstr.Key, error) { keys := []mh.Multihash{k} // Add provider record to local provider store. s.addLocalRecord(k) @@ -565,10 +587,22 @@ func (s *SweepingProvider) vanillaProvide(k mh.Multihash) (bitstr.Key, error) { for _, p := range peers { keysAllocations[p] = keys } - err = s.sendProviderRecords(keysAllocations, addrInfo) + reachablePeers, err := s.sendProviderRecords(keysAllocations, addrInfo, 1) + + s.stats.cycleStatsLk.Lock() + if reprovide { + s.stats.peers.Add(coveredPrefix, int64(len(keysAllocations))) + s.stats.reachable.Add(coveredPrefix, int64(reachablePeers)) + s.stats.keysPerReprovide.Add(coveredPrefix, 1) + } else { + s.stats.keysPerProvide.Add(1) + } + s.stats.cycleStatsLk.Unlock() + if err == nil { logger.Debugw("sent provider record", "prefix", coveredPrefix, "count", 1, "keys", keys) } + return coveredPrefix, err } @@ -594,6 +628,7 @@ func (s *SweepingProvider) exploreSwarm(prefix bitstr.Key) (regions []keyspace.R if len(peers) == 0 { return nil, "", fmt.Errorf("no peers found when exploring prefix %s", prefix) } + s.stats.regionSize.Add(int64(len(peers))) regions, coveredPrefix = keyspace.RegionsFromPeers(peers, s.replicationFactor, s.order) return regions, coveredPrefix, nil } @@ -686,20 +721,23 @@ type provideJob struct { // keys. Upon failure to reprovide a key, or to connect to a peer, it will NOT // retry. // -// Returns an error if we were unable to reprovide keys to a given threshold of -// peers. In this case, the region reprovide is considered failed and the -// caller is responsible for trying again. This allows detecting if we are -// offline. -func (s *SweepingProvider) sendProviderRecords(keysAllocations map[peer.ID][]mh.Multihash, addrInfo peer.AddrInfo) error { +// Returns the number of reachable peers and an error if we were unable to +// reprovide keys to a given threshold of peers. In this case, the region +// reprovide is considered failed and the caller is responsible for trying +// again. This allows detecting if we are offline. +func (s *SweepingProvider) sendProviderRecords(keysAllocations map[peer.ID][]mh.Multihash, addrInfo peer.AddrInfo, nKeys int) (reachablePeers int, err error) { nPeers := len(keysAllocations) if nPeers == 0 { - return nil + return reachablePeers, err } startTime := time.Now() errCount := atomic.Uint32{} nWorkers := s.maxProvideConnsPerWorker jobChan := make(chan provideJob, nWorkers) + failedKeysCount := make(map[string]int) // count the number of failures for each key + failedKeysCountLk := sync.Mutex{} + wg := sync.WaitGroup{} wg.Add(nWorkers) for range nWorkers { @@ -710,6 +748,11 @@ func (s *SweepingProvider) sendProviderRecords(keysAllocations map[peer.ID][]mh. err := s.provideKeysToPeer(job.pid, job.keys, pmes) if err != nil { errCount.Add(1) + failedKeysCountLk.Lock() + for _, k := range job.keys { + failedKeysCount[string(k)]++ + } + failedKeysCountLk.Unlock() } } }() @@ -721,13 +764,27 @@ func (s *SweepingProvider) sendProviderRecords(keysAllocations map[peer.ID][]mh. close(jobChan) wg.Wait() + var failedKeys int + holdersSum := s.replicationFactor * nKeys + for _, c := range failedKeysCount { + holdersSum -= c + if c == s.replicationFactor { + failedKeys++ + } + } + successfulKeys := nKeys - failedKeys + s.stats.avgHolders.AddWeighted(float64(holdersSum)/float64(nKeys), nKeys) + s.stats.addCompletedKeys(successfulKeys, failedKeys) + s.provideCounter.Add(s.ctx, int64(successfulKeys)) + errCountLoaded := int(errCount.Load()) logger.Infof("sent provider records to peers in %s, errors %d/%d", time.Since(startTime), errCountLoaded, len(keysAllocations)) + reachablePeers = nPeers - errCountLoaded if errCountLoaded == nPeers || errCountLoaded > int(float32(nPeers)*(1-minimalRegionReachablePeersRatio)) { - return fmt.Errorf("unable to provide to enough peers (%d/%d)", nPeers-errCountLoaded, nPeers) + err = fmt.Errorf("unable to provide to enough peers (%d/%d)", reachablePeers, nPeers) } - return nil + return reachablePeers, err } // genProvideMessage generates a new provide message with the supplied @@ -742,21 +799,24 @@ func genProvideMessage(addrInfo peer.AddrInfo) *pb.Message { // provideKeysToPeer performs the network operation to advertise to the given // DHT server (p) that we serve all the given keys. func (s *SweepingProvider) provideKeysToPeer(p peer.ID, keys []mh.Multihash, pmes *pb.Message) error { - errCount := 0 - for _, mh := range keys { + var errCount, errStreak int + for i, mh := range keys { pmes.Key = mh err := s.msgSender.SendMessage(s.ctx, p, pmes) if err != nil { + errStreak++ errCount++ - if errCount == len(keys) || errCount > maxConsecutiveProvideFailuresAllowed { + if errStreak == len(keys) || errStreak > maxConsecutiveProvideFailuresAllowed { + s.stats.addProvidedRecords(i + 1 - errCount) return fmt.Errorf("failed to provide to %s: %s", p, err.Error()) } - } else if errCount > 0 { + } else if errStreak > 0 { // Reset error count - errCount = 0 + errStreak = 0 } } + s.stats.addProvidedRecords(len(keys) - errCount) return nil } @@ -1102,17 +1162,26 @@ func (s *SweepingProvider) reprovideLateRegions() { } func (s *SweepingProvider) batchProvide(prefix bitstr.Key, keys []mh.Multihash) { - if len(keys) == 0 { + keyCount := len(keys) + if keyCount == 0 { return } - logger.Debugw("batchProvide called", "prefix", prefix, "count", len(keys)) + logger.Debugw("batchProvide called", "prefix", prefix, "count", keyCount) addrInfo, ok := s.selfAddrInfo() if !ok { // Don't provide if the node doesn't have a valid address to include in the // provider record. return } - if len(keys) <= individualProvideThreshold { + + startTime := time.Now() + s.stats.ongoingProvides.start(keyCount) + defer func() { + s.stats.ongoingProvides.finish(len(keys)) + s.stats.provideDuration.Add(int64(time.Since(startTime))) + }() + + if keyCount <= individualProvideThreshold { // Don't fully explore the region, execute simple DHT provides for these // keys. It isn't worth it to fully explore a region for just a few keys. s.individualProvide(prefix, keys, false, false) @@ -1129,7 +1198,10 @@ func (s *SweepingProvider) batchProvide(prefix bitstr.Key, keys []mh.Multihash) // Add any key matching the covered prefix from the provide queue to the // current provide batch. extraKeys := s.provideQueue.DequeueMatching(coveredPrefix) - keys = append(keys, extraKeys...) + if len(extraKeys) > 0 { + keys = append(keys, extraKeys...) + s.stats.ongoingProvides.addKeys(len(extraKeys)) + } regions = keyspace.AssignKeysToRegions(regions, keys) if !s.provideRegions(regions, addrInfo, false, false) { @@ -1154,11 +1226,20 @@ func (s *SweepingProvider) batchReprovide(prefix bitstr.Key, periodicReprovide b } return } - if len(keys) == 0 { + keyCount := len(keys) + if keyCount == 0 { logger.Infof("No keys to reprovide for prefix %s", prefix) return } - if len(keys) <= individualProvideThreshold { + + startTime := time.Now() + s.stats.ongoingReprovides.start(keyCount) + defer func() { + s.stats.ongoingReprovides.finish(len(keys)) + s.stats.reprovideDuration.Add(prefix, int64(time.Since(startTime))) + }() + + if keyCount <= individualProvideThreshold { // Don't fully explore the region, execute simple DHT provides for these // keys. It isn't worth it to fully explore a region for just a few keys. s.individualProvide(prefix, keys, true, periodicReprovide) @@ -1201,6 +1282,7 @@ func (s *SweepingProvider) batchReprovide(prefix bitstr.Key, periodicReprovide b s.reschedulePrefix(prefix) } } + s.stats.ongoingReprovides.addKeys(len(keys) - keyCount) } regions = keyspace.AssignKeysToRegions(regions, keys) @@ -1250,10 +1332,8 @@ func (s *SweepingProvider) individualProvide(prefix bitstr.Key, keys []mh.Multih var provideErr error if len(keys) == 1 { - coveredPrefix, err := s.vanillaProvide(keys[0]) - if err == nil { - s.provideCounter.Add(s.ctx, 1) - } else if !reprovide { + coveredPrefix, err := s.vanillaProvide(keys[0], reprovide) + if err != nil && !reprovide { // Put the key back in the provide queue. s.failedProvide(prefix, keys, fmt.Errorf("individual provide failed for prefix '%s', %w", prefix, err)) } @@ -1271,9 +1351,8 @@ func (s *SweepingProvider) individualProvide(prefix bitstr.Key, keys []mh.Multih wg.Add(1) go func() { defer wg.Done() - _, err := s.vanillaProvide(key) + _, err := s.vanillaProvide(key, reprovide) if err == nil { - s.provideCounter.Add(s.ctx, 1) success.Store(true) } else if !reprovide { // Individual provide failed, put key back in provide queue. @@ -1303,7 +1382,8 @@ func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo pe errCount := 0 for _, r := range regions { allKeys := keyspace.AllValues(r.Keys, s.order) - if len(allKeys) == 0 { + nKeys := len(allKeys) + if nKeys == 0 { if reprovide { s.releaseRegionReprovide(r.Prefix) } @@ -1314,12 +1394,23 @@ func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo pe s.addLocalRecord(h) } keysAllocations := keyspace.AllocateToKClosest(r.Keys, r.Peers, s.replicationFactor) - err := s.sendProviderRecords(keysAllocations, addrInfo) + reachablePeers, err := s.sendProviderRecords(keysAllocations, addrInfo, nKeys) + + s.stats.cycleStatsLk.Lock() if reprovide { + // reprovide stats + s.stats.peers.Add(r.Prefix, int64(len(keysAllocations))) + s.stats.reachable.Add(r.Prefix, int64(reachablePeers)) + s.stats.keysPerReprovide.Add(r.Prefix, int64(nKeys)) + s.stats.cycleStatsLk.Unlock() + s.releaseRegionReprovide(r.Prefix) if periodicReprovide { s.reschedulePrefix(r.Prefix) } + } else { + s.stats.keysPerProvide.Add(int64(nKeys)) + s.stats.cycleStatsLk.Unlock() } if err != nil { errCount++ @@ -1343,17 +1434,17 @@ func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo pe // another thread. If not it marks the region as being currently reprovided. func (s *SweepingProvider) claimRegionReprovide(regions []keyspace.Region) []keyspace.Region { out := regions[:0] - s.ongoingReprovidesLk.Lock() - defer s.ongoingReprovidesLk.Unlock() + s.activeReprovidesLk.Lock() + defer s.activeReprovidesLk.Unlock() for _, r := range regions { if r.Peers.IsEmptyLeaf() { continue } - if _, ok := keyspace.FindPrefixOfKey(s.ongoingReprovides, r.Prefix); !ok { + if _, ok := keyspace.FindPrefixOfKey(s.activeReprovides, r.Prefix); !ok { // Prune superstrings of r.Prefix if any - keyspace.PruneSubtrie(s.ongoingReprovides, r.Prefix) + keyspace.PruneSubtrie(s.activeReprovides, r.Prefix) out = append(out, r) - s.ongoingReprovides.Add(r.Prefix, struct{}{}) + s.activeReprovides.Add(r.Prefix, struct{}{}) } } return out @@ -1361,9 +1452,9 @@ func (s *SweepingProvider) claimRegionReprovide(regions []keyspace.Region) []key // releaseRegionReprovide marks the region as no longer being reprovided. func (s *SweepingProvider) releaseRegionReprovide(prefix bitstr.Key) { - s.ongoingReprovidesLk.Lock() - defer s.ongoingReprovidesLk.Unlock() - s.ongoingReprovides.Remove(prefix) + s.activeReprovidesLk.Lock() + defer s.activeReprovidesLk.Unlock() + s.activeReprovides.Remove(prefix) } // ProvideOnce only sends provider records for the given keys out to the DHT diff --git a/provider/provider_test.go b/provider/provider_test.go index d9c2fbd22..214b5e2b6 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -489,10 +489,13 @@ func TestIndividualProvideMultiple(t *testing.T) { return nil }, } + reprovideInterval := time.Hour + maxDelay := time.Minute r := SweepingProvider{ router: router, msgSender: msgSender, - reprovideInterval: time.Hour, + reprovideInterval: reprovideInterval, + maxReprovideDelay: maxDelay, maxProvideConnsPerWorker: 2, provideQueue: queue.NewProvideQueue(), reprovideQueue: queue.NewReprovideQueue(), @@ -502,6 +505,7 @@ func TestIndividualProvideMultiple(t *testing.T) { getSelfAddrs: func() []ma.Multiaddr { return nil }, addLocalRecord: func(mh mh.Multihash) error { return nil }, provideCounter: provideCounter(), + stats: newOperationStats(reprovideInterval, maxDelay), } assertAdvertisementCount := func(n int) { @@ -642,7 +646,7 @@ func TestHandleReprovide(t *testing.T) { require.Equal(t, prefixes[0], prov.scheduleCursor) // Two prefixes in schedule - time.Sleep(1) // advance 1 tick into the reprovide cycle + time.Sleep(time.Nanosecond) // advance 1 tick into the reprovide cycle prov.schedule.Add(prefixes[1], prov.reprovideTimeForPrefix(prefixes[1])) prov.handleReprovide() // reprovides prefixes[0], set scheduleCursor to prefixes[1] require.Equal(t, prefixes[1], prov.scheduleCursor) @@ -908,7 +912,7 @@ func TestStartProvidingSingle(t *testing.T) { synctest.Wait() require.True(t, prov.connectivity.IsOnline()) prov.avgPrefixLenLk.Lock() - require.Greater(t, prov.cachedAvgPrefixLen, 0) + require.GreaterOrEqual(t, prov.cachedAvgPrefixLen, 0) prov.avgPrefixLenLk.Unlock() err = prov.StartProviding(true, h) @@ -1182,7 +1186,7 @@ func TestStartProvidingUnstableNetwork(t *testing.T) { synctest.Wait() prov.avgPrefixLenLk.Lock() - require.Greater(t, prov.cachedAvgPrefixLen, 0) + require.GreaterOrEqual(t, prov.cachedAvgPrefixLen, 0) prov.avgPrefixLenLk.Unlock() routerOffline.Store(true) diff --git a/provider/stats.go b/provider/stats.go new file mode 100644 index 000000000..383b84f5c --- /dev/null +++ b/provider/stats.go @@ -0,0 +1,258 @@ +package provider + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace" + "github.com/libp2p/go-libp2p-kad-dht/provider/internal/timeseries" + "github.com/libp2p/go-libp2p-kad-dht/provider/stats" + "github.com/probe-lab/go-libdht/kad/trie" +) + +func (s *SweepingProvider) Stats() stats.Stats { + now := time.Now() + snapshot := stats.Stats{ + Closed: s.closed(), + } + + if snapshot.Closed { + return snapshot + } + + // Queue metrics + snapshot.Queues = stats.Queues{ + PendingKeyProvides: s.provideQueue.Size(), + PendingRegionProvides: s.provideQueue.NumRegions(), + PendingRegionReprovides: s.reprovideQueue.Size(), + } + + s.avgPrefixLenLk.Lock() + avgPrefixLenCached := s.cachedAvgPrefixLen + s.avgPrefixLenLk.Unlock() + + // Connectivity status + var status string + if s.connectivity.IsOnline() { + status = "online" + } else { + if avgPrefixLenCached >= 0 { + status = "disconnected" + } else { + status = "offline" + } + } + snapshot.Connectivity = stats.Connectivity{ + Status: status, + Since: s.connectivity.LastStateChange(), + } + + // Schedule information + s.scheduleLk.Lock() + var scheduleSize int + var avgPrefixLen float64 + nextPrefix := s.scheduleCursor + ok, nextReprovideOffset := trie.Find(s.schedule, nextPrefix) + if avgPrefixLenCached >= 0 && !s.schedule.IsEmptyLeaf() { + scheduleEntries := keyspace.AllEntries(s.schedule, s.order) + scheduleSize = len(scheduleEntries) + prefixSum := 0. + for _, e := range scheduleEntries { + prefixSum += float64(e.Key.BitLen()) + } + avgPrefixLen = prefixSum / float64(scheduleSize) + } else { + scheduleSize = s.schedule.Size() + avgPrefixLen = float64(avgPrefixLenCached) + } + s.scheduleLk.Unlock() + + currentOffset := s.currentTimeOffset() + nextReprovideAt := time.Time{} + if ok { + nextReprovideAt = now.Add(s.timeUntil(nextReprovideOffset)) + } + + keys := -1 // Default value if keyStore.Size() fails + if keyCount, err := s.keystore.Size(context.Background()); err == nil { + keys = keyCount + } + snapshot.Schedule = stats.Schedule{ + Keys: keys, + Regions: scheduleSize, + AvgPrefixLength: avgPrefixLen, + NextReprovideAt: nextReprovideAt, + NextReprovidePrefix: nextPrefix, + } + + // Worker pool status + workerStats := s.workerPool.Stats() + active := 0 + for _, v := range workerStats.Used { + active += v + } + snapshot.Workers = stats.Workers{ + Max: workerStats.Max, + Active: active, + ActivePeriodic: workerStats.Used[periodicWorker], + ActiveBurst: workerStats.Used[burstWorker], + DedicatedPeriodic: workerStats.Reserves[periodicWorker], + DedicatedBurst: workerStats.Reserves[burstWorker], + QueuedPeriodic: workerStats.Queued[periodicWorker], + QueuedBurst: workerStats.Queued[burstWorker], + MaxProvideConnsPerWorker: s.maxProvideConnsPerWorker, + } + + // Timing information + snapshot.Timing = stats.Timing{ + Uptime: time.Since(s.cycleStart), + ReprovidesInterval: s.reprovideInterval, + CycleStart: now.Add(-currentOffset), + CurrentTimeOffset: currentOffset, + MaxReprovideDelay: s.maxReprovideDelay, + } + + ongoingOps := stats.OngoingOperations{ + RegionProvides: int(s.stats.ongoingProvides.opCount.Load()), + KeyProvides: int(s.stats.ongoingProvides.keyCount.Load()), + RegionReprovides: int(s.stats.ongoingReprovides.opCount.Load()), + KeyReprovides: int(s.stats.ongoingReprovides.keyCount.Load()), + } + + // Take snapshots of cycle stats data while holding the lock + s.stats.cycleStatsLk.Lock() + s.stats.keysPerReprovide.Cleanup() + s.stats.reprovideDuration.Cleanup() + s.stats.peers.Cleanup() + s.stats.reachable.Cleanup() + + // Capture data for calculations outside the lock + keysPerProvideSum := s.stats.keysPerProvide.Sum() + provideDurationSum := s.stats.provideDuration.Sum() + keysPerReprovideSum := s.stats.keysPerReprovide.Sum() + reprovideDurationSum := s.stats.reprovideDuration.Sum() + reprovideDurationAvg := s.stats.reprovideDuration.Avg() + keysPerReprovideAvg := s.stats.keysPerReprovide.Avg() + reprovideDurationCount := s.stats.reprovideDuration.Count() + peersSum := s.stats.peers.Sum() + peersFullyCovered := s.stats.peers.FullyCovered() + reachableSum := s.stats.reachable.Sum() + avgRegionSize := s.stats.regionSize.Avg() + avgHoldersAvg := s.stats.avgHolders.Avg() + + keysProvidedPerMinute := 0. + if time.Duration(provideDurationSum) > 0 { + keysProvidedPerMinute = float64(keysPerProvideSum) / time.Duration(provideDurationSum).Minutes() + } + keysReprovidedPerMinute := 0. + if time.Duration(reprovideDurationSum) > 0 { + keysReprovidedPerMinute = float64(keysPerReprovideSum) / time.Duration(reprovideDurationSum).Minutes() + } + s.stats.cycleStatsLk.Unlock() + + pastOps := stats.PastOperations{ + RecordsProvided: int(s.stats.recordsProvided.Load()), + KeysProvided: int(s.stats.keysProvided.Load()), + KeysFailed: int(s.stats.keysFailed.Load()), + + KeysProvidedPerMinute: keysProvidedPerMinute, + KeysReprovidedPerMinute: keysReprovidedPerMinute, + RegionReprovideDuration: time.Duration(reprovideDurationAvg), + AvgKeysPerReprovide: keysPerReprovideAvg, + RegionReprovidedLastCycle: reprovideDurationCount, + } + + snapshot.Operations = stats.Operations{ + Ongoing: ongoingOps, + Past: pastOps, + } + + snapshot.Network = stats.Network{ // in the last reprovide cycle + Peers: int(peersSum), + CompleteKeyspaceCoverage: peersFullyCovered, + Reachable: int(reachableSum), + AvgRegionSize: avgRegionSize, + AvgHolders: avgHoldersAvg, + ReplicationFactor: s.replicationFactor, + } + + return snapshot +} + +// operationStats tracks provider operation metrics over time windows. +type operationStats struct { + // Cumulative counters since provider started + recordsProvided atomic.Int32 // total provider records sent + keysProvided atomic.Int32 // total keys successfully provided + keysFailed atomic.Int32 // total keys that failed to provide + + // Current ongoing operations + ongoingProvides ongoingOpStats // active provide operations + ongoingReprovides ongoingOpStats // active reprovide operations + + // Time-windowed metrics for provide operations + keysPerProvide timeseries.IntTimeSeries // keys provided per operation + provideDuration timeseries.IntTimeSeries // duration of provide operations + + // Time-windowed metrics for reprovide operations (by keyspace region) + keysPerReprovide timeseries.CycleStats // keys reprovided per region + reprovideDuration timeseries.CycleStats // duration of reprovide operations per region + + // Network topology metrics (by keyspace region) + peers timeseries.CycleStats // number of peers per region + reachable timeseries.CycleStats // number of reachable peers per region + regionSize timeseries.IntTimeSeries // average size of keyspace regions + avgHolders timeseries.FloatTimeSeries // average holders per key (weighted) + + cycleStatsLk sync.Mutex // protects cycle-based statistics +} + +func newOperationStats(reprovideInterval, maxDelay time.Duration) operationStats { + return operationStats{ + keysPerProvide: timeseries.NewIntTimeSeries(reprovideInterval), + provideDuration: timeseries.NewIntTimeSeries(reprovideInterval), + regionSize: timeseries.NewIntTimeSeries(reprovideInterval), + avgHolders: timeseries.NewFloatTimeSeries(reprovideInterval), + + keysPerReprovide: timeseries.NewCycleStats(reprovideInterval, maxDelay), + reprovideDuration: timeseries.NewCycleStats(reprovideInterval, maxDelay), + peers: timeseries.NewCycleStats(reprovideInterval, maxDelay), + reachable: timeseries.NewCycleStats(reprovideInterval, maxDelay), + } +} + +// addProvidedRecords increments the total count of provider records sent. +func (s *operationStats) addProvidedRecords(count int) { + s.recordsProvided.Add(int32(count)) +} + +// addCompletedKeys updates the counts of successfully provided and failed keys. +func (s *operationStats) addCompletedKeys(successes, failures int) { + s.keysProvided.Add(int32(successes)) + s.keysFailed.Add(int32(failures)) +} + +// ongoingOpStats tracks currently active operations. +type ongoingOpStats struct { + opCount atomic.Int32 // number of active operations + keyCount atomic.Int32 // total keys being processed in active operations +} + +// start records the beginning of a new operation with the given number of keys. +func (s *ongoingOpStats) start(keyCount int) { + s.opCount.Add(1) + s.keyCount.Add(int32(keyCount)) +} + +// addKeys adds more keys to the current active operations. +func (s *ongoingOpStats) addKeys(keyCount int) { + s.keyCount.Add(int32(keyCount)) +} + +// finish records the completion of an operation and removes its keys from the active count. +func (s *ongoingOpStats) finish(keyCount int) { + s.opCount.Add(-1) + s.keyCount.Add(-int32(keyCount)) +} diff --git a/provider/stats/types.go b/provider/stats/types.go new file mode 100644 index 000000000..6151d5e17 --- /dev/null +++ b/provider/stats/types.go @@ -0,0 +1,97 @@ +// Package stats defines the data structures for provider performance metrics +// and statistics exported by the libp2p kademlia DHT provider. +package stats + +import ( + "time" + + "github.com/probe-lab/go-libdht/kad/key/bitstr" +) + +type Stats struct { + Closed bool `json:"closed"` + Connectivity Connectivity `json:"connectivity"` + Queues Queues `json:"queues"` + Schedule Schedule `json:"schedule"` + Workers Workers `json:"workers"` + Timing Timing `json:"timing"` + Operations Operations `json:"operations"` + Network Network `json:"network"` +} + +type Queues struct { + PendingKeyProvides int `json:"pending_key_provides"` + PendingRegionProvides int `json:"pending_region_provides"` + PendingRegionReprovides int `json:"pending_region_reprovides"` +} + +type Connectivity struct { + Status string `json:"status"` + Since time.Time `json:"since"` +} + +type Schedule struct { + Keys int `json:"keys"` + Regions int `json:"regions"` + AvgPrefixLength float64 `json:"avg_prefix_length"` + NextReprovideAt time.Time `json:"next_reprovide_at"` + NextReprovidePrefix bitstr.Key `json:"next_reprovide_prefix"` +} + +type Workers struct { + Max int `json:"max"` + Active int `json:"active"` + ActivePeriodic int `json:"active_periodic"` + ActiveBurst int `json:"active_burst"` + DedicatedPeriodic int `json:"dedicated_periodic"` + DedicatedBurst int `json:"dedicated_burst"` + QueuedPeriodic int `json:"queued_periodic"` + QueuedBurst int `json:"queued_burst"` + MaxProvideConnsPerWorker int `json:"max_provide_conns_per_worker"` +} + +type Timing struct { + Uptime time.Duration `json:"uptime"` + ReprovidesInterval time.Duration `json:"reprovides_interval"` + CycleStart time.Time `json:"cycle_start"` + CurrentTimeOffset time.Duration `json:"current_time_offset"` + MaxReprovideDelay time.Duration `json:"max_reprovide_delay"` +} + +type Operations struct { + Ongoing OngoingOperations `json:"ongoing"` + Past PastOperations `json:"past"` +} + +type OngoingOperations struct { + RegionProvides int `json:"region_provides"` + KeyProvides int `json:"key_provides"` + RegionReprovides int `json:"region_reprovides"` + KeyReprovides int `json:"key_reprovides"` +} + +type PastOperations struct { + // Cumulative totals since provider started + RecordsProvided int `json:"records_provided"` // total provider records sent + KeysProvided int `json:"keys_provided"` // total keys successfully provided + KeysFailed int `json:"keys_failed"` // total keys that failed to provide + + // Performance metrics from last reprovide cycle + KeysProvidedPerMinute float64 `json:"keys_provided_per_minute"` // provide rate + KeysReprovidedPerMinute float64 `json:"keys_reprovided_per_minute"` // reprovide rate + RegionReprovideDuration time.Duration `json:"reprovide_duration"` // avg time per region + AvgKeysPerReprovide float64 `json:"avg_keys_per_reprovide"` // avg keys per region + RegionReprovidedLastCycle int `json:"regions_reprovided_last_cycle"` // regions processed +} + +type Network struct { + // Peer counts from last reprovide cycle + Peers int `json:"peers"` // total peers contacted + Reachable int `json:"reachable"` // peers that responded successfully + + // Keyspace coverage analysis + CompleteKeyspaceCoverage bool `json:"complete_keyspace_coverage"` // whether all regions were covered + AvgRegionSize float64 `json:"avg_region_size"` // average size of keyspace regions + AvgHolders float64 `json:"avg_holders"` // average holders per key + ReplicationFactor int `json:"replication_factor"` // target replication factor +} diff --git a/provider/stats_test.go b/provider/stats_test.go new file mode 100644 index 000000000..b7b1444ba --- /dev/null +++ b/provider/stats_test.go @@ -0,0 +1,780 @@ +//go:build go1.25 +// +build go1.25 + +package provider + +import ( + "context" + "errors" + "strings" + "sync" + "testing" + "testing/synctest" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multihash" + "github.com/probe-lab/go-libdht/kad/key" + "github.com/probe-lab/go-libdht/kad/key/bit256" + "github.com/probe-lab/go-libdht/kad/key/bitstr" + + pb "github.com/libp2p/go-libp2p-kad-dht/pb" + "github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace" + kb "github.com/libp2p/go-libp2p-kbucket" + + "github.com/stretchr/testify/require" +) + +func TestStats(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + replicationFactor := 4 + peerPrefixBitlen := 7 + require.LessOrEqual(t, peerPrefixBitlen, bitsPerByte) + + reprovideInterval := time.Hour + + var nPeers byte = 1 << peerPrefixBitlen // 2**peerPrefixBitlen + // Generate balanced peers over the prefix space + peers := make([]peer.ID, nPeers) + var err error + for i := range nPeers { + b := i << (bitsPerByte - peerPrefixBitlen) + k := [32]byte{b} + peers[i], err = kb.GenRandPeerIDWithCPL(k[:], uint(peerPrefixBitlen)) + require.NoError(t, err) + } + reachabilityModulo := 8 // 1/8 peers unreachable + peerReachability := make(map[peer.ID]bool) + for i, p := range peers { + peerReachability[p] = i%reachabilityModulo != 0 + } + + pid, err := peer.Decode("12D3KooWCPQTeFYCDkru8nza3Af6u77aoVLA71Vb74eHxeR91Gka") // kadid starts with 16*"0" + require.NoError(t, err) + + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + sortedPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k)) + return sortedPeers[:min(replicationFactor, len(peers))], nil + }, + } + maxWorkers := 12 + dedicatedBurstWorkers := 6 + dedicatedPeriodicWorkers := 2 + maxConnsPerWorker := 3 + + blocked := false + workersBlocked := make(chan struct{}, maxWorkers*maxConnsPerWorker) + blockedCond := sync.NewCond(&sync.Mutex{}) + msgSender := &mockMsgSender{ + sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { + blockedCond.L.Lock() + if blocked { + // Signal once that this worker is blocked + select { + case workersBlocked <- struct{}{}: + default: + t.Fatal("too many workers blocked") + } + // Wait until unblocked + for blocked { + blockedCond.Wait() + } + } + blockedCond.L.Unlock() + if reachable := peerReachability[p]; !reachable { + return errors.New("peer not reachable") + } + return nil + }, + } + + offlineDelay := 10 * time.Minute + startTime := time.Now() + + opts := []Option{ + WithReprovideInterval(reprovideInterval), + WithReplicationFactor(replicationFactor), + WithMaxWorkers(maxWorkers), + WithDedicatedBurstWorkers(dedicatedBurstWorkers), + WithDedicatedPeriodicWorkers(dedicatedPeriodicWorkers), + WithMaxProvideConnsPerWorker(maxConnsPerWorker), + WithPeerID(pid), + WithRouter(router), + WithMessageSender(msgSender), + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + WithOfflineDelay(offlineDelay), + } + prov, err := New(opts...) + require.NoError(t, err) + synctest.Wait() + + require.True(t, prov.connectivity.IsOnline()) + + // Wait 1 minute and do nothing + time.Sleep(time.Minute) + + avgPrefixLen := 5 + avgPrefixLenFloat := float64(avgPrefixLen) + + // Initial stats check + stats := prov.Stats() + require.False(t, stats.Closed) + require.Equal(t, "online", stats.Connectivity.Status) + require.Equal(t, startTime, stats.Connectivity.Since) + require.Equal(t, 0, stats.Queues.PendingKeyProvides) + require.Equal(t, 0, stats.Queues.PendingRegionProvides) + require.Equal(t, 0, stats.Queues.PendingRegionReprovides) + require.Equal(t, 0, stats.Schedule.Keys) + require.Equal(t, 0, stats.Schedule.Regions) + require.Equal(t, avgPrefixLenFloat, stats.Schedule.AvgPrefixLength) + require.Equal(t, bitstr.Key(""), stats.Schedule.NextReprovidePrefix) + require.Equal(t, time.Time{}, stats.Schedule.NextReprovideAt) + require.Equal(t, maxWorkers, stats.Workers.Max) + require.Equal(t, 0, stats.Workers.Active) + require.Equal(t, 0, stats.Workers.ActiveBurst) + require.Equal(t, 0, stats.Workers.ActivePeriodic) + require.Equal(t, dedicatedBurstWorkers, stats.Workers.DedicatedBurst) + require.Equal(t, dedicatedPeriodicWorkers, stats.Workers.DedicatedPeriodic) + require.Equal(t, 0, stats.Workers.QueuedBurst) + require.Equal(t, 0, stats.Workers.QueuedPeriodic) + require.Equal(t, maxConnsPerWorker, stats.Workers.MaxProvideConnsPerWorker) + require.Equal(t, time.Since(startTime), stats.Timing.Uptime) + require.Equal(t, reprovideInterval, stats.Timing.ReprovidesInterval) + require.Equal(t, time.Since(startTime), stats.Timing.CurrentTimeOffset) + require.Equal(t, DefaultMaxReprovideDelay, stats.Timing.MaxReprovideDelay) + require.Equal(t, 0, stats.Operations.Ongoing.KeyProvides) + require.Equal(t, 0, stats.Operations.Ongoing.RegionProvides) + require.Equal(t, 0, stats.Operations.Ongoing.KeyReprovides) + require.Equal(t, 0, stats.Operations.Ongoing.RegionReprovides) + require.Equal(t, 0, stats.Operations.Past.RecordsProvided) + require.Equal(t, 0, stats.Operations.Past.KeysProvided) + require.Equal(t, 0, stats.Operations.Past.KeysFailed) + require.Equal(t, 0., stats.Operations.Past.KeysProvidedPerMinute) + require.Equal(t, 0., stats.Operations.Past.KeysReprovidedPerMinute) + require.Equal(t, time.Duration(0), stats.Operations.Past.RegionReprovideDuration) + require.Equal(t, 0., stats.Operations.Past.AvgKeysPerReprovide) + require.Equal(t, 0, stats.Operations.Past.RegionReprovidedLastCycle) + require.Equal(t, 0, stats.Network.Peers) + require.Equal(t, 0, stats.Network.Reachable) + require.False(t, stats.Network.CompleteKeyspaceCoverage) + require.Equal(t, 0., stats.Network.AvgHolders) + require.Equal(t, 0., stats.Network.AvgRegionSize) + require.Equal(t, replicationFactor, stats.Network.ReplicationFactor) + + wg := sync.WaitGroup{} + + // Block the provide operation + blockedCond.L.Lock() + blocked = true + blockedCond.L.Unlock() + + h, err := multihash.FromB58String("QmTQjzSazBrNFUiNkDEBFyJXrEBiQj55dBEcthNgSJm6pV") + require.NoError(t, err) + require.True(t, keyspace.IsPrefix(bitstr.Key("0000000"), keyspace.MhToBit256(h))) + + wg.Go(func() { + err := prov.StartProviding(false, h) + require.NoError(t, err) + }) + + // Wait for provide operation to be blocked (single worker) + for range maxConnsPerWorker { + <-workersBlocked + } + require.Empty(t, workersBlocked) + + // Check stats during a provide operation (blocked) + stats = prov.Stats() + + require.False(t, stats.Closed) + require.Equal(t, "online", stats.Connectivity.Status) + require.Equal(t, startTime, stats.Connectivity.Since) + require.Equal(t, 0, stats.Queues.PendingKeyProvides) + require.Equal(t, 0, stats.Queues.PendingRegionProvides) + require.Equal(t, 0, stats.Queues.PendingRegionReprovides) + require.Equal(t, 1, stats.Schedule.Keys) + require.Equal(t, 1, stats.Schedule.Regions) + require.Equal(t, avgPrefixLenFloat, stats.Schedule.AvgPrefixLength) + require.Equal(t, bitstr.Key(strings.Repeat("0", avgPrefixLen)), stats.Schedule.NextReprovidePrefix) + require.Equal(t, startTime.Add(reprovideInterval), stats.Schedule.NextReprovideAt) + require.Equal(t, maxWorkers, stats.Workers.Max) + require.Equal(t, 1, stats.Workers.Active) + require.Equal(t, 1, stats.Workers.ActiveBurst) + require.Equal(t, 0, stats.Workers.ActivePeriodic) + require.Equal(t, dedicatedBurstWorkers, stats.Workers.DedicatedBurst) + require.Equal(t, dedicatedPeriodicWorkers, stats.Workers.DedicatedPeriodic) + require.Equal(t, 0, stats.Workers.QueuedBurst) + require.Equal(t, 0, stats.Workers.QueuedPeriodic) + require.Equal(t, maxConnsPerWorker, stats.Workers.MaxProvideConnsPerWorker) + require.Equal(t, time.Since(startTime), stats.Timing.Uptime) + require.Equal(t, reprovideInterval, stats.Timing.ReprovidesInterval) + require.Equal(t, time.Since(startTime), stats.Timing.CurrentTimeOffset) + require.Equal(t, DefaultMaxReprovideDelay, stats.Timing.MaxReprovideDelay) + require.Equal(t, 1, stats.Operations.Ongoing.KeyProvides) + require.Equal(t, 1, stats.Operations.Ongoing.RegionProvides) + require.Equal(t, 0, stats.Operations.Ongoing.KeyReprovides) + require.Equal(t, 0, stats.Operations.Ongoing.RegionReprovides) + require.Equal(t, 0, stats.Operations.Past.RecordsProvided) + require.Equal(t, 0, stats.Operations.Past.KeysProvided) + require.Equal(t, 0, stats.Operations.Past.KeysFailed) + require.Equal(t, 0., stats.Operations.Past.KeysProvidedPerMinute) + require.Equal(t, 0., stats.Operations.Past.KeysReprovidedPerMinute) + require.Equal(t, time.Duration(0), stats.Operations.Past.RegionReprovideDuration) + require.Equal(t, 0., stats.Operations.Past.AvgKeysPerReprovide) + require.Equal(t, 0, stats.Operations.Past.RegionReprovidedLastCycle) + require.Equal(t, 0, stats.Network.Peers) + require.Equal(t, 0, stats.Network.Reachable) + require.False(t, stats.Network.CompleteKeyspaceCoverage) + require.Equal(t, 0., stats.Network.AvgHolders) + require.Equal(t, 0., stats.Network.AvgRegionSize) + require.Equal(t, replicationFactor, stats.Network.ReplicationFactor) + + // Sleep 1 minute, so that we provide 1 key per minute + time.Sleep(time.Minute) + + blockedCond.L.Lock() + blocked = false + blockedCond.Broadcast() + blockedCond.L.Unlock() + wg.Wait() + synctest.Wait() + + // Check stats after the provide operation + stats = prov.Stats() + + // Count to how many peers the record was actually provided + recordsProvided := 0 + sollicitedPeers := make(map[peer.ID]struct{}) + reachablePeers := make(map[peer.ID]struct{}) + closestPeers, err := router.GetClosestPeers(context.Background(), string(h)) + require.NoError(t, err) + for _, p := range closestPeers { + sollicitedPeers[p] = struct{}{} + if peerReachability[p] { + recordsProvided++ + reachablePeers[p] = struct{}{} + } + } + + require.False(t, stats.Closed) + require.Equal(t, "online", stats.Connectivity.Status) + require.Equal(t, startTime, stats.Connectivity.Since) + require.Equal(t, 0, stats.Queues.PendingKeyProvides) + require.Equal(t, 0, stats.Queues.PendingRegionProvides) + require.Equal(t, 0, stats.Queues.PendingRegionReprovides) + require.Equal(t, 1, stats.Schedule.Keys) + require.Equal(t, 1, stats.Schedule.Regions) + require.Equal(t, avgPrefixLenFloat, stats.Schedule.AvgPrefixLength) + require.Equal(t, bitstr.Key(strings.Repeat("0", avgPrefixLen)), stats.Schedule.NextReprovidePrefix) + require.Equal(t, startTime.Add(reprovideInterval), stats.Schedule.NextReprovideAt) + require.Equal(t, maxWorkers, stats.Workers.Max) + require.Equal(t, 0, stats.Workers.Active) + require.Equal(t, 0, stats.Workers.ActiveBurst) + require.Equal(t, 0, stats.Workers.ActivePeriodic) + require.Equal(t, dedicatedBurstWorkers, stats.Workers.DedicatedBurst) + require.Equal(t, dedicatedPeriodicWorkers, stats.Workers.DedicatedPeriodic) + require.Equal(t, 0, stats.Workers.QueuedBurst) + require.Equal(t, 0, stats.Workers.QueuedPeriodic) + require.Equal(t, maxConnsPerWorker, stats.Workers.MaxProvideConnsPerWorker) + require.Equal(t, time.Since(startTime), stats.Timing.Uptime) + require.Equal(t, reprovideInterval, stats.Timing.ReprovidesInterval) + require.Equal(t, time.Since(startTime), stats.Timing.CurrentTimeOffset) + require.Equal(t, DefaultMaxReprovideDelay, stats.Timing.MaxReprovideDelay) + require.Equal(t, 0, stats.Operations.Ongoing.KeyProvides) + require.Equal(t, 0, stats.Operations.Ongoing.RegionProvides) + require.Equal(t, 0, stats.Operations.Ongoing.KeyReprovides) + require.Equal(t, 0, stats.Operations.Ongoing.RegionReprovides) + require.Equal(t, recordsProvided, stats.Operations.Past.RecordsProvided) + require.Equal(t, 1, stats.Operations.Past.KeysProvided) + require.Equal(t, 0, stats.Operations.Past.KeysFailed) + require.Equal(t, 1., stats.Operations.Past.KeysProvidedPerMinute) + require.Equal(t, 0., stats.Operations.Past.KeysReprovidedPerMinute) + require.Equal(t, time.Duration(0), stats.Operations.Past.RegionReprovideDuration) + require.Equal(t, 0., stats.Operations.Past.AvgKeysPerReprovide) + require.Equal(t, 0, stats.Operations.Past.RegionReprovidedLastCycle) + require.Equal(t, 0, stats.Network.Peers) + require.Equal(t, 0, stats.Network.Reachable) + require.False(t, stats.Network.CompleteKeyspaceCoverage) + require.Equal(t, float64(recordsProvided), stats.Network.AvgHolders) + require.Equal(t, 0., stats.Network.AvgRegionSize) + require.Equal(t, replicationFactor, stats.Network.ReplicationFactor) + + // Add more keys from the same keyspace region as `h` to the keystore. They + // will all be reprovided together. Prefix is "0000000". + newKeys := make([]multihash.Multihash, len(b58KeysPrefix0000000)) + for i, k := range b58KeysPrefix0000000 { + newKeys[i], err = multihash.FromB58String(k) + require.NoError(t, err) + require.True(t, keyspace.IsPrefix(bitstr.Key("0000000"), keyspace.MhToBit256(newKeys[i]))) + } + uniqueNewKeys, err := prov.keystore.Put(context.Background(), newKeys...) + require.NoError(t, err) + require.Len(t, uniqueNewKeys, len(newKeys)) // all keys are new + + // Block the reprovide operation + blockedCond.L.Lock() + blocked = true + blockedCond.L.Unlock() + + // Sleep until the keys must be reprovided + time.Sleep(time.Until(startTime.Add(reprovideInterval))) + + // Wait for reprovide to be blocked (single worker) + for range maxConnsPerWorker { + <-workersBlocked + } + require.Empty(t, workersBlocked) + + stats = prov.Stats() + + reprovidedKeys := len(newKeys) + 1 + + regionSizeInt := 1 << (peerPrefixBitlen - prov.cachedAvgPrefixLen) + regionSize := float64(regionSizeInt) + + require.False(t, stats.Closed) + require.Equal(t, "online", stats.Connectivity.Status) + require.Equal(t, startTime, stats.Connectivity.Since) + require.Equal(t, 0, stats.Queues.PendingKeyProvides) + require.Equal(t, 0, stats.Queues.PendingRegionProvides) + require.Equal(t, 0, stats.Queues.PendingRegionReprovides) + require.Equal(t, reprovidedKeys, stats.Schedule.Keys) + require.Equal(t, 0, stats.Schedule.Regions) // during the reprovide region is removed from schedule, it will be added back after reprovide is complete + require.Equal(t, avgPrefixLenFloat, stats.Schedule.AvgPrefixLength) + require.Equal(t, bitstr.Key(strings.Repeat("0", avgPrefixLen)), stats.Schedule.NextReprovidePrefix) + require.Equal(t, time.Time{}, stats.Schedule.NextReprovideAt) // region removed from schedule + require.Equal(t, maxWorkers, stats.Workers.Max) + require.Equal(t, 1, stats.Workers.Active) + require.Equal(t, 0, stats.Workers.ActiveBurst) + require.Equal(t, 1, stats.Workers.ActivePeriodic) + require.Equal(t, dedicatedBurstWorkers, stats.Workers.DedicatedBurst) + require.Equal(t, dedicatedPeriodicWorkers, stats.Workers.DedicatedPeriodic) + require.Equal(t, 0, stats.Workers.QueuedBurst) + require.Equal(t, 0, stats.Workers.QueuedPeriodic) + require.Equal(t, maxConnsPerWorker, stats.Workers.MaxProvideConnsPerWorker) + require.Equal(t, time.Since(startTime), stats.Timing.Uptime) + require.Equal(t, reprovideInterval, stats.Timing.ReprovidesInterval) + require.Equal(t, time.Since(startTime)%reprovideInterval, stats.Timing.CurrentTimeOffset) + require.Equal(t, DefaultMaxReprovideDelay, stats.Timing.MaxReprovideDelay) + require.Equal(t, 0, stats.Operations.Ongoing.KeyProvides) + require.Equal(t, 0, stats.Operations.Ongoing.RegionProvides) + require.Equal(t, reprovidedKeys, stats.Operations.Ongoing.KeyReprovides) + require.Equal(t, 1, stats.Operations.Ongoing.RegionReprovides) + require.Equal(t, recordsProvided, stats.Operations.Past.RecordsProvided) + require.Equal(t, 1, stats.Operations.Past.KeysProvided) + require.Equal(t, 0, stats.Operations.Past.KeysFailed) + require.Equal(t, 1., stats.Operations.Past.KeysProvidedPerMinute) + require.Equal(t, 0., stats.Operations.Past.KeysReprovidedPerMinute) + require.Equal(t, time.Duration(0), stats.Operations.Past.RegionReprovideDuration) + require.Equal(t, 0., stats.Operations.Past.AvgKeysPerReprovide) + require.Equal(t, 0, stats.Operations.Past.RegionReprovidedLastCycle) + require.Equal(t, 0, stats.Network.Peers) + require.Equal(t, 0, stats.Network.Reachable) + require.False(t, stats.Network.CompleteKeyspaceCoverage) + require.Equal(t, float64(recordsProvided), stats.Network.AvgHolders) + require.Equal(t, regionSize, stats.Network.AvgRegionSize) + require.Equal(t, replicationFactor, stats.Network.ReplicationFactor) + + // Reprovide takes 2 minutes + reprovideDuration := 2 * time.Minute + time.Sleep(reprovideDuration) + + // Unblock the reprovide operation + blockedCond.L.Lock() + blocked = false + blockedCond.Broadcast() + blockedCond.L.Unlock() + synctest.Wait() + + // Check stats after the reprovide operation + stats = prov.Stats() + + newKeysProvidedRecords := 0 + for _, k := range newKeys { + closestPeers, err := router.GetClosestPeers(context.Background(), string(k)) + require.NoError(t, err) + for _, p := range closestPeers { + sollicitedPeers[p] = struct{}{} + if peerReachability[p] { + newKeysProvidedRecords++ + reachablePeers[p] = struct{}{} + } + } + } + + require.False(t, stats.Closed) + require.Equal(t, "online", stats.Connectivity.Status) + require.Equal(t, startTime, stats.Connectivity.Since) + require.Equal(t, 0, stats.Queues.PendingKeyProvides) + require.Equal(t, 0, stats.Queues.PendingRegionProvides) + require.Equal(t, 0, stats.Queues.PendingRegionReprovides) + require.Equal(t, reprovidedKeys, stats.Schedule.Keys) + require.Equal(t, 1, stats.Schedule.Regions) + require.Equal(t, avgPrefixLenFloat, stats.Schedule.AvgPrefixLength) + require.Equal(t, bitstr.Key(strings.Repeat("0", avgPrefixLen)), stats.Schedule.NextReprovidePrefix) + require.Equal(t, startTime.Add(2*reprovideInterval), stats.Schedule.NextReprovideAt) + require.Equal(t, maxWorkers, stats.Workers.Max) + require.Equal(t, 0, stats.Workers.Active) + require.Equal(t, 0, stats.Workers.ActiveBurst) + require.Equal(t, 0, stats.Workers.ActivePeriodic) + require.Equal(t, dedicatedBurstWorkers, stats.Workers.DedicatedBurst) + require.Equal(t, dedicatedPeriodicWorkers, stats.Workers.DedicatedPeriodic) + require.Equal(t, 0, stats.Workers.QueuedBurst) + require.Equal(t, 0, stats.Workers.QueuedPeriodic) + require.Equal(t, maxConnsPerWorker, stats.Workers.MaxProvideConnsPerWorker) + require.Equal(t, time.Since(startTime), stats.Timing.Uptime) + require.Equal(t, reprovideInterval, stats.Timing.ReprovidesInterval) + require.Equal(t, time.Since(startTime)%reprovideInterval, stats.Timing.CurrentTimeOffset) + require.Equal(t, DefaultMaxReprovideDelay, stats.Timing.MaxReprovideDelay) + require.Equal(t, 0, stats.Operations.Ongoing.KeyProvides) + require.Equal(t, 0, stats.Operations.Ongoing.RegionProvides) + require.Equal(t, 0, stats.Operations.Ongoing.KeyReprovides) + require.Equal(t, 0, stats.Operations.Ongoing.RegionReprovides) + require.Equal(t, newKeysProvidedRecords+2*recordsProvided, stats.Operations.Past.RecordsProvided) + require.Equal(t, reprovidedKeys+1, stats.Operations.Past.KeysProvided) // 1 provided, 6 reprovided + require.Equal(t, 0, stats.Operations.Past.KeysFailed) + require.Equal(t, 1., stats.Operations.Past.KeysProvidedPerMinute) + require.Equal(t, float64(reprovidedKeys)/float64(reprovideDuration.Minutes()), stats.Operations.Past.KeysReprovidedPerMinute) + require.Equal(t, reprovideDuration, stats.Operations.Past.RegionReprovideDuration) + require.Equal(t, float64(reprovidedKeys), stats.Operations.Past.AvgKeysPerReprovide) + require.Equal(t, 1, stats.Operations.Past.RegionReprovidedLastCycle) + require.Equal(t, len(sollicitedPeers), stats.Network.Peers) + require.Equal(t, len(reachablePeers), stats.Network.Reachable) + require.False(t, stats.Network.CompleteKeyspaceCoverage) + require.Equal(t, float64(len(reachablePeers))/float64(len(sollicitedPeers))*float64(replicationFactor), stats.Network.AvgHolders) + require.Equal(t, regionSize, stats.Network.AvgRegionSize) + require.Equal(t, replicationFactor, stats.Network.ReplicationFactor) + + keysPerPrefix := 3 + balancedKeys := make([]multihash.Multihash, len(b58KeysAllPrefixesBut00000)) + require.Len(t, b58KeysAllPrefixesBut00000, keysPerPrefix*(1<