Skip to content

Commit 1dbb138

Browse files
authored
Merge pull request #39 from vulcanize/statediff-prometheus
Statediff Prometheus metrics refactor
2 parents 68aa1de + 7506968 commit 1dbb138

File tree

6 files changed

+169
-315
lines changed

6 files changed

+169
-315
lines changed

statediff/indexer/indexer.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
// You should have received a copy of the GNU Affero General Public License
1515
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

17+
// This package provides an interface for pushing and indexing IPLD objects into a Postgres database
18+
// Metrics for reporting processing and connection stats are defined in ./metrics.go
1719
package indexer
1820

1921
import (
@@ -26,6 +28,7 @@ import (
2628
"github.com/ethereum/go-ethereum/core/types"
2729
"github.com/ethereum/go-ethereum/crypto"
2830
"github.com/ethereum/go-ethereum/log"
31+
"github.com/ethereum/go-ethereum/metrics"
2932
"github.com/ethereum/go-ethereum/params"
3033
"github.com/ethereum/go-ethereum/rlp"
3134

@@ -36,16 +39,21 @@ import (
3639
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
3740
"github.com/ethereum/go-ethereum/statediff/indexer/models"
3841
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
39-
"github.com/ethereum/go-ethereum/statediff/indexer/prom"
4042
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
4143
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
4244
)
4345

46+
var (
47+
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
48+
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
49+
)
50+
4451
// Indexer interface to allow substitution of mocks for testing
4552
type Indexer interface {
4653
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error)
4754
PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error
4855
PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error
56+
ReportDBMetrics(delay time.Duration, quit <-chan bool)
4957
}
5058

5159
// StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects
@@ -70,6 +78,25 @@ type BlockTx struct {
7078
Close func() error
7179
}
7280

81+
// Reporting function to run as goroutine
82+
func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {
83+
if !metrics.Enabled {
84+
return
85+
}
86+
ticker := time.NewTicker(delay)
87+
go func() {
88+
for {
89+
select {
90+
case <-ticker.C:
91+
dbMetrics.Update(sdi.dbWriter.db.Stats())
92+
case <-quit:
93+
ticker.Stop()
94+
return
95+
}
96+
}
97+
}()
98+
}
99+
73100
// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
74101
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
75102
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
@@ -109,12 +136,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
109136
panic(p)
110137
} else {
111138
tDiff := time.Now().Sub(t)
112-
prom.SetTimeMetric("t_state_store_code_processing", tDiff)
139+
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
113140
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
114141
t = time.Now()
115142
err = tx.Commit()
116143
tDiff = time.Now().Sub(t)
117-
prom.SetTimeMetric("t_postgres_commit", tDiff)
144+
indexerMetrics.tPostgresCommit.Update(tDiff)
118145
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
119146
}
120147
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Now().Sub(start).String())
@@ -123,7 +150,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
123150
},
124151
}
125152
tDiff := time.Now().Sub(t)
126-
prom.SetTimeMetric("t_free_postgres", tDiff)
153+
indexerMetrics.tFreePostgres.Update(tDiff)
154+
127155
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
128156
t = time.Now()
129157

@@ -133,15 +161,15 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
133161
return nil, err
134162
}
135163
tDiff = time.Now().Sub(t)
136-
prom.SetTimeMetric("t_header_processing", tDiff)
164+
indexerMetrics.tHeaderProcessing.Update(tDiff)
137165
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
138166
t = time.Now()
139167
// Publish and index uncles
140168
if err := sdi.processUncles(tx, headerID, height, uncleNodes); err != nil {
141169
return nil, err
142170
}
143171
tDiff = time.Now().Sub(t)
144-
prom.SetTimeMetric("t_uncle_processing", tDiff)
172+
indexerMetrics.tUncleProcessing.Update(tDiff)
145173
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
146174
t = time.Now()
147175
// Publish and index receipts and txs
@@ -158,7 +186,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
158186
return nil, err
159187
}
160188
tDiff = time.Now().Sub(t)
161-
prom.SetTimeMetric("t_tx_receipt_processing", tDiff)
189+
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
162190
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
163191
t = time.Now()
164192

