Skip to content

Commit a4d3ca0

Browse files
authored
feat(p2p): metrics for Exchange (#125)
Includes: * headRequestTime * responseSize * responseTime * trackerPeersNum * disconnectedPeersNum
1 parent 09c272d commit a4d3ca0

11 files changed

+251
-84
lines changed

p2p/exchange.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type Exchange[H header.Header[H]] struct {
4242

4343
trustedPeers func() peer.IDSlice
4444
peerTracker *peerTracker
45-
metrics *metrics
45+
metrics *exchangeMetrics
4646

4747
Params ClientParameters
4848
}
@@ -63,7 +63,7 @@ func NewExchange[H header.Header[H]](
6363
return nil, err
6464
}
6565

66-
var metrics *metrics
66+
var metrics *exchangeMetrics
6767
if params.metrics {
6868
var err error
6969
metrics, err = newExchangeMetrics()
@@ -75,7 +75,7 @@ func NewExchange[H header.Header[H]](
7575
ex := &Exchange[H]{
7676
host: host,
7777
protocolID: protocolID(params.networkID),
78-
peerTracker: newPeerTracker(host, gater, params.pidstore),
78+
peerTracker: newPeerTracker(host, gater, params.pidstore, metrics),
7979
Params: params,
8080
metrics: metrics,
8181
}
@@ -102,7 +102,8 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error {
102102
// cancel the session if it exists
103103
ex.cancel()
104104
// stop the peerTracker
105-
return ex.peerTracker.stop(ctx)
105+
err := ex.peerTracker.stop(ctx)
106+
return errors.Join(err, ex.metrics.Close())
106107
}
107108

108109
// Head requests the latest Header from trusted peers.
@@ -114,14 +115,15 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
114115
log.Debug("requesting head")
115116

116117
reqCtx := ctx
118+
startTime := time.Now()
117119
if deadline, ok := ctx.Deadline(); ok {
118120
// allocate 90% of caller's set deadline for requests
119121
// and give leftover to determine the bestHead from gathered responses
120122
// this avoids DeadlineExceeded error when any of the peers are unresponsive
121-
now := time.Now()
122-
sub := deadline.Sub(now) * 9 / 10
123+
124+
sub := deadline.Sub(startTime) * 9 / 10
123125
var cancel context.CancelFunc
124-
reqCtx, cancel = context.WithDeadline(ctx, now.Add(sub))
126+
reqCtx, cancel = context.WithDeadline(ctx, startTime.Add(sub))
125127
defer cancel()
126128
}
127129

@@ -184,6 +186,11 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
184186
}(from)
185187
}
186188

189+
headType := headTypeTrusted
190+
if useTrackedPeers {
191+
headType = headTypeUntrusted
192+
}
193+
187194
headers := make([]H, 0, len(peers))
188195
for range peers {
189196
select {
@@ -192,12 +199,27 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
192199
headers = append(headers, h)
193200
}
194201
case <-ctx.Done():
202+
status := headStatusCanceled
203+
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
204+
status = headStatusTimeout
205+
}
206+
207+
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, status)
195208
return zero, ctx.Err()
196209
case <-ex.ctx.Done():
210+
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusCanceled)
197211
return zero, ex.ctx.Err()
198212
}
199213
}
200-
return bestHead[H](headers)
214+
215+
head, err := bestHead[H](headers)
216+
if err != nil {
217+
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusNoHeaders)
218+
return zero, err
219+
}
220+
221+
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusOk)
222+
return head, nil
201223
}
202224

