Skip to content

Commit 2ddbeca

Browse files
dual: provider (#1132)
* provider: refresh schedule * dual: provider * fix: flaky TestStartProvidingUnstableNetwork * addressing review
1 parent 9446753 commit 2ddbeca

File tree

8 files changed

+551
-53
lines changed

8 files changed

+551
-53
lines changed

dht.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,11 @@ func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
855855
return dht.routingTable
856856
}
857857

858+
// BucketSize returns the size of the DHT's routing table buckets.
859+
func (dht *IpfsDHT) BucketSize() int {
860+
return dht.bucketSize
861+
}
862+
858863
// Close calls Process Close.
859864
func (dht *IpfsDHT) Close() error {
860865
dht.cancel()
@@ -897,6 +902,11 @@ func (dht *IpfsDHT) Host() host.Host {
897902
return dht.host
898903
}
899904

905+
// MessageSender returns the DHT's message sender.
906+
func (dht *IpfsDHT) MessageSender() pb.MessageSender {
907+
return dht.msgSender
908+
}
909+
900910
// Ping sends a ping message to the passed peer and waits for a response.
901911
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
902912
ctx, span := internal.StartSpan(ctx, "IpfsDHT.Ping", trace.WithAttributes(attribute.Stringer("PeerID", p)))
@@ -932,6 +942,16 @@ func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Dura
932942
dht.peerstore.AddAddrs(p, dht.filterAddrs(addrs), ttl)
933943
}
934944

