Skip to content

Commit bdcee75

Browse files
committed
feat(p2p/peerTracker): remove gc and add subscriptions
1 parent 6c285ed commit bdcee75

File tree

3 files changed

+149
-122
lines changed

3 files changed

+149
-122
lines changed

p2p/exchange.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,11 @@ func NewExchange[H header.Header[H]](
8080
}
8181
}
8282

83+
id := protocolID(params.networkID)
8384
ex := &Exchange[H]{
8485
host: host,
85-
protocolID: protocolID(params.networkID),
86-
peerTracker: newPeerTracker(host, gater, params.pidstore, metrics),
86+
protocolID: id,
87+
peerTracker: newPeerTracker(host, gater, params.networkID, params.pidstore, metrics),
8788
Params: params,
8889
metrics: metrics,
8990
}
@@ -98,7 +99,6 @@ func (ex *Exchange[H]) Start(ctx context.Context) error {
9899
ex.ctx, ex.cancel = context.WithCancel(context.Background())
99100
log.Infow("client: starting client", "protocol ID", ex.protocolID)
100101

101-
go ex.peerTracker.gc()
102102
go ex.peerTracker.track()
103103

104104
// bootstrap the peerTracker with trusted peers as well as previously seen

p2p/exchange_test.go

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88

99
"github.com/ipfs/go-datastore"
1010
"github.com/ipfs/go-datastore/sync"
11+
"github.com/libp2p/go-libp2p"
1112
libhost "github.com/libp2p/go-libp2p/core/host"
1213
"github.com/libp2p/go-libp2p/core/network"
1314
"github.com/libp2p/go-libp2p/core/peer"
1415
"github.com/libp2p/go-libp2p/core/peerstore"
1516
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
1617
"github.com/libp2p/go-libp2p/p2p/net/conngater"
18+
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
1719
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
1820
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
1921
"github.com/stretchr/testify/assert"
@@ -242,7 +244,7 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {
242244
require.NoError(t, err)
243245
servers[index].Start(context.Background()) //nolint:errcheck
244246
exchange.peerTracker.peerLk.Lock()
245-
exchange.peerTracker.trackedPeers[hosts[index].ID()] = &peerStat{peerID: hosts[index].ID()}
247+
exchange.peerTracker.trackedPeers[hosts[index].ID()] = struct{}{}
246248
exchange.peerTracker.peerLk.Unlock()
247249
}
248250

@@ -262,30 +264,38 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {
262264
// TestExchange_RequestHeadersFromAnotherPeer tests that the Exchange instance will request range
263265
// from another peer with lower score after receiving header.ErrNotFound
264266
func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) {
265-
hosts := createMocknet(t, 3)
267+
hosts := quicHosts(t, 3)
268+
266269
// create client + server(it does not have needed headers)
267270
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])
271+
272+
serverSideStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10)
273+
tmServerSideStore := &timedOutStore{timeout: time.Millisecond * 200, Store: *serverSideStore}
274+
275+
hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100)
276+
hosts[0].ConnManager().TagPeer(hosts[2].ID(), string(protocolID(networkID)), 90)
277+
268278
// create one more server(with more headers in the store)
269279
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](
270-
hosts[2], headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10),
280+
hosts[2], tmServerSideStore,
271281
WithNetworkID[ServerParameters](networkID),
272282
)
273283
require.NoError(t, err)
274284
require.NoError(t, serverSideEx.Start(context.Background()))
275285
t.Cleanup(func() {
276286
serverSideEx.Stop(context.Background()) //nolint:errcheck
277287
})
288+
278289
exchg.peerTracker.peerLk.Lock()
279-
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 20}
290+
exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{}
280291
exchg.peerTracker.peerLk.Unlock()
281-
282292
h, err := store.GetByHeight(context.Background(), 5)
283293
require.NoError(t, err)
284294

285295
_, err = exchg.GetRangeByHeight(context.Background(), h, 8)
286296
require.NoError(t, err)
287297
// ensure that peerScore for the second peer is changed
288-
newPeerScore := exchg.peerTracker.trackedPeers[hosts[2].ID()].score()
298+
newPeerScore := score(t, exchg.peerTracker.host, hosts[2].ID())
289299
require.NotEqual(t, 20, newPeerScore)
290300
}
291301

@@ -464,7 +474,9 @@ func TestExchange_HandleHeaderWithDifferentChainID(t *testing.T) {
464474
func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
465475
// create blankhost because mocknet does not support deadlines
466476
swarm0 := swarm.GenSwarm(t)
467-
host0 := blankhost.NewBlankHost(swarm0)
477+
mngr, err := connmgr.NewConnManager(0, 50)
478+
require.NoError(t, err)
479+
host0 := blankhost.NewBlankHost(swarm0, blankhost.WithConnectionManager(mngr))
468480
swarm1 := swarm.GenSwarm(t)
469481
host1 := blankhost.NewBlankHost(swarm1)
470482
swarm2 := swarm.GenSwarm(t)
@@ -495,24 +507,25 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
495507
t.Cleanup(func() {
496508
serverSideEx.Stop(context.Background()) //nolint:errcheck
497509
})
498-
prevScore := exchg.peerTracker.trackedPeers[host1.ID()].score()
510+
prevScore := score(t, exchg.host, host1.ID())
499511
exchg.peerTracker.peerLk.Lock()
500-
exchg.peerTracker.trackedPeers[host2.ID()] = &peerStat{peerID: host2.ID(), peerScore: 200}
512+
host0.ConnManager().TagPeer(host2.ID(), string(protocolID(networkID)), 100)
513+
exchg.peerTracker.trackedPeers[host2.ID()] = struct{}{}
501514
exchg.peerTracker.peerLk.Unlock()
502515

503516
gen, err := store.GetByHeight(context.Background(), 1)
504517
require.NoError(t, err)
505518

506519
_, err = exchg.GetRangeByHeight(context.Background(), gen, 3)
507520
require.NoError(t, err)
508-
newPeerScore := exchg.peerTracker.trackedPeers[host1.ID()].score()
521+
newPeerScore := score(t, exchg.host, host1.ID())
509522
assert.NotEqual(t, newPeerScore, prevScore)
510523
}
511524

