Skip to content

Commit 39232eb

Browse files
committed
misc(p2p/peerstat): remove pruneDeadline + update peer scoring mechanism
1 parent 10c442b commit 39232eb

File tree

7 files changed

+46
-94
lines changed

7 files changed

+46
-94
lines changed

go.mod

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,23 @@ require (
2424
github.com/benbjohnson/clock v1.3.5 // indirect
2525
github.com/beorn7/perks v1.0.1 // indirect
2626
github.com/cespare/xxhash/v2 v2.2.0 // indirect
27+
github.com/containerd/cgroups v1.1.0 // indirect
28+
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
2729
github.com/davecgh/go-spew v1.1.1 // indirect
2830
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
2931
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
32+
github.com/docker/go-units v0.5.0 // indirect
33+
github.com/elastic/gosigar v0.14.2 // indirect
3034
github.com/flynn/noise v1.1.0 // indirect
3135
github.com/francoispqt/gojay v1.2.13 // indirect
3236
github.com/go-logr/logr v1.4.1 // indirect
3337
github.com/go-logr/stdr v1.2.2 // indirect
3438
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
39+
github.com/godbus/dbus/v5 v5.1.0 // indirect
3540
github.com/google/gopacket v1.1.19 // indirect
3641
github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect
3742
github.com/google/uuid v1.5.0 // indirect
43+
github.com/gorilla/websocket v1.5.1 // indirect
3844
github.com/huin/goupnp v1.3.0 // indirect
3945
github.com/ipfs/go-cid v0.4.1 // indirect
4046
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
@@ -69,6 +75,8 @@ require (
6975
github.com/multiformats/go-multistream v0.5.0 // indirect
7076
github.com/multiformats/go-varint v0.0.7 // indirect
7177
github.com/onsi/ginkgo/v2 v2.15.0 // indirect
78+
github.com/opencontainers/runtime-spec v1.2.0 // indirect
79+
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
7280
github.com/pion/datachannel v1.5.6 // indirect
7381
github.com/pion/dtls/v2 v2.2.11 // indirect
7482
github.com/pion/ice/v2 v2.3.24 // indirect
@@ -85,6 +93,7 @@ require (
8593
github.com/pion/transport/v2 v2.2.5 // indirect
8694
github.com/pion/turn/v2 v2.1.6 // indirect
8795
github.com/pion/webrtc/v3 v3.2.40 // indirect
96+
github.com/pkg/errors v0.9.1 // indirect
8897
github.com/pmezard/go-difflib v1.0.0 // indirect
8998
github.com/prometheus/client_golang v1.19.1 // indirect
9099
github.com/prometheus/client_model v0.6.1 // indirect
@@ -93,7 +102,10 @@ require (
93102
github.com/quic-go/qpack v0.4.0 // indirect
94103
github.com/quic-go/quic-go v0.44.0 // indirect
95104
github.com/quic-go/webtransport-go v0.8.0 // indirect
105+
github.com/raulk/go-watchdog v1.3.0 // indirect
96106
github.com/spaolacci/murmur3 v1.1.0 // indirect
107+
go.uber.org/dig v1.17.1 // indirect
108+
go.uber.org/fx v1.21.1 // indirect
97109
go.uber.org/mock v0.4.0 // indirect
98110
go.uber.org/multierr v1.11.0 // indirect
99111
go.uber.org/zap v1.27.0 // indirect

go.sum

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8Nz
126126
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
127127
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
128128
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
129-
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d h1:t5Wuyh53qYyg9eqn4BbnlIT+vmhyww0TatL+zT3uWgI=
130129
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
131130
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
132131
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=

p2p/peer_stats.go

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,7 @@ type peerStat struct {
1414
sync.RWMutex
1515
peerID peer.ID
1616
// score is the average speed per single request
17-
peerScore float32
18-
// pruneDeadline specifies when disconnected peer will be removed if
19-
// it does not return online.
20-
pruneDeadline time.Time
17+
peerScore int
2118
}
2219

2320
// updateStats recalculates peer.score by averaging the last score
@@ -26,33 +23,28 @@ type peerStat struct {
2623
// by dividing the amount by time, so the result score will represent how many bytes
2724
// were retrieved in 1 millisecond. This value will then be averaged relative to the
2825
// previous peerScore.
29-
func (p *peerStat) updateStats(amount uint64, duration time.Duration) {
30-
p.Lock()
31-
defer p.Unlock()
26+
func (p *peerStat) updateStats(amount uint64, duration time.Duration) int {
27+
if amount == 0 && duration == 0 {
28+
// decrease peerScore by 20% of the peer that failed the request by any reason.
29+
// NOTE: peerScore will not be decreased if the score is less than 100.
30+
p.peerScore -= p.peerScore / 100 * 20
31+
return p.peerScore
32+
}
33+
3234
averageSpeed := float32(amount)
3335
if duration != 0 {
3436
averageSpeed /= float32(duration.Milliseconds())
3537
}
3638
if p.peerScore == 0.0 {
37-
p.peerScore = averageSpeed
38-
return
39+
p.peerScore = int(averageSpeed * 100)
40+
return p.peerScore
3941
}
40-
p.peerScore = (p.peerScore + averageSpeed) / 2
41-
}
42-
43-
// decreaseScore decreases peerScore by 20% of the peer that failed the request by any reason.
44-
// NOTE: decreasing peerScore in one session will not affect its position in queue in another
45-
// session(as we can have multiple sessions running concurrently).
46-
// TODO(vgonkivs): to figure out the better scoring increments/decrements
47-
func (p *peerStat) decreaseScore() {
48-
p.Lock()
49-
defer p.Unlock()
50-
51-
p.peerScore -= p.peerScore / 100 * 20
42+
p.peerScore = (p.peerScore + int(averageSpeed*100)) / 2
43+
return p.peerScore
5244
}
5345

5446
// score reads a peer's latest score from the queue
55-
func (p *peerStat) score() float32 {
47+
func (p *peerStat) score() int {
5648
p.RLock()
5749
defer p.RUnlock()
5850
return p.peerScore
@@ -123,10 +115,6 @@ func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue {
123115
// in case if there are no peer available in current session, it blocks until
124116
// the peer will be pushed in.
125117
func (p *peerQueue) waitPop(ctx context.Context) *peerStat {
126-
// TODO(vgonkivs): implement fallback solution for cases when peer queue is empty.
127-
// As we discussed with @Wondertan there could be 2 possible solutions:
128-
// * use libp2p.Discovery to find new peers outside peerTracker to request headers;
129-
// * implement IWANT/IHAVE messaging system and start requesting ranges from the Peerstore;
130118
select {
131119
case <-ctx.Done():
132120
return &peerStat{}

p2p/peer_stats_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,6 @@ func Test_StatDecreaseScore(t *testing.T) {
107107
peerScore: 100,
108108
}
109109
// will decrease score by 20%
110-
pStats.decreaseScore()
111-
require.Equal(t, pStats.score(), float32(80.0))
110+
pStats.updateStats(0, 0)
111+
require.Equal(t, pStats.score(), 80)
112112
}

p2p/peer_tracker_test.go

Lines changed: 14 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -9,67 +9,21 @@ import (
99

1010
"github.com/ipfs/go-datastore"
1111
"github.com/ipfs/go-datastore/sync"
12+
"github.com/libp2p/go-libp2p"
13+
"github.com/libp2p/go-libp2p/core/host"
1214
"github.com/libp2p/go-libp2p/core/peer"
15+
"github.com/libp2p/go-libp2p/core/peerstore"
1316
testpeer "github.com/libp2p/go-libp2p/core/test"
1417
"github.com/libp2p/go-libp2p/p2p/net/conngater"
15-
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
1618
"github.com/stretchr/testify/assert"
1719
"github.com/stretchr/testify/require"
1820
)
1921

20-
func TestPeerTracker_GC(t *testing.T) {
21-
ctx, cancel := context.WithCancel(context.Background())
22-
t.Cleanup(cancel)
23-
24-
h := createMocknet(t, 1)
25-
26-
gcCycle = time.Millisecond * 200
27-
28-
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
29-
require.NoError(t, err)
30-
31-
pidstore := newDummyPIDStore()
32-
p := newPeerTracker(h[0], connGater, pidstore, nil)
33-
34-
maxAwaitingTime = time.Millisecond
35-
36-
peerlist := generateRandomPeerlist(t, 10)
37-
for i := 0; i < 10; i++ {
38-
p.trackedPeers[peerlist[i]] = &peerStat{peerID: peerlist[i], peerScore: 0.5}
39-
}
40-
41-
peerlist = generateRandomPeerlist(t, 4)
42-
pid1 := peerlist[2]
43-
pid2 := peerlist[3]
44-
45-
p.disconnectedPeers[pid1] = &peerStat{peerID: pid1, pruneDeadline: time.Now()}
46-
p.disconnectedPeers[pid2] = &peerStat{peerID: pid2, pruneDeadline: time.Now().Add(time.Minute * 10)}
47-
assert.True(t, len(p.trackedPeers) > 0)
48-
assert.True(t, len(p.disconnectedPeers) > 0)
49-
50-
go p.track()
51-
go p.gc()
52-
53-
time.Sleep(time.Millisecond * 500)
54-
55-
err = p.stop(ctx)
56-
require.NoError(t, err)
57-
58-
require.Len(t, p.trackedPeers, 10)
59-
require.Nil(t, p.disconnectedPeers[pid1])
60-
61-
// ensure good peers were dumped to store
62-
peers, err := pidstore.Load(ctx)
63-
require.NoError(t, err)
64-
require.Equal(t, 10, len(peers))
65-
}
66-
6722
func TestPeerTracker_BlockPeer(t *testing.T) {
6823
h := createMocknet(t, 2)
6924
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
7025
require.NoError(t, err)
71-
p := newPeerTracker(h[0], connGater, nil, nil)
72-
maxAwaitingTime = time.Millisecond
26+
p := newPeerTracker(h[0], connGater, "private", nil, nil)
7327
p.blockPeer(h[1].ID(), errors.New("test"))
7428
require.Len(t, connGater.ListBlockedPeers(), 1)
7529
require.True(t, connGater.ListBlockedPeers()[0] == h[1].ID())
@@ -82,26 +36,25 @@ func TestPeerTracker_Bootstrap(t *testing.T) {
8236
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
8337
require.NoError(t, err)
8438

85-
// mn := createMocknet(t, 10)
86-
mn, err := mocknet.FullMeshConnected(10)
87-
require.NoError(t, err)
39+
hosts := make([]host.Host, 10)
40+
41+
for i := range hosts {
42+
hosts[i], err = libp2p.New()
43+
require.NoError(t, err)
44+
hosts[i].SetStreamHandler(protocolID("private"), nil)
45+
}
8846

8947
// store peers to peerstore
9048
prevSeen := make([]peer.ID, 9)
91-
for i, peer := range mn.Hosts()[1:] {
49+
for i, peer := range hosts[1:] {
50+
hosts[0].Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Addrs(), peerstore.PermanentAddrTTL)
9251
prevSeen[i] = peer.ID()
93-
94-
// disconnect so they're not already connected on attempt to
95-
// connect
96-
err = mn.DisconnectPeers(mn.Hosts()[i].ID(), peer.ID())
97-
require.NoError(t, err)
9852
}
9953
pidstore := newDummyPIDStore()
10054
// only store 7 peers to pidstore, and use 2 as trusted
10155
err = pidstore.Put(ctx, prevSeen[2:])
10256
require.NoError(t, err)
103-
104-
tracker := newPeerTracker(mn.Hosts()[0], connGater, pidstore, nil)
57+
tracker := newPeerTracker(hosts[0], connGater, "private", pidstore, nil)
10558

10659
go tracker.track()
10760

p2p/session.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (s *session[H]) doRequest(
198198
switch err {
199199
case header.ErrNotFound, errEmptyResponse:
200200
logFn = log.Debugw
201-
stat.decreaseScore()
201+
s.peerTracker.updateScore(stat, 0, 0)
202202
default:
203203
s.peerTracker.blockPeer(stat.peerID, err)
204204
}
@@ -234,7 +234,7 @@ func (s *session[H]) doRequest(
234234
span.SetStatus(codes.Ok, "")
235235

236236
// update peer stats
237-
stat.updateStats(size, took)
237+
s.peerTracker.updateScore(stat, size, took)
238238

239239
// ensure that we received the correct amount of headers.
240240
if remainingHeaders > 0 {

p2p/session_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func Test_Validate(t *testing.T) {
2828
ses := newSession(
2929
context.Background(),
3030
nil,
31-
&peerTracker{trackedPeers: make(map[peer.ID]*peerStat)},
31+
&peerTracker{trackedPeers: make(map[peer.ID]struct{})},
3232
"", time.Second, nil,
3333
withValidation(head),
3434
)
@@ -45,7 +45,7 @@ func Test_ValidateFails(t *testing.T) {
4545
ses := newSession(
4646
context.Background(),
4747
nil,
48-
&peerTracker{trackedPeers: make(map[peer.ID]*peerStat)},
48+
&peerTracker{trackedPeers: make(map[peer.ID]struct{})},
4949
"", time.Second, nil,
5050
withValidation(head),
5151
)

0 commit comments

Comments
 (0)