Skip to content

Commit 42c3b8f

Browse files
committed
misc(p2p/peerstat): remove pruneDeadline + update peer scoring mechanism
1 parent bdcee75 commit 42c3b8f

File tree

7 files changed

+54
-94
lines changed

7 files changed

+54
-94
lines changed

go.mod

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,24 @@ require (
2424
github.com/benbjohnson/clock v1.3.5 // indirect
2525
github.com/beorn7/perks v1.0.1 // indirect
2626
github.com/cespare/xxhash/v2 v2.2.0 // indirect
27+
github.com/containerd/cgroups v1.1.0 // indirect
28+
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
2729
github.com/davecgh/go-spew v1.1.1 // indirect
2830
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
2931
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
32+
github.com/docker/go-units v0.5.0 // indirect
33+
github.com/elastic/gosigar v0.14.2 // indirect
3034
github.com/flynn/noise v1.0.0 // indirect
3135
github.com/francoispqt/gojay v1.2.13 // indirect
3236
github.com/go-logr/logr v1.4.1 // indirect
3337
github.com/go-logr/stdr v1.2.2 // indirect
3438
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
39+
github.com/godbus/dbus/v5 v5.1.0 // indirect
3540
github.com/golang/protobuf v1.5.3 // indirect
3641
github.com/google/gopacket v1.1.19 // indirect
3742
github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect
3843
github.com/google/uuid v1.5.0 // indirect
44+
github.com/gorilla/websocket v1.5.0 // indirect
3945
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
4046
github.com/huin/goupnp v1.3.0 // indirect
4147
github.com/ipfs/go-cid v0.4.1 // indirect
@@ -73,6 +79,9 @@ require (
7379
github.com/multiformats/go-multistream v0.5.0 // indirect
7480
github.com/multiformats/go-varint v0.0.7 // indirect
7581
github.com/onsi/ginkgo/v2 v2.13.0 // indirect
82+
github.com/opencontainers/runtime-spec v1.1.0 // indirect
83+
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
84+
github.com/pkg/errors v0.9.1 // indirect
7685
github.com/pmezard/go-difflib v1.0.0 // indirect
7786
github.com/prometheus/client_golang v1.14.0 // indirect
7887
github.com/prometheus/client_model v0.4.0 // indirect
@@ -82,7 +91,10 @@ require (
8291
github.com/quic-go/qtls-go1-20 v0.3.4 // indirect
8392
github.com/quic-go/quic-go v0.39.4 // indirect
8493
github.com/quic-go/webtransport-go v0.6.0 // indirect
94+
github.com/raulk/go-watchdog v1.3.0 // indirect
8595
github.com/spaolacci/murmur3 v1.1.0 // indirect
96+
go.uber.org/dig v1.17.1 // indirect
97+
go.uber.org/fx v1.20.1 // indirect
8698
go.uber.org/mock v0.3.0 // indirect
8799
go.uber.org/multierr v1.11.0 // indirect
88100
go.uber.org/zap v1.26.0 // indirect

go.sum

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,17 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE
120120
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE=
121121
github.com/containerd/cgroups v1.0.3/go.mod h1:/ofk34relqNjSGyqPrmEULrO4Sc8LJhvJmWbUCUKqj8=
122122
github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM=
123+
github.com/containerd/cgroups v1.1.0/go.mod h1:6ppBcbh/NOOUU+dMKrykgaBnK9lCIBxHqJDGwsa1mIw=
123124
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
124125
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
125126
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
126127
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
127128
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
128-
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d h1:t5Wuyh53qYyg9eqn4BbnlIT+vmhyww0TatL+zT3uWgI=
129129
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
130130
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
131131
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
132132
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
133+
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
133134
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
134135
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
135136
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
@@ -152,6 +153,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
152153
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
153154
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
154155
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
156+
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
155157
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
156158
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
157159
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
@@ -698,6 +700,7 @@ github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
698700
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
699701
github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
700702
github.com/opencontainers/runtime-spec v1.1.0 h1:HHUyrt9mwHUjtasSbXSMvs4cyFxh+Bll4AjJ9odEGpg=
703+
github.com/opencontainers/runtime-spec v1.1.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
701704
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
702705
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
703706
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
@@ -779,6 +782,7 @@ github.com/quic-go/webtransport-go v0.6.0/go.mod h1:9KjU4AEBqEQidGHNDkZrb8CAa1ab
779782
github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0=
780783
github.com/raulk/go-watchdog v1.2.0/go.mod h1:lzSbAl5sh4rtI8tYHU01BWIDzgzqaQLj6RcA1i4mlqI=
781784
github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=
785+
github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU=
782786
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
783787
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
784788
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@@ -896,8 +900,11 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
896900
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
897901
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
898902
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
903+
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
899904
go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc=
905+
go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE=
900906
go.uber.org/fx v1.20.1 h1:zVwVQGS8zYvhh9Xxcu4w1M6ESyeMzebzj2NbSayZ4Mk=
907+
go.uber.org/fx v1.20.1/go.mod h1:iSYNbHf2y55acNCwCXKx7LbWb5WG1Bnue5RDXz1OREg=
901908
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
902909
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
903910
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=

p2p/peer_stats.go

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,7 @@ type peerStat struct {
1414
sync.RWMutex
1515
peerID peer.ID
1616
// score is the average speed per single request
17-
peerScore float32
18-
// pruneDeadline specifies when disconnected peer will be removed if
19-
// it does not return online.
20-
pruneDeadline time.Time
17+
peerScore int
2118
}
2219

2320
// updateStats recalculates peer.score by averaging the last score
@@ -26,33 +23,28 @@ type peerStat struct {
2623
// by dividing the amount by time, so the result score will represent how many bytes
2724
// were retrieved in 1 millisecond. This value will then be averaged relative to the
2825
// previous peerScore.
29-
func (p *peerStat) updateStats(amount uint64, duration time.Duration) {
30-
p.Lock()
31-
defer p.Unlock()
26+
func (p *peerStat) updateStats(amount uint64, duration time.Duration) int {
27+
if amount == 0 && duration == 0 {
28+
// decrease peerScore by 20% of the peer that failed the request by any reason.
29+
// NOTE: peerScore will not be decreased if the score is less than 100.
30+
p.peerScore -= p.peerScore / 100 * 20
31+
return p.peerScore
32+
}
33+
3234
averageSpeed := float32(amount)
3335
if duration != 0 {
3436
averageSpeed /= float32(duration.Milliseconds())
3537
}
3638
if p.peerScore == 0.0 {
37-
p.peerScore = averageSpeed
38-
return
39+
p.peerScore = int(averageSpeed * 100)
40+
return p.peerScore
3941
}
40-
p.peerScore = (p.peerScore + averageSpeed) / 2
41-
}
42-
43-
// decreaseScore decreases peerScore by 20% of the peer that failed the request by any reason.
44-
// NOTE: decreasing peerScore in one session will not affect its position in queue in another
45-
// session(as we can have multiple sessions running concurrently).
46-
// TODO(vgonkivs): to figure out the better scoring increments/decrements
47-
func (p *peerStat) decreaseScore() {
48-
p.Lock()
49-
defer p.Unlock()
50-
51-
p.peerScore -= p.peerScore / 100 * 20
42+
p.peerScore = (p.peerScore + int(averageSpeed*100)) / 2
43+
return p.peerScore
5244
}
5345

5446
// score reads a peer's latest score from the queue
55-
func (p *peerStat) score() float32 {
47+
func (p *peerStat) score() int {
5648
p.RLock()
5749
defer p.RUnlock()
5850
return p.peerScore
@@ -123,10 +115,6 @@ func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue {
123115
// in case if there are no peer available in current session, it blocks until
124116
// the peer will be pushed in.
125117
func (p *peerQueue) waitPop(ctx context.Context) *peerStat {
126-
// TODO(vgonkivs): implement fallback solution for cases when peer queue is empty.
127-
// As we discussed with @Wondertan there could be 2 possible solutions:
128-
// * use libp2p.Discovery to find new peers outside peerTracker to request headers;
129-
// * implement IWANT/IHAVE messaging system and start requesting ranges from the Peerstore;
130118
select {
131119
case <-ctx.Done():
132120
return &peerStat{}

p2p/peer_stats_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,6 @@ func Test_StatDecreaseScore(t *testing.T) {
107107
peerScore: 100,
108108
}
109109
// will decrease score by 20%
110-
pStats.decreaseScore()
111-
require.Equal(t, pStats.score(), float32(80.0))
110+
pStats.updateStats(0, 0)
111+
require.Equal(t, pStats.score(), 80)
112112
}

p2p/peer_tracker_test.go

Lines changed: 14 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -9,67 +9,21 @@ import (
99

1010
"github.com/ipfs/go-datastore"
1111
"github.com/ipfs/go-datastore/sync"
12+
"github.com/libp2p/go-libp2p"
13+
"github.com/libp2p/go-libp2p/core/host"
1214
"github.com/libp2p/go-libp2p/core/peer"
15+
"github.com/libp2p/go-libp2p/core/peerstore"
1316
testpeer "github.com/libp2p/go-libp2p/core/test"
1417
"github.com/libp2p/go-libp2p/p2p/net/conngater"
15-
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
1618
"github.com/stretchr/testify/assert"
1719
"github.com/stretchr/testify/require"
1820
)
1921

20-
func TestPeerTracker_GC(t *testing.T) {
21-
ctx, cancel := context.WithCancel(context.Background())
22-
t.Cleanup(cancel)
23-
24-
h := createMocknet(t, 1)
25-
26-
gcCycle = time.Millisecond * 200
27-
28-
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
29-
require.NoError(t, err)
30-
31-
pidstore := newDummyPIDStore()
32-
p := newPeerTracker(h[0], connGater, pidstore, nil)
33-
34-
maxAwaitingTime = time.Millisecond
35-
36-
peerlist := generateRandomPeerlist(t, 10)
37-
for i := 0; i < 10; i++ {
38-
p.trackedPeers[peerlist[i]] = &peerStat{peerID: peerlist[i], peerScore: 0.5}
39-
}
40-
41-
peerlist = generateRandomPeerlist(t, 4)
42-
pid1 := peerlist[2]
43-
pid2 := peerlist[3]
44-
45-
p.disconnectedPeers[pid1] = &peerStat{peerID: pid1, pruneDeadline: time.Now()}
46-
p.disconnectedPeers[pid2] = &peerStat{peerID: pid2, pruneDeadline: time.Now().Add(time.Minute * 10)}
47-
assert.True(t, len(p.trackedPeers) > 0)
48-
assert.True(t, len(p.disconnectedPeers) > 0)
49-
50-
go p.track()
51-
go p.gc()
52-
53-
time.Sleep(time.Millisecond * 500)
54-
55-
err = p.stop(ctx)
56-
require.NoError(t, err)
57-
58-
require.Len(t, p.trackedPeers, 10)
59-
require.Nil(t, p.disconnectedPeers[pid1])
60-
61-
// ensure good peers were dumped to store
62-
peers, err := pidstore.Load(ctx)
63-
require.NoError(t, err)
64-
require.Equal(t, 10, len(peers))
65-
}
66-
6722
func TestPeerTracker_BlockPeer(t *testing.T) {
6823
h := createMocknet(t, 2)
6924
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
7025
require.NoError(t, err)
71-
p := newPeerTracker(h[0], connGater, nil, nil)
72-
maxAwaitingTime = time.Millisecond
26+
p := newPeerTracker(h[0], connGater, "private", nil, nil)
7327
p.blockPeer(h[1].ID(), errors.New("test"))
7428
require.Len(t, connGater.ListBlockedPeers(), 1)
7529
require.True(t, connGater.ListBlockedPeers()[0] == h[1].ID())
@@ -82,26 +36,25 @@ func TestPeerTracker_Bootstrap(t *testing.T) {
8236
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
8337
require.NoError(t, err)
8438

85-
// mn := createMocknet(t, 10)
86-
mn, err := mocknet.FullMeshConnected(10)
87-
require.NoError(t, err)
39+
hosts := make([]host.Host, 10)
40+
41+
for i := range hosts {
42+
hosts[i], err = libp2p.New()
43+
require.NoError(t, err)
44+
hosts[i].SetStreamHandler(protocolID("private"), nil)
45+
}
8846

8947
// store peers to peerstore
9048
prevSeen := make([]peer.ID, 9)
91-
for i, peer := range mn.Hosts()[1:] {
49+
for i, peer := range hosts[1:] {
50+
hosts[0].Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Addrs(), peerstore.PermanentAddrTTL)
9251
prevSeen[i] = peer.ID()
93-
94-
// disconnect so they're not already connected on attempt to
95-
// connect
96-
err = mn.DisconnectPeers(mn.Hosts()[i].ID(), peer.ID())
97-
require.NoError(t, err)
9852
}
9953
pidstore := newDummyPIDStore()
10054
// only store 7 peers to pidstore, and use 2 as trusted
10155
err = pidstore.Put(ctx, prevSeen[2:])
10256
require.NoError(t, err)
103-
104-
tracker := newPeerTracker(mn.Hosts()[0], connGater, pidstore, nil)
57+
tracker := newPeerTracker(hosts[0], connGater, "private", pidstore, nil)
10558

10659
go tracker.track()
10760

p2p/session.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func (s *session[H]) doRequest(
197197
switch err {
198198
case header.ErrNotFound, errEmptyResponse:
199199
logFn = log.Debugw
200-
stat.decreaseScore()
200+
s.peerTracker.updateScore(stat, 0, 0)
201201
default:
202202
s.peerTracker.blockPeer(stat.peerID, err)
203203
}
@@ -233,7 +233,7 @@ func (s *session[H]) doRequest(
233233
span.SetStatus(codes.Ok, "")
234234

235235
// update peer stats
236-
stat.updateStats(size, duration)
236+
s.peerTracker.updateScore(stat, size, duration)
237237

238238
// ensure that we received the correct amount of headers.
239239
if remainingHeaders > 0 {

p2p/session_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func Test_Validate(t *testing.T) {
2828
ses := newSession(
2929
context.Background(),
3030
nil,
31-
&peerTracker{trackedPeers: make(map[peer.ID]*peerStat)},
31+
&peerTracker{trackedPeers: make(map[peer.ID]struct{})},
3232
"", time.Second, nil,
3333
withValidation(head),
3434
)
@@ -45,7 +45,7 @@ func Test_ValidateFails(t *testing.T) {
4545
ses := newSession(
4646
context.Background(),
4747
nil,
48-
&peerTracker{trackedPeers: make(map[peer.ID]*peerStat)},
48+
&peerTracker{trackedPeers: make(map[peer.ID]struct{})},
4949
"", time.Second, nil,
5050
withValidation(head),
5151
)

0 commit comments

Comments
 (0)