Skip to content

Commit ef61e17

Browse files
provider: connectivity state machine (#1135)
* update ConnectivityChecker * ai tests * update connectivity state machine * remove useless connectivity funcs * connectivity: get rid of internal state * docs and tests * fix(dual/provider): don't prevent providing if a DHT returns an error * address review
1 parent 3a0a531 commit ef61e17

File tree

9 files changed

+1303
-510
lines changed

9 files changed

+1303
-510
lines changed

dual/provider/options.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type config struct {
2424
reprovideInterval [2]time.Duration // [0] = LAN, [1] = WAN
2525
maxReprovideDelay [2]time.Duration
2626

27+
offlineDelay [2]time.Duration
2728
connectivityCheckOnlineInterval [2]time.Duration
2829
connectivityCheckOfflineInterval [2]time.Duration
2930

@@ -75,8 +76,8 @@ var DefaultConfig = func(cfg *config) error {
7576
cfg.reprovideInterval = [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval}
7677
cfg.maxReprovideDelay = [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay}
7778

79+
cfg.offlineDelay = [2]time.Duration{provider.DefaultOfflineDelay, provider.DefaultOfflineDelay}
7880
cfg.connectivityCheckOnlineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval}
79-
cfg.connectivityCheckOfflineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOfflineInterval, provider.DefaultConnectivityCheckOfflineInterval}
8081

8182
cfg.maxWorkers = [2]int{4, 4}
8283
cfg.dedicatedPeriodicWorkers = [2]int{2, 2}
@@ -144,6 +145,30 @@ func WithMaxReprovideDelayWAN(maxReprovideDelay time.Duration) Option {
144145
return withMaxReprovideDelay(maxReprovideDelay, wanID)
145146
}
146147

148+
func withOfflineDelay(offlineDelay time.Duration, dhts ...uint8) Option {
149+
return func(cfg *config) error {
150+
if offlineDelay < 0 {
151+
return fmt.Errorf("invalid offline delay %s", offlineDelay)
152+
}
153+
for _, dht := range dhts {
154+
cfg.offlineDelay[dht] = offlineDelay
155+
}
156+
return nil
157+
}
158+
}
159+
160+
func WithOfflineDelay(offlineDelay time.Duration) Option {
161+
return withOfflineDelay(offlineDelay, lanID, wanID)
162+
}
163+
164+
func WithOfflineDelayLAN(offlineDelay time.Duration) Option {
165+
return withOfflineDelay(offlineDelay, lanID)
166+
}
167+
168+
func WithOfflineDelayWAN(offlineDelay time.Duration) Option {
169+
return withOfflineDelay(offlineDelay, wanID)
170+
}
171+
147172
func withConnectivityCheckOnlineInterval(onlineInterval time.Duration, dhts ...uint8) Option {
148173
return func(cfg *config) error {
149174
if onlineInterval <= 0 {

dual/provider/provider.go

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,16 @@ package provider
33
import (
44
"context"
55
"errors"
6-
"sync/atomic"
6+
"fmt"
77

88
"github.com/ipfs/go-cid"
9-
logging "github.com/ipfs/go-log/v2"
109
dht "github.com/libp2p/go-libp2p-kad-dht"
1110
"github.com/libp2p/go-libp2p-kad-dht/dual"
1211
"github.com/libp2p/go-libp2p-kad-dht/provider"
1312
"github.com/libp2p/go-libp2p-kad-dht/provider/datastore"
1413
mh "github.com/multiformats/go-multihash"
1514
)
1615

17-
var logger = logging.Logger(provider.LoggerName)
18-
1916
// SweepingProvider manages provides and reprovides for both DHT swarms (LAN
2017
// and WAN) in the dual DHT setup.
2118
type SweepingProvider struct {
@@ -60,8 +57,8 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
6057
provider.WithMessageSender(cfg.msgSenders[i]),
6158
provider.WithReprovideInterval(cfg.reprovideInterval[i]),
6259
provider.WithMaxReprovideDelay(cfg.maxReprovideDelay[i]),
60+
provider.WithOfflineDelay(cfg.offlineDelay[i]),
6361
provider.WithConnectivityCheckOnlineInterval(cfg.connectivityCheckOnlineInterval[i]),
64-
provider.WithConnectivityCheckOfflineInterval(cfg.connectivityCheckOfflineInterval[i]),
6562
provider.WithMaxWorkers(cfg.maxWorkers[i]),
6663
provider.WithDedicatedPeriodicWorkers(cfg.dedicatedPeriodicWorkers[i]),
6764
provider.WithDedicatedBurstWorkers(cfg.dedicatedBurstWorkers[i]),
@@ -83,29 +80,38 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
8380

8481
// runOnBoth runs the provided function on both the LAN and WAN providers in
8582
// parallel and waits for both to complete.
86-
func (s *SweepingProvider) runOnBoth(wait bool, f func(*provider.SweepingProvider)) {
87-
if wait {
88-
done := make(chan struct{})
89-
go func() {
90-
defer close(done)
91-
f(s.LAN)
92-
}()
93-
f(s.WAN)
94-
<-done
95-
return
83+
func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider) error) error {
84+
var errs [2]error
85+
done := make(chan struct{})
86+
go func() {
87+
defer close(done)
88+
err := f(s.LAN)
89+
if err != nil {
90+
errs[0] = fmt.Errorf("LAN provider: %w", err)
91+
}
92+
}()
93+
err := f(s.WAN)
94+
if err != nil {
95+
errs[1] = fmt.Errorf("WAN provider: %w", err)
9696
}
97-
go f(s.LAN)
98-
go f(s.WAN)
97+
<-done
98+
return errors.Join(errs[:]...)
9999
}
100100

101101
// ProvideOnce sends provider records for the specified keys to both DHT swarms
102102
// only once. It does not automatically reprovide those keys afterward.
103103
//
104-
// Add the supplied multihashes to the provide queue, and return immediately.
104+
// Add the supplied multihashes to the provide queues, and return right after.
105105
// The provide operation happens asynchronously.
106-
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) {
107-
s.runOnBoth(false, func(p *provider.SweepingProvider) {
108-
p.ProvideOnce(keys...)
106+
//
107+
// Returns an error if the keys couldn't be added to the provide queue. This
108+
// can happen if the provider is closed or if the node is currently Offline
109+
// (either never bootstrapped, or disconnected since more than `OfflineDelay`).
110+
// The schedule and provide queue depend on the network size, hence recent
111+
// network connectivity is essential.
112+
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) error {
113+
return s.runOnBoth(func(p *provider.SweepingProvider) error {
114+
return p.ProvideOnce(keys...)
109115
})
110116
}
111117

@@ -120,23 +126,28 @@ func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) {
120126
//
121127
// This operation is asynchronous, it returns as soon as the `keys` are added
122128
// to the provide queue, and provides happens asynchronously.
123-
func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) {
129+
//
130+
// Returns an error if the keys couldn't be added to the provide queue. This
131+
// can happen if the provider is closed or if the node is currently Offline
132+
// (either never bootstrapped, or disconnected since more than `OfflineDelay`).
133+
// The schedule and provide queue depend on the network size, hence recent
134+
// network connectivity is essential.
135+
func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) error {
124136
ctx := context.Background()
125137
newKeys, err := s.keyStore.Put(ctx, keys...)
126138
if err != nil {
127-
logger.Warnf("failed to store multihashes: %v", err)
128-
return
139+
return fmt.Errorf("failed to store multihashes: %w", err)
129140
}
130141

131-
s.runOnBoth(false, func(p *provider.SweepingProvider) {
132-
p.AddToSchedule(newKeys...)
142+
s.runOnBoth(func(p *provider.SweepingProvider) error {
143+
return p.AddToSchedule(newKeys...)
133144
})
134145

135146
if !force {
136147
keys = newKeys
137148
}
138149

139-
s.ProvideOnce(keys...)
150+
return s.ProvideOnce(keys...)
140151
}
141152

142153
// StopProviding stops reproviding the given keys to both DHT swarms. The node
@@ -146,11 +157,12 @@ func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) {
146157
// Remove the `keys` from the schedule and return immediately. Valid records
147158
// can remain in the DHT swarms up to the provider record TTL after calling
148159
// `StopProviding`.
149-
func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) {
160+
func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) error {
150161
err := s.keyStore.Delete(context.Background(), keys...)
151162
if err != nil {
152-
logger.Warnf("failed to stop providing keys: %s", err)
163+
return fmt.Errorf("failed to stop providing keys: %w", err)
153164
}
165+
return nil
154166
}
155167

156168
// Clear clears the all the keys from the provide queues of both DHTs and
@@ -159,11 +171,7 @@ func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) {
159171
// The keys are not deleted from the keystore, so they will continue to be
160172
// reprovided as scheduled.
161173
func (s *SweepingProvider) Clear() int {
162-
var total atomic.Int32
163-
s.runOnBoth(true, func(p *provider.SweepingProvider) {
164-
total.Add(int32(p.Clear()))
165-
})
166-
return int(total.Load())
174+
return s.LAN.Clear() + s.WAN.Clear()
167175
}
168176

169177
// RefreshSchedule scans the KeyStore for any keys that are not currently
@@ -173,9 +181,14 @@ func (s *SweepingProvider) Clear() int {
173181
// This function doesn't remove prefixes that have no keys from the schedule.
174182
// This is done automatically during the reprovide operation if a region has no
175183
// keys.
176-
func (s *SweepingProvider) RefreshSchedule() {
177-
go s.runOnBoth(false, func(p *provider.SweepingProvider) {
178-
p.RefreshSchedule()
184+
//
185+
// Returns an error if the provider is closed or if the node is currently
186+
// Offline (either never bootstrapped, or disconnected since more than
187+
// `OfflineDelay`). The schedule depends on the network size, hence recent
188+
// network connectivity is essential.
189+
func (s *SweepingProvider) RefreshSchedule() error {
190+
return s.runOnBoth(func(p *provider.SweepingProvider) error {
191+
return p.RefreshSchedule()
179192
})
180193
}
181194

@@ -187,9 +200,9 @@ var (
187200
// dhtProvider is the interface to ensure that SweepingProvider and
188201
// provider.SweepingProvider share the same interface.
189202
type dhtProvider interface {
190-
StartProviding(force bool, keys ...mh.Multihash)
191-
StopProviding(keys ...mh.Multihash)
192-
ProvideOnce(keys ...mh.Multihash)
203+
StartProviding(force bool, keys ...mh.Multihash) error
204+
StopProviding(keys ...mh.Multihash) error
205+
ProvideOnce(keys ...mh.Multihash) error
193206
Clear() int
194-
RefreshSchedule()
207+
RefreshSchedule() error
195208
}

provider/datastore/keystore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type KeyStoreOption func(*keyStoreCfg) error
6767

6868
const (
6969
DefaultKeyStorePrefixBits = 10
70-
DefaultKeyStoreBasePrefix = "/reprovider/mhs"
70+
DefaultKeyStoreBasePrefix = "/reprovider/keystore"
7171
DefaultKeyStoreGCInterval = 2 * amino.DefaultReprovideInterval
7272
DefaultKeyStoreGCBatchSize = 1 << 14
7373
)

0 commit comments

Comments
 (0)