Skip to content

Commit 08275cf

Browse files
committed
chore(sensor): track active goroutines per peer
1 parent d4b7039 commit 08275cf

File tree

8 files changed

+117
-62
lines changed

8 files changed

+117
-62
lines changed

cmd/p2p/sensor/api.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ import (
1919
// It includes both message counts (items sent/received) and packet counts
2020
// (number of p2p messages), along with connection timing information.
2121
type peerData struct {
22-
Received p2p.MessageCount `json:"received"`
23-
Sent p2p.MessageCount `json:"sent"`
24-
PacketsReceived p2p.MessageCount `json:"packets_received"`
25-
PacketsSent p2p.MessageCount `json:"packets_sent"`
26-
ConnectedAt string `json:"connected_at"`
27-
DurationSeconds float64 `json:"duration_seconds"`
22+
Received p2p.MessageCount `json:"received"`
23+
Sent p2p.MessageCount `json:"sent"`
24+
PacketsReceived p2p.MessageCount `json:"packets_received"`
25+
PacketsSent p2p.MessageCount `json:"packets_sent"`
26+
ConnectedAt string `json:"connected_at"`
27+
DurationSeconds float64 `json:"duration_seconds"`
28+
ActiveGoroutines int64 `json:"active_goroutines"`
2829
}
2930

3031
// apiData represents all sensor information including node info and peer data.
@@ -60,12 +61,13 @@ func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec, conns *p2p
6061
}
6162

6263
msgs := peerData{
63-
Received: getPeerMessages(counter, url, name, p2p.MsgReceived, false),
64-
Sent: getPeerMessages(counter, url, name, p2p.MsgSent, false),
65-
PacketsReceived: getPeerMessages(counter, url, name, p2p.MsgReceived, true),
66-
PacketsSent: getPeerMessages(counter, url, name, p2p.MsgSent, true),
67-
ConnectedAt: connectedAt.UTC().Format(time.RFC3339),
68-
DurationSeconds: time.Since(connectedAt).Seconds(),
64+
Received: getPeerMessages(counter, url, name, p2p.MsgReceived, false),
65+
Sent: getPeerMessages(counter, url, name, p2p.MsgSent, false),
66+
PacketsReceived: getPeerMessages(counter, url, name, p2p.MsgReceived, true),
67+
PacketsSent: getPeerMessages(counter, url, name, p2p.MsgSent, true),
68+
ConnectedAt: connectedAt.UTC().Format(time.RFC3339),
69+
DurationSeconds: time.Since(connectedAt).Seconds(),
70+
ActiveGoroutines: conns.GetPeerActiveGoroutines(peerID),
6971
}
7072

7173
peers[url] = msgs

cmd/p2p/sensor/sensor.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,12 @@ var SensorCmd = &cobra.Command{
191191
Help: "The number and type of messages the sensor has sent and received",
192192
}, []string{"message", "url", "name", "direction"})
193193

194+
goroutinesGauge := promauto.NewGaugeVec(prometheus.GaugeOpts{
195+
Namespace: "sensor",
196+
Name: "peer_goroutines",
197+
Help: "The number of active goroutines per peer for database writes",
198+
}, []string{"url", "name"})
199+
194200
// Create peer connection manager for broadcasting transactions
195201
conns := p2p.NewConns()
196202

