Skip to content

Commit cc977a7

Browse files
minhd-vuleovct
andauthored
feat!: sensor rpc functionality for broadcasting txs (#708)
* feat: add `--rpc-port` flag * feat: sensor rpc endpoint to broadcast txs * chore: clean up * fix: stuff * fix: more * fix: more * fix: remove processing tx duplication * docs: validate tx godoc * fix: err shadow * fix: use eth.TransactionsMsg * docs: make gen-doc * feat: --no-discovery flag * fix: log when broadcasting txs * fix: nits * fix: body defer * docs: --no-discovery * fix: api /peers panic * chore: refactor db creation * feat!: remove --quick-start and use --static-nodes flag * fix: remove leftover nodes logic * fix: remove duplicate var * fix: flags * fix: more * docs: make gen-doc --------- Co-authored-by: Léo Vincent <[email protected]>
1 parent fa6387b commit cc977a7

File tree

12 files changed

+653
-259
lines changed

12 files changed

+653
-259
lines changed

cmd/p2p/crawl/crawl.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,15 @@ var CrawlCmd = &cobra.Command{
114114
}
115115

116116
func init() {
117-
CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.Bootnodes, "bootnodes", "b", "",
117+
CrawlCmd.Flags().StringVarP(&inputCrawlParams.Bootnodes, "bootnodes", "b", "",
118118
`Comma separated nodes used for bootstrapping. At least one bootnode is
119119
required, so other nodes in the network can discover each other.`)
120-
CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.DiscoveryDNS, "discovery-dns", "", "", `Enable EIP-1459, DNS Discovery to recover node list from given ENRTree`)
120+
CrawlCmd.Flags().StringVarP(&inputCrawlParams.DiscoveryDNS, "discovery-dns", "", "", `Enable EIP-1459, DNS Discovery to recover node list from given ENRTree`)
121121
CrawlCmd.MarkFlagsMutuallyExclusive("bootnodes", "discovery-dns")
122-
CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.Timeout, "timeout", "t", "30m0s", "Time limit for the crawl")
123-
CrawlCmd.PersistentFlags().IntVarP(&inputCrawlParams.Threads, "parallel", "p", 16, "How many parallel discoveries to attempt")
124-
CrawlCmd.PersistentFlags().Uint64VarP(&inputCrawlParams.NetworkID, "network-id", "n", 0, "Filter discovered nodes by this network id")
125-
CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.Database, "database", "d", "", "Node database for updating and storing client information")
126-
CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.RevalidationInterval, "revalidation-interval", "r", "10m", "Time before retrying to connect to a failed peer")
127-
CrawlCmd.PersistentFlags().BoolVarP(&inputCrawlParams.OnlyURLs, "only-urls", "u", true, "Only writes the enode URLs to the output")
122+
CrawlCmd.Flags().StringVarP(&inputCrawlParams.Timeout, "timeout", "t", "30m0s", "Time limit for the crawl")
123+
CrawlCmd.Flags().IntVarP(&inputCrawlParams.Threads, "parallel", "p", 16, "How many parallel discoveries to attempt")
124+
CrawlCmd.Flags().Uint64VarP(&inputCrawlParams.NetworkID, "network-id", "n", 0, "Filter discovered nodes by this network id")
125+
CrawlCmd.Flags().StringVarP(&inputCrawlParams.Database, "database", "d", "", "Node database for updating and storing client information")
126+
CrawlCmd.Flags().StringVarP(&inputCrawlParams.RevalidationInterval, "revalidation-interval", "r", "10m", "Time before retrying to connect to a failed peer")
127+
CrawlCmd.Flags().BoolVarP(&inputCrawlParams.OnlyURLs, "only-urls", "u", true, "Only writes the enode URLs to the output")
128128
}

cmd/p2p/nodelist/nodelist.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import (
55
"os"
66

77
"github.com/0xPolygon/polygon-cli/p2p/database"
8+
"github.com/rs/zerolog/log"
89
"github.com/spf13/cobra"
910
)
1011

