Skip to content

Commit 47a1bf4

Browse files
committed
feat: buffer inbound provides and add a non-blocking option
That way, overloaded nodes can drop provides.
1 parent 3cf54bb commit 47a1bf4

File tree

6 files changed

+103
-32
lines changed

6 files changed

+103
-32
lines changed

dht_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1343,7 +1343,8 @@ func TestClientModeConnect(t *testing.T) {
13431343

13441344
c := testCaseCids[0]
13451345
p := peer.ID("TestPeer")
1346-
a.ProviderManager.AddProvider(ctx, c.Hash(), p)
1346+
err := a.ProviderManager.AddProviderNonBlocking(ctx, c.Hash(), p)
1347+
require.NoError(t, err)
13471348
time.Sleep(time.Millisecond * 5) // just in case...
13481349

13491350
provs, err := b.FindProviders(ctx, c)

fullrt/dht.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -773,9 +773,15 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
773773
logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
774774

775775
// add self locally
776-
dht.ProviderManager.AddProvider(ctx, keyMH, dht.h.ID())
776+
err = dht.ProviderManager.AddProvider(ctx, keyMH, dht.h.ID())
777777
if !brdcst {
778-
return nil
778+
// If we're not broadcasting, return immediately. But also return the error because,
779+
// if something went wrong, we basically failed to do anything.
780+
return err
781+
}
782+
if err != nil {
783+
// Otherwise, "local" provides are "best effort".
784+
logger.Debugw("local provide failed", "error", err)
779785
}
780786

781787
closerCtx := ctx

handlers.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,10 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
366366
// add the received addresses to our peerstore.
367367
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
368368
}
369-
dht.ProviderManager.AddProvider(ctx, key, p)
369+
err := dht.ProviderManager.AddProviderNonBlocking(ctx, key, p)
370+
if err != nil {
371+
return nil, err
372+
}
370373
}
371374

372375
return nil, nil

providers/providers_manager.go

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package providers
33
import (
44
"context"
55
"encoding/binary"
6+
"errors"
67
"fmt"
78
"strings"
89
"time"
@@ -19,6 +20,11 @@ import (
1920
base32 "github.com/multiformats/go-base32"
2021
)
2122

23+
var (
24+
ErrWouldBlock = errors.New("provide would block")
25+
ErrClosing = errors.New("provider manager is closing")
26+
)
27+
2228
// ProvidersKeyPrefix is the prefix/namespace for ALL provider record
2329
// keys stored in the data store.
2430
const ProvidersKeyPrefix = "/providers/"
@@ -29,6 +35,7 @@ var defaultCleanupInterval = time.Hour
2935
var lruCacheSize = 256
3036
var batchBufferSize = 256
3137
var log = logging.Logger("providers")
38+
var defaultProvideBufferSize = 256
3239

3340
// ProviderManager adds and pulls providers out of the datastore,
3441
// caching them in between
@@ -38,9 +45,10 @@ type ProviderManager struct {
3845
cache lru.LRUCache
3946
dstore *autobatch.Datastore
4047

41-
newprovs chan *addProv
42-
getprovs chan *getProv
43-
proc goprocess.Process
48+
nonBlocking bool
49+
newprovs chan *addProv
50+
getprovs chan *getProv
51+
proc goprocess.Process
4452

4553
cleanupInterval time.Duration
4654
}
@@ -75,9 +83,19 @@ func Cache(c lru.LRUCache) Option {
7583
}
7684
}
7785

86+
// NonBlockingProvide causes the provide manager to drop inbound provides when the queue is full
87+
// instead of blocking.
88+
func NonBlockingProvide(nonBlocking bool) Option {
89+
return func(pm *ProviderManager) error {
90+
pm.nonBlocking = nonBlocking
91+
return nil
92+
}
93+
}
94+
7895
type addProv struct {
79-
key []byte
80-
val peer.ID
96+
key []byte
97+
val peer.ID
98+
resp chan error
8199
}
82100