@@ -278,7 +284,13 @@ var SensorCmd = &cobra.Command{
278284

279285
urls := []string{}
280286
for _, peer := range server.Peers() {
281-
urls = append(urls, peer.Node().URLv4())
287+
url := peer.Node().URLv4()
288+
peerID := peer.Node().ID().String()
289+
name := peer.Fullname()
290+
urls = append(urls, url)
291+
292+
activeGoroutines := conns.GetPeerActiveGoroutines(peerID)
293+
goroutinesGauge.WithLabelValues(url, name).Set(float64(activeGoroutines))
282294
}
283295

284296
if err := removePeerMessages(msgCounter, urls); err != nil {

p2p/conns.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,16 @@ func (c *Conns) GetPeerConnectedAt(peerID string) time.Time {
9191

9292
return time.Time{}
9393
}
94+
95+
// GetPeerActiveGoroutines returns the number of active goroutines for a peer by their ID.
96+
// Returns 0 if the peer is not found.
97+
func (c *Conns) GetPeerActiveGoroutines(peerID string) int64 {
98+
c.mu.RLock()
99+
defer c.mu.RUnlock()
100+
101+
if cn, ok := c.conns[peerID]; ok {
102+
return cn.ActiveGoroutines()
103+
}
104+
105+
return 0
106+
}

p2p/database/database.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,36 @@ import (
1212
"github.com/ethereum/go-ethereum/p2p/enode"
1313
)
1414

15+
// GoroutineTracker is an interface for tracking goroutine spawns.
16+
type GoroutineTracker interface {
17+
// TrackGoroutine wraps a function to track its goroutine.
18+
TrackGoroutine(f func())
19+
}
20+
1521
// Database represents a database solution to write block and transaction data
1622
// to. To use another database solution, just implement these methods and
1723
// update the sensor to use the new connection.
1824
type Database interface {
1925
// WriteBlock will write the both the block and block event to the database
2026
// if ShouldWriteBlocks and ShouldWriteBlockEvents return true, respectively.
21-
WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int, time.Time)
27+
WriteBlock(context.Context, GoroutineTracker, *enode.Node, *types.Block, *big.Int, time.Time)
2228

2329
// WriteBlockHeaders will write the block headers if ShouldWriteBlocks
2430
// returns true.
25-
WriteBlockHeaders(context.Context, []*types.Header, time.Time)
31+
WriteBlockHeaders(context.Context, GoroutineTracker, []*types.Header, time.Time)
2632

2733
// WriteBlockHashes will write the block hashes if ShouldWriteBlockEvents
2834
// returns true.
29-
WriteBlockHashes(context.Context, *enode.Node, []common.Hash, time.Time)
35+
WriteBlockHashes(context.Context, GoroutineTracker, *enode.Node, []common.Hash, time.Time)
3036

3137
// WriteBlockBody will write the block bodies if ShouldWriteBlocks returns
3238
// true.
33-
WriteBlockBody(context.Context, *eth.BlockBody, common.Hash, time.Time)
39+
WriteBlockBody(context.Context, GoroutineTracker, *eth.BlockBody, common.Hash, time.Time)
3440

3541
// WriteTransactions will write the both the transaction and transaction
3642
// event to the database if ShouldWriteTransactions and
3743
// ShouldWriteTransactionEvents return true, respectively.
38-
WriteTransactions(context.Context, *enode.Node, []*types.Transaction, time.Time)
44+
WriteTransactions(context.Context, GoroutineTracker, *enode.Node, []*types.Transaction, time.Time)
3945

4046
// WritePeers will write the connected peers to the database.
4147
WritePeers(context.Context, []*p2p.Peer, time.Time)

p2p/database/datastore.go

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -153,43 +153,45 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
153153
}
154154

155155
// WriteBlock writes the block and the block event to datastore.
156-
func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) {
156+
func (d *Datastore) WriteBlock(ctx context.Context, tracker GoroutineTracker, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) {
157157
if d.client == nil {
158158
return
159159
}
160160

161161
if d.ShouldWriteBlockEvents() {
162-
d.jobs <- struct{}{}
163-
go func() {
162+
tracker.TrackGoroutine(func() {
163+
d.jobs <- struct{}{}
164164
d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs)
165165
<-d.jobs
166-
}()
166+
})
167167
}
168168

169169
if d.ShouldWriteBlocks() {
170-
d.jobs <- struct{}{}
171-
go func() {
170+
tracker.TrackGoroutine(func() {
171+
d.jobs <- struct{}{}
172172
d.writeBlock(ctx, block, td, tfs)
173173
<-d.jobs
174-
}()
174+
})
175175
}
176176
}
177177

