Skip to content

Commit 0cb418a

Browse files
provider: options (#1124)
* provider: adding provide and reprovide queue * provider: network operations * add some tests * schedule prefix len computations * provider schedule * provider: handleProvide * addressed review * use go-test/random * satisfy linter * log errors during initial prefix len measurement * address review * satisfy linter * address review * provider: explore swarm * provider: batch provide * provider: batch reprovide * provider: catchup pending work * provider: options * fix panic when adding key to trie if superstring already exists * address review * decrease minimal region size from replicationFactor+1 to replicationFactor * simplify unscheduleSubsumedPrefixesNoClock * address review * fix test to match region size (now: replicationFactor, before: replicationFactor+1) * dequeue outside of go routine * refactor and test groupAndScheduleKeysByPrefix * moved maxPrefixSize const to top * address review * address review * address review * don't allocate capacity to avgPrefixLenReady
1 parent 1704edb commit 0cb418a

File tree

2 files changed

+385
-6
lines changed

2 files changed

+385
-6
lines changed

provider/options.go

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
package provider
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"time"
7+
8+
"github.com/filecoin-project/go-clock"
9+
"github.com/libp2p/go-libp2p-kad-dht/amino"
10+
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
11+
"github.com/libp2p/go-libp2p-kad-dht/provider/datastore"
12+
"github.com/libp2p/go-libp2p/core/peer"
13+
ma "github.com/multiformats/go-multiaddr"
14+
mh "github.com/multiformats/go-multihash"
15+
)
16+
17+
const (
18+
// DefaultMaxReprovideDelay is the default maximum delay allowed when
19+
// reproviding a region. The interval between 2 reprovides of the same region
20+
// is at most ReprovideInterval+MaxReprovideDelay. This variable is necessary
21+
// since regions can grow and shrink depending on the network churn.
22+
DefaultMaxReprovideDelay = 1 * time.Hour
23+
24+
// DefaultConnectivityCheckOnlineInterval is the default minimum interval for
25+
// checking whether the node is still online. Such a check is performed when
26+
// a network operation fails, and the ConnectivityCheckOnlineInterval limits
27+
// how often such a check is performed.
28+
DefaultConnectivityCheckOnlineInterval = 1 * time.Minute
29+
// DefaultConnectivityCheckOfflineInterval is the default interval for
30+
// checking if the offline node has come online again.
31+
DefaultConnectivityCheckOfflineInterval = 5 * time.Minute
32+
)
33+
34+
type config struct {
35+
replicationFactor int
36+
reprovideInterval time.Duration
37+
maxReprovideDelay time.Duration
38+
connectivityCheckOnlineInterval time.Duration
39+
connectivityCheckOfflineInterval time.Duration
40+
41+
peerid peer.ID
42+
router KadClosestPeersRouter
43+
44+
keyStore *datastore.KeyStore
45+
46+
msgSender pb.MessageSender
47+
selfAddrs func() []ma.Multiaddr
48+
addLocalRecord func(mh.Multihash) error
49+
50+
clock clock.Clock
51+
52+
maxWorkers int
53+
dedicatedPeriodicWorkers int
54+
dedicatedBurstWorkers int
55+
maxProvideConnsPerWorker int
56+
}
57+
58+
func (cfg *config) apply(opts ...Option) error {
59+
for i, o := range opts {
60+
if err := o(cfg); err != nil {
61+
return fmt.Errorf("reprovider dht option %d failed: %w", i, err)
62+
}
63+
}
64+
return nil
65+
}
66+
67+
func (c *config) validate() error {
68+
if len(c.peerid) == 0 {
69+
return errors.New("reprovider config: peer id is required")
70+
}
71+
if c.router == nil {
72+
return errors.New("reprovider config: router is required")
73+
}
74+
if c.msgSender == nil {
75+
return errors.New("reprovider config: message sender is required")
76+
}
77+
if c.selfAddrs == nil {
78+
return errors.New("reprovider config: self addrs func is required")
79+
}
80+
if c.dedicatedPeriodicWorkers+c.dedicatedBurstWorkers > c.maxWorkers {
81+
return errors.New("reprovider config: total dedicated workers exceed max workers")
82+
}
83+
return nil
84+
}
85+
86+
type Option func(opt *config) error
87+
88+
var DefaultConfig = func(cfg *config) error {
89+
cfg.replicationFactor = amino.DefaultBucketSize
90+
cfg.reprovideInterval = amino.DefaultReprovideInterval
91+
cfg.maxReprovideDelay = DefaultMaxReprovideDelay
92+
cfg.connectivityCheckOnlineInterval = DefaultConnectivityCheckOnlineInterval
93+
cfg.connectivityCheckOfflineInterval = DefaultConnectivityCheckOfflineInterval
94+
95+
cfg.clock = clock.New()
96+
97+
cfg.maxWorkers = 4
98+
cfg.dedicatedPeriodicWorkers = 2
99+
cfg.dedicatedBurstWorkers = 1
100+
cfg.maxProvideConnsPerWorker = 20
101+
102+
cfg.addLocalRecord = func(mh mh.Multihash) error { return nil }
103+
104+
return nil
105+
}
106+
107+
// WithReplicationFactor sets the replication factor for provider records. It
108+
// means that during provide and reprovide operations, each provider records is
109+
// allocated to the ReplicationFactor closest peers in the DHT swarm.
110+
func WithReplicationFactor(n int) Option {
111+
return func(cfg *config) error {
112+
if n <= 0 {
113+
return errors.New("reprovider config: replication factor must be a positive integer")
114+
}
115+
cfg.replicationFactor = n
116+
return nil
117+
}
118+
}
119+
120+
// WithReprovideInterval sets the interval at which regions are reprovided.
121+
func WithReprovideInterval(d time.Duration) Option {
122+
return func(cfg *config) error {
123+
if d <= 0 {
124+
return errors.New("reprovider config: reprovide interval must be greater than 0")
125+
}
126+
cfg.reprovideInterval = d
127+
return nil
128+
}
129+
}
130+
131+
// WithMaxReprovideDelay sets the maximum delay allowed when reproviding a
132+
// region. The interval between 2 reprovides of the same region is at most
133+
// ReprovideInterval+MaxReprovideDelay.
134+
//
135+
// This parameter is necessary since regions can grow and shrink depending on
136+
// the network churn.
137+
func WithMaxReprovideDelay(d time.Duration) Option {
138+
return func(cfg *config) error {
139+
if d <= 0 {
140+
return errors.New("reprovider config: max reprovide delay must be greater than 0")
141+
}
142+
cfg.maxReprovideDelay = d
143+
return nil
144+
}
145+
}
146+
147+
// WithConnectivityCheckOnlineInterval sets the minimal interval for checking
148+
// whether the node is still online. Such a check is performed when a network
149+
// operation fails, and the ConnectivityCheckOnlineInterval limits how often
150+
// such a check is performed.
151+
func WithConnectivityCheckOnlineInterval(d time.Duration) Option {
152+
return func(cfg *config) error {
153+
cfg.connectivityCheckOnlineInterval = d
154+
return nil
155+
}
156+
}
157+
158+
// WithConnectivityCheckOfflineInterval sets the interval for periodically
159+
// checking whether the offline node has come online again.
160+
func WithConnectivityCheckOfflineInterval(d time.Duration) Option {
161+
return func(cfg *config) error {
162+
cfg.connectivityCheckOfflineInterval = d
163+
return nil
164+
}
165+
}
166+
167+
// WithPeerID sets the peer ID of the node running the provider.
168+
func WithPeerID(p peer.ID) Option {
169+
return func(cfg *config) error {
170+
cfg.peerid = p
171+
return nil
172+
}
173+
}
174+
175+
// WithRouter sets the router used to find closest peers in the DHT.
176+
func WithRouter(r KadClosestPeersRouter) Option {
177+
return func(cfg *config) error {
178+
cfg.router = r
179+
return nil
180+
}
181+
}
182+
183+
// WithMessageSender sets the message sender used to send messages out to the
184+
// DHT swarm.
185+
func WithMessageSender(m pb.MessageSender) Option {
186+
return func(cfg *config) error {
187+
cfg.msgSender = m
188+
return nil
189+
}
190+
}
191+
192+
// WithSelfAddrs sets the function that returns the self addresses of the node.
193+
// These addresses are written in the provider records advertised by the node.
194+
func WithSelfAddrs(f func() []ma.Multiaddr) Option {
195+
return func(cfg *config) error {
196+
cfg.selfAddrs = f
197+
return nil
198+
}
199+
}
200+
201+
// WithAddLocalRecord sets the function that adds a provider record to the
202+
// local provider record store.
203+
func WithAddLocalRecord(f func(mh.Multihash) error) Option {
204+
return func(cfg *config) error {
205+
if f == nil {
206+
return errors.New("reprovider config: add local record function cannot be nil")
207+
}
208+
cfg.addLocalRecord = f
209+
return nil
210+
}
211+
}
212+
213+
// WithClock sets the clock used by the provider. This is useful for testing
214+
// purposes, allowing to control time in tests.
215+
func WithClock(c clock.Clock) Option {
216+
return func(cfg *config) error {
217+
cfg.clock = c
218+
return nil
219+
}
220+
}
221+
222+
// WithMaxWorkers sets the maximum number of workers that can be used for
223+
// provide and reprovide jobs. The job of a worker is to explore a region of
224+
// the keyspace and (re)provide the keys matching the region to the closest
225+
// peers.
226+
//
227+
// You can configure a number of workers dedicated to periodic jobs, and a
228+
// number of workers dedicated to burst jobs. MaxWorkers should be greater or
229+
// equal to DedicatedPeriodicWorkers+DedicatedBurstWorkers. The additional
230+
// workers that aren't dedicated to specific jobs can be used for either job
231+
// type where needed.
232+
func WithMaxWorkers(n int) Option {
233+
return func(cfg *config) error {
234+
if n < 0 {
235+
return errors.New("reprovider config: max workers must be non-negative")
236+
}
237+
cfg.maxWorkers = n
238+
return nil
239+
}
240+
}
241+
242+
// WithDedicatedPeriodicWorkers sets the number of workers dedicated to
243+
// periodic region reprovides.
244+
func WithDedicatedPeriodicWorkers(n int) Option {
245+
return func(cfg *config) error {
246+
if n < 0 {
247+
return errors.New("reprovider config: dedicated periodic workers must be non-negative")
248+
}
249+
cfg.dedicatedPeriodicWorkers = n
250+
return nil
251+
}
252+
}
253+
254+
// WithDedicatedBurstWorkers sets the number of workers dedicated to burst
255+
// operations. Burst operations consist in work that isn't scheduled
256+
// beforehands, such as initial provides and catching up with reproviding after
257+
// the node went offline for a while.
258+
func WithDedicatedBurstWorkers(n int) Option {
259+
return func(cfg *config) error {
260+
if n < 0 {
261+
return errors.New("reprovider config: dedicated burst workers must be non-negative")
262+
}
263+
cfg.dedicatedBurstWorkers = n
264+
return nil
265+
}
266+
}
267+
268+
// WithMaxProvideConnsPerWorker sets the maximum number of connections to
269+
// distinct peers that can be opened by a single worker during a provide
270+
// operation.
271+
func WithMaxProvideConnsPerWorker(n int) Option {
272+
return func(cfg *config) error {
273+
if n <= 0 {
274+
return errors.New("reprovider config: max provide conns per worker must be greater than 0")
275+
}
276+
cfg.maxProvideConnsPerWorker = n
277+
return nil
278+
}
279+
}
280+
281+
// WithKeyStore defines the KeyStore used to keep track of the keys that need
282+
// to be reprovided.
283+
func WithKeyStore(keyStore *datastore.KeyStore) Option {
284+
return func(cfg *config) error {
285+
if keyStore == nil {
286+
return errors.New("reprovider config: multihash store cannot be nil")
287+
}
288+
cfg.keyStore = keyStore
289+
return nil
290+
}
291+
}

0 commit comments

Comments
 (0)