Skip to content

Commit 5da7e89

Browse files
committed
refactor routing to be more configurable. start extraction of routing to
new package.
1 parent 3d294c7 commit 5da7e89

File tree

11 files changed

+646
-278
lines changed

11 files changed

+646
-278
lines changed

dual/dual_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
record "github.com/libp2p/go-libp2p-record"
1616
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
1717
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
18+
19+
dhtrouting "github.com/libp2p/go-libp2p-kad-dht/routing"
1820
)
1921

2022
var wancid, lancid cid.Cid
@@ -263,7 +265,7 @@ func TestSearchValue(t *testing.T) {
263265

264266
_ = wan.PutValue(ctx, "/v/hello", []byte("valid"))
265267

266-
valCh, err := d.SearchValue(ctx, "/v/hello", dht.Quorum(0))
268+
valCh, err := d.SearchValue(ctx, "/v/hello", dhtrouting.Quorum(0))
267269
if err != nil {
268270
t.Fatal(err)
269271
}
@@ -291,7 +293,7 @@ func TestSearchValue(t *testing.T) {
291293
t.Error(err)
292294
}
293295

294-
valCh, err = d.SearchValue(ctx, "/v/hello", dht.Quorum(0))
296+
valCh, err = d.SearchValue(ctx, "/v/hello", dhtrouting.Quorum(0))
295297
if err != nil {
296298
t.Fatal(err)
297299
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ require (
3131
github.com/multiformats/go-multiaddr-net v0.1.5
3232
github.com/multiformats/go-multihash v0.0.13
3333
github.com/multiformats/go-multistream v0.1.1
34+
github.com/pkg/errors v0.9.1
3435
github.com/stretchr/testify v1.5.1
3536
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1
3637
go.opencensus.io v0.22.3

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQza
272272
github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA20obEtsW9BLkuOUyQAOCbs=
273273
github.com/libp2p/go-reuseport-transport v0.0.3 h1:zzOeXnTooCkRvoH+bSXEfXhn76+LAiwoneM0gnXjF2M=
274274
github.com/libp2p/go-reuseport-transport v0.0.3/go.mod h1:Spv+MPft1exxARzP2Sruj2Wb5JSyHNncjf1Oi2dEbzM=
275+
github.com/libp2p/go-sockaddr v0.0.2 h1:tCuXfpA9rq7llM/v834RKc/Xvovy/AqM9kHvTV/jY/Q=
275276
github.com/libp2p/go-sockaddr v0.0.2/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k=
276277
github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14=
277278
github.com/libp2p/go-stream-muxer-multistream v0.2.0 h1:714bRJ4Zy9mdhyTLJ+ZKiROmAFwUHpeRidG+q7LTQOg=

lookup.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ func (lk loggableKeyBytes) String() string {
7272
// If the context is canceled, this function will return the context error along
7373
// with the closest K peers it has found so far.
7474
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
75+
return dht.GetClosestPeersSeeded(ctx, key, nil, true)
76+
}
77+
78+
// GetClosestPeersSeeded is the Kademlia 'node lookup' operation
79+
func (dht *IpfsDHT) GetClosestPeersSeeded(ctx context.Context, key string, seedPeers []peer.ID, useRTPeers bool) (<-chan peer.ID, error) {
7580
if key == "" {
7681
return nil, fmt.Errorf("can't lookup empty key")
7782
}
@@ -101,6 +106,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
101106
return peers, err
102107
},
103108
func() bool { return false },
109+
seedPeers, useRTPeers,
104110
)
105111

106112
if err != nil {

query.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ type lookupWithFollowupResult struct {
7777
//
7878
// After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the
7979
// lookup that have not already been successfully queried.
80-
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
80+
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, seedPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) {
8181
// run the query
82-
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn)
82+
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn, seedPeers, useRTPeers)
8383
if err != nil {
8484
return nil, err
8585
}
@@ -137,10 +137,23 @@ processFollowUp:
137137
return lookupRes, nil
138138
}
139139

140-
func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
140+
func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, manuallySeededPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) {
141141
// pick the K closest peers to the key in our Routing table.
142142
targetKadID := kb.ConvertKey(target)
143-
seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)
143+
144+
seedPeerSet := peer.NewSet()
145+
for _, p := range manuallySeededPeers {
146+
seedPeerSet.Add(p)
147+
}
148+
149+
if manuallySeededPeers == nil || useRTPeers {
150+
RTSeedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)
151+
for _, p := range RTSeedPeers {
152+
seedPeerSet.Add(p)
153+
}
154+
}
155+
156+
seedPeers := seedPeerSet.Peers()
144157
if len(seedPeers) == 0 {
145158
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
146159
Type: routing.QueryError,

0 commit comments

Comments
 (0)