Skip to content

Commit 2851c88

Browse files
Routing Table Refresh manager (#601)
* rt refresh refactor
1 parent 08ab423 commit 2851c88

File tree

8 files changed

+506
-644
lines changed

8 files changed

+506
-644
lines changed

dht.go

Lines changed: 62 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@ import (
1616
"github.com/libp2p/go-libp2p-core/peerstore"
1717
"github.com/libp2p/go-libp2p-core/protocol"
1818
"github.com/libp2p/go-libp2p-core/routing"
19-
"go.uber.org/zap"
20-
21-
"go.opencensus.io/tag"
2219

2320
"github.com/libp2p/go-libp2p-kad-dht/metrics"
2421
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
2522
"github.com/libp2p/go-libp2p-kad-dht/providers"
23+
"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
2624
kb "github.com/libp2p/go-libp2p-kbucket"
2725
record "github.com/libp2p/go-libp2p-record"
2826
recpb "github.com/libp2p/go-libp2p-record/pb"
@@ -35,6 +33,8 @@ import (
3533
"github.com/multiformats/go-base32"
3634
ma "github.com/multiformats/go-multiaddr"
3735
"github.com/multiformats/go-multihash"
36+
"go.opencensus.io/tag"
37+
"go.uber.org/zap"
3838
)
3939

4040
var (
@@ -71,6 +71,9 @@ type IpfsDHT struct {
7171
// ProviderManager stores & manages the provider records for this Dht peer.
7272
ProviderManager *providers.ProviderManager
7373

74+
// manages Routing Table refresh
75+
rtRefreshManager *rtrefresh.RtRefreshManager
76+
7477
birth time.Time // When this peer started up
7578

7679
Validator record.Validator
@@ -104,11 +107,7 @@ type IpfsDHT struct {
104107
queryPeerFilter QueryFilterFunc
105108
routingTablePeerFilter RouteTableFilterFunc
106109

107-
autoRefresh bool
108-
rtRefreshQueryTimeout time.Duration
109-
rtRefreshInterval time.Duration
110-
triggerRtRefresh chan chan<- error
111-
triggerSelfLookup chan chan<- error
110+
autoRefresh bool
112111

113112
// A set of bootstrap peers to fallback on if all other attempts to fix
114113
// the routing table fail (or, e.g., this is the first time this node is
@@ -122,11 +121,6 @@ type IpfsDHT struct {
122121
// networks).
123122
enableProviders, enableValues bool
124123

125-
// successfulOutboundQueryGracePeriod is the maximum grace period we will give to a peer
126-
// to between two successful query responses from it, failing which,
127-
// we will ping it to see if it's alive.
128-
successfulOutboundQueryGracePeriod time.Duration
129-
130124
fixLowPeersChan chan struct{}
131125
}
132126

@@ -156,14 +150,13 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
156150
if err := cfg.validate(); err != nil {
157151
return nil, err
158152
}
153+
159154
dht, err := makeDHT(ctx, h, cfg)
160155
if err != nil {
161156
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
162157
}
163158

164159
dht.autoRefresh = cfg.routingTable.autoRefresh
165-
dht.rtRefreshInterval = cfg.routingTable.refreshInterval
166-
dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
167160

168161
dht.maxRecordAge = cfg.maxRecordAge
169162
dht.enableProviders = cfg.enableProviders
@@ -196,8 +189,9 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
196189
// handle providers
197190
dht.proc.AddChild(dht.ProviderManager.Process())
198191

199-
dht.startSelfLookup()
200-
dht.startRefreshing()
192+
if err := dht.rtRefreshManager.Start(); err != nil {
193+
return nil, err
194+
}
201195

202196
// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
203197
// since RT membership is decoupled from connectivity
@@ -266,23 +260,44 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
266260
bucketSize: cfg.bucketSize,
267261
alpha: cfg.concurrency,
268262
beta: cfg.resiliency,
269-
triggerRtRefresh: make(chan chan<- error),
270-
triggerSelfLookup: make(chan chan<- error),
271263
queryPeerFilter: cfg.queryPeerFilter,
272264
routingTablePeerFilter: cfg.routingTable.peerFilter,
273265
fixLowPeersChan: make(chan struct{}, 1),
274266
}
275267

268+
var maxLastSuccessfulOutboundThreshold time.Duration
269+
270+
// The threshold is calculated based on the expected amount of time that should pass before we
271+
// query a peer as part of our refresh cycle.
272+
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
273+
// be published soon.
274+
if cfg.concurrency < cfg.bucketSize { // (alpha < K)
275+
l1 := math.Log(float64(1) / float64(cfg.bucketSize)) //(Log(1/K))
276+
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(cfg.bucketSize))) // Log(1 - (alpha / K))
277+
maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
278+
} else {
279+
maxLastSuccessfulOutboundThreshold = cfg.routingTable.refreshInterval
280+
}
281+
276282
// construct routing table
277-
rt, err := makeRoutingTable(dht, cfg)
283+
rt, err := makeRoutingTable(dht, cfg, maxLastSuccessfulOutboundThreshold)
278284
if err != nil {
279285
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
280286
}
281287
dht.routingTable = rt
282288
dht.bootstrapPeers = cfg.bootstrapPeers
283289

290+
// rt refresh manager
291+
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
292+
if err != nil {
293+
return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
294+
}
295+
dht.rtRefreshManager = rtRefresh
296+
284297
// create a DHT proc with the given context
285-
dht.proc = goprocessctx.WithContext(ctx)
298+
dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
299+
return rtRefresh.Close()
300+
})
286301