83101
type getProv struct {
@@ -89,7 +107,7 @@ type getProv struct {
89107
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
90108
pm := new(ProviderManager)
91109
pm.getprovs = make(chan *getProv)
92-
pm.newprovs = make(chan *addProv)
110+
pm.newprovs = make(chan *addProv, defaultProvideBufferSize)
93111
pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
94112
cache, err := lru.NewLRU(lruCacheSize, nil)
95113
if err != nil {
@@ -134,6 +152,9 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
134152
select {
135153
case np := <-pm.newprovs:
136154
err := pm.addProv(np.key, np.val)
155+
if np.resp != nil {
156+
np.resp <- err
157+
}
137158
if err != nil {
138159
log.Error("error adding new providers: ", err)
139160
continue
@@ -213,15 +234,50 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
213234
}
214235
}
215236

216-
// AddProvider adds a provider
217-
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) {
237+
// AddProviderNonBlocking adds a provider
238+
func (pm *ProviderManager) AddProviderNonBlocking(ctx context.Context, k []byte, val peer.ID) error {
218239
prov := &addProv{
219240
key: k,
220241
val: val,
221242
}
243+
if pm.nonBlocking {
244+
select {
245+
case pm.newprovs <- prov:
246+
default:
247+
return ErrWouldBlock
248+
}
249+
} else {
250+
select {
251+
case pm.newprovs <- prov:
252+
case <-pm.proc.Closing():
253+
return ErrClosing
254+
case <-ctx.Done():
255+
return ctx.Err()
256+
}
257+
}
258+
return nil
259+
}
260+
261+
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) error {
262+
prov := &addProv{
263+
key: k,
264+
val: val,
265+
resp: make(chan error, 1),
266+
}
222267
select {
223268
case pm.newprovs <- prov:
269+
case <-pm.proc.Closing():
270+
return ErrClosing
271+
case <-ctx.Done():
272+
return ctx.Err()
273+
}
274+
select {
275+
case err := <-prov.resp:
276+
return err
224277
case <-ctx.Done():
278+
return ctx.Err()
279+
case <-pm.proc.Closing():
280+
return ErrClosing
225281
}
226282
}
227283

providers/providers_manager_test.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/libp2p/go-libp2p-core/peer"
12+
"github.com/stretchr/testify/require"
1213

1314
mh "github.com/multiformats/go-multihash"
1415

@@ -31,7 +32,7 @@ func TestProviderManager(t *testing.T) {
3132
t.Fatal(err)
3233
}
3334
a := u.Hash([]byte("test"))
34-
p.AddProvider(ctx, a, peer.ID("testingprovider"))
35+
require.NoError(t, p.AddProvider(ctx, a, peer.ID("testingprovider")))
3536

3637
// Not cached
3738
// TODO verify that cache is empty
@@ -47,8 +48,8 @@ func TestProviderManager(t *testing.T) {
4748
t.Fatal("Could not retrieve provider.")
4849
}
4950

50-
p.AddProvider(ctx, a, peer.ID("testingprovider2"))
51-
p.AddProvider(ctx, a, peer.ID("testingprovider3"))
51+
require.NoError(t, p.AddProvider(ctx, a, peer.ID("testingprovider2")))
52+
require.NoError(t, p.AddProvider(ctx, a, peer.ID("testingprovider3")))
5253
// TODO verify that cache is already up to date
5354
resp = p.GetProviders(ctx, a)
5455
if len(resp) != 3 {
@@ -78,7 +79,7 @@ func TestProvidersDatastore(t *testing.T) {
7879
for i := 0; i < 100; i++ {
7980
h := u.Hash([]byte(fmt.Sprint(i)))
8081
mhs = append(mhs, h)
81-
p.AddProvider(ctx, h, friend)
82+
require.NoError(t, p.AddProvider(ctx, h, friend))
8283
}
8384

8485
for _, c := range mhs {
@@ -165,15 +166,15 @@ func TestProvidesExpire(t *testing.T) {
165166
}
166167

167168
for _, h := range mhs[:5] {
168-
p.AddProvider(ctx, h, peers[0])
169-
p.AddProvider(ctx, h, peers[1])
169+
require.NoError(t, p.AddProvider(ctx, h, peers[0]))
170+
require.NoError(t, p.AddProvider(ctx, h, peers[1]))
170171
}
171172

172173
time.Sleep(time.Second / 4)
173174

174175
for _, h := range mhs[5:] {
175-
p.AddProvider(ctx, h, peers[0])
176-
p.AddProvider(ctx, h, peers[1])
176+
require.NoError(t, p.AddProvider(ctx, h, peers[0]))
177+
require.NoError(t, p.AddProvider(ctx, h, peers[1]))
177178
}
178179

179180
for _, h := range mhs {
@@ -271,7 +272,7 @@ func TestLargeProvidersSet(t *testing.T) {
271272
h := u.Hash([]byte(fmt.Sprint(i)))
272273
mhs = append(mhs, h)
273274
for _, pid := range peers {
274-
p.AddProvider(ctx, h, pid)
275+
require.NoError(t, p.AddProvider(ctx, h, pid))
275276
}
276277
}
277278

@@ -296,16 +297,14 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
296297
h1 := u.Hash([]byte("1"))
297298
h2 := u.Hash([]byte("2"))
298299
pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
299-
if err != nil {
300-
t.Fatal(err)
301-
}
300+
require.NoError(t, err)
302301

303302
// add provider
304-
pm.AddProvider(ctx, h1, p1)
303+
require.NoError(t, pm.AddProvider(ctx, h1, p1))
305304
// make the cached provider for h1 go to datastore
306-
pm.AddProvider(ctx, h2, p1)
305+
require.NoError(t, pm.AddProvider(ctx, h2, p1))
307306
// now just offloaded record should be brought back and joined with p2
308-
pm.AddProvider(ctx, h1, p2)
307+
require.NoError(t, pm.AddProvider(ctx, h1, p2))
309308

310309
h1Provs := pm.GetProviders(ctx, h1)
311310
if len(h1Provs) != 2 {
@@ -325,11 +324,11 @@ func TestWriteUpdatesCache(t *testing.T) {
325324
}
326325

327326
// add provider
328-
pm.AddProvider(ctx, h1, p1)
327+
require.NoError(t, pm.AddProvider(ctx, h1, p1))
329328
// force into the cache
330-
pm.GetProviders(ctx, h1)
329+
_ = pm.GetProviders(ctx, h1)
331330
// add a second provider
332-
pm.AddProvider(ctx, h1, p2)
331+
require.NoError(t, pm.AddProvider(ctx, h1, p2))
333332

334333
c1Provs := pm.GetProviders(ctx, h1)
335334
if len(c1Provs) != 2 {

routing.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,9 +403,15 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
403403
logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
404404

405405
// add self locally
406-
dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
406+
err = dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
407407
if !brdcst {
408-
return nil
408+
// If we're not broadcasting, return immediately. But also return the error because,
409+
// if something went wrong, we basically failed to do anything.
410+
return err
411+
}
412+
if err != nil {
413+
// Otherwise, "local" provides are "best effort".
414+
logger.Debugw("local provide failed", "error", err)
409415
}
410416

411417
closerCtx := ctx

0 commit comments

Comments
 (0)