178178
// WriteBlockHeaders will write the block headers to datastore. It will not
179179
// write block events because headers will only be sent to the sensor when
180180
// requested. The block events will be written when the hash is received
181181
// instead.
182-
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
182+
func (d *Datastore) WriteBlockHeaders(ctx context.Context, tracker GoroutineTracker, headers []*types.Header, tfs time.Time) {
183183
if d.client == nil || !d.ShouldWriteBlocks() {
184184
return
185185
}
186186

187187
for _, h := range headers {
188-
d.jobs <- struct{}{}
189-
go func(header *types.Header) {
190-
d.writeBlockHeader(ctx, header, tfs)
191-
<-d.jobs
192-
}(h)
188+
tracker.TrackGoroutine(func(header *types.Header) func() {
189+
return func() {
190+
d.jobs <- struct{}{}
191+
d.writeBlockHeader(ctx, header, tfs)
192+
<-d.jobs
193+
}
194+
}(h))
193195
}
194196
}
195197

@@ -198,43 +200,43 @@ func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Head
198200
// requested. The block events will be written when the hash is received
199201
// instead. It will write the uncles and transactions to datastore if they
200202
// don't already exist.
201-
func (d *Datastore) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) {
203+
func (d *Datastore) WriteBlockBody(ctx context.Context, tracker GoroutineTracker, body *eth.BlockBody, hash common.Hash, tfs time.Time) {
202204
if d.client == nil || !d.ShouldWriteBlocks() {
203205
return
204206
}
205207

206-
d.jobs <- struct{}{}
207-
go func() {
208+
tracker.TrackGoroutine(func() {
209+
d.jobs <- struct{}{}
208210
d.writeBlockBody(ctx, body, hash, tfs)
209211
<-d.jobs
210-
}()
212+
})
211213
}
212214

213215
// WriteBlockHashes will write the block events to datastore.
214-
func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []common.Hash, tfs time.Time) {
216+
func (d *Datastore) WriteBlockHashes(ctx context.Context, tracker GoroutineTracker, peer *enode.Node, hashes []common.Hash, tfs time.Time) {
215217
if d.client == nil || !d.ShouldWriteBlockEvents() || len(hashes) == 0 {
216218
return
217219
}
218220

219-
d.jobs <- struct{}{}
220-
go func() {
221+
tracker.TrackGoroutine(func() {
222+
d.jobs <- struct{}{}
221223
d.writeEvents(ctx, peer, BlockEventsKind, hashes, BlocksKind, tfs)
222224
<-d.jobs
223-
}()
225+
})
224226
}
225227

226228
// WriteTransactions will write the transactions and transaction events to datastore.
227-
func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction, tfs time.Time) {
229+
func (d *Datastore) WriteTransactions(ctx context.Context, tracker GoroutineTracker, peer *enode.Node, txs []*types.Transaction, tfs time.Time) {
228230
if d.client == nil {
229231
return
230232
}
231233

232234
if d.ShouldWriteTransactions() {
233-
d.jobs <- struct{}{}
234-
go func() {
235+
tracker.TrackGoroutine(func() {
236+
d.jobs <- struct{}{}
235237
d.writeTransactions(ctx, txs, tfs)
236238
<-d.jobs
237-
}()
239+
})
238240
}
239241

240242
if d.ShouldWriteTransactionEvents() {
@@ -243,11 +245,11 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs
243245
hashes = append(hashes, tx.Hash())
244246
}
245247

246-
d.jobs <- struct{}{}
247-
go func() {
248+
tracker.TrackGoroutine(func() {
249+
d.jobs <- struct{}{}
248250
d.writeEvents(ctx, peer, TransactionEventsKind, hashes, TransactionsKind, tfs)
249251
<-d.jobs
250-
}()
252+
})
251253
}
252254
}
253255

p2p/database/json.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func (j *JSONDatabase) Write(v any) {
129129
}
130130

