Skip to content

Commit 616ef86

Browse files
changes to peer mantainence logic
1 parent 82c2333 commit 616ef86

File tree

5 files changed

+25
-8
lines changed

5 files changed

+25
-8
lines changed

dht.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ type IpfsDHT struct {
109109
enableProviders, enableValues bool
110110

111111
// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit on the time duration
112-
// between the current time and the last time a peer was useful to us.
112+
// between the current time and the last time we successfully queried a peer.
113113
maxLastSuccessfulOutboundThreshold float64
114114

115115
fixLowPeersChan chan struct{}
@@ -448,22 +448,37 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
448448

449449
// peerFound signals the routingTable that we've found a peer that
450450
// might support the DHT protocol.
451+
// If we have a connection a peer but no exchange of a query RPC ->
452+
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
453+
// LastUsefulAt=N/A
454+
// If we connect to a peer and exchange a query RPC ->
455+
// LastQueriedAt=time.Now (same reason as above)
456+
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
457+
// If we query a peer we already have in our Routing Table ->
458+
// LastQueriedAt=time.Now()
459+
// LastUsefulAt remains unchanged
460+
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
461+
// Do Nothing.
451462
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
452463
logger.Debugw("peer found", "peer", p)
453464
b, err := dht.validRTPeer(p)
454465
if err != nil {
455466
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
456467
} else if b {
457-
_, err := dht.routingTable.TryAddPeer(p, queryPeer)
468+
newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
458469
if err != nil {
459470
// peer not added.
460471
return
461472
}
462473

463-
// If we discovered the peer because of a query, we need to ensure we override the "zero" lastSuccessfulOutboundQuery
474+
// If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt
464475
// value that must have been set in the Routing Table for this peer when it was first added during a connection.
465-
if queryPeer {
466-
dht.routingTable.UpdateLastSuccessfulOutboundQuery(p, time.Now())
476+
if newlyAdded && queryPeer {
477+
dht.routingTable.UpdateLastUsefulAt(p, time.Now())
478+
} else if queryPeer {
479+
// the peer is already in our RT, but we just successfully queried it and so let's give it a
480+
// bump on the query time so we don't ping it too soon for a liveliness check.
481+
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
467482
}
468483
}
469484
}

dht_bootstrap.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (dht *IpfsDHT) startRefreshing() {
128128
// ping Routing Table peers that haven't been hear of/from in the interval they should have been.
129129
for _, ps := range dht.routingTable.GetPeerInfos() {
130130
// ping the peer if it's due for a ping and evict it if the ping fails
131-
if float64(time.Since(ps.LastSuccessfulOutboundQuery)) > dht.maxLastSuccessfulOutboundThreshold {
131+
if float64(time.Since(ps.LastSuccessfulOutboundQueryAt)) > dht.maxLastSuccessfulOutboundThreshold {
132132
livelinessCtx, cancel := context.WithTimeout(ctx, peerPingTimeout)
133133
if err := dht.host.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil {
134134
logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ require (
1818
github.com/libp2p/go-eventbus v0.1.0
1919
github.com/libp2p/go-libp2p v0.7.4
2020
github.com/libp2p/go-libp2p-core v0.5.1
21-
github.com/libp2p/go-libp2p-kbucket v0.3.3
21+
github.com/libp2p/go-libp2p-kbucket v0.3.4-0.20200409173621-ac6d9f25cfc5
2222
github.com/libp2p/go-libp2p-peerstore v0.2.2
2323
github.com/libp2p/go-libp2p-record v0.1.2
2424
github.com/libp2p/go-libp2p-swarm v0.2.3

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,8 @@ github.com/libp2p/go-libp2p-discovery v0.3.0 h1:+JnYBRLzZQtRq0mK3xhyjBwHytLmJXMT
232232
github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw=
233233
github.com/libp2p/go-libp2p-kbucket v0.3.3 h1:V2Zwv6QnCK6Who0iiJW2eUKwdlTYGJ2HnLViaolDOcs=
234234
github.com/libp2p/go-libp2p-kbucket v0.3.3/go.mod h1:IWFdYRBOYzaLEHnvrfzEkr+UcuveCXIoeO8QeFZSI6A=
235+
github.com/libp2p/go-libp2p-kbucket v0.3.4-0.20200409173621-ac6d9f25cfc5 h1:WKHaqIIF9mHebRjTtsOCGPY32Wr2TKOyKnF6KQ9LV8Q=
236+
github.com/libp2p/go-libp2p-kbucket v0.3.4-0.20200409173621-ac6d9f25cfc5/go.mod h1:sDQeCkD6yf/Yq8O+HPTNLyGa0TsJi+CURa6ELp1ppkk=
235237
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
236238
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
237239
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=

query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn
174174
}
175175

176176
func (q *query) recordPeerIsValuable(p peer.ID) {
177-
q.dht.routingTable.UpdateLastSuccessfulOutboundQuery(p, time.Now())
177+
q.dht.routingTable.UpdateLastUsefulAt(p, time.Now())
178178
}
179179

180180
func (q *query) recordValuablePeers() {

0 commit comments

Comments
 (0)