945+
// FilteredAddrs returns the set of addresses that this DHT instance
946+
// advertises to the swarm, after applying the configured addrFilter.
947+
//
948+
// For example:
949+
// - In a public DHT, local and loopback addresses are filtered out.
950+
// - In a LAN DHT, only loopback addresses are filtered out.
951+
func (dht *IpfsDHT) FilteredAddrs() []ma.Multiaddr {
952+
return dht.filterAddrs(dht.host.Addrs())
953+
}
954+
935955
func (dht *IpfsDHT) filterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
936956
if f := dht.addrFilter; f != nil {
937957
return f(addrs)

dual/provider/options.go

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
package provider
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"time"
7+
8+
ds "github.com/ipfs/go-datastore"
9+
"github.com/libp2p/go-libp2p-kad-dht/amino"
10+
"github.com/libp2p/go-libp2p-kad-dht/dual"
11+
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
12+
"github.com/libp2p/go-libp2p-kad-dht/provider"
13+
"github.com/libp2p/go-libp2p-kad-dht/provider/datastore"
14+
)
15+
16+
const (
17+
lanID uint8 = iota
18+
wanID
19+
)
20+
21+
type config struct {
22+
keyStore *datastore.KeyStore
23+
24+
reprovideInterval [2]time.Duration // [0] = LAN, [1] = WAN
25+
maxReprovideDelay [2]time.Duration
26+
27+
connectivityCheckOnlineInterval [2]time.Duration
28+
connectivityCheckOfflineInterval [2]time.Duration
29+
30+
maxWorkers [2]int
31+
dedicatedPeriodicWorkers [2]int
32+
dedicatedBurstWorkers [2]int
33+
maxProvideConnsPerWorker [2]int
34+
35+
msgSenders [2]pb.MessageSender
36+
}
37+
38+
type Option func(opt *config) error
39+
40+
func (cfg *config) apply(opts ...Option) error {
41+
for i, o := range opts {
42+
if err := o(cfg); err != nil {
43+
return fmt.Errorf("dual dht provider option %d failed: %w", i, err)
44+
}
45+
}
46+
return nil
47+
}
48+
49+
func (cfg *config) resolveDefaults(d *dual.DHT) {
50+
if cfg.msgSenders[lanID] == nil {
51+
cfg.msgSenders[lanID] = d.LAN.MessageSender()
52+
}
53+
if cfg.msgSenders[wanID] == nil {
54+
cfg.msgSenders[wanID] = d.WAN.MessageSender()
55+
}
56+
}
57+
58+
func (c *config) validate() error {
59+
if c.dedicatedPeriodicWorkers[lanID]+c.dedicatedBurstWorkers[lanID] > c.maxWorkers[lanID] {
60+
return errors.New("provider config: total dedicated workers exceed max workers")
61+
}
62+
if c.dedicatedPeriodicWorkers[wanID]+c.dedicatedBurstWorkers[wanID] > c.maxWorkers[wanID] {
63+
return errors.New("provider config: total dedicated workers exceed max workers")
64+
}
65+
return nil
66+
}
67+
68+
var DefaultConfig = func(cfg *config) error {
69+
var err error
70+
cfg.keyStore, err = datastore.NewKeyStore(ds.NewMapDatastore())
71+
if err != nil {
72+
return err
73+
}
74+
75+
cfg.reprovideInterval = [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval}
76+
cfg.maxReprovideDelay = [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay}
77+
78+
cfg.connectivityCheckOnlineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval}
79+
cfg.connectivityCheckOfflineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOfflineInterval, provider.DefaultConnectivityCheckOfflineInterval}
80+
81+
cfg.maxWorkers = [2]int{4, 4}
82+
cfg.dedicatedPeriodicWorkers = [2]int{2, 2}
83+
cfg.dedicatedBurstWorkers = [2]int{1, 1}
84+
cfg.maxProvideConnsPerWorker = [2]int{20, 20}
85+
86+
return nil
87+
}
88+
89+
func WithKeyStore(keyStore *datastore.KeyStore) Option {
90+
return func(cfg *config) error {
91+
if keyStore == nil {
92+
return errors.New("provider config: keyStore cannot be nil")
93+
}
94+
cfg.keyStore = keyStore
95+
return nil
96+
}
97+
}
98+
99+
func withReprovideInterval(reprovideInterval time.Duration, dhts ...uint8) Option {
100+
return func(cfg *config) error {
101+
if reprovideInterval <= 0 {
102+
return fmt.Errorf("reprovide interval must be positive, got %s", reprovideInterval)
103+
}
104+
for _, dht := range dhts {
105+
cfg.reprovideInterval[dht] = reprovideInterval
106+
}
107+
return nil
108+
}
109+
}
110+
111+
func WithReprovideInterval(reprovideInterval time.Duration) Option {
112+
return withReprovideInterval(reprovideInterval, lanID, wanID)
113+
}
114+
115+
func WithReprovideIntervalLAN(reprovideInterval time.Duration) Option {
116+
return withReprovideInterval(reprovideInterval, lanID)
117+
}
118+
119+
func WithReprovideIntervalWAN(reprovideInterval time.Duration) Option {
120+
return withReprovideInterval(reprovideInterval, wanID)
121+
}
122+
123+
func withMaxReprovideDelay(maxReprovideDelay time.Duration, dhts ...uint8) Option {
124+
return func(cfg *config) error {
125+
if maxReprovideDelay <= 0 {
126+
return fmt.Errorf("max reprovide delay must be positive, got %s", maxReprovideDelay)
127+
}
128+
for _, dht := range dhts {
129+
cfg.maxReprovideDelay[dht] = maxReprovideDelay
130+
}
131+
return nil
132+
}
133+
}
134+
135+
func WithMaxReprovideDelay(maxReprovideDelay time.Duration) Option {
136+
return withMaxReprovideDelay(maxReprovideDelay, lanID, wanID)
137+
}
138+
139+
func WithMaxReprovideDelayLAN(maxReprovideDelay time.Duration) Option {
140+
return withMaxReprovideDelay(maxReprovideDelay, lanID)
141+
}
142+
143+
func WithMaxReprovideDelayWAN(maxReprovideDelay time.Duration) Option {
144+
return withMaxReprovideDelay(maxReprovideDelay, wanID)
145+
}
146+
147+
func withConnectivityCheckOnlineInterval(onlineInterval time.Duration, dhts ...uint8) Option {
148+
return func(cfg *config) error {
149+
if onlineInterval <= 0 {
150+
return fmt.Errorf("invalid connectivity check online interval %s", onlineInterval)
151+
}
152+
for _, dht := range dhts {
153+
cfg.connectivityCheckOnlineInterval[dht] = onlineInterval
154+
}
155+
return nil
156+
}
157+
}
158+
159+
func WithConnectivityCheckOnlineInterval(onlineInterval time.Duration) Option {
160+
return withConnectivityCheckOnlineInterval(onlineInterval, lanID, wanID)
161+
}
162+
163+
func WithConnectivityCheckOnlineIntervalLAN(onlineInterval time.Duration) Option {
164+
return withConnectivityCheckOnlineInterval(onlineInterval, lanID)
165+
}
166+
167+
func WithConnectivityCheckOnlineIntervalWAN(onlineInterval time.Duration) Option {
168+
return withConnectivityCheckOnlineInterval(onlineInterval, wanID)
169+
}
170+
171+
func withConnectivityCheckOfflineInterval(offlineInterval time.Duration, dhts ...uint8) Option {
172+
return func(cfg *config) error {
173+
if offlineInterval <= 0 {
174+
return fmt.Errorf("invalid connectivity check offline interval %s", offlineInterval)
175+
}
176+
for _, dht := range dhts {
177+
cfg.connectivityCheckOfflineInterval[dht] = offlineInterval
178+
}
179+
return nil
180+
}
181+
}
182+
183+
func WithConnectivityCheckOfflineInterval(offlineInterval time.Duration) Option {
184+
return withConnectivityCheckOfflineInterval(offlineInterval, lanID, wanID)
185+
}
186+
187+
func WithConnectivityCheckOfflineIntervalLAN(offlineInterval time.Duration) Option {
188+
return withConnectivityCheckOfflineInterval(offlineInterval, lanID)
189+
}
190+
191+
func WithConnectivityCheckOfflineIntervalWAN(offlineInterval time.Duration) Option {
192+
return withConnectivityCheckOfflineInterval(offlineInterval, wanID)
193+
}
194+
195+
func withMaxWorkers(maxWorkers int, dhts ...uint8) Option {
196+
return func(cfg *config) error {
197+
if maxWorkers <= 0 {
198+
return fmt.Errorf("invalid max workers %d", maxWorkers)
199+
}
200+
for _, dht := range dhts {
201+
cfg.maxWorkers[dht] = maxWorkers
202+
}
203+
return nil
204+
}
205+
}
206+
207+
func WithMaxWorkers(maxWorkers int) Option {
208+
return withMaxWorkers(maxWorkers, lanID, wanID)
209+
}
210+
211+
func WithMaxWorkersLAN(maxWorkers int) Option {
212+
return withMaxWorkers(maxWorkers, lanID)
213+
}
214+
215+
func WithMaxWorkersWAN(maxWorkers int) Option {
216+
return withMaxWorkers(maxWorkers, wanID)
217+
}
218+
219+
func withDedicatedPeriodicWorkers(dedicatedPeriodicWorkers int, dhts ...uint8) Option {
220+
return func(cfg *config) error {
221+
if dedicatedPeriodicWorkers < 0 {
222+
return fmt.Errorf("invalid dedicated periodic workers %d", dedicatedPeriodicWorkers)
223+
}
224+
for _, dht := range dhts {
225+
cfg.dedicatedPeriodicWorkers[dht] = dedicatedPeriodicWorkers
226+
}
227+
return nil
228+
}
229+
}
230+
231+
func WithDedicatedPeriodicWorkers(dedicatedPeriodicWorkers int) Option {
232+
return withDedicatedPeriodicWorkers(dedicatedPeriodicWorkers, lanID, wanID)
233+
}
234+
235+
func WithDedicatedPeriodicWorkersLAN(dedicatedPeriodicWorkers int) Option {
236+
return withDedicatedPeriodicWorkers(dedicatedPeriodicWorkers, lanID)
237+
}
238+
239+
func WithDedicatedPeriodicWorkersWAN(dedicatedPeriodicWorkers int) Option {
240+
return withDedicatedPeriodicWorkers(dedicatedPeriodicWorkers, wanID)
241+
}
242+
243+
func withDedicatedBurstWorkers(dedicatedBurstWorkers int, dhts ...uint8) Option {
244+
return func(cfg *config) error {
245+
if dedicatedBurstWorkers < 0 {
246+
return fmt.Errorf("invalid dedicated burst workers %d", dedicatedBurstWorkers)
247+
}
248+
for _, dht := range dhts {
249+
cfg.dedicatedBurstWorkers[dht] = dedicatedBurstWorkers
250+
}
251+
return nil
252+
}
253+
}
254+
255+
func WithDedicatedBurstWorkers(dedicatedBurstWorkers int) Option {
256+
return withDedicatedBurstWorkers(dedicatedBurstWorkers, lanID, wanID)
257+
}
258+
259+
func WithDedicatedBurstWorkersLAN(dedicatedBurstWorkers int) Option {
260+
return withDedicatedBurstWorkers(dedicatedBurstWorkers, lanID)
261+
}
262+
263+
func WithDedicatedBurstWorkersWAN(dedicatedBurstWorkers int) Option {
264+
return withDedicatedBurstWorkers(dedicatedBurstWorkers, wanID)
265+
}
266+
267+
func withMaxProvideConnsPerWorker(maxProvideConnsPerWorker int, dhts ...uint8) Option {
268+
return func(cfg *config) error {
269+
if maxProvideConnsPerWorker <= 0 {
270+
return fmt.Errorf("invalid max provide conns per worker %d", maxProvideConnsPerWorker)
271+
}
272+
for _, dht := range dhts {
273+
cfg.maxProvideConnsPerWorker[dht] = maxProvideConnsPerWorker
274+
}
275+
return nil
276+
}
277+
}
278+
279+
func WithMaxProvideConnsPerWorker(maxProvideConnsPerWorker int) Option {
280+
return withMaxProvideConnsPerWorker(maxProvideConnsPerWorker, lanID, wanID)
281+
}
282+
283+
func WithMaxProvideConnsPerWorkerLAN(maxProvideConnsPerWorker int) Option {
284+
return withMaxProvideConnsPerWorker(maxProvideConnsPerWorker, lanID)
285+
}
286+
287+
func WithMaxProvideConnsPerWorkerWAN(maxProvideConnsPerWorker int) Option {
288+
return withMaxProvideConnsPerWorker(maxProvideConnsPerWorker, wanID)
289+
}
290+
291+
func withMessageSender(msgSender pb.MessageSender, dhts ...uint8) Option {
292+
return func(cfg *config) error {
293+
if msgSender == nil {
294+
return errors.New("provider config: message sender cannot be nil")
295+
}
296+
for _, dht := range dhts {
297+
cfg.msgSenders[dht] = msgSender
298+
}
299+
return nil
300+
}
301+
}
302+
303+
func WithMessageSenderLAN(msgSender pb.MessageSender) Option {
304+
return withMessageSender(msgSender, lanID)
305+
}
306+
307+
func WithMessageSenderWAN(msgSender pb.MessageSender) Option {
308+
return withMessageSender(msgSender, wanID)
309+
}

0 commit comments

Comments
 (0)