Skip to content

Commit 94243b2

Browse files
authored
feat(sensor): track metrics direction label (#737)
* feat(sensor): track metrics direction label * feat: connected time metrics; join API endpoints * fix: docs * fix: invert IsZero
1 parent 99163c7 commit 94243b2

File tree

6 files changed

+150
-81
lines changed

6 files changed

+150
-81
lines changed

cmd/p2p/sensor/api.go

Lines changed: 67 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"net/http"
77
"slices"
8+
"time"
89

910
"github.com/0xPolygon/polygon-cli/p2p"
1011
"github.com/ethereum/go-ethereum/eth/protocols/eth"
@@ -14,17 +15,32 @@ import (
1415
"github.com/rs/zerolog/log"
1516
)
1617

17-
// nodeInfo represents information about the sensor node.
18-
type nodeInfo struct {
19-
ENR string `json:"enr"`
20-
URL string `json:"enode"`
18+
// peerData represents the metrics and connection information for a peer.
19+
// It includes both message counts (items sent/received) and packet counts
20+
// (number of p2p messages), along with connection timing information.
21+
type peerData struct {
22+
Received p2p.MessageCount `json:"received"`
23+
Sent p2p.MessageCount `json:"sent"`
24+
PacketsReceived p2p.MessageCount `json:"packets_received"`
25+
PacketsSent p2p.MessageCount `json:"packets_sent"`
26+
ConnectedAt string `json:"connected_at"`
27+
DurationSeconds float64 `json:"duration_seconds"`
2128
}
2229

23-
// handleAPI sets up the API for interacting with the sensor. The `/peers`
24-
// endpoint returns a list of all peers connected to the sensor, including the
25-
// types and counts of eth packets sent by each peer.
26-
func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) {
27-
http.HandleFunc("/peers", func(w http.ResponseWriter, r *http.Request) {
30+
// apiData represents all sensor information including node info and peer data.
31+
type apiData struct {
32+
ENR string `json:"enr"`
33+
URL string `json:"enode"`
34+
PeerCount int `json:"peer_count"`
35+
Peers map[string]peerData `json:"peers"`
36+
}
37+
38+
// handleAPI sets up the API for interacting with the sensor. All endpoints
39+
// return information about the sensor node and all connected peers, including
40+
// the types and counts of eth packets sent and received by each peer.
41+
func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec, conns *p2p.Conns) {
42+
mux := http.NewServeMux()
43+
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
2844
if r.Method != http.MethodGet {
2945
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
3046
return
@@ -33,62 +49,74 @@ func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) {
3349
w.Header().Set("Content-Type", "application/json")
3450
w.WriteHeader(http.StatusOK)
3551

36-
peers := make(map[string]p2p.MessageCount)
52+
peers := make(map[string]peerData)
3753
for _, peer := range server.Peers() {
3854
url := peer.Node().URLv4()
39-
peers[url] = getPeerMessages(url, peer.Fullname(), counter)
40-
}
55+
peerID := peer.Node().ID().String()
56+
name := peer.Fullname()
57+
connectedAt := conns.GetPeerConnectedAt(peerID)
58+
if connectedAt.IsZero() {
59+
continue
60+
}
4161

42-
if err := json.NewEncoder(w).Encode(peers); err != nil {
43-
log.Error().Err(err).Msg("Failed to encode peers")
44-
}
45-
})
62+
msgs := peerData{
63+
Received: getPeerMessages(counter, url, name, p2p.MsgReceived, false),
64+
Sent: getPeerMessages(counter, url, name, p2p.MsgSent, false),
65+
PacketsReceived: getPeerMessages(counter, url, name, p2p.MsgReceived, true),
66+
PacketsSent: getPeerMessages(counter, url, name, p2p.MsgSent, true),
67+
ConnectedAt: connectedAt.UTC().Format(time.RFC3339),
68+
DurationSeconds: time.Since(connectedAt).Seconds(),
69+
}
4670

47-
http.HandleFunc("/info", func(w http.ResponseWriter, r *http.Request) {
48-
if r.Method != http.MethodGet {
49-
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
50-
return
71+
peers[url] = msgs
5172
}
5273

53-
info := nodeInfo{
54-
ENR: server.NodeInfo().ENR,
55-
URL: server.Self().URLv4(),
74+
data := apiData{
75+
ENR: server.NodeInfo().ENR,
76+
URL: server.Self().URLv4(),
77+
PeerCount: len(peers),
78+
Peers: peers,
5679
}
5780

58-
if err := json.NewEncoder(w).Encode(info); err != nil {
59-
log.Error().Err(err).Msg("Failed to encode node info")
81+
if err := json.NewEncoder(w).Encode(data); err != nil {
82+
log.Error().Err(err).Msg("Failed to encode sensor data")
6083
}
6184
})
6285

6386
addr := fmt.Sprintf(":%d", inputSensorParams.APIPort)
64-
if err := http.ListenAndServe(addr, nil); err != nil {
87+
if err := http.ListenAndServe(addr, mux); err != nil {
6588
log.Error().Err(err).Msg("Failed to start API handler")
6689
}
6790
}
6891

6992
// getPeerMessages retrieves the count of various types of eth packets sent by a
7093
// peer.
71-
func getPeerMessages(url, name string, counter *prometheus.CounterVec) p2p.MessageCount {
94+
func getPeerMessages(counter *prometheus.CounterVec, url, name string, direction p2p.Direction, isPacket bool) p2p.MessageCount {
7295
return p2p.MessageCount{
73-
BlockHeaders: getCounterValue(new(eth.BlockHeadersPacket), url, name, counter),
74-
BlockBodies: getCounterValue(new(eth.BlockBodiesPacket), url, name, counter),
75-
Blocks: getCounterValue(new(eth.NewBlockPacket), url, name, counter),
76-
BlockHashes: getCounterValue(new(eth.NewBlockHashesPacket), url, name, counter),
77-
BlockHeaderRequests: getCounterValue(new(eth.GetBlockHeadersPacket), url, name, counter),
78-
BlockBodiesRequests: getCounterValue(new(eth.GetBlockBodiesPacket), url, name, counter),
79-
Transactions: getCounterValue(new(eth.TransactionsPacket), url, name, counter) +
80-
getCounterValue(new(eth.PooledTransactionsPacket), url, name, counter),
81-
TransactionHashes: getCounterValue(new(eth.NewPooledTransactionHashesPacket), url, name, counter),
82-
TransactionRequests: getCounterValue(new(eth.GetPooledTransactionsRequest), url, name, counter),
96+
BlockHeaders: getCounterValue(new(eth.BlockHeadersPacket), counter, url, name, direction, isPacket),
97+
BlockBodies: getCounterValue(new(eth.BlockBodiesPacket), counter, url, name, direction, isPacket),
98+
Blocks: getCounterValue(new(eth.NewBlockPacket), counter, url, name, direction, isPacket),
99+
BlockHashes: getCounterValue(new(eth.NewBlockHashesPacket), counter, url, name, direction, isPacket),
100+
BlockHeaderRequests: getCounterValue(new(eth.GetBlockHeadersPacket), counter, url, name, direction, isPacket),
101+
BlockBodiesRequests: getCounterValue(new(eth.GetBlockBodiesPacket), counter, url, name, direction, isPacket),
102+
Transactions: getCounterValue(new(eth.TransactionsPacket), counter, url, name, direction, isPacket) +
103+
getCounterValue(new(eth.PooledTransactionsPacket), counter, url, name, direction, isPacket),
104+
TransactionHashes: getCounterValue(new(eth.NewPooledTransactionHashesPacket), counter, url, name, direction, isPacket),
105+
TransactionRequests: getCounterValue(new(eth.GetPooledTransactionsRequest), counter, url, name, direction, isPacket),
83106
}
84107
}
85108

86109
// getCounterValue retrieves the count of packets for a specific type from the
87110
// Prometheus counter.
88-
func getCounterValue(packet eth.Packet, url, name string, counter *prometheus.CounterVec) int64 {
111+
func getCounterValue(packet eth.Packet, counter *prometheus.CounterVec, url, name string, direction p2p.Direction, isPacket bool) int64 {
89112
metric := &dto.Metric{}
90113

91-
err := counter.WithLabelValues(packet.Name(), url, name).Write(metric)
114+
messageName := packet.Name()
115+
if isPacket {
116+
messageName += p2p.PacketSuffix
117+
}
118+
119+
err := counter.WithLabelValues(messageName, url, name, string(direction)).Write(metric)
92120
if err != nil {
93121
log.Error().Err(err).Send()
94122
return 0

cmd/p2p/sensor/rpc.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ func handleRPC(conns *p2p.Conns, networkID uint64) {
4545
// Use network ID as chain ID for signature validation
4646
chainID := new(big.Int).SetUint64(networkID)
4747

48-
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
48+
mux := http.NewServeMux()
49+
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
4950
if r.Method != http.MethodPost {
5051
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
5152
return
@@ -85,7 +86,7 @@ func handleRPC(conns *p2p.Conns, networkID uint64) {
8586

8687
addr := fmt.Sprintf(":%d", inputSensorParams.RPCPort)
8788
log.Info().Str("addr", addr).Msg("Starting JSON-RPC server")
88-
if err := http.ListenAndServe(addr, nil); err != nil {
89+
if err := http.ListenAndServe(addr, mux); err != nil {
8990
log.Error().Err(err).Msg("Failed to start RPC server")
9091
}
9192
}

cmd/p2p/sensor/sensor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ var SensorCmd = &cobra.Command{
186186
msgCounter := promauto.NewCounterVec(prometheus.CounterOpts{
187187
Namespace: "sensor",
188188
Name: "messages",
189-
Help: "The number and type of messages the sensor has received",
190-
}, []string{"message", "url", "name"})
189+
Help: "The number and type of messages the sensor has sent and received",
190+
}, []string{"message", "url", "name", "direction"})
191191

192192
// Create peer connection manager for broadcasting transactions
193193
conns := p2p.NewConns()
@@ -258,7 +258,7 @@ var SensorCmd = &cobra.Command{
258258
go handlePrometheus()
259259
}
260260

261-
go handleAPI(&server, msgCounter)
261+
go handleAPI(&server, msgCounter, conns)
262262

263263
// Start the RPC server for receiving transactions
264264
go handleRPC(conns, inputSensorParams.NetworkID)

p2p/conns.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package p2p
22

33
import (
44
"sync"
5+
"time"
56

67
"github.com/ethereum/go-ethereum/core/types"
78
"github.com/ethereum/go-ethereum/eth/protocols/eth"
@@ -77,3 +78,16 @@ func (c *Conns) Nodes() []*enode.Node {
7778

7879
return nodes
7980
}
81+
82+
// GetPeerConnectedAt returns the connection time for a peer by their ID.
83+
// Returns zero time if the peer is not found.
84+
func (c *Conns) GetPeerConnectedAt(peerID string) time.Time {
85+
c.mu.RLock()
86+
defer c.mu.RUnlock()
87+
88+
if cn, ok := c.conns[peerID]; ok {
89+
return cn.connectedAt
90+
}
91+
92+
return time.Time{}
93+
}

p2p/log.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,19 @@ import (
44
"sync/atomic"
55
)
66

7+
// Direction represents the direction of a message (sent or received).
8+
type Direction string
9+
10+
const (
11+
// MsgReceived represents messages received from peers.
12+
MsgReceived Direction = "received"
13+
// MsgSent represents messages sent to peers.
14+
MsgSent Direction = "sent"
15+
16+
// PacketSuffix is appended to message names to create packet count metrics.
17+
PacketSuffix = "Packet"
18+
)
19+
720
// MessageCount is used to help the outer goroutine to receive summary of the
821
// number and type of messages that were sent. This is used for distributed
922
// logging. It can be used to count the different types of messages received

0 commit comments

Comments
 (0)