512525
// TestExchange_RequestPartialRange enusres in case of receiving a partial response
513526
// from server, Exchange will re-request remaining headers from another peer
514527
func TestExchange_RequestPartialRange(t *testing.T) {
515-
hosts := createMocknet(t, 3)
528+
hosts := quicHosts(t, 3)
516529
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])
517530

518531
// create one more server(with more headers in the store)
@@ -523,13 +536,14 @@ func TestExchange_RequestPartialRange(t *testing.T) {
523536
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
524537
t.Cleanup(cancel)
525538

539+
hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100)
540+
526541
require.NoError(t, err)
527542
require.NoError(t, serverSideEx.Start(ctx))
528543
exchg.peerTracker.peerLk.Lock()
529-
prevScoreBefore1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore
530-
prevScoreBefore2 := 50
531-
// reducing peerScore of the second server, so our exchange will request host[1] first.
532-
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 50}
544+
prevScoreBefore1 := score(t, exchg.host, hosts[1].ID())
545+
prevScoreBefore2 := score(t, exchg.host, hosts[2].ID())
546+
exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{}
533547
exchg.peerTracker.peerLk.Unlock()
534548

535549
gen, err := store.GetByHeight(context.Background(), 1)
@@ -540,8 +554,8 @@ func TestExchange_RequestPartialRange(t *testing.T) {
540554
require.NoError(t, err)
541555

542556
exchg.peerTracker.peerLk.Lock()
543-
prevScoreAfter1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore
544-
prevScoreAfter2 := exchg.peerTracker.trackedPeers[hosts[2].ID()].peerScore
557+
prevScoreAfter1 := score(t, exchg.host, hosts[1].ID())
558+
prevScoreAfter2 := score(t, exchg.host, hosts[2].ID())
545559
exchg.peerTracker.peerLk.Unlock()
546560

547561
assert.NotEqual(t, prevScoreBefore1, prevScoreAfter1)
@@ -561,7 +575,6 @@ func createP2PExAndServer(
561575
host, tpeer libhost.Host,
562576
) (*Exchange[*headertest.DummyHeader], *headertest.Store[*headertest.DummyHeader]) {
563577
store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 5)
564-
565578
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](tpeer, store,
566579
WithNetworkID[ServerParameters](networkID),
567580
)
@@ -582,7 +595,7 @@ func createP2PExAndServer(
582595
time.Sleep(time.Millisecond * 100) // give peerTracker time to add a trusted peer
583596

584597
ex.peerTracker.peerLk.Lock()
585-
ex.peerTracker.trackedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID(), peerScore: 100.0}
598+
ex.peerTracker.trackedPeers[tpeer.ID()] = struct{}{}
586599
ex.peerTracker.peerLk.Unlock()
587600

588601
t.Cleanup(func() {
@@ -595,12 +608,15 @@ func createP2PExAndServer(
595608

596609
func quicHosts(t *testing.T, n int) []libhost.Host {
597610
hosts := make([]libhost.Host, n)
611+
var err error
598612
for i := range hosts {
599-
swrm := swarm.GenSwarm(t, swarm.OptDisableTCP)
600-
hosts[i] = blankhost.NewBlankHost(swrm)
613+
require.NoError(t, err)
614+
hosts[i], err = libp2p.New()
601615
for _, host := range hosts[:i] {
602616
hosts[i].Peerstore().AddAddrs(host.ID(), host.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
603617
host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Network().ListenAddresses(), peerstore.PermanentAddrTTL)
618+
err = hosts[i].Connect(context.Background(), peer.AddrInfo{ID: host.ID(), Addrs: host.Addrs()})
619+
require.NoError(t, err)
604620
}
605621
}
606622

@@ -647,3 +663,15 @@ func (t *timedOutStore) Head(context.Context, ...header.HeadOption[*headertest.D
647663
time.Sleep(t.timeout)
648664
return nil, header.ErrNoHead
649665
}
666+
667+
func (t *timedOutStore) GetRange(ctx context.Context, from, to uint64) ([]*headertest.DummyHeader, error) {
668+
time.Sleep(t.timeout)
669+
return t.Store.GetRange(ctx, from, to)
670+
}
671+
672+
func score(t *testing.T, h libhost.Host, id peer.ID) int {
673+
t.Helper()
674+
tags := h.ConnManager().GetTagInfo(id)
675+
tag, _ := tags.Tags[string(protocolID(networkID))]
676+
return tag
677+
}

0 commit comments

Comments
 (0)