Skip to content

Commit 2339b0a

Browse files
committed
fixes and cli integration for new options
1 parent b3add30 commit 2339b0a

22 files changed

+1009
-139
lines changed

cmd/geth/config.go

Lines changed: 75 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package main
1818

1919
import (
2020
"bufio"
21+
"context"
2122
"errors"
2223
"fmt"
2324
"math/big"
@@ -26,10 +27,7 @@ import (
2627
"time"
2728
"unicode"
2829

29-
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
30-
31-
"github.com/ethereum/go-ethereum/eth/downloader"
32-
"github.com/ethereum/go-ethereum/statediff"
30+
"github.com/naoina/toml"
3331
"gopkg.in/urfave/cli.v1"
3432

3533
"github.com/ethereum/go-ethereum/accounts/external"
@@ -38,13 +36,18 @@ import (
3836
"github.com/ethereum/go-ethereum/accounts/usbwallet"
3937
"github.com/ethereum/go-ethereum/cmd/utils"
4038
"github.com/ethereum/go-ethereum/eth/catalyst"
39+
"github.com/ethereum/go-ethereum/eth/downloader"
4140
"github.com/ethereum/go-ethereum/eth/ethconfig"
4241
"github.com/ethereum/go-ethereum/internal/ethapi"
4342
"github.com/ethereum/go-ethereum/log"
4443
"github.com/ethereum/go-ethereum/metrics"
4544
"github.com/ethereum/go-ethereum/node"
4645
"github.com/ethereum/go-ethereum/params"
47-
"github.com/naoina/toml"
46+
"github.com/ethereum/go-ethereum/statediff"
47+
dumpdb "github.com/ethereum/go-ethereum/statediff/indexer/database/dump"
48+
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
49+
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
50+
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
4851
)
4952

5053
var (
@@ -185,48 +188,82 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
185188
}
186189

187190
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
188-
var dbConfig *sql.Config
191+
var indexerConfig interfaces.Config
192+
var clientName, nodeID string
189193
if ctx.GlobalIsSet(utils.StateDiffWritingFlag.Name) {
190-
dbConfig = new(sql.Config)
191-
dbConfig.Hostname = ctx.GlobalString(utils.StateDiffDBHostFlag.Name)
192-
dbConfig.Port = ctx.GlobalInt(utils.StateDiffDBPortFlag.Name)
193-
dbConfig.DatabaseName = ctx.GlobalString(utils.StateDiffDBNameFlag.Name)
194-
dbConfig.Username = ctx.GlobalString(utils.StateDiffDBUserFlag.Name)
195-
dbConfig.Password = ctx.GlobalString(utils.StateDiffDBPasswordFlag.Name)
196-
194+
clientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name)
197195
if ctx.GlobalIsSet(utils.StateDiffDBNodeIDFlag.Name) {
198-
dbConfig.ID = ctx.GlobalString(utils.StateDiffDBNodeIDFlag.Name)
196+
nodeID = ctx.GlobalString(utils.StateDiffDBNodeIDFlag.Name)
199197
} else {
200198
utils.Fatalf("Must specify node ID for statediff DB output")
201199
}
202200

203-
if ctx.GlobalIsSet(utils.StateDiffDBClientNameFlag.Name) {
204-
dbConfig.ClientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name)
205-
} else {
206-
utils.Fatalf("Must specify client name for statediff DB output")
207-
}
208-
209-
if ctx.GlobalIsSet(utils.StateDiffDBMinConns.Name) {
210-
dbConfig.MinConns = ctx.GlobalInt(utils.StateDiffDBMinConns.Name)
211-
}
212-
if ctx.GlobalIsSet(utils.StateDiffDBMaxConns.Name) {
213-
dbConfig.MaxConns = ctx.GlobalInt(utils.StateDiffDBMaxConns.Name)
214-
}
215-
if ctx.GlobalIsSet(utils.StateDiffDBMaxIdleConns.Name) {
216-
dbConfig.MaxIdle = ctx.GlobalInt(utils.StateDiffDBMaxIdleConns.Name)
217-
}
218-
if ctx.GlobalIsSet(utils.StateDiffDBMaxConnLifetime.Name) {
219-
dbConfig.MaxConnLifetime = ctx.GlobalDuration(utils.StateDiffDBMaxConnLifetime.Name) * time.Second
220-
}
221-
if ctx.GlobalIsSet(utils.StateDiffDBMaxConnIdleTime.Name) {
222-
dbConfig.MaxConnIdleTime = ctx.GlobalDuration(utils.StateDiffDBMaxConnIdleTime.Name) * time.Second
201+
dbTypeStr := ctx.GlobalString(utils.StateDiffDBTypeFlag.Name)
202+
dbType, err := shared.ResolveDBType(dbTypeStr)
203+
if err != nil {
204+
utils.Fatalf("%v", err)
223205
}
224-
if ctx.GlobalIsSet(utils.StateDiffDBConnTimeout.Name) {
225-
dbConfig.ConnTimeout = ctx.GlobalDuration(utils.StateDiffDBConnTimeout.Name) * time.Second
206+
switch dbType {
207+
case shared.POSTGRES:
208+
driverTypeStr := ctx.GlobalString(utils.StateDiffDBDriverTypeFlag.Name)
209+
driverType, err := postgres.ResolveDriverType(driverTypeStr)
210+
if err != nil {
211+
utils.Fatalf("%v", err)
212+
}
213+
pgConfig := postgres.Config{
214+
Hostname: ctx.GlobalString(utils.StateDiffDBHostFlag.Name),
215+
Port: ctx.GlobalInt(utils.StateDiffDBPortFlag.Name),
216+
DatabaseName: ctx.GlobalString(utils.StateDiffDBNameFlag.Name),
217+
Username: ctx.GlobalString(utils.StateDiffDBUserFlag.Name),
218+
Password: ctx.GlobalString(utils.StateDiffDBPasswordFlag.Name),
219+
ID: nodeID,
220+
ClientName: clientName,
221+
Driver: driverType,
222+
}
223+
if ctx.GlobalIsSet(utils.StateDiffDBMinConns.Name) {
224+
pgConfig.MinConns = ctx.GlobalInt(utils.StateDiffDBMinConns.Name)
225+
}
226+
if ctx.GlobalIsSet(utils.StateDiffDBMaxConns.Name) {
227+
pgConfig.MaxConns = ctx.GlobalInt(utils.StateDiffDBMaxConns.Name)
228+
}
229+
if ctx.GlobalIsSet(utils.StateDiffDBMaxIdleConns.Name) {
230+
pgConfig.MaxIdle = ctx.GlobalInt(utils.StateDiffDBMaxIdleConns.Name)
231+
}
232+
if ctx.GlobalIsSet(utils.StateDiffDBMaxConnLifetime.Name) {
233+
pgConfig.MaxConnLifetime = ctx.GlobalDuration(utils.StateDiffDBMaxConnLifetime.Name) * time.Second
234+
}
235+
if ctx.GlobalIsSet(utils.StateDiffDBMaxConnIdleTime.Name) {
236+
pgConfig.MaxConnIdleTime = ctx.GlobalDuration(utils.StateDiffDBMaxConnIdleTime.Name) * time.Second
237+
}
238+
if ctx.GlobalIsSet(utils.StateDiffDBConnTimeout.Name) {
239+
pgConfig.ConnTimeout = ctx.GlobalDuration(utils.StateDiffDBConnTimeout.Name) * time.Second
240+
}
241+
indexerConfig = pgConfig
242+
case shared.DUMP:
243+
dumpTypeStr := ctx.GlobalString(utils.StateDiffDBDumpDst.Name)
244+
dumpType, err := dumpdb.ResolveDumpType(dumpTypeStr)
245+
if err != nil {
246+
utils.Fatalf("%v", err)
247+
}
248+
switch dumpType {
249+
case dumpdb.STDERR:
250+
indexerConfig = dumpdb.Config{Dump: os.Stdout}
251+
case dumpdb.STDOUT:
252+
indexerConfig = dumpdb.Config{Dump: os.Stderr}
253+
case dumpdb.DISCARD:
254+
indexerConfig = dumpdb.Config{Dump: dumpdb.NewDiscardWriterCloser()}
255+
default:
256+
utils.Fatalf("unrecognized dump destination: %s", dumpType)
257+
}
258+
default:
259+
utils.Fatalf("unrecognized database type: %s", dbType)
226260
}
227261
}
228-
p := statediff.ServiceParams{
229-
DBParams: dbConfig,
262+
p := statediff.Config{
263+
IndexerConfig: indexerConfig,
264+
ID: nodeID,
265+
ClientName: clientName,
266+
Context: context.Background(),
230267
EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name),
231268
NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name),
232269
}

cmd/geth/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ var (
149149
utils.GpoIgnoreGasPriceFlag,
150150
utils.MinerNotifyFullFlag,
151151
utils.StateDiffFlag,
152+
utils.StateDiffDBTypeFlag,
153+
utils.StateDiffDBDriverTypeFlag,
154+
utils.StateDiffDBDumpDst,
152155
utils.StateDiffDBNameFlag,
153156
utils.StateDiffDBPasswordFlag,
154157
utils.StateDiffDBUserFlag,

cmd/geth/usage.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,9 @@ var AppHelpFlagGroups = []flags.FlagGroup{
225225
Name: "STATE DIFF",
226226
Flags: []cli.Flag{
227227
utils.StateDiffFlag,
228+
utils.StateDiffDBTypeFlag,
229+
utils.StateDiffDBDriverTypeFlag,
230+
utils.StateDiffDBDumpDst,
228231
utils.StateDiffDBNameFlag,
229232
utils.StateDiffDBPasswordFlag,
230233
utils.StateDiffDBUserFlag,

cmd/utils/flags.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,21 @@ var (
786786
Name: "statediff",
787787
Usage: "Enables the processing of state diffs between each block",
788788
}
789+
StateDiffDBTypeFlag = cli.StringFlag{
790+
Name: "statediff.db.type",
791+
Usage: "Statediff database type",
792+
Value: "postgres",
793+
}
794+
StateDiffDBDriverTypeFlag = cli.StringFlag{
795+
Name: "statediff.db.driver",
796+
Usage: "Statediff database driver type",
797+
Value: "pgx",
798+
}
799+
StateDiffDBDumpDst = cli.StringFlag{
800+
Name: "statediff.dump.dst",
801+
Usage: "Statediff database dump destination (default is stdout)",
802+
Value: "stdout",
803+
}
789804
StateDiffDBHostFlag = cli.StringFlag{
790805
Name: "statediff.db.host",
791806
Usage: "Statediff database hostname/ip",
@@ -840,14 +855,16 @@ var (
840855
StateDiffDBClientNameFlag = cli.StringFlag{
841856
Name: "statediff.db.clientname",
842857
Usage: "Client name to use when writing state diffs to database",
858+
Value: "go-ethereum",
843859
}
844860
StateDiffWritingFlag = cli.BoolFlag{
845861
Name: "statediff.writing",
846862
Usage: "Activates progressive writing of state diffs to database as new block are synced",
847863
}
848864
StateDiffWorkersFlag = cli.UintFlag{
849865
Name: "statediff.workers",
850-
Usage: "Number of concurrent workers to use during statediff processing (0 = 1)",
866+
Usage: "Number of concurrent workers to use during statediff processing (default 1)",
867+
Value: 1,
851868
}
852869
)
853870

@@ -1804,7 +1821,7 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C
18041821
}
18051822

18061823
// RegisterStateDiffService configures and registers a service to stream state diff data over RPC
1807-
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params statediff.ServiceParams) {
1824+
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params statediff.Config) {
18081825
if err := statediff.New(stack, ethServ, cfg, params); err != nil {
18091826
Fatalf("Failed to register the Statediff service: %v", err)
18101827
}

statediff/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ This service introduces a CLI flag namespace `statediff`
7979
`--statediff` flag is used to turn on the service
8080
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
8181
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
82+
`--statediff.db.type` is the type of database we write out to (current options: postgres and dump)
83+
`--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard)
84+
`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx)
8285
`--statediff.db.host` is the hostname/ip to dial to connect to the database
8386
`--statediff.db.port` is the port to dial to connect to the database
8487
`--statediff.db.name` is the name of the database to connect to

statediff/indexer/database/dump/batch_tx.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,22 @@ import (
3030

3131
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
3232
type BatchTx struct {
33-
dump io.Writer
34-
quit chan struct{}
35-
iplds chan models.IPLDModel
36-
ipldCache models.IPLDBatch
33+
BlockNumber uint64
34+
dump io.Writer
35+
quit chan struct{}
36+
iplds chan models.IPLDModel
37+
ipldCache models.IPLDBatch
3738

38-
close func(blockTx *BatchTx, err error) error
39+
submit func(blockTx *BatchTx, err error) error
3940
}
4041

4142
// Submit satisfies indexer.AtomicTx
4243
func (tx *BatchTx) Submit(err error) error {
43-
return tx.close(tx, err)
44+
return tx.submit(tx, err)
4445
}
4546

4647
func (tx *BatchTx) flush() error {
47-
if _, err := fmt.Fprintf(tx.dump, "%+v", tx.ipldCache); err != nil {
48+
if _, err := fmt.Fprintf(tx.dump, "%+v\r\n", tx.ipldCache); err != nil {
4849
return err
4950
}
5051
tx.ipldCache = models.IPLDBatch{}

statediff/indexer/database/dump/config.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,63 @@
1717
package dump
1818

1919
import (
20+
"fmt"
2021
"io"
22+
"strings"
2123

2224
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
2325
)
2426

27+
// DumpType to explicitly type the dump destination
28+
type DumpType string
29+
30+
const (
31+
STDOUT = "Stdout"
32+
STDERR = "Stderr"
33+
DISCARD = "Discard"
34+
UNKNOWN = "Unknown"
35+
)
36+
37+
// ResolveDumpType resolves the dump type for the provided string
38+
func ResolveDumpType(str string) (DumpType, error) {
39+
switch strings.ToLower(str) {
40+
case "stdout", "out", "std out":
41+
return STDOUT, nil
42+
case "stderr", "err", "std err":
43+
return STDERR, nil
44+
case "discard", "void", "devnull", "dev null":
45+
return DISCARD, nil
46+
default:
47+
return UNKNOWN, fmt.Errorf("unrecognized dump type: %s", str)
48+
}
49+
}
50+
51+
// Config for data dump
2552
type Config struct {
2653
Dump io.WriteCloser
2754
}
2855

56+
// Type satisfies interfaces.Config
2957
func (c Config) Type() shared.DBType {
3058
return shared.DUMP
3159
}
60+
61+
// NewDiscardWriterCloser returns a discardWrapper wrapping io.Discard
62+
func NewDiscardWriterCloser() io.WriteCloser {
63+
return discardWrapper{blackhole: io.Discard}
64+
}
65+
66+
// discardWrapper wraps io.Discard with io.Closer
67+
type discardWrapper struct {
68+
blackhole io.Writer
69+
}
70+
71+
// Write satisfies io.Writer
72+
func (dw discardWrapper) Write(b []byte) (int, error) {
73+
return dw.blackhole.Write(b)
74+
}
75+
76+
// Close satisfies io.Closer
77+
func (dw discardWrapper) Close() error {
78+
return nil
79+
}

0 commit comments

Comments
 (0)