Skip to content

Commit 67732d6

Browse files
committed
feat: add extended routing functions that take functional options and return the closest peers used in the query
1 parent 7855259 commit 67732d6

File tree

2 files changed

+139
-56
lines changed

2 files changed

+139
-56
lines changed

routing.go

Lines changed: 111 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -26,52 +26,53 @@ import (
2626

2727
// Basic Put/Get
2828

29-
// PutValue adds value corresponding to given Key.
30-
// This is the top level "Store" operation of the DHT
31-
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
29+
// PutValueExtended adds value corresponding to given Key.
30+
func (dht *IpfsDHT) PutValueExtended(ctx context.Context, key string, value []byte, opts ...routing.Option) ([]peer.ID, error) {
3231
if !dht.enableValues {
33-
return routing.ErrNotSupported
32+
return nil, routing.ErrNotSupported
3433
}
3534

3635
logger.Debugw("putting value", "key", internal.LoggableRecordKeyString(key))
3736

3837
// don't even allow local users to put bad values.
3938
if err := dht.Validator.Validate(key, value); err != nil {
40-
return err
39+
return nil, err
4140
}
4241

4342
old, err := dht.getLocal(key)
4443
if err != nil {
4544
// Means something is wrong with the datastore.
46-
return err
45+
return nil, err
4746
}
4847

4948
// Check if we have an old value that's not the same as the new one.
5049
if old != nil && !bytes.Equal(old.GetValue(), value) {
5150
// Check to see if the new one is better.
5251
i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
5352
if err != nil {
54-
return err
53+
return nil, err
5554
}
5655
if i != 0 {
57-
return fmt.Errorf("can't replace a newer value with an older value")
56+
return nil, fmt.Errorf("can't replace a newer value with an older value")
5857
}
5958
}
6059

6160
rec := record.MakePutRecord(key, value)
6261
rec.TimeReceived = u.FormatRFC3339(time.Now())
6362
err = dht.putLocal(key, rec)
6463
if err != nil {
65-
return err
64+
return nil, err
6665
}
6766

6867
pchan, err := dht.GetClosestPeers(ctx, key)
6968
if err != nil {
70-
return err
69+
return nil, err
7170
}
7271

72+
closestPeers := make([]peer.ID, 0, dht.bucketSize)
7373
wg := sync.WaitGroup{}
7474
for p := range pchan {
75+
closestPeers = append(closestPeers, p)
7576
wg.Add(1)
7677
go func(p peer.ID) {
7778
ctx, cancel := context.WithCancel(ctx)
@@ -90,7 +91,15 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
9091
}
9192
wg.Wait()
9293

93-
return nil
94+
return closestPeers, nil
95+
}
96+
97+
// PutValue adds value corresponding to given Key.
98+
// This is the top level "Store" operation of the DHT
99+
100+
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
101+
_, err = dht.PutValueExtended(ctx, key, value, opts...)
102+
return err
94103
}
95104

96105
// RecvdVal stores a value and the peer from which we got the value.
@@ -126,15 +135,15 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op
126135
return best, nil
127136
}
128137

