Skip to content

Commit 7c8fb48

Browse files
authored
Merge pull request #43 from vulcanize/41-statediff-workerpool
statediff: Use a worker pool
2 parents 1dbb138 + 83c3583 commit 7c8fb48

File tree

8 files changed

+178
-36
lines changed

8 files changed

+178
-36
lines changed

cmd/geth/config.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,17 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
191191
} else {
192192
utils.Fatalf("Must specify client name for statediff DB output")
193193
}
194+
} else {
195+
if ctx.GlobalBool(utils.StateDiffWritingFlag.Name) {
196+
utils.Fatalf("Must pass DB parameters if enabling statediff write loop")
197+
}
198+
}
199+
params := statediff.ServiceParams{
200+
DBParams: dbParams,
201+
EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name),
202+
NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name),
194203
}
195-
utils.RegisterStateDiffService(stack, backend, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name))
204+
utils.RegisterStateDiffService(stack, backend, params)
196205
}
197206

198207
// Configure GraphQL if requested

cmd/geth/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ var (
162162
utils.StateDiffDBNodeIDFlag,
163163
utils.StateDiffDBClientNameFlag,
164164
utils.StateDiffWritingFlag,
165+
utils.StateDiffWorkersFlag,
165166
configFileFlag,
166167
}
167168

cmd/geth/usage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
242242
utils.StateDiffDBNodeIDFlag,
243243
utils.StateDiffDBClientNameFlag,
244244
utils.StateDiffWritingFlag,
245+
utils.StateDiffWorkersFlag,
245246
},
246247
},
247248
{

cmd/utils/flags.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,10 @@ var (
746746
Name: "statediff.writing",
747747
Usage: "Activates progressive writing of state diffs to database as new block are synced",
748748
}
749+
StateDiffWorkersFlag = cli.UintFlag{
750+
Name: "statediff.workers",
751+
Usage: "Number of concurrent workers to use during statediff processing (0 = 1)",
752+
}
749753
)
750754

751755
// MakeDataDir retrieves the currently requested data directory, terminating
@@ -1744,9 +1748,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C
17441748
}
17451749

17461750
// RegisterStateDiffService configures and registers a service to stream state diff data over RPC
1747-
// dbParams are: Postgres connection URI, Node ID, client name
1748-
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, dbParams *statediff.DBParams, startWriteLoop bool) {
1749-
if err := statediff.New(stack, ethServ, dbParams, startWriteLoop); err != nil {
1751+
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, params statediff.ServiceParams) {
1752+
if err := statediff.New(stack, ethServ, params); err != nil {
17501753
Fatalf("Failed to register the Statediff service: %v", err)
17511754
}
17521755
}

statediff/indexer/metrics.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@ import (
88
)
99

1010
const (
11-
indexerNamespace = "indexer"
11+
namespace = "statediff"
1212
)
1313

1414
// Build a fully qualified metric name
1515
func metricName(subsystem, name string) string {
1616
if name == "" {
1717
return ""
1818
}
19-
parts := []string{indexerNamespace, name}
19+
parts := []string{namespace, name}
2020
if subsystem != "" {
21-
parts = []string{indexerNamespace, subsystem, name}
21+
parts = []string{namespace, subsystem, name}
2222
}
2323
// Prometheus uses _ but geth metrics uses / and replaces
2424
return strings.Join(parts, "/")
@@ -57,7 +57,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
5757
tTxAndRecProcessing: metrics.NewTimer(),
5858
tStateStoreCodeProcessing: metrics.NewTimer(),
5959
}
60-
subsys := "" // todo
60+
subsys := "indexer"
6161
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
6262
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
6363
reg.Register(metricName(subsys, "receipts"), ctx.receipts)

statediff/metrics.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package statediff
2+
3+
import (
4+
"strings"
5+
6+
"github.com/ethereum/go-ethereum/metrics"
7+
)
8+
9+
const (
10+
namespace = "statediff"
11+
)
12+
13+
// Build a fully qualified metric name
14+
func metricName(subsystem, name string) string {
15+
if name == "" {
16+
return ""
17+
}
18+
parts := []string{namespace, name}
19+
if subsystem != "" {
20+
parts = []string{namespace, subsystem, name}
21+
}
22+
// Prometheus uses _ but geth metrics uses / and replaces
23+
return strings.Join(parts, "/")
24+
}
25+
26+
type statediffMetricsHandles struct {
27+
// Height of latest synced by core.BlockChain
28+
// FIXME
29+
lastSyncHeight metrics.Gauge
30+
// Height of the latest block received from chainEvent channel
31+
lastEventHeight metrics.Gauge
32+
// Height of latest state diff
33+
lastStatediffHeight metrics.Gauge
34+
// Current length of chainEvent channels
35+
serviceLoopChannelLen metrics.Gauge
36+
writeLoopChannelLen metrics.Gauge
37+
}
38+
39+
func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
40+
ctx := statediffMetricsHandles{
41+
lastSyncHeight: metrics.NewGauge(),
42+
lastEventHeight: metrics.NewGauge(),
43+
lastStatediffHeight: metrics.NewGauge(),
44+
serviceLoopChannelLen: metrics.NewGauge(),
45+
writeLoopChannelLen: metrics.NewGauge(),
46+
}
47+
subsys := "service"
48+
reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight)
49+
reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight)
50+
reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight)
51+
reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen)
52+
reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen)
53+
return ctx
54+
}

0 commit comments

Comments
 (0)