203225
// GetByHeight performs a request for the Header at the given
@@ -230,7 +252,7 @@ func (ex *Exchange[H]) GetRangeByHeight(
230252
to uint64,
231253
) ([]H, error) {
232254
session := newSession[H](
233-
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, withValidation(from),
255+
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, ex.metrics, withValidation(from),
234256
)
235257
defer session.close()
236258
// we request the next header height that we don't have: `fromHead`+1
@@ -307,7 +329,7 @@ func (ex *Exchange[H]) request(
307329
) ([]H, error) {
308330
log.Debugw("requesting peer", "peer", to)
309331
responses, size, duration, err := sendMessage(ctx, ex.host, to, ex.protocolID, req)
310-
ex.metrics.observeResponse(ctx, size, duration, err)
332+
ex.metrics.response(ctx, size, duration, err)
311333
if err != nil {
312334
log.Debugw("err sending request", "peer", to, "err", err)
313335
return nil, err

p2p/exchange_metrics.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package p2p
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync/atomic"
7+
"time"
8+
9+
"go.opentelemetry.io/otel"
10+
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/metric"
12+
)
13+
14+
var meter = otel.Meter("header/p2p")
15+
16+
const (
17+
failedKey = "failed"
18+
headerReceivedKey = "num_headers_received"
19+
headTypeKey = "request_type"
20+
headTypeTrusted = "trusted_request"
21+
headTypeUntrusted = "untrusted_request"
22+
headStatusKey = "status"
23+
headStatusOk = "ok"
24+
headStatusTimeout = "timeout"
25+
headStatusCanceled = "canceled"
26+
headStatusNoHeaders = "no_headers"
27+
)
28+
29+
type exchangeMetrics struct {
30+
headRequestTimeInst metric.Float64Histogram
31+
responseSizeInst metric.Int64Histogram
32+
responseTimeInst metric.Float64Histogram
33+
34+
trackerPeersNum atomic.Int64
35+
trackedPeersNumInst metric.Int64ObservableGauge
36+
trackedPeersNumReg metric.Registration
37+
38+
disconnectedPeersNum atomic.Int64
39+
disconnectedPeersNumInst metric.Int64ObservableGauge
40+
disconnectedPeersNumReg metric.Registration
41+
42+
blockedPeersNum atomic.Int64
43+
blockedPeersNumInst metric.Int64ObservableGauge
44+
blockedPeersNumReg metric.Registration
45+
}
46+
47+
func newExchangeMetrics() (m *exchangeMetrics, err error) {
48+
m = new(exchangeMetrics)
49+
m.headRequestTimeInst, err = meter.Float64Histogram(
50+
"hdr_p2p_exch_clnt_head_time_hist",
51+
metric.WithDescription("exchange client head request time in seconds"),
52+
)
53+
if err != nil {
54+
return nil, err
55+
}
56+
m.responseSizeInst, err = meter.Int64Histogram(
57+
"hdr_p2p_exch_clnt_resp_size_hist",
58+
metric.WithDescription("exchange client header response size in bytes"),
59+
)
60+
if err != nil {
61+
return nil, err
62+
}
63+
m.responseTimeInst, err = meter.Float64Histogram(
64+
"hdr_p2p_exch_clnt_resp_time_hist",
65+
metric.WithDescription("exchange client response time in seconds"),
66+
)
67+
if err != nil {
68+
return nil, err
69+
}
70+
m.trackedPeersNumInst, err = meter.Int64ObservableGauge(
71+
"hdr_p2p_exch_clnt_trck_peer_num_gauge",
72+
metric.WithDescription("exchange client tracked peers number"),
73+
)
74+
if err != nil {
75+
return nil, err
76+
}
77+
m.trackedPeersNumReg, err = meter.RegisterCallback(m.observeTrackedPeers, m.trackedPeersNumInst)
78+
if err != nil {
79+
return nil, err
80+
}
81+
m.disconnectedPeersNumInst, err = meter.Int64ObservableGauge(
82+
"hdr_p2p_exch_clnt_disconn_peer_num_gauge",
83+
metric.WithDescription("exchange client tracked disconnected peers number"),
84+
)
85+
if err != nil {
86+
return nil, err
87+
}
88+
m.disconnectedPeersNumReg, err = meter.RegisterCallback(m.observeDisconnectedPeers, m.disconnectedPeersNumInst)
89+
if err != nil {
90+
return nil, err
91+
}
92+
m.blockedPeersNumInst, err = meter.Int64ObservableGauge(
93+
"hdr_p2p_exch_clnt_block_peer_num_gauge",
94+
metric.WithDescription("exchange client blocked peers number"),
95+
)
96+
if err != nil {
97+
return nil, err
98+
}
99+
m.blockedPeersNumReg, err = meter.RegisterCallback(m.observeBlockedPeers, m.blockedPeersNumInst)
100+
if err != nil {
101+
return nil, err
102+
}
103+
return m, nil
104+
}
105+
106+
func (m *exchangeMetrics) head(ctx context.Context, duration time.Duration, headersReceived int, tp, status string) {
107+
m.observe(ctx, func(ctx context.Context) {
108+
m.headRequestTimeInst.Record(ctx,
109+
duration.Seconds(),
110+
metric.WithAttributes(
111+
attribute.Int(headerReceivedKey, headersReceived),
112+
attribute.String(headTypeKey, tp),
113+
attribute.String(headStatusKey, status),
114+
),
115+
)
116+
})
117+
}
118+
119+
func (m *exchangeMetrics) response(ctx context.Context, size uint64, duration time.Duration, err error) {
120+
m.observe(ctx, func(ctx context.Context) {
121+
m.responseSizeInst.Record(ctx,
122+
int64(size),
123+
metric.WithAttributes(attribute.Bool(failedKey, err != nil)),
124+
)
125+
m.responseTimeInst.Record(ctx,
126+
duration.Seconds(),
127+
metric.WithAttributes(attribute.Bool(failedKey, err != nil)),
128+
)
129+
})
130+
}
131+
132+
func (m *exchangeMetrics) peersTracked(num int) {
133+
m.observe(context.Background(), func(context.Context) {
134+
m.trackerPeersNum.Add(int64(num))
135+
})
136+
}
137+
138+
func (m *exchangeMetrics) peersDisconnected(num int) {
139+
m.observe(context.Background(), func(context.Context) {
140+
m.disconnectedPeersNum.Add(int64(num))
141+
})
142+
}
143+
144+
func (m *exchangeMetrics) peerBlocked() {
145+
m.observe(context.Background(), func(ctx context.Context) {
146+
m.blockedPeersNum.Add(1)
147+
})
148+
}
149+
150+
func (m *exchangeMetrics) observeTrackedPeers(_ context.Context, obs metric.Observer) error {
151+
obs.ObserveInt64(m.trackedPeersNumInst, m.trackerPeersNum.Load())
152+
return nil
153+
}
154+
155+
func (m *exchangeMetrics) observeDisconnectedPeers(_ context.Context, obs metric.Observer) error {
156+
obs.ObserveInt64(m.disconnectedPeersNumInst, m.disconnectedPeersNum.Load())
157+
return nil
158+
}
159+
160+
func (m *exchangeMetrics) observeBlockedPeers(_ context.Context, obs metric.Observer) error {
161+
obs.ObserveInt64(m.blockedPeersNumInst, m.blockedPeersNum.Load())
162+
return nil
163+
}
164+
165+
func (m *exchangeMetrics) observe(ctx context.Context, observeFn func(context.Context)) {
166+
if m == nil {
167+
return
168+
}
169+
if ctx.Err() != nil {
170+
ctx = context.Background()
171+
}
172+
173+
observeFn(ctx)
174+
}
175+
176+
func (m *exchangeMetrics) Close() (err error) {
177+
if m == nil {
178+
return nil
179+
}
180+
181+
err = errors.Join(err, m.trackedPeersNumReg.Unregister())
182+
err = errors.Join(err, m.disconnectedPeersNumReg.Unregister())
183+
return err
184+
}

p2p/helpers.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func sendMessage(
5151
to peer.ID,
5252
protocol protocol.ID,
5353
req *p2p_pb.HeaderRequest,
54-
) ([]*p2p_pb.HeaderResponse, uint64, uint64, error) {
54+
) ([]*p2p_pb.HeaderResponse, uint64, time.Duration, error) {
5555
startTime := time.Now()
5656
stream, err := host.NewStream(ctx, to, protocol)
5757
if err != nil {
@@ -94,8 +94,6 @@ func sendMessage(
9494
headers = append(headers, resp)
9595
}
9696

97-
duration := time.Since(startTime).Milliseconds()
98-
9997
// we allow the server side to explicitly close the connection
10098
// if it does not have the requested range.
10199
// In this case, server side will send us a response with ErrNotFound status code inside
@@ -114,7 +112,7 @@ func sendMessage(
114112
// reset stream in case of an error
115113
stream.Reset() //nolint:errcheck
116114
}
117-
return headers, totalRespLn, uint64(duration), err
115+
return headers, totalRespLn, time.Since(startTime), err
118116
}
119117

120118
// convertStatusCodeToError converts passed status code into an error.

p2p/metrics.go

Lines changed: 0 additions & 49 deletions
This file was deleted.

p2p/peer_stats.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ type peerStat struct {
2626
// by dividing the amount by time, so the result score will represent how many bytes
2727
// were retrieved in 1 millisecond. This value will then be averaged relative to the
2828
// previous peerScore.
29-
func (p *peerStat) updateStats(amount uint64, time uint64) {
29+
func (p *peerStat) updateStats(amount uint64, duration time.Duration) {
3030
p.Lock()
3131
defer p.Unlock()
3232
averageSpeed := float32(amount)
33-
if time != 0 {
34-
averageSpeed /= float32(time)
33+
if duration != 0 {
34+
averageSpeed /= float32(duration.Milliseconds())
3535
}
3636
if p.peerScore == 0.0 {
3737
p.peerScore = averageSpeed

0 commit comments

Comments
 (0)