Skip to content

Commit 7ae6615

Browse files
committed
improvements
1 parent c5112c3 commit 7ae6615

File tree

5 files changed

+74
-60
lines changed

5 files changed

+74
-60
lines changed

p2p/exchange.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ func (ex *Exchange[H]) Start(ctx context.Context) error {
9999
ex.ctx, ex.cancel = context.WithCancel(context.Background())
100100
log.Infow("client: starting client", "protocol ID", ex.protocolID)
101101

102-
go ex.peerTracker.track()
102+
err := ex.peerTracker.track()
103+
if err != nil {
104+
return err
105+
}
103106

104107
// bootstrap the peerTracker with trusted peers as well as previously seen
105108
// peers if provided.
@@ -150,9 +153,11 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
150153
// their Head and verify against the given trusted header.
151154
useTrackedPeers := !reqParams.TrustedHead.IsZero()
152155
if useTrackedPeers {
153-
trackedPeers := ex.peerTracker.getPeers(maxUntrustedHeadRequests)
156+
trackedPeers := ex.peerTracker.peers(maxUntrustedHeadRequests)
154157
if len(trackedPeers) > 0 {
155-
peers = trackedPeers
158+
peers = transform(trackedPeers, func(p *peerStat) peer.ID {
159+
return p.peerID
160+
})
156161
log.Debugw("requesting head from tracked peers", "amount", len(peers))
157162
}
158163
}

p2p/helpers.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,13 @@ func convertStatusCodeToError(code p2p_pb.StatusCode) error {
124124
return fmt.Errorf("unknown status code %d", code)
125125
}
126126
}
127+
128+
// transform applies a provided function to each element of the input slice,
129+
// producing a new slice with the results of the function.
130+
func transform[T, U any](ts []T, f func(T) U) []U {
131+
us := make([]U, len(ts))
132+
for i := range ts {
133+
us[i] = f(ts[i])
134+
}
135+
return us
136+
}

p2p/peer_tracker.go

Lines changed: 52 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ type peerTracker struct {
2323

2424
peerLk sync.RWMutex
2525
// trackedPeers contains active peers that we can request to.
26-
// we cache the peer once they disconnect,
2726
// so we can guarantee that peerQueue will only contain active peers
2827
trackedPeers map[libpeer.ID]struct{}
2928

@@ -101,72 +100,59 @@ func (p *peerTracker) connectToPeer(ctx context.Context, peer libpeer.ID) {
101100
}
102101
}
103102

