Skip to content

Commit e0da085

Browse files
provider: minor fixes (#1133)
* provider: minor fixes * adjusting for kubo PR * address review
1 parent 2ddbeca commit e0da085

File tree

6 files changed

+82
-43
lines changed

6 files changed

+82
-43
lines changed

dual/provider/provider.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package provider
33
import (
44
"context"
55
"errors"
6-
"sync"
76
"sync/atomic"
87

98
"github.com/ipfs/go-cid"
@@ -84,18 +83,19 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
8483

8584
// runOnBoth runs the provided function on both the LAN and WAN providers in
8685
// parallel and waits for both to complete.
87-
func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider)) {
88-
wg := sync.WaitGroup{}
89-
wg.Add(2)
90-
go func() {
91-
defer wg.Done()
92-
f(s.LAN)
93-
}()
94-
go func() {
95-
defer wg.Done()
86+
func (s *SweepingProvider) runOnBoth(wait bool, f func(*provider.SweepingProvider)) {
87+
if wait {
88+
done := make(chan struct{})
89+
go func() {
90+
defer close(done)
91+
f(s.LAN)
92+
}()
9693
f(s.WAN)
97-
}()
98-
wg.Wait()
94+
<-done
95+
return
96+
}
97+
go f(s.LAN)
98+
go f(s.WAN)
9999
}
100100

101101
// ProvideOnce sends provider records for the specified keys to both DHT swarms
@@ -104,7 +104,7 @@ func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider)) {
104104
// Add the supplied multihashes to the provide queue, and return immediately.
105105
// The provide operation happens asynchronously.
106106
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) {
107-
s.runOnBoth(func(p *provider.SweepingProvider) {
107+
s.runOnBoth(false, func(p *provider.SweepingProvider) {
108108
p.ProvideOnce(keys...)
109109
})
110110
}
@@ -128,7 +128,7 @@ func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) {
128128
return
129129
}
130130

131-
s.runOnBoth(func(p *provider.SweepingProvider) {
131+
s.runOnBoth(false, func(p *provider.SweepingProvider) {
132132
p.AddToSchedule(newKeys...)
133133
})
134134

@@ -160,7 +160,7 @@ func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) {
160160
// reprovided as scheduled.
161161
func (s *SweepingProvider) Clear() int {
162162
var total atomic.Int32
163-
s.runOnBoth(func(p *provider.SweepingProvider) {
163+
s.runOnBoth(true, func(p *provider.SweepingProvider) {
164164
total.Add(int32(p.Clear()))
165165
})
166166
return int(total.Load())
@@ -174,7 +174,7 @@ func (s *SweepingProvider) Clear() int {
174174
// This is done automatically during the reprovide operation if a region has no
175175
// keys.
176176
func (s *SweepingProvider) RefreshSchedule() {
177-
s.runOnBoth(func(p *provider.SweepingProvider) {
177+
go s.runOnBoth(false, func(p *provider.SweepingProvider) {
178178
p.RefreshSchedule()
179179
})
180180
}

fullrt/dht.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
150150
EnableProviders: true,
151151
EnableValues: true,
152152
ProtocolPrefix: protocolPrefix,
153+
MsgSenderBuilder: net.NewMessageSenderImpl,
153154
}
154155

155156
if err := dhtcfg.Apply(fullrtcfg.dhtOpts...); err != nil {
@@ -163,7 +164,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
163164
return nil, err
164165
}
165166

166-
ms := net.NewMessageSenderImpl(h, amino.Protocols)
167+
ms := dhtcfg.MsgSenderBuilder(h, amino.Protocols)
167168
protoMessenger, err := dht_pb.NewProtocolMessenger(ms)
168169
if err != nil {
169170
return nil, err

provider/datastore/keystore.go

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type KeyStore struct {
4646
prefixLen int
4747

4848
gcFunc KeyChanFunc // optional function to get keys for garbage collection
49+
gcFuncLk sync.Mutex
4950
gcInterval time.Duration
5051
gcBatchSize int
5152
}
@@ -103,17 +104,6 @@ func WithDatastorePrefix(base string) KeyStoreOption {
103104
}
104105
}
105106

106-
// WithGCFunc sets the function periodically used to garbage collect keys that
107-
// shouldn't be provided anymore. During the garbage collection process, the
108-
// store is entirely purged, and is repopulated from the keys supplied by the
109-
// KeyChanFunc.
110-
func WithGCFunc(gcFunc KeyChanFunc) KeyStoreOption {
111-
return func(cfg *keyStoreCfg) error {
112-
cfg.gcFunc = gcFunc
113-
return nil
114-
}
115-
}
116-
117107
// WithGCInterval defines the interval at which the KeyStore is garbage
118108
// collected. During the garbage collection process, the store is entirely
119109
// purged, and is repopulated from the keys supplied by the gcFunc.
@@ -141,6 +131,33 @@ func WithGCBatchSize(size int) KeyStoreOption {
141131
}
142132
}
143133

134+
// WithGCFunc sets the function periodically used to garbage collect keys that
135+
// shouldn't be provided anymore. During the garbage collection process, the
136+
// store is entirely purged, and is repopulated from the keys supplied by the
137+
// KeyChanFunc.
138+
func WithGCFunc(gcFunc KeyChanFunc) KeyStoreOption {
139+
return func(cfg *keyStoreCfg) error {
140+
cfg.gcFunc = gcFunc
141+
return nil
142+
}
143+
}
144+
145+
func (s *KeyStore) SetGCFunc(gcFunc KeyChanFunc) {
146+
if gcFunc == nil {
147+
return
148+
}
149+
s.gcFuncLk.Lock()
150+
defer s.gcFuncLk.Unlock()
151+
152+
startGC := s.gcFunc == nil && s.gcInterval > 0
153+
s.gcFunc = gcFunc
154+
155+
if startGC {
156+
s.wg.Add(1)
157+
go s.runGC()
158+
}
159+
}
160+
144161
// NewKeyStore creates a new KeyStore backed by the provided datastore.
145162
func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (*KeyStore, error) {
146163
var cfg keyStoreCfg
@@ -161,6 +178,7 @@ func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (*KeyStore, error) {
161178
gcBatchSize: cfg.gcBatchSize,
162179
}
163180
if cfg.gcFunc != nil && cfg.gcInterval > 0 {
181+
keyStore.wg.Add(1)
164182
go keyStore.runGC()
165183
}
166184
return &keyStore, nil
@@ -188,15 +206,16 @@ func (s *KeyStore) closed() bool {
188206
// resets the state of the KeyStore to match the state returned by the GC
189207
// function.
190208
func (s *KeyStore) runGC() {
191-
s.wg.Add(1)
192209
defer s.wg.Done()
193210
ticker := time.NewTicker(s.gcInterval)
194211
for {
195212
select {
196213
case <-s.done:
197214
return
198215
case <-ticker.C:
216+
s.gcFuncLk.Lock()
199217
keysChan, err := s.gcFunc(context.Background())
218+
s.gcFuncLk.Unlock()
200219
if err != nil {
201220
logger.Errorf("garbage collection failed: %v", err)
202221
continue
@@ -218,17 +237,14 @@ func (s *KeyStore) ResetCids(ctx context.Context, keysChan <-chan cid.Cid) error
218237
if err != nil {
219238
return fmt.Errorf("KeyStore empty failed during reset: %w", err)
220239
}
221-
keys := make([]mh.Multihash, s.gcBatchSize)
222-
i := 0
240+
keys := make([]mh.Multihash, 0, s.gcBatchSize)
223241
for c := range keysChan {
224-
keys[i] = c.Hash()
225-
i++
226-
if i == s.gcBatchSize {
242+
keys = append(keys, c.Hash())
243+
if len(keys) == s.gcBatchSize {
227244
_, err = s.Put(ctx, keys...)
228245
if err != nil {
229246
return fmt.Errorf("KeyStore put failed during reset: %w", err)
230247
}
231-
i = 0
232248
keys = keys[:0]
233249
}
234250
}
@@ -245,7 +261,7 @@ func (s *KeyStore) dsKey(prefix bitstr.Key) ds.Key {
245261

246262
// putLocked stores the provided keys while assuming s.lk is already held.
247263
func (s *KeyStore) putLocked(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) {
248-
groups := make(map[bitstr.Key][]mh.Multihash)
264+
groups := make(map[bitstr.Key][]mh.Multihash, len(keys))
249265
for _, h := range keys {
250266
k := keyspace.MhToBit256(h)
251267
bs := bitstr.Key(key.BitString(k)[:s.prefixLen])

provider/internal/keyspace/trie.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,16 @@ func FindPrefixOfKey[K0 kad.Key[K0], K1 kad.Key[K1], D any](t *trie.Trie[K0, D],
5050
}
5151

5252
func findPrefixOfKeyAtDepth[K0 kad.Key[K0], K1 kad.Key[K1], D any](t *trie.Trie[K0, D], k K1, depth int) (K0, bool) {
53+
var zero K0
5354
if t.IsLeaf() {
5455
if !t.HasKey() {
55-
var zero K0
5656
return zero, false
5757
}
5858
return *t.Key(), key.CommonPrefixLength(*t.Key(), k) == (*t.Key()).BitLen()
5959
}
60+
if depth == k.BitLen() {
61+
return zero, false
62+
}
6063
b := int(k.Bit(depth))
6164
return findPrefixOfKeyAtDepth(t.Branch(b), k, depth+1)
6265
}

provider/internal/keyspace/trie_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,21 @@ func TestFindPrefixOfKey(t *testing.T) {
8989
require.False(t, ok)
9090
}
9191

92+
func TestFindPrefixOfTooShortKey(t *testing.T) {
93+
tr := trie.New[bitstr.Key, struct{}]()
94+
keys := []bitstr.Key{
95+
"0000",
96+
"0001",
97+
"0010",
98+
"0011",
99+
}
100+
for _, k := range keys {
101+
tr.Add(k, struct{}{})
102+
}
103+
_, ok := FindPrefixOfKey(tr, bitstr.Key("000"))
104+
require.False(t, ok)
105+
}
106+
92107
func TestFindSubtrie(t *testing.T) {
93108
keys := []bitstr.Key{
94109
"0000",

provider/provider.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ const (
4646
// the measurement, but adds network load and latency before the initial
4747
// provide request can be performed.
4848
initialGetClosestPeersCount = 4
49+
// defaultPrefixLenValidity is the validity of the cached average region
50+
// prefix length computed from the schedule. It allows to avoid recomputing
51+
// the average everytime we need to average prefix length.
52+
defaultPrefixLenValidity = 5 * time.Minute
4953

5054
// retryInterval is the interval at which the provider tries to perform any
5155
// previously failed work (provide or reprovide).
@@ -197,7 +201,7 @@ func New(opts ...Option) (*SweepingProvider, error) {
197201
}),
198202
maxProvideConnsPerWorker: cfg.maxProvideConnsPerWorker,
199203

200-
avgPrefixLenValidity: 5 * time.Minute,
204+
avgPrefixLenValidity: defaultPrefixLenValidity,
201205
cachedAvgPrefixLen: -1,
202206
avgPrefixLenReady: make(chan struct{}),
203207

@@ -380,7 +384,7 @@ func (s *SweepingProvider) unscheduleSubsumedPrefixesNoLock(prefix bitstr.Key) {
380384
} else {
381385
timeUntilReprovide := s.timeUntil(next.Data)
382386
s.scheduleNextReprovideNoLock(next.Key, timeUntilReprovide)
383-
logger.Warnf("next scheduled prefix now is %s", s.scheduleCursor)
387+
logger.Debugf("next scheduled prefix now is %s", s.scheduleCursor)
384388
}
385389
}
386390
}
@@ -1083,7 +1087,7 @@ func (s *SweepingProvider) batchProvide(prefix bitstr.Key, keys []mh.Multihash)
10831087
regions = keyspace.AssignKeysToRegions(regions, keys)
10841088

10851089
if !s.provideRegions(regions, addrInfo, false, false) {
1086-
logger.Errorf("failed to provide any region for prefix %s", prefix)
1090+
logger.Warnf("failed to provide any region for prefix %s", prefix)
10871091
}
10881092
}
10891093

@@ -1155,20 +1159,20 @@ func (s *SweepingProvider) batchReprovide(prefix bitstr.Key, periodicReprovide b
11551159
regions = keyspace.AssignKeysToRegions(regions, keys)
11561160

11571161
if !s.provideRegions(regions, addrInfo, true, periodicReprovide) {
1158-
logger.Errorf("failed to reprovide any region for prefix %s", prefix)
1162+
logger.Warnf("failed to reprovide any region for prefix %s", prefix)
11591163
}
11601164
}
11611165

11621166
func (s *SweepingProvider) failedProvide(prefix bitstr.Key, keys []mh.Multihash, err error) {
1163-
logger.Error(err)
1167+
logger.Warn(err)
11641168
// Put keys back to the provide queue.
11651169
s.provideQueue.Enqueue(prefix, keys...)
11661170

11671171
s.connectivity.TriggerCheck()
11681172
}
11691173

11701174
func (s *SweepingProvider) failedReprovide(prefix bitstr.Key, err error) {
1171-
logger.Error(err)
1175+
logger.Warn(err)
11721176
// Put prefix in the reprovide queue.
11731177
s.reprovideQueue.Enqueue(prefix)
11741178

0 commit comments

Comments
 (0)