Skip to content

Commit d3a0f5c

Browse files
committed
feat: add support for controlling the seed peers used in queries
1 parent 67732d6 commit d3a0f5c

File tree

3 files changed

+54
-11
lines changed

3 files changed

+54
-11
lines changed

lookup.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ import (
1717
// If the context is canceled, this function will return the context error along
1818
// with the closest K peers it has found so far.
1919
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
20+
return dht.GetClosestPeersSeeded(ctx, key, nil, true)
21+
}
22+
23+
// GetClosestPeersSeeded is the Kademlia 'node lookup' operation
24+
func (dht *IpfsDHT) GetClosestPeersSeeded(ctx context.Context, key string, seedPeers []peer.ID, useRTPeers bool) (<-chan peer.ID, error) {
2025
if key == "" {
2126
return nil, fmt.Errorf("can't lookup empty key")
2227
}
@@ -45,6 +50,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
4550
return peers, err
4651
},
4752
func() bool { return false },
53+
seedPeers, useRTPeers,
4854
)
4955

5056
if err != nil {

query.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ type lookupWithFollowupResult struct {
7676
//
7777
// After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the
7878
// lookup that have not already been successfully queried.
79-
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
79+
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, seedPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) {
8080
// run the query
81-
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn)
81+
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn, seedPeers, useRTPeers)
8282
if err != nil {
8383
return nil, err
8484
}
@@ -145,10 +145,23 @@ processFollowUp:
145145
return lookupRes, nil
146146
}
147147

148-
func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
148+
func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, manuallySeededPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) {
149149
// pick the K closest peers to the key in our Routing table.
150150
targetKadID := kb.ConvertKey(target)
151-
seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)
151+
152+
seedPeerSet := peer.NewSet()
153+
for _, p := range manuallySeededPeers {
154+
seedPeerSet.Add(p)
155+
}
156+
157+
if manuallySeededPeers == nil || useRTPeers {
158+
RTSeedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)
159+
for _, p := range RTSeedPeers {
160+
seedPeerSet.Add(p)
161+
}
162+
}
163+
164+
seedPeers := seedPeerSet.Peers()
152165
if len(seedPeers) == 0 {
153166
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
154167
Type: routing.QueryError,

routing.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ func (dht *IpfsDHT) PutValueExtended(ctx context.Context, key string, value []by
3434

3535
logger.Debugw("putting value", "key", internal.LoggableRecordKeyString(key))
3636

37+
var cfg routing.Options
38+
if err := cfg.Apply(opts...); err != nil {
39+
return nil, err
40+
}
41+
seedPeerOpts := dhtrouting.GetSeedPeers(&cfg)
42+
3743
// don't even allow local users to put bad values.
3844
if err := dht.Validator.Validate(key, value); err != nil {
3945
return nil, err
@@ -64,7 +70,7 @@ func (dht *IpfsDHT) PutValueExtended(ctx context.Context, key string, value []by
6470
return nil, err
6571
}
6672

67-
pchan, err := dht.GetClosestPeers(ctx, key)
73+
pchan, err := dht.GetClosestPeersSeeded(ctx, key, seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers)
6874
if err != nil {
6975
return nil, err
7076
}
@@ -151,8 +157,10 @@ func (dht *IpfsDHT) SearchValueExtended(ctx context.Context, key string, opts ..
151157
responsesNeeded = dhtrouting.GetQuorum(&cfg)
152158
}
153159

160+
seedPeerOpts := dhtrouting.GetSeedPeers(&cfg)
161+
154162
stopCh := make(chan struct{})
155-
valCh, lookupRes := dht.getValues(ctx, key, stopCh)
163+
valCh, lookupRes := dht.getValues(ctx, key, seedPeerOpts, stopCh)
156164

157165
out := make(chan []byte)
158166
peers := make(chan []peer.ID, 1)
@@ -226,7 +234,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R
226234

227235
queryCtx, cancel := context.WithCancel(ctx)
228236
defer cancel()
229-
valCh, _ := dht.getValues(queryCtx, key, nil)
237+
valCh, _ := dht.getValues(queryCtx, key, dhtrouting.SeedPeersOptions{UseRTPeers: true}, nil)
230238

231239
out := make([]RecvdVal, 0, nvals)
232240
for val := range valCh {
@@ -304,7 +312,7 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte
304312
}
305313
}
306314

307-
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
315+
func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtrouting.SeedPeersOptions, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
308316
valCh := make(chan RecvdVal, 1)
309317
lookupResCh := make(chan *lookupWithFollowupResult, 1)
310318

@@ -380,6 +388,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
380388
return false
381389
}
382390
},
391+
seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers,
383392
)
384393