statediff/indexer/metrics.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package indexer
2+
3+
import (
4+
"database/sql"
5+
"strings"
6+
7+
"github.com/ethereum/go-ethereum/metrics"
8+
)
9+
10+
const (
11+
indexerNamespace = "indexer"
12+
)
13+
14+
// Build a fully qualified metric name
15+
func metricName(subsystem, name string) string {
16+
if name == "" {
17+
return ""
18+
}
19+
parts := []string{indexerNamespace, name}
20+
if subsystem != "" {
21+
parts = []string{indexerNamespace, subsystem, name}
22+
}
23+
// Prometheus uses _ but geth metrics uses / and replaces
24+
return strings.Join(parts, "/")
25+
}
26+
27+
type indexerMetricsHandles struct {
28+
// The total number of processed blocks
29+
blocks metrics.Counter
30+
// The total number of processed transactions
31+
transactions metrics.Counter
32+
// The total number of processed receipts
33+
receipts metrics.Counter
34+
// Time spent waiting for free postgres tx
35+
tFreePostgres metrics.Timer
36+
// Postgres transaction commit duration
37+
tPostgresCommit metrics.Timer
38+
// Header processing time
39+
tHeaderProcessing metrics.Timer
40+
// Uncle processing time
41+
tUncleProcessing metrics.Timer
42+
// Tx and receipt processing time
43+
tTxAndRecProcessing metrics.Timer
44+
// State, storage, and code combined processing time
45+
tStateStoreCodeProcessing metrics.Timer
46+
}
47+
48+
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
49+
ctx := indexerMetricsHandles{
50+
blocks: metrics.NewCounter(),
51+
transactions: metrics.NewCounter(),
52+
receipts: metrics.NewCounter(),
53+
tFreePostgres: metrics.NewTimer(),
54+
tPostgresCommit: metrics.NewTimer(),
55+
tHeaderProcessing: metrics.NewTimer(),
56+
tUncleProcessing: metrics.NewTimer(),
57+
tTxAndRecProcessing: metrics.NewTimer(),
58+
tStateStoreCodeProcessing: metrics.NewTimer(),
59+
}
60+
subsys := "" // todo
61+
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
62+
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
63+
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
64+
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
65+
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
66+
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
67+
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
68+
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
69+
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
70+
return ctx
71+
}
72+
73+
type dbMetricsHandles struct {
74+
// Maximum number of open connections to the database
75+
maxOpen metrics.Gauge
76+
// The number of established connections both in use and idle
77+
open metrics.Gauge
78+
// The number of connections currently in use
79+
inUse metrics.Gauge
80+
// The number of idle connections
81+
idle metrics.Gauge
82+
// The total number of connections waited for
83+
waitedFor metrics.Counter
84+
// The total time blocked waiting for a new connection
85+
blockedMilliseconds metrics.Counter
86+
// The total number of connections closed due to SetMaxIdleConns
87+
closedMaxIdle metrics.Counter
88+
// The total number of connections closed due to SetConnMaxLifetime
89+
closedMaxLifetime metrics.Counter
90+
}
91+
92+
func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
93+
ctx := dbMetricsHandles{
94+
maxOpen: metrics.NewGauge(),
95+
open: metrics.NewGauge(),
96+
inUse: metrics.NewGauge(),
97+
idle: metrics.NewGauge(),
98+
waitedFor: metrics.NewCounter(),
99+
blockedMilliseconds: metrics.NewCounter(),
100+
closedMaxIdle: metrics.NewCounter(),
101+
closedMaxLifetime: metrics.NewCounter(),
102+
}
103+
subsys := "connections"
104+
reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
105+
reg.Register(metricName(subsys, "open"), ctx.open)
106+
reg.Register(metricName(subsys, "in_use"), ctx.inUse)
107+
reg.Register(metricName(subsys, "idle"), ctx.idle)
108+
reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
109+
reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
110+
reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
111+
reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
112+
return ctx
113+
}
114+
115+
func (met *dbMetricsHandles) Update(stats sql.DBStats) {
116+
met.maxOpen.Update(int64(stats.MaxOpenConnections))
117+
met.open.Update(int64(stats.OpenConnections))
118+
met.inUse.Update(int64(stats.InUse))
119+
met.idle.Update(int64(stats.Idle))
120+
met.waitedFor.Inc(int64(stats.WaitCount))
121+
met.blockedMilliseconds.Inc(int64(stats.WaitDuration.Milliseconds()))
122+
met.closedMaxIdle.Inc(int64(stats.MaxIdleClosed))
123+
met.closedMaxLifetime.Inc(int64(stats.MaxLifetimeClosed))
124+
}

statediff/indexer/prom/db_stats_collector.go

Lines changed: 0 additions & 146 deletions
This file was deleted.

0 commit comments

Comments
 (0)