131131
// WriteBlock writes the block and the block event as JSON.
132-
func (j *JSONDatabase) WriteBlock(_ context.Context, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) {
132+
func (j *JSONDatabase) WriteBlock(_ context.Context, _ GoroutineTracker, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) {
133133
j.writeBlockEvent(peer, block, tfs)
134134
j.writeBlock(block, td, tfs)
135135
}
@@ -179,7 +179,7 @@ func (j *JSONDatabase) writeBlock(block *types.Block, td *big.Int, tfs time.Time
179179
}
180180

181181
// WriteBlockHeaders writes the block headers as JSON.
182-
func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
182+
func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, _ GoroutineTracker, headers []*types.Header, tfs time.Time) {
183183
if !j.ShouldWriteBlocks() {
184184
return
185185
}
@@ -207,7 +207,7 @@ func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, headers []*types.H
207207
}
208208

209209
// WriteBlockHashes writes the block events as JSON.
210-
func (j *JSONDatabase) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []common.Hash, tfs time.Time) {
210+
func (j *JSONDatabase) WriteBlockHashes(ctx context.Context, _ GoroutineTracker, peer *enode.Node, hashes []common.Hash, tfs time.Time) {
211211
if !j.ShouldWriteBlockEvents() || len(hashes) == 0 || peer == nil {
212212
return
213213
}
@@ -226,7 +226,7 @@ func (j *JSONDatabase) WriteBlockHashes(ctx context.Context, peer *enode.Node, h
226226
}
227227

228228
// WriteBlockBody writes the block body as JSON.
229-
func (j *JSONDatabase) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) {
229+
func (j *JSONDatabase) WriteBlockBody(ctx context.Context, _ GoroutineTracker, body *eth.BlockBody, hash common.Hash, tfs time.Time) {
230230
if !j.ShouldWriteBlocks() {
231231
return
232232
}
@@ -244,7 +244,7 @@ func (j *JSONDatabase) WriteBlockBody(ctx context.Context, body *eth.BlockBody,
244244
}
245245

246246
// WriteTransactions writes the transactions and transaction events as JSON.
247-
func (j *JSONDatabase) WriteTransactions(_ context.Context, peer *enode.Node, txs []*types.Transaction, tfs time.Time) {
247+
func (j *JSONDatabase) WriteTransactions(_ context.Context, _ GoroutineTracker, peer *enode.Node, txs []*types.Transaction, tfs time.Time) {
248248
j.writeTxs(txs, tfs)
249249
j.writeTxEvents(peer, txs, tfs)
250250
}

p2p/database/nodb.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,23 @@ func NoDatabase() Database {
2222
}
2323

2424
// WriteBlock does nothing.
25-
func (n *nodb) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) {
25+
func (n *nodb) WriteBlock(ctx context.Context, _ GoroutineTracker, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) {
2626
}
2727

2828
// WriteBlockHeaders does nothing.
29-
func (n *nodb) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
29+
func (n *nodb) WriteBlockHeaders(ctx context.Context, _ GoroutineTracker, headers []*types.Header, tfs time.Time) {
3030
}
3131

3232
// WriteBlockHashes does nothing.
33-
func (n *nodb) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []common.Hash, tfs time.Time) {
33+
func (n *nodb) WriteBlockHashes(ctx context.Context, _ GoroutineTracker, peer *enode.Node, hashes []common.Hash, tfs time.Time) {
3434
}
3535

3636
// WriteBlockBody does nothing.
37-
func (n *nodb) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) {
37+
func (n *nodb) WriteBlockBody(ctx context.Context, _ GoroutineTracker, body *eth.BlockBody, hash common.Hash, tfs time.Time) {
3838
}
3939

4040
// WriteTransactions does nothing.
41-
func (n *nodb) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction, tfs time.Time) {
41+
func (n *nodb) WriteTransactions(ctx context.Context, _ GoroutineTracker, peer *enode.Node, txs []*types.Transaction, tfs time.Time) {
4242
}
4343

4444
// WritePeers does nothing.

0 commit comments

Comments
 (0)