11-
const jsonIndent = " "
12-
1312
type (
1413
nodeListParams struct {
1514
ProjectID string
15+
DatabaseID string
1616
OutputFile string
1717
Limit int
1818
}
@@ -26,24 +26,24 @@ var NodeListCmd = &cobra.Command{
2626
Use: "nodelist [nodes.json]",
2727
Short: "Generate a node list to seed a node",
2828
Args: cobra.MinimumNArgs(1),
29-
PreRunE: func(cmd *cobra.Command, args []string) (err error) {
29+
PreRunE: func(cmd *cobra.Command, args []string) error {
3030
inputNodeListParams.OutputFile = args[0]
31-
inputNodeListParams.ProjectID, err = cmd.Flags().GetString("project-id")
32-
return err
31+
return nil
3332
},
3433
RunE: func(cmd *cobra.Command, args []string) error {
3534
ctx := cmd.Context()
3635

3736
db := database.NewDatastore(cmd.Context(), database.DatastoreOptions{
38-
ProjectID: inputNodeListParams.ProjectID,
37+
ProjectID: inputNodeListParams.ProjectID,
38+
DatabaseID: inputNodeListParams.DatabaseID,
3939
})
4040

4141
nodes, err := db.NodeList(ctx, inputNodeListParams.Limit)
4242
if err != nil {
4343
return err
4444
}
4545

46-
bytes, err := json.MarshalIndent(nodes, "", jsonIndent)
46+
bytes, err := json.MarshalIndent(nodes, "", " ")
4747
if err != nil {
4848
return err
4949
}
@@ -57,6 +57,10 @@ var NodeListCmd = &cobra.Command{
5757
}
5858

5959
func init() {
60-
NodeListCmd.PersistentFlags().IntVarP(&inputNodeListParams.Limit, "limit", "l", 100, "Number of unique nodes to return")
61-
NodeListCmd.PersistentFlags().StringVarP(&inputNodeListParams.ProjectID, "project-id", "p", "", "GCP project ID")
60+
NodeListCmd.Flags().IntVarP(&inputNodeListParams.Limit, "limit", "l", 100, "Number of unique nodes to return")
61+
NodeListCmd.Flags().StringVarP(&inputNodeListParams.ProjectID, "project-id", "p", "", "GCP project ID")
62+
NodeListCmd.Flags().StringVarP(&inputNodeListParams.DatabaseID, "database-id", "d", "", "Datastore database ID")
63+
if err := NodeListCmd.MarkFlagRequired("project-id"); err != nil {
64+
log.Error().Err(err).Msg("Failed to mark project-id as required flag")
65+
}
6266
}

cmd/p2p/ping/ping.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ can see other messages the peer sends (e.g. blocks, transactions, etc.).`,
127127
}
128128

129129
func init() {
130-
PingCmd.PersistentFlags().StringVarP(&inputPingParams.OutputFile, "output", "o", "", "Write ping results to output file (default stdout)")
131-
PingCmd.PersistentFlags().IntVarP(&inputPingParams.Threads, "parallel", "p", 16, "How many parallel pings to attempt")
132-
PingCmd.PersistentFlags().BoolVarP(&inputPingParams.Listen, "listen", "l", true,
130+
PingCmd.Flags().StringVarP(&inputPingParams.OutputFile, "output", "o", "", "Write ping results to output file (default stdout)")
131+
PingCmd.Flags().IntVarP(&inputPingParams.Threads, "parallel", "p", 16, "How many parallel pings to attempt")
132+
PingCmd.Flags().BoolVarP(&inputPingParams.Listen, "listen", "l", true,
133133
`Keep the connection open and listen to the peer. This only works if the first
134134
argument is an enode/enr, not a nodes file.`)
135135
}

cmd/p2p/query/query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,6 @@ func init() {
107107
QueryCmd.Flags().Uint64VarP(&inputQueryParams.StartBlock, "start-block", "s", 0, "Block number to start querying from")
108108
QueryCmd.Flags().Uint64VarP(&inputQueryParams.Amount, "amount", "a", 1, "Amount of blocks to query")
109109
if err := QueryCmd.MarkFlagRequired("start-block"); err != nil {
110-
log.Error().Err(err).Msg("Failed to mark start-block as required persistent flag")
110+
log.Error().Err(err).Msg("Failed to mark start-block as required flag")
111111
}
112112
}

cmd/p2p/sensor/api.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package sensor
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"net/http"
7+
"slices"
8+
9+
"github.com/0xPolygon/polygon-cli/p2p"
10+
"github.com/ethereum/go-ethereum/eth/protocols/eth"
11+
ethp2p "github.com/ethereum/go-ethereum/p2p"
12+
"github.com/prometheus/client_golang/prometheus"
13+
dto "github.com/prometheus/client_model/go"
14+
"github.com/rs/zerolog/log"
15+
)
16+
17+
// nodeInfo represents information about the sensor node.
18+
type nodeInfo struct {
19+
ENR string `json:"enr"`
20+
URL string `json:"enode"`
21+
}
22+
23+
// handleAPI sets up the API for interacting with the sensor. The `/peers`
24+
// endpoint returns a list of all peers connected to the sensor, including the
25+
// types and counts of eth packets sent by each peer.
26+
func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) {
27+
http.HandleFunc("/peers", func(w http.ResponseWriter, r *http.Request) {
28+
if r.Method != http.MethodGet {
29+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
30+
return
31+
}
32+
33+
w.Header().Set("Content-Type", "application/json")
34+
w.WriteHeader(http.StatusOK)
35+
36+
peers := make(map[string]p2p.MessageCount)
37+
for _, peer := range server.Peers() {
38+
url := peer.Node().URLv4()
39+
peers[url] = getPeerMessages(url, peer.Fullname(), counter)
40+
}
41+
42+
if err := json.NewEncoder(w).Encode(peers); err != nil {
43+
log.Error().Err(err).Msg("Failed to encode peers")
44+
}
45+
})
46+
47+
http.HandleFunc("/info", func(w http.ResponseWriter, r *http.Request) {
48+
if r.Method != http.MethodGet {
49+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
50+
return
51+
}
52+
53+
info := nodeInfo{
54+
ENR: server.NodeInfo().ENR,
55+
URL: server.Self().URLv4(),
56+
}
57+
58+
if err := json.NewEncoder(w).Encode(info); err != nil {
59+
log.Error().Err(err).Msg("Failed to encode node info")
60+
}
61+
})
62+
63+
addr := fmt.Sprintf(":%d", inputSensorParams.APIPort)
64+
if err := http.ListenAndServe(addr, nil); err != nil {
65+
log.Error().Err(err).Msg("Failed to start API handler")
66+
}
67+
}
68+
69+
// getPeerMessages retrieves the count of various types of eth packets sent by a
70+
// peer.
71+
func getPeerMessages(url, name string, counter *prometheus.CounterVec) p2p.MessageCount {
72+
return p2p.MessageCount{
73+
BlockHeaders: getCounterValue(new(eth.BlockHeadersPacket), url, name, counter),
74+
BlockBodies: getCounterValue(new(eth.BlockBodiesPacket), url, name, counter),
75+
Blocks: getCounterValue(new(eth.NewBlockPacket), url, name, counter),
76+
BlockHashes: getCounterValue(new(eth.NewBlockHashesPacket), url, name, counter),
77+
BlockHeaderRequests: getCounterValue(new(eth.GetBlockHeadersPacket), url, name, counter),
78+
BlockBodiesRequests: getCounterValue(new(eth.GetBlockBodiesPacket), url, name, counter),
79+
Transactions: getCounterValue(new(eth.TransactionsPacket), url, name, counter) +
80+
getCounterValue(new(eth.PooledTransactionsPacket), url, name, counter),
81+
TransactionHashes: getCounterValue(new(eth.NewPooledTransactionHashesPacket), url, name, counter),
82+
TransactionRequests: getCounterValue(new(eth.GetPooledTransactionsRequest), url, name, counter),
83+
}
84+
}
85+
86+
// getCounterValue retrieves the count of packets for a specific type from the
87+
// Prometheus counter.
88+
func getCounterValue(packet eth.Packet, url, name string, counter *prometheus.CounterVec) int64 {
89+
metric := &dto.Metric{}
90+
91+
err := counter.WithLabelValues(packet.Name(), url, name).Write(metric)
92+
if err != nil {
93+
log.Error().Err(err).Send()
94+
return 0
95+
}
96+
97+
return int64(metric.GetCounter().GetValue())
98+
}
99+
100+
// removePeerMessages removes all the counters of peers that disconnected from
101+
// the sensor. This prevents the metrics list from infinitely growing.
102+
func removePeerMessages(counter *prometheus.CounterVec, urls []string) error {
103+
families, err := prometheus.DefaultGatherer.Gather()
104+
if err != nil {
105+
return err
106+
}
107+
108+
var family *dto.MetricFamily
109+
for _, f := range families {
110+
if f.GetName() == "sensor_messages" {
111+
family = f
112+
break
113+
}
114+
}
115+
116+
// During DNS-discovery or when the server is taking a while to discover
117+
// peers and has yet to receive a message, the sensor_messages prometheus
118+
// metric may not exist yet.
119+
if family == nil {
120+
log.Trace().Msg("Could not find sensor_messages metric family")
121+
return nil
122+
}
123+
124+
for _, metric := range family.GetMetric() {
125+
for _, label := range metric.GetLabel() {
126+
url := label.GetValue()
127+
if label.GetName() != "url" || slices.Contains(urls, url) {
128+
continue
129+
}
130+
131+
counter.DeletePartialMatch(prometheus.Labels{"url": url})
132+
}
133+
}
134+
135+
return nil
136+
}

0 commit comments

Comments
 (0)