129-
// SearchValue searches for the value corresponding to given Key and streams the results.
130-
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
138+
// SearchValueExtended searches for the value corresponding to given Key and streams the results.
139+
func (dht *IpfsDHT) SearchValueExtended(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, <-chan []peer.ID, error) {
131140
if !dht.enableValues {
132-
return nil, routing.ErrNotSupported
141+
return nil, nil, routing.ErrNotSupported
133142
}
134143

135144
var cfg routing.Options
136145
if err := cfg.Apply(opts...); err != nil {
137-
return nil, err
146+
return nil, nil, err
138147
}
139148

140149
responsesNeeded := 0
@@ -146,33 +155,45 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing
146155
valCh, lookupRes := dht.getValues(ctx, key, stopCh)
147156

148157
out := make(chan []byte)
158+
peers := make(chan []peer.ID, 1)
149159
go func() {
150160
defer close(out)
161+
defer close(peers)
151162
best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
163+
164+
var l *lookupWithFollowupResult
165+
select {
166+
case l = <-lookupRes:
167+
case <-ctx.Done():
168+
return
169+
}
170+
171+
if l == nil {
172+
return
173+
}
174+
152175
if best == nil || aborted {
176+
peers <- l.peers
153177
return
154178
}
155179

156180
updatePeers := make([]peer.ID, 0, dht.bucketSize)
157-
select {
158-
case l := <-lookupRes:
159-
if l == nil {
160-
return
161-
}
162-
163-
for _, p := range l.peers {
164-
if _, ok := peersWithBest[p]; !ok {
165-
updatePeers = append(updatePeers, p)
166-
}
181+
for _, p := range l.peers {
182+
if _, ok := peersWithBest[p]; !ok {
183+
updatePeers = append(updatePeers, p)
167184
}
168-
case <-ctx.Done():
169-
return
170185
}
171186

172187
dht.updatePeerValues(dht.Context(), key, best, updatePeers)
173188
}()
174189

175-
return out, nil
190+
return out, peers, nil
191+
}
192+
193+
// SearchValue searches for the value corresponding to given Key and streams the results.
194+
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
195+
out, _, err := dht.SearchValueExtended(ctx, key, opts...)
196+
return out, err
176197
}
177198

178199
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
@@ -385,20 +406,20 @@ func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollow
385406
// Some DHTs store values directly, while an indirect store stores pointers to
386407
// locations of the value, similarly to Coral and Mainline DHT.
387408

388-
// Provide makes this node announce that it can provide a value for the given key
389-
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
409+
// ProvideExtended makes this node announce that it can provide a value for the given key
410+
func (dht *IpfsDHT) ProvideExtended(ctx context.Context, key cid.Cid, brdcst bool, opts ...routing.Option) ([]peer.ID, error) {
390411
if !dht.enableProviders {
391-
return routing.ErrNotSupported
412+
return nil, routing.ErrNotSupported
392413
} else if !key.Defined() {
393-
return fmt.Errorf("invalid cid: undefined")
414+
return nil, fmt.Errorf("invalid cid: undefined")
394415
}
395416
keyMH := key.Hash()
396417
logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
397418

398419
// add self locally
399420
dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
400421
if !brdcst {
401-
return nil
422+
return nil, nil
402423
}
403424

404425
closerCtx := ctx
@@ -408,7 +429,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
408429

409430
if timeout < 0 {
410431
// timed out
411-
return context.DeadlineExceeded
432+
return nil, context.DeadlineExceeded
412433
} else if timeout < 10*time.Second {
413434
// Reserve 10% for the final put.
414435
deadline = deadline.Add(-timeout / 10)
@@ -430,16 +451,18 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
430451
// context is still fine, provide the value to the closest peers
431452
// we managed to find, even if they're not the _actual_ closest peers.
432453
if ctx.Err() != nil {
433-
return ctx.Err()
454+
return nil, ctx.Err()
434455
}
435456
exceededDeadline = true
436457
case nil:
437458
default:
438-
return err
459+
return nil, err
439460
}
440461

462+
closestPeers := make([]peer.ID, 0, dht.bucketSize)
441463
wg := sync.WaitGroup{}
442464
for p := range peers {
465+
closestPeers = append(closestPeers, p)
443466
wg.Add(1)
444467
go func(p peer.ID) {
445468
defer wg.Done()
@@ -452,9 +475,15 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
452475
}
453476
wg.Wait()
454477
if exceededDeadline {
455-
return context.DeadlineExceeded
478+
return nil, context.DeadlineExceeded
456479
}
457-
return ctx.Err()
480+
return closestPeers, ctx.Err()
481+
}
482+
483+
// Provide makes this node announce that it can provide a value for the given key
484+
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
485+
_, err = dht.ProvideExtended(ctx, key, brdcst)
486+
return err
458487
}
459488

460489
// FindProviders searches until the context expires.
@@ -472,33 +501,49 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn
472501
return providers, nil
473502
}
474503

475-
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
476-
// Peers will be returned on the channel as soon as they are found, even before
477-
// the search query completes. If count is zero then the query will run until it
478-
// completes. Note: not reading from the returned channel may block the query
479-
// from progressing.
480-
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
504+
// FindProvidersAsyncExtended searches until the context expires.
505+
func (dht *IpfsDHT) FindProvidersAsyncExtended(ctx context.Context, key cid.Cid, opts ...routing.Option) (<-chan peer.AddrInfo, <-chan []peer.ID, error) {
481506
if !dht.enableProviders || !key.Defined() {
482-
peerOut := make(chan peer.AddrInfo)
507+
peerOut, closestPeers := make(chan peer.AddrInfo), make(chan []peer.ID)
483508
close(peerOut)
484-
return peerOut
509+
close(closestPeers)
510+
return peerOut, closestPeers, routing.ErrNotSupported
511+
}
512+
513+
var cfg routing.Options
514+
if err := cfg.Apply(opts...); err != nil {
515+
return nil, nil, err
485516
}
486517

518+
count := dhtrouting.GetQuorum(&cfg)
519+
487520
chSize := count
488521
if count == 0 {
489522
chSize = 1
490523
}
491524
peerOut := make(chan peer.AddrInfo, chSize)
525+
closestPeersOut := make(chan []peer.ID, 1)
492526

493527
keyMH := key.Hash()
494528

495529
logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
496-
go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
497-
return peerOut
530+
go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut, closestPeersOut)
531+
return peerOut, closestPeersOut, nil
532+
}
533+
534+
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
535+
// Peers will be returned on the channel as soon as they are found, even before
536+
// the search query completes. If count is zero then the query will run until it
537+
// completes. Note: not reading from the returned channel may block the query
538+
// from progressing.
539+
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
540+
providers, _, _ := dht.FindProvidersAsyncExtended(ctx, key, dhtrouting.Quorum(count))
541+
return providers
498542
}
499543