385394
if err != nil {
@@ -416,6 +425,12 @@ func (dht *IpfsDHT) ProvideExtended(ctx context.Context, key cid.Cid, brdcst boo
416425
keyMH := key.Hash()
417426
logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
418427

428+
var cfg routing.Options
429+
if err := cfg.Apply(opts...); err != nil {
430+
return nil, err
431+
}
432+
seedPeerOpts := dhtrouting.GetSeedPeers(&cfg)
433+
419434
// add self locally
420435
dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
421436
if !brdcst {
@@ -444,7 +459,7 @@ func (dht *IpfsDHT) ProvideExtended(ctx context.Context, key cid.Cid, brdcst boo
444459
}
445460

446461
var exceededDeadline bool
447-
peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
462+
peers, err := dht.GetClosestPeersSeeded(closerCtx, string(keyMH), seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers)
448463
switch err {
449464
case context.DeadlineExceeded:
450465
// If the _inner_ deadline has been exceeded but the _outer_
@@ -516,6 +531,7 @@ func (dht *IpfsDHT) FindProvidersAsyncExtended(ctx context.Context, key cid.Cid,
516531
}
517532

518533
count := dhtrouting.GetQuorum(&cfg)
534+
seedPeerOpts := dhtrouting.GetSeedPeers(&cfg)
519535

520536
chSize := count
521537
if count == 0 {
@@ -527,7 +543,7 @@ func (dht *IpfsDHT) FindProvidersAsyncExtended(ctx context.Context, key cid.Cid,
527543
keyMH := key.Hash()
528544

529545
logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
530-
go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut, closestPeersOut)
546+
go dht.findProvidersAsyncRoutine(ctx, keyMH, count, seedPeerOpts, peerOut, closestPeersOut)
531547
return peerOut, closestPeersOut, nil
532548
}
533549

@@ -541,7 +557,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
541557
return providers
542558
}
543559

544-
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo, closestPeersOut chan []peer.ID) {
560+
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, seedPeerOpts dhtrouting.SeedPeersOptions, peerOut chan peer.AddrInfo, closestPeersOut chan []peer.ID) {
545561
defer close(peerOut)
546562
defer close(closestPeersOut)
547563

@@ -620,6 +636,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
620636
func() bool {
621637
return !findAll && ps.Size() >= count
622638
},
639+
seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers,
623640
)
624641

625642
if lookupRes != nil {
@@ -639,6 +656,12 @@ func (dht *IpfsDHT) FindPeerExtended(ctx context.Context, id peer.ID, opts ...ro
639656

640657
logger.Debugw("finding peer", "peer", id)
641658

659+
var cfg routing.Options
660+
if err := cfg.Apply(opts...); err != nil {
661+
return peer.AddrInfo{}, nil, err
662+
}
663+
seedPeerOpts := dhtrouting.GetSeedPeers(&cfg)
664+
642665
// Check if were already connected to them
643666
if pi := dht.FindLocal(id); pi.ID != "" {
644667
return pi, nil, nil
@@ -670,6 +693,7 @@ func (dht *IpfsDHT) FindPeerExtended(ctx context.Context, id peer.ID, opts ...ro
670693
func() bool {
671694
return dht.host.Network().Connectedness(id) == network.Connected
672695
},
696+
seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers,
673697
)
674698

675699
if err != nil {

0 commit comments

Comments
 (0)