287302
// create a tagged context derived from the original context
288303
ctxTags := dht.newContextWithLocalTags(ctx)
@@ -298,19 +313,32 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
298313
return dht, nil
299314
}
300315

301-
func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
302-
// The threshold is calculated based on the expected amount of time that should pass before we
303-
// query a peer as part of our refresh cycle.
304-
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
305-
// be published soon.
306-
l1 := math.Log(float64(1) / float64(defaultBucketSize)) //(Log(1/K))
307-
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
308-
maxLastSuccessfulOutboundThreshold := time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
316+
func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
317+
keyGenFnc := func(cpl uint) (string, error) {
318+
p, err := dht.routingTable.GenRandPeerID(cpl)
319+
return string(p), err
320+
}
309321

322+
queryFnc := func(ctx context.Context, key string) error {
323+
_, err := dht.GetClosestPeers(ctx, key)
324+
return err
325+
}
326+
327+
r, err := rtrefresh.NewRtRefreshManager(
328+
dht.host, dht.routingTable, cfg.routingTable.autoRefresh,
329+
keyGenFnc,
330+
queryFnc,
331+
cfg.routingTable.refreshQueryTimeout,
332+
cfg.routingTable.refreshInterval,
333+
maxLastSuccessfulOutboundThreshold)
334+
335+
return r, err
336+
}
337+
338+
func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
310339
self := kb.ConvertPeerID(dht.host.ID())
311340

312341
rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
313-
dht.successfulOutboundQueryGracePeriod = maxLastSuccessfulOutboundThreshold
314342
cmgr := dht.host.ConnManager()
315343

316344
rt.PeerAdded = func(p peer.ID) {
@@ -397,10 +425,7 @@ func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
397425
}
398426

399427
if dht.autoRefresh {
400-
select {
401-
case dht.triggerRtRefresh <- nil:
402-
default:
403-
}
428+
dht.rtRefreshManager.RefreshNoWait()
404429
}
405430
}
406431

@@ -520,8 +545,8 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
520545
// might support the DHT protocol.
521546
// If we have a connection a peer but no exchange of a query RPC ->
522547
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
523-
// LastUsefulAt=N/A
524-
// If we connect to a peer and exchange a query RPC ->
548+
// LastUsefulAt=0
549+
// If we connect to a peer and then exchange a query RPC ->
525550
// LastQueriedAt=time.Now (same reason as above)
526551
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
527552
// If we query a peer we already have in our Routing Table ->
@@ -542,12 +567,7 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
542567
// peer not added.
543568
return
544569
}
545-
546-
// If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt
547-
// value that must have been set in the Routing Table for this peer when it was first added during a connection.
548-
if newlyAdded && queryPeer {
549-
dht.routingTable.UpdateLastUsefulAt(p, time.Now())
550-
} else if queryPeer {
570+
if !newlyAdded && queryPeer {
551571
// the peer is already in our RT, but we just successfully queried it and so let's give it a
552572
// bump on the query time so we don't ping it too soon for a liveliness check.
553573
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())

0 commit comments

Comments
 (0)