500-
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
544+
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo, closestPeersOut chan []peer.ID) {
501545
defer close(peerOut)
546+
defer close(closestPeersOut)
502547

503548
findAll := count == 0
504549
var ps *peer.Set
@@ -577,22 +622,26 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
577622
},
578623
)
579624

625+
if lookupRes != nil {
626+
closestPeersOut <- lookupRes.peers
627+
}
628+
580629
if err == nil && ctx.Err() == nil {
581630
dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes)
582631
}
583632
}
584633

585-
// FindPeer searches for a peer with given ID.
586-
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
634+
// FindPeerExtended searches for a peer with given ID.
635+
func (dht *IpfsDHT) FindPeerExtended(ctx context.Context, id peer.ID, opts ...routing.Option) (peer.AddrInfo, []peer.ID, error) {
587636
if err := id.Validate(); err != nil {
588-
return peer.AddrInfo{}, err
637+
return peer.AddrInfo{}, nil, err
589638
}
590639

591640
logger.Debugw("finding peer", "peer", id)
592641

593642
// Check if were already connected to them
594643
if pi := dht.FindLocal(id); pi.ID != "" {
595-
return pi, nil
644+
return pi, nil, nil
596645
}
597646

598647
lookupRes, err := dht.runLookupWithFollowup(ctx, string(id),
@@ -624,7 +673,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
624673
)
625674

626675
if err != nil {
627-
return peer.AddrInfo{}, err
676+
return peer.AddrInfo{}, nil, err
628677
}
629678

630679
dialedPeerDuringQuery := false
@@ -642,8 +691,14 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
642691
// to the peer.
643692
connectedness := dht.host.Network().Connectedness(id)
644693
if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect {
645-
return dht.peerstore.PeerInfo(id), nil
694+
return dht.peerstore.PeerInfo(id), lookupRes.peers, nil
646695
}
647696

648-
return peer.AddrInfo{}, routing.ErrNotFound
697+
return peer.AddrInfo{}, lookupRes.peers, routing.ErrNotFound
698+
}
699+
700+
// FindPeer searches for a peer with given ID.
701+
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
702+
pid, _, err := dht.FindPeerExtended(ctx, id)
703+
return pid, err
649704
}

routing/options.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package routing
22

33
import (
4+
"github.com/libp2p/go-libp2p-core/peer"
45
"github.com/libp2p/go-libp2p-core/routing"
56
)
67

@@ -30,3 +31,30 @@ func GetQuorum(opts *routing.Options) int {
3031
}
3132
return responsesNeeded
3233
}
34+
35+
type seedPeersOptionKey struct{}
36+
type SeedPeersOptions struct {
37+
SeedPeers []peer.ID
38+
UseRTPeers bool
39+
}
40+
41+
// SeedPeers is a DHT option that tells the DHT which peers it should use to seed a DHT query.
42+
//
43+
// Default: Use BucketSize closest peers to the target that are in the routing table
44+
func SeedPeers(seedPeers []peer.ID, useRoutingTablePeers bool) routing.Option {
45+
return func(opts *routing.Options) error {
46+
if opts.Other == nil {
47+
opts.Other = make(map[interface{}]interface{}, 1)
48+
}
49+
opts.Other[seedPeersOptionKey{}] = SeedPeersOptions{SeedPeers: seedPeers, UseRTPeers: useRoutingTablePeers}
50+
return nil
51+
}
52+
}
53+
54+
func GetSeedPeers(opts *routing.Options) SeedPeersOptions {
55+
seedPeersOpts, ok := opts.Other[seedPeersOptionKey{}].(SeedPeersOptions)
56+
if !ok {
57+
seedPeersOpts = SeedPeersOptions{UseRTPeers: true}
58+
}
59+
return seedPeersOpts
60+
}

0 commit comments

Comments
 (0)