Skip to content

Commit 06e9b7f

Browse files
committed
feat(sensor): track metrics direction label
1 parent c31222b commit 06e9b7f

File tree

4 files changed

+95
-55
lines changed

4 files changed

+95
-55
lines changed

cmd/p2p/sensor/api.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,17 @@ type nodeInfo struct {
2020
URL string `json:"enode"`
2121
}
2222

23+
// peerMessages represents the messages sent and received by a peer.
24+
type peerMessages struct {
25+
Received p2p.MessageCount `json:"received"`
26+
Sent p2p.MessageCount `json:"sent"`
27+
PacketsReceived p2p.MessageCount `json:"packets_received"`
28+
PacketsSent p2p.MessageCount `json:"packets_sent"`
29+
}
30+
2331
// handleAPI sets up the API for interacting with the sensor. The `/peers`
2432
// endpoint returns a list of all peers connected to the sensor, including the
25-
// types and counts of eth packets sent by each peer.
33+
// types and counts of eth packets sent and received by each peer.
2634
func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) {
2735
http.HandleFunc("/peers", func(w http.ResponseWriter, r *http.Request) {
2836
if r.Method != http.MethodGet {
@@ -33,10 +41,15 @@ func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) {
3341
w.Header().Set("Content-Type", "application/json")
3442
w.WriteHeader(http.StatusOK)
3543

36-
peers := make(map[string]p2p.MessageCount)
44+
peers := make(map[string]peerMessages)
3745
for _, peer := range server.Peers() {
3846
url := peer.Node().URLv4()
39-
peers[url] = getPeerMessages(url, peer.Fullname(), counter)
47+
peers[url] = peerMessages{
48+
Received: getPeerMessages(counter, url, peer.Fullname(), p2p.MsgReceived, false),
49+
Sent: getPeerMessages(counter, url, peer.Fullname(), p2p.MsgSent, false),
50+
PacketsReceived: getPeerMessages(counter, url, peer.Fullname(), p2p.MsgReceived, true),
51+
PacketsSent: getPeerMessages(counter, url, peer.Fullname(), p2p.MsgSent, true),
52+
}
4053
}
4154

4255
if err := json.NewEncoder(w).Encode(peers); err != nil {
@@ -68,27 +81,32 @@ func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) {
6881

6982
// getPeerMessages retrieves the count of various types of eth packets sent by a
7083
// peer.
71-
func getPeerMessages(url, name string, counter *prometheus.CounterVec) p2p.MessageCount {
84+
func getPeerMessages(counter *prometheus.CounterVec, url, name string, direction p2p.Direction, isPacket bool) p2p.MessageCount {
7285
return p2p.MessageCount{
73-
BlockHeaders: getCounterValue(new(eth.BlockHeadersPacket), url, name, counter),
74-
BlockBodies: getCounterValue(new(eth.BlockBodiesPacket), url, name, counter),
75-
Blocks: getCounterValue(new(eth.NewBlockPacket), url, name, counter),
76-
BlockHashes: getCounterValue(new(eth.NewBlockHashesPacket), url, name, counter),
77-
BlockHeaderRequests: getCounterValue(new(eth.GetBlockHeadersPacket), url, name, counter),
78-
BlockBodiesRequests: getCounterValue(new(eth.GetBlockBodiesPacket), url, name, counter),
79-
Transactions: getCounterValue(new(eth.TransactionsPacket), url, name, counter) +
80-
getCounterValue(new(eth.PooledTransactionsPacket), url, name, counter),
81-
TransactionHashes: getCounterValue(new(eth.NewPooledTransactionHashesPacket), url, name, counter),
82-
TransactionRequests: getCounterValue(new(eth.GetPooledTransactionsRequest), url, name, counter),
86+
BlockHeaders: getCounterValue(new(eth.BlockHeadersPacket), counter, url, name, direction, isPacket),
87+
BlockBodies: getCounterValue(new(eth.BlockBodiesPacket), counter, url, name, direction, isPacket),
88+
Blocks: getCounterValue(new(eth.NewBlockPacket), counter, url, name, direction, isPacket),
89+
BlockHashes: getCounterValue(new(eth.NewBlockHashesPacket), counter, url, name, direction, isPacket),
90+
BlockHeaderRequests: getCounterValue(new(eth.GetBlockHeadersPacket), counter, url, name, direction, isPacket),
91+
BlockBodiesRequests: getCounterValue(new(eth.GetBlockBodiesPacket), counter, url, name, direction, isPacket),
92+
Transactions: getCounterValue(new(eth.TransactionsPacket), counter, url, name, direction, isPacket) +
93+
getCounterValue(new(eth.PooledTransactionsPacket), counter, url, name, direction, isPacket),
94+
TransactionHashes: getCounterValue(new(eth.NewPooledTransactionHashesPacket), counter, url, name, direction, isPacket),
95+
TransactionRequests: getCounterValue(new(eth.GetPooledTransactionsRequest), counter, url, name, direction, isPacket),
8396
}
8497
}
8598

8699
// getCounterValue retrieves the count of packets for a specific type from the
87100
// Prometheus counter.
88-
func getCounterValue(packet eth.Packet, url, name string, counter *prometheus.CounterVec) int64 {
101+
func getCounterValue(packet eth.Packet, counter *prometheus.CounterVec, url, name string, direction p2p.Direction, isPacket bool) int64 {
89102
metric := &dto.Metric{}
90103

91-
err := counter.WithLabelValues(packet.Name(), url, name).Write(metric)
104+
messageName := packet.Name()
105+
if isPacket {
106+
messageName += p2p.PacketSuffix
107+
}
108+
109+
err := counter.WithLabelValues(messageName, url, name, string(direction)).Write(metric)
92110
if err != nil {
93111
log.Error().Err(err).Send()
94112
return 0

cmd/p2p/sensor/sensor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ var SensorCmd = &cobra.Command{
186186
msgCounter := promauto.NewCounterVec(prometheus.CounterOpts{
187187
Namespace: "sensor",
188188
Name: "messages",
189-
Help: "The number and type of messages the sensor has received",
190-
}, []string{"message", "url", "name"})
189+
Help: "The number and type of messages the sensor has sent and received",
190+
}, []string{"message", "url", "name", "direction"})
191191

192192
// Create peer connection manager for broadcasting transactions
193193
conns := p2p.NewConns()

p2p/log.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,19 @@ import (
44
"sync/atomic"
55
)
66

7+
// Direction represents the direction of a message (sent or received).
8+
type Direction string
9+
10+
const (
11+
// MsgReceived represents messages received from peers.
12+
MsgReceived Direction = "received"
13+
// MsgSent represents messages sent to peers.
14+
MsgSent Direction = "sent"
15+
16+
// PacketSuffix is appended to message names to create packet count metrics.
17+
PacketSuffix = "Packet"
18+
)
19+
720
// MessageCount is used to help the outer goroutine to receive summary of the
821
// number and type of messages that were sent. This is used for distributed
922
// logging. It can be used to count the different types of messages received

p2p/protocol.go

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ func (c *conn) statusExchange(packet *eth.StatusPacket) error {
183183
errc := make(chan error, 2)
184184

185185
go func() {
186+
c.countMsgSent((&eth.StatusPacket{}).Name(), 1)
186187
errc <- ethp2p.Send(c.rw, eth.StatusMsg, &packet)
187188
}()
188189

@@ -207,9 +208,21 @@ func (c *conn) statusExchange(packet *eth.StatusPacket) error {
207208
return nil
208209
}
209210

210-
// AddCount increments the prometheus counter for this connection with the given message name and count.
211-
func (c *conn) AddCount(messageName string, count float64) {
212-
c.counter.WithLabelValues(messageName, c.node.URLv4(), c.peer.Fullname()).Add(count)
211+
// countMsg increments the prometheus counter for this connection with the given direction, message name, and count.
212+
func (c *conn) countMsg(direction Direction, messageName string, count float64) {
213+
c.counter.WithLabelValues(messageName, c.node.URLv4(), c.peer.Fullname(), string(direction)).Add(count)
214+
}
215+
216+
// countMsgReceived increments the prometheus counter for received messages.
217+
func (c *conn) countMsgReceived(messageName string, count float64) {
218+
c.countMsg(MsgReceived, messageName, count)
219+
c.countMsg(MsgReceived, messageName+PacketSuffix, 1)
220+
}
221+
222+
// countMsgSent increments the prometheus counter for sent messages.
223+
func (c *conn) countMsgSent(messageName string, count float64) {
224+
c.countMsg(MsgSent, messageName, count)
225+
c.countMsg(MsgSent, messageName+PacketSuffix, 1)
213226
}
214227

215228
func (c *conn) readStatus(packet *eth.StatusPacket) error {
@@ -260,6 +273,7 @@ func (c *conn) getBlockData(hash common.Hash) error {
260273
},
261274
}
262275

276+
c.countMsgSent(headersRequest.Name(), 1)
263277
if err := ethp2p.Send(c.rw, eth.GetBlockHeadersMsg, headersRequest); err != nil {
264278
return err
265279
}
@@ -284,6 +298,7 @@ func (c *conn) getBlockData(hash common.Hash) error {
284298
GetBlockBodiesRequest: []common.Hash{hash},
285299
}
286300

301+
c.countMsgSent(bodiesRequest.Name(), 1)
287302
return ethp2p.Send(c.rw, eth.GetBlockBodiesMsg, bodiesRequest)
288303
}
289304

@@ -320,7 +335,7 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
320335

321336
tfs := time.Now()
322337

323-
c.AddCount(packet.Name(), float64(len(packet)))
338+
c.countMsgReceived(packet.Name(), float64(len(packet)))
324339

325340
// Collect unique hashes for database write.
326341
uniqueHashes := make([]common.Hash, 0, len(packet))
@@ -389,7 +404,7 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error {
389404

390405
tfs := time.Now()
391406

392-
c.AddCount(txs.Name(), float64(len(txs)))
407+
c.countMsgReceived(txs.Name(), float64(len(txs)))
393408

394409
c.db.WriteTransactions(ctx, c.node, txs, tfs)
395410

@@ -402,13 +417,11 @@ func (c *conn) handleGetBlockHeaders(msg ethp2p.Msg) error {
402417
return err
403418
}
404419

405-
c.AddCount(request.Name(), 1)
420+
c.countMsgReceived(request.Name(), 1)
406421

407-
return ethp2p.Send(
408-
c.rw,
409-
eth.BlockHeadersMsg,
410-
&eth.BlockHeadersPacket{RequestId: request.RequestId},
411-
)
422+
response := &eth.BlockHeadersPacket{RequestId: request.RequestId}
423+
c.countMsgSent(response.Name(), 0)
424+
return ethp2p.Send(c.rw, eth.BlockHeadersMsg, response)
412425
}
413426

414427
func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error {
@@ -420,7 +433,7 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error {
420433
tfs := time.Now()
421434

422435
headers := packet.BlockHeadersRequest
423-
c.AddCount(packet.Name(), float64(len(headers)))
436+
c.countMsgReceived(packet.Name(), float64(len(headers)))
424437

425438
for _, header := range headers {
426439
if err := c.getParentBlock(ctx, header); err != nil {
@@ -438,13 +451,11 @@ func (c *conn) handleGetBlockBodies(msg ethp2p.Msg) error {
438451
return err
439452
}
440453

441-
c.AddCount(request.Name(), float64(len(request.GetBlockBodiesRequest)))
454+
c.countMsgReceived(request.Name(), float64(len(request.GetBlockBodiesRequest)))
442455

443-
return ethp2p.Send(
444-
c.rw,
445-
eth.BlockBodiesMsg,
446-
&eth.BlockBodiesPacket{RequestId: request.RequestId},
447-
)
456+
response := &eth.BlockBodiesPacket{RequestId: request.RequestId}
457+
c.countMsgSent(response.Name(), 0)
458+
return ethp2p.Send(c.rw, eth.BlockBodiesMsg, response)
448459
}
449460

450461
func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
@@ -459,7 +470,7 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
459470
return nil
460471
}
461472

462-
c.AddCount(packet.Name(), float64(len(packet.BlockBodiesResponse)))
473+
c.countMsgReceived(packet.Name(), float64(len(packet.BlockBodiesResponse)))
463474

464475
var hash *common.Hash
465476
for e := c.requests.Front(); e != nil; e = e.Next() {
@@ -490,7 +501,7 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error {
490501

491502
tfs := time.Now()
492503

493-
c.AddCount(block.Name(), 1)
504+
c.countMsgReceived(block.Name(), 1)
494505

495506
// Set the head block if newer.
496507
c.headMutex.Lock()
@@ -520,12 +531,11 @@ func (c *conn) handleGetPooledTransactions(msg ethp2p.Msg) error {
520531
return err
521532
}
522533

523-
c.AddCount(request.Name(), float64(len(request.GetPooledTransactionsRequest)))
534+
c.countMsgReceived(request.Name(), float64(len(request.GetPooledTransactionsRequest)))
524535

525-
return ethp2p.Send(
526-
c.rw,
527-
eth.PooledTransactionsMsg,
528-
&eth.PooledTransactionsPacket{RequestId: request.RequestId})
536+
response := &eth.PooledTransactionsPacket{RequestId: request.RequestId}
537+
c.countMsgSent(response.Name(), 0)
538+
return ethp2p.Send(c.rw, eth.PooledTransactionsMsg, response)
529539
}
530540

531541
func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) error {
@@ -544,17 +554,15 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er
544554
return errors.New("protocol version not found")
545555
}
546556

547-
c.AddCount(name, float64(len(hashes)))
557+
c.countMsgReceived(name, float64(len(hashes)))
548558

549559
if !c.db.ShouldWriteTransactions() || !c.db.ShouldWriteTransactionEvents() {
550560
return nil
551561
}
552562

553-
return ethp2p.Send(
554-
c.rw,
555-
eth.GetPooledTransactionsMsg,
556-
&eth.GetPooledTransactionsPacket{GetPooledTransactionsRequest: hashes},
557-
)
563+
request := &eth.GetPooledTransactionsPacket{GetPooledTransactionsRequest: hashes}
564+
c.countMsgSent(request.Name(), float64(len(hashes)))
565+
return ethp2p.Send(c.rw, eth.GetPooledTransactionsMsg, request)
558566
}
559567

560568
func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) error {
@@ -565,7 +573,7 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err
565573

566574
tfs := time.Now()
567575

568-
c.AddCount(packet.Name(), float64(len(packet.PooledTransactionsResponse)))
576+
c.countMsgReceived(packet.Name(), float64(len(packet.PooledTransactionsResponse)))
569577

570578
c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, tfs)
571579

@@ -577,9 +585,10 @@ func (c *conn) handleGetReceipts(msg ethp2p.Msg) error {
577585
if err := msg.Decode(&request); err != nil {
578586
return err
579587
}
580-
return ethp2p.Send(
581-
c.rw,
582-
eth.ReceiptsMsg,
583-
&eth.ReceiptsPacket{RequestId: request.RequestId},
584-
)
588+
589+
c.countMsgReceived(request.Name(), float64(len(request.GetReceiptsRequest)))
590+
591+
response := &eth.ReceiptsPacket{RequestId: request.RequestId}
592+
c.countMsgSent(response.Name(), 0)
593+
return ethp2p.Send(c.rw, eth.ReceiptsMsg, response)
585594
}

0 commit comments

Comments
 (0)