@@ -3,19 +3,16 @@ package provider
3
3
import (
4
4
"context"
5
5
"errors"
6
- "sync/atomic "
6
+ "fmt "
7
7
8
8
"github.com/ipfs/go-cid"
9
- logging "github.com/ipfs/go-log/v2"
10
9
dht "github.com/libp2p/go-libp2p-kad-dht"
11
10
"github.com/libp2p/go-libp2p-kad-dht/dual"
12
11
"github.com/libp2p/go-libp2p-kad-dht/provider"
13
12
"github.com/libp2p/go-libp2p-kad-dht/provider/datastore"
14
13
mh "github.com/multiformats/go-multihash"
15
14
)
16
15
17
- var logger = logging .Logger (provider .LoggerName )
18
-
19
16
// SweepingProvider manages provides and reprovides for both DHT swarms (LAN
20
17
// and WAN) in the dual DHT setup.
21
18
type SweepingProvider struct {
@@ -60,8 +57,8 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
60
57
provider .WithMessageSender (cfg .msgSenders [i ]),
61
58
provider .WithReprovideInterval (cfg .reprovideInterval [i ]),
62
59
provider .WithMaxReprovideDelay (cfg .maxReprovideDelay [i ]),
60
+ provider .WithOfflineDelay (cfg .offlineDelay [i ]),
63
61
provider .WithConnectivityCheckOnlineInterval (cfg .connectivityCheckOnlineInterval [i ]),
64
- provider .WithConnectivityCheckOfflineInterval (cfg .connectivityCheckOfflineInterval [i ]),
65
62
provider .WithMaxWorkers (cfg .maxWorkers [i ]),
66
63
provider .WithDedicatedPeriodicWorkers (cfg .dedicatedPeriodicWorkers [i ]),
67
64
provider .WithDedicatedBurstWorkers (cfg .dedicatedBurstWorkers [i ]),
@@ -83,29 +80,38 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
83
80
84
81
// runOnBoth runs the provided function on both the LAN and WAN providers in
85
82
// parallel and waits for both to complete.
86
- func (s * SweepingProvider ) runOnBoth (wait bool , f func (* provider.SweepingProvider )) {
87
- if wait {
88
- done := make (chan struct {})
89
- go func () {
90
- defer close (done )
91
- f (s .LAN )
92
- }()
93
- f (s .WAN )
94
- <- done
95
- return
83
+ func (s * SweepingProvider ) runOnBoth (f func (* provider.SweepingProvider ) error ) error {
84
+ var errs [2 ]error
85
+ done := make (chan struct {})
86
+ go func () {
87
+ defer close (done )
88
+ err := f (s .LAN )
89
+ if err != nil {
90
+ errs [0 ] = fmt .Errorf ("LAN provider: %w" , err )
91
+ }
92
+ }()
93
+ err := f (s .WAN )
94
+ if err != nil {
95
+ errs [1 ] = fmt .Errorf ("WAN provider: %w" , err )
96
96
}
97
- go f ( s . LAN )
98
- go f ( s . WAN )
97
+ <- done
98
+ return errors . Join ( errs [:] ... )
99
99
}
100
100
101
101
// ProvideOnce sends provider records for the specified keys to both DHT swarms
102
102
// only once. It does not automatically reprovide those keys afterward.
103
103
//
104
- // Add the supplied multihashes to the provide queue , and return immediately .
104
+ // Add the supplied multihashes to the provide queues , and return right after .
105
105
// The provide operation happens asynchronously.
106
- func (s * SweepingProvider ) ProvideOnce (keys ... mh.Multihash ) {
107
- s .runOnBoth (false , func (p * provider.SweepingProvider ) {
108
- p .ProvideOnce (keys ... )
106
+ //
107
+ // Returns an error if the keys couldn't be added to the provide queue. This
108
+ // can happen if the provider is closed or if the node is currently Offline
109
+ // (either never bootstrapped, or disconnected since more than `OfflineDelay`).
110
+ // The schedule and provide queue depend on the network size, hence recent
111
+ // network connectivity is essential.
112
+ func (s * SweepingProvider ) ProvideOnce (keys ... mh.Multihash ) error {
113
+ return s .runOnBoth (func (p * provider.SweepingProvider ) error {
114
+ return p .ProvideOnce (keys ... )
109
115
})
110
116
}
111
117
@@ -120,23 +126,28 @@ func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) {
120
126
//
121
127
// This operation is asynchronous, it returns as soon as the `keys` are added
122
128
// to the provide queue, and provides happens asynchronously.
123
- func (s * SweepingProvider ) StartProviding (force bool , keys ... mh.Multihash ) {
129
+ //
130
+ // Returns an error if the keys couldn't be added to the provide queue. This
131
+ // can happen if the provider is closed or if the node is currently Offline
132
+ // (either never bootstrapped, or disconnected since more than `OfflineDelay`).
133
+ // The schedule and provide queue depend on the network size, hence recent
134
+ // network connectivity is essential.
135
+ func (s * SweepingProvider ) StartProviding (force bool , keys ... mh.Multihash ) error {
124
136
ctx := context .Background ()
125
137
newKeys , err := s .keyStore .Put (ctx , keys ... )
126
138
if err != nil {
127
- logger .Warnf ("failed to store multihashes: %v" , err )
128
- return
139
+ return fmt .Errorf ("failed to store multihashes: %w" , err )
129
140
}
130
141
131
- s .runOnBoth (false , func (p * provider.SweepingProvider ) {
132
- p .AddToSchedule (newKeys ... )
142
+ s .runOnBoth (func (p * provider.SweepingProvider ) error {
143
+ return p .AddToSchedule (newKeys ... )
133
144
})
134
145
135
146
if ! force {
136
147
keys = newKeys
137
148
}
138
149
139
- s .ProvideOnce (keys ... )
150
+ return s .ProvideOnce (keys ... )
140
151
}
141
152
142
153
// StopProviding stops reproviding the given keys to both DHT swarms. The node
@@ -146,11 +157,12 @@ func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) {
146
157
// Remove the `keys` from the schedule and return immediately. Valid records
147
158
// can remain in the DHT swarms up to the provider record TTL after calling
148
159
// `StopProviding`.
149
- func (s * SweepingProvider ) StopProviding (keys ... mh.Multihash ) {
160
+ func (s * SweepingProvider ) StopProviding (keys ... mh.Multihash ) error {
150
161
err := s .keyStore .Delete (context .Background (), keys ... )
151
162
if err != nil {
152
- logger . Warnf ("failed to stop providing keys: %s " , err )
163
+ return fmt . Errorf ("failed to stop providing keys: %w " , err )
153
164
}
165
+ return nil
154
166
}
155
167
156
168
// Clear clears the all the keys from the provide queues of both DHTs and
@@ -159,11 +171,7 @@ func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) {
159
171
// The keys are not deleted from the keystore, so they will continue to be
160
172
// reprovided as scheduled.
161
173
func (s * SweepingProvider ) Clear () int {
162
- var total atomic.Int32
163
- s .runOnBoth (true , func (p * provider.SweepingProvider ) {
164
- total .Add (int32 (p .Clear ()))
165
- })
166
- return int (total .Load ())
174
+ return s .LAN .Clear () + s .WAN .Clear ()
167
175
}
168
176
169
177
// RefreshSchedule scans the KeyStore for any keys that are not currently
@@ -173,9 +181,14 @@ func (s *SweepingProvider) Clear() int {
173
181
// This function doesn't remove prefixes that have no keys from the schedule.
174
182
// This is done automatically during the reprovide operation if a region has no
175
183
// keys.
176
- func (s * SweepingProvider ) RefreshSchedule () {
177
- go s .runOnBoth (false , func (p * provider.SweepingProvider ) {
178
- p .RefreshSchedule ()
184
+ //
185
+ // Returns an error if the provider is closed or if the node is currently
186
+ // Offline (either never bootstrapped, or disconnected since more than
187
+ // `OfflineDelay`). The schedule depends on the network size, hence recent
188
+ // network connectivity is essential.
189
+ func (s * SweepingProvider ) RefreshSchedule () error {
190
+ return s .runOnBoth (func (p * provider.SweepingProvider ) error {
191
+ return p .RefreshSchedule ()
179
192
})
180
193
}
181
194
@@ -187,9 +200,9 @@ var (
187
200
// dhtProvider is the interface to ensure that SweepingProvider and
188
201
// provider.SweepingProvider share the same interface.
189
202
type dhtProvider interface {
190
- StartProviding (force bool , keys ... mh.Multihash )
191
- StopProviding (keys ... mh.Multihash )
192
- ProvideOnce (keys ... mh.Multihash )
203
+ StartProviding (force bool , keys ... mh.Multihash ) error
204
+ StopProviding (keys ... mh.Multihash ) error
205
+ ProvideOnce (keys ... mh.Multihash ) error
193
206
Clear () int
194
- RefreshSchedule ()
207
+ RefreshSchedule () error
195
208
}
0 commit comments