Skip to content

Commit da93bdc

Browse files
committed
add reporting loop for db metrics
1 parent 300dae6 commit da93bdc

File tree

3 files changed

+49
-24
lines changed

3 files changed

+49
-24
lines changed

statediff/indexer/indexer.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,17 @@ import (
4141
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
4242
)
4343

44-
var indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
44+
var (
45+
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
46+
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
47+
)
4548

4649
// Indexer interface to allow substitution of mocks for testing
4750
type Indexer interface {
4851
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error)
4952
PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error
5053
PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error
54+
ReportDBMetrics(delay time.Duration, quit <-chan bool)
5155
}
5256

5357
// StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects
@@ -72,6 +76,25 @@ type BlockTx struct {
7276
Close func() error
7377
}
7478

79+
// Reporting function to run as goroutine
80+
func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {
81+
if !metrics.Enabled {
82+
return
83+
}
84+
ticker := time.NewTicker(delay)
85+
go func() {
86+
for {
87+
select {
88+
case <-ticker.C:
89+
dbMetrics.Update(sdi.dbWriter.db.Stats())
90+
case <-quit:
91+
ticker.Stop()
92+
return
93+
}
94+
}
95+
}()
96+
}
97+
7598
// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
7699
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
77100
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {

statediff/indexer/metrics.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
7070
return ctx
7171
}
7272

73-
type dbMetrics struct {
73+
type dbMetricsHandles struct {
7474
// Maximum number of open connections to the database
7575
maxOpen metrics.Gauge
7676
// The number of established connections both in use and idle
@@ -82,43 +82,43 @@ type dbMetrics struct {
8282
// The total number of connections waited for
8383
waitedFor metrics.Counter
8484
// The total time blocked waiting for a new connection
85-
blockedSeconds metrics.Counter
85+
blockedMilliseconds metrics.Counter
8686
// The total number of connections closed due to SetMaxIdleConns
8787
closedMaxIdle metrics.Counter
8888
// The total number of connections closed due to SetConnMaxLifetime
8989
closedMaxLifetime metrics.Counter
9090
}
9191

92-
func RegisterDBMetrics(reg metrics.Registry) dbMetrics {
93-
ctx := dbMetrics{
94-
maxOpen: metrics.NewGauge(),
95-
open: metrics.NewGauge(),
96-
inUse: metrics.NewGauge(),
97-
idle: metrics.NewGauge(),
98-
waitedFor: metrics.NewCounter(),
99-
blockedSeconds: metrics.NewCounter(),
100-
closedMaxIdle: metrics.NewCounter(),
101-
closedMaxLifetime: metrics.NewCounter(),
92+
func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
93+
ctx := dbMetricsHandles{
94+
maxOpen: metrics.NewGauge(),
95+
open: metrics.NewGauge(),
96+
inUse: metrics.NewGauge(),
97+
idle: metrics.NewGauge(),
98+
waitedFor: metrics.NewCounter(),
99+
blockedMilliseconds: metrics.NewCounter(),
100+
closedMaxIdle: metrics.NewCounter(),
101+
closedMaxLifetime: metrics.NewCounter(),
102102
}
103103
subsys := "connections"
104-
reg.Register(metricName(subsys, "max_open_desc"), ctx.maxOpen)
105-
reg.Register(metricName(subsys, "open_desc"), ctx.open)
106-
reg.Register(metricName(subsys, "in_use_desc"), ctx.inUse)
107-
reg.Register(metricName(subsys, "idle_desc"), ctx.idle)
108-
reg.Register(metricName(subsys, "waited_for_desc"), ctx.waitedFor)
109-
reg.Register(metricName(subsys, "blocked_seconds_desc"), ctx.blockedSeconds)
110-
reg.Register(metricName(subsys, "closed_max_idle_desc"), ctx.closedMaxIdle)
111-
reg.Register(metricName(subsys, "closed_max_lifetime_desc"), ctx.closedMaxLifetime)
104+
reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
105+
reg.Register(metricName(subsys, "open"), ctx.open)
106+
reg.Register(metricName(subsys, "in_use"), ctx.inUse)
107+
reg.Register(metricName(subsys, "idle"), ctx.idle)
108+
reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
109+
reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
110+
reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
111+
reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
112112
return ctx
113113
}
114114

115-
func (met *dbMetrics) Update(stats sql.DBStats) {
115+
func (met *dbMetricsHandles) Update(stats sql.DBStats) {
116116
met.maxOpen.Update(int64(stats.MaxOpenConnections))
117117
met.open.Update(int64(stats.OpenConnections))
118118
met.inUse.Update(int64(stats.InUse))
119119
met.idle.Update(int64(stats.Idle))
120120
met.waitedFor.Inc(int64(stats.WaitCount))
121-
met.blockedSeconds.Inc(int64(stats.WaitDuration.Seconds()))
121+
met.blockedMilliseconds.Inc(int64(stats.WaitDuration.Milliseconds()))
122122
met.closedMaxIdle.Inc(int64(stats.MaxIdleClosed))
123123
met.closedMaxLifetime.Inc(int64(stats.MaxLifetimeClosed))
124124
}

statediff/service.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"strconv"
2323
"sync"
2424
"sync/atomic"
25+
"time"
2526

2627
"github.com/ethereum/go-ethereum/common"
2728
"github.com/ethereum/go-ethereum/core"
@@ -73,7 +74,7 @@ type IService interface {
7374
// Main event loop for processing state diffs
7475
Loop(chainEventCh chan core.ChainEvent)
7576
// Method to subscribe to receive state diff processing output
76-
Subscribe(id rpc.ID, sub chan<- Payload, quitChanogr chan<- bool, params Params)
77+
Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params)
7778
// Method to unsubscribe from state diff processing
7879
Unsubscribe(id rpc.ID) error
7980
// Method to get state diff object at specific block
@@ -414,6 +415,7 @@ func (sds *Service) Start() error {
414415
if sds.enableWriteLoop {
415416
log.Info("Starting statediff DB write loop", "params", writeLoopParams)
416417
go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize))
418+
go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan)
417419
}
418420

419421
return nil

0 commit comments

Comments
 (0)