104-
func (p *peerTracker) track() {
105-
defer func() {
106-
p.done <- struct{}{}
107-
}()
103+
// track creates subscriptions for different types of libp2p.Events to efficiently handle peers.
104+
func (p *peerTracker) track() error {
105+
evtBus := p.host.EventBus()
108106

109-
connSubs, err := p.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
107+
connSubs, err := evtBus.Subscribe(&event.EvtPeerConnectednessChanged{})
110108
if err != nil {
111109
log.Errorw("subscribing to EvtPeerConnectednessChanged", "err", err)
112-
return
110+
return err
113111
}
114112

115-
identifySub, err := p.host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
113+
identifySub, err := evtBus.Subscribe(&event.EvtPeerIdentificationCompleted{})
116114
if err != nil {
117115
log.Errorw("subscribing to EvtPeerIdentificationCompleted", "err", err)
118-
return
116+
return err
119117
}
120118

121-
protocolSub, err := p.host.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{})
119+
protocolSub, err := evtBus.Subscribe(&event.EvtPeerProtocolsUpdated{})
122120
if err != nil {
123121
log.Errorw("subscribing to EvtPeerProtocolsUpdated", "err", err)
124-
return
122+
return err
125123
}
126124

127-
for {
128-
select {
129-
case <-p.ctx.Done():
130-
err = connSubs.Close()
131-
errors.Join(err, identifySub.Close(), protocolSub.Close())
132-
if err != nil {
133-
log.Errorw("closing subscriptions", "err", err)
125+
go func() {
126+
for {
127+
select {
128+
case <-p.ctx.Done():
129+
if err := closeSubscriptions(connSubs, identifySub, protocolSub); err != nil {
130+
log.Errorw("closing subscriptions", "err", err)
131+
}
132+
p.done <- struct{}{}
133+
return
134+
case connSubscription := <-connSubs.Out():
135+
ev := connSubscription.(event.EvtPeerConnectednessChanged)
136+
if network.NotConnected == ev.Connectedness {
137+
p.disconnected(ev.Peer)
138+
}
139+
case identSubscription := <-identifySub.Out():
140+
ev := identSubscription.(event.EvtPeerIdentificationCompleted)
141+
if slices.Contains(ev.Protocols, p.protocolID) {
142+
p.connected(ev.Peer)
143+
}
144+
case protocolSubscription := <-protocolSub.Out():
145+
ev := protocolSubscription.(event.EvtPeerProtocolsUpdated)
146+
if slices.Contains(ev.Removed, p.protocolID) {
147+
p.disconnected(ev.Peer)
148+
}
149+
if slices.Contains(ev.Added, p.protocolID) {
150+
p.connected(ev.Peer)
151+
}
134152
}
135-
return
136-
case connSubscription := <-connSubs.Out():
137-
ev := connSubscription.(event.EvtPeerConnectednessChanged)
138-
if network.NotConnected == ev.Connectedness {
139-
p.disconnected(ev.Peer)
140-
}
141-
case identSubscription := <-identifySub.Out():
142-
ev := identSubscription.(event.EvtPeerIdentificationCompleted)
143-
if slices.Contains(ev.Protocols, p.protocolID) {
144-
p.connected(ev.Peer)
145-
}
146-
case protocolSubscription := <-protocolSub.Out():
147-
ev := protocolSubscription.(event.EvtPeerProtocolsUpdated)
148-
if slices.Contains(ev.Removed, p.protocolID) {
149-
p.disconnected(ev.Peer)
150-
break
151-
}
152-
p.connected(ev.Peer)
153-
}
154-
}
155-
}
156-
157-
// getPeers returns the tracker's currently tracked peers up to the `max`.
158-
func (p *peerTracker) getPeers(max int) []libpeer.ID {
159-
p.peerLk.RLock()
160-
defer p.peerLk.RUnlock()
161-
162-
peers := make([]libpeer.ID, 0, max)
163-
for peer := range p.trackedPeers {
164-
peers = append(peers, peer)
165-
if len(peers) == max {
166-
break
167153
}
168-
}
169-
return peers
154+
}()
155+
return nil
170156
}
171157

172158
func (p *peerTracker) connected(pID libpeer.ID) {
@@ -215,17 +201,21 @@ func (p *peerTracker) disconnected(pID libpeer.ID) {
215201
p.metrics.peersDisconnected(1)
216202
}
217203

218-
func (p *peerTracker) peers() []*peerStat {
204+
// peers returns the tracker's currently tracked peers up to the `max`.
205+
func (p *peerTracker) peers(max int) []*peerStat {
219206
p.peerLk.RLock()
220207
defer p.peerLk.RUnlock()
221208

222-
peers := make([]*peerStat, 0)
209+
peers := make([]*peerStat, 0, max)
223210
for peerID := range p.trackedPeers {
224211
score := 0
225212
if info := p.host.ConnManager().GetTagInfo(peerID); info != nil {
226213
score = info.Tags[string(p.protocolID)]
227214
}
228215
peers = append(peers, &peerStat{peerID: peerID, peerScore: score})
216+
if len(peers) == max {
217+
break
218+
}
229219
}
230220
return peers
231221
}
@@ -296,3 +286,11 @@ func (p *peerTracker) updateScore(stats *peerStat, size uint64, duration time.Du
296286
score := stats.updateStats(size, duration)
297287
p.host.ConnManager().TagPeer(stats.peerID, string(p.protocolID), score)
298288
}
289+
290+
func closeSubscriptions(subs ...event.Subscription) error {
291+
var err error
292+
for _, sub := range subs {
293+
err = errors.Join(err, sub.Close())
294+
}
295+
return err
296+
}

p2p/peer_tracker_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,14 @@ func TestPeerTracker_Bootstrap(t *testing.T) {
5656
require.NoError(t, err)
5757
tracker := newPeerTracker(hosts[0], connGater, "private", pidstore, nil)
5858

59-
go tracker.track()
59+
err = tracker.track()
60+
require.NoError(t, err)
6061

6162
err = tracker.bootstrap(prevSeen[:2])
6263
require.NoError(t, err)
6364

6465
assert.Eventually(t, func() bool {
65-
return len(tracker.getPeers(7)) > 0
66+
return len(tracker.peers(7)) > 0
6667
}, time.Millisecond*500, time.Millisecond*100)
6768
}
6869

p2p/session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func newSession[H header.Header[H]](
7272
metrics: metrics,
7373
}
7474

75-
peers := peerTracker.peers()
75+
peers := peerTracker.peers(len(peerTracker.trackedPeers))
7676
if len(peers) == 0 {
7777
return nil, errors.New("empty peer tracker")
7878
}

0 commit comments

Comments
 (0)