Skip to content

Commit 2eebeb5

Browse files
authored
Merge pull request #152 from vulcanize/schema_updates
Updates
2 parents 9a67034 + bb788f7 commit 2eebeb5

32 files changed

+1371
-379
lines changed

cmd/geth/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"github.com/ethereum/go-ethereum/params"
4646
"github.com/ethereum/go-ethereum/statediff"
4747
dumpdb "github.com/ethereum/go-ethereum/statediff/indexer/database/dump"
48+
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
4849
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
4950
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
5051
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
@@ -204,6 +205,10 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
204205
utils.Fatalf("%v", err)
205206
}
206207
switch dbType {
208+
case shared.FILE:
209+
indexerConfig = file.Config{
210+
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
211+
}
207212
case shared.POSTGRES:
208213
driverTypeStr := ctx.GlobalString(utils.StateDiffDBDriverTypeFlag.Name)
209214
driverType, err := postgres.ResolveDriverType(driverTypeStr)

cmd/geth/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ var (
167167
utils.StateDiffDBClientNameFlag,
168168
utils.StateDiffWritingFlag,
169169
utils.StateDiffWorkersFlag,
170+
utils.StateDiffFilePath,
170171
configFileFlag,
171172
utils.CatalystFlag,
172173
}

cmd/geth/usage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
243243
utils.StateDiffDBClientNameFlag,
244244
utils.StateDiffWritingFlag,
245245
utils.StateDiffWorkersFlag,
246+
utils.StateDiffFilePath,
246247
},
247248
},
248249
{

cmd/utils/flags.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,7 @@ var (
788788
}
789789
StateDiffDBTypeFlag = cli.StringFlag{
790790
Name: "statediff.db.type",
791-
Usage: "Statediff database type",
791+
Usage: "Statediff database type (current options: postgres, file, dump)",
792792
Value: "postgres",
793793
}
794794
StateDiffDBDriverTypeFlag = cli.StringFlag{
@@ -852,6 +852,10 @@ var (
852852
Name: "statediff.db.nodeid",
853853
Usage: "Node ID to use when writing state diffs to database",
854854
}
855+
StateDiffFilePath = cli.StringFlag{
856+
Name: "statediff.file.path",
857+
Usage: "Full path (including filename) to write statediff data out to when operating in file mode",
858+
}
855859
StateDiffDBClientNameFlag = cli.StringFlag{
856860
Name: "statediff.db.clientname",
857861
Usage: "Client name to use when writing state diffs to database",

statediff/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ This service introduces a CLI flag namespace `statediff`
7979
`--statediff` flag is used to turn on the service
8080
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
8181
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
82-
`--statediff.db.type` is the type of database we write out to (current options: postgres and dump)
82+
`--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file)
8383
`--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard)
8484
`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx)
8585
`--statediff.db.host` is the hostname/ip to dial to connect to the database
@@ -95,6 +95,7 @@ This service introduces a CLI flag namespace `statediff`
9595
`--statediff.db.maxconnlifetime` is the maximum lifetime for a connection (in seconds)
9696
`--statediff.db.nodeid` is the node id to use in the Postgres database
9797
`--statediff.db.clientname` is the client name to use in the Postgres database
98+
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
9899

99100
The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`)
100101

statediff/builder.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,14 @@ import (
2323
"bytes"
2424
"fmt"
2525

26-
"github.com/ethereum/go-ethereum/statediff/trie_helpers"
27-
28-
types2 "github.com/ethereum/go-ethereum/statediff/types"
29-
3026
"github.com/ethereum/go-ethereum/common"
3127
"github.com/ethereum/go-ethereum/core/state"
3228
"github.com/ethereum/go-ethereum/core/types"
3329
"github.com/ethereum/go-ethereum/crypto"
3430
"github.com/ethereum/go-ethereum/log"
3531
"github.com/ethereum/go-ethereum/rlp"
32+
"github.com/ethereum/go-ethereum/statediff/trie_helpers"
33+
types2 "github.com/ethereum/go-ethereum/statediff/types"
3634
"github.com/ethereum/go-ethereum/trie"
3735
)
3836

statediff/indexer/constructor.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/ethereum/go-ethereum/params"
2424
"github.com/ethereum/go-ethereum/statediff/indexer/database/dump"
25+
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
2526
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
2627
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
2728
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
@@ -32,10 +33,17 @@ import (
3233
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface
3334
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (interfaces.StateDiffIndexer, error) {
3435
switch config.Type() {
36+
case shared.FILE:
37+
fc, ok := config.(file.Config)
38+
if !ok {
39+
return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
40+
}
41+
fc.NodeInfo = nodeInfo
42+
return file.NewStateDiffIndexer(ctx, chainConfig, fc)
3543
case shared.POSTGRES:
3644
pgc, ok := config.(postgres.Config)
3745
if !ok {
38-
return nil, fmt.Errorf("ostgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
46+
return nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
3947
}
4048
var err error
4149
var driver sql.Driver

statediff/indexer/database/dump/indexer.go

Lines changed: 63 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
136136
t = time.Now()
137137

138138
// Publish and index header, collect headerID
139-
var headerID int64
139+
var headerID string
140140
headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty)
141141
if err != nil {
142142
return nil, err
@@ -181,7 +181,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
181181

182182
// processHeader publishes and indexes a header IPLD in Postgres
183183
// it returns the headerID
184-
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
184+
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
185185
tx.cacheIPLD(headerNode)
186186

187187
var baseFee *int64
@@ -190,12 +190,13 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
190190
*baseFee = header.BaseFee.Int64()
191191
}
192192

193+
headerID := header.Hash().String()
193194
mod := models.HeaderModel{
194195
CID: headerNode.Cid().String(),
195196
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
196197
ParentHash: header.ParentHash.String(),
197198
BlockNumber: header.Number.String(),
198-
BlockHash: header.Hash().String(),
199+
BlockHash: headerID,
199200
TotalDifficulty: td.String(),
200201
Reward: reward.String(),
201202
Bloom: header.Bloom.Bytes(),
@@ -207,11 +208,11 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
207208
BaseFee: baseFee,
208209
}
209210
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
210-
return 0, err
211+
return headerID, err
211212
}
212213

213214
// processUncles publishes and indexes uncle IPLDs in Postgres
214-
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
215+
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
215216
// publish and index uncles
216217
for _, uncleNode := range uncleNodes {
217218
tx.cacheIPLD(uncleNode)
@@ -223,6 +224,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum
223224
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
224225
}
225226
uncle := models.UncleModel{
227+
HeaderID: headerID,
226228
CID: uncleNode.Cid().String(),
227229
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
228230
ParentHash: uncleNode.ParentHash.String(),
@@ -238,7 +240,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum
238240

239241
// processArgs bundles arguments to processReceiptsAndTxs
240242
type processArgs struct {
241-
headerID int64
243+
headerID string
242244
blockNumber *big.Int
243245
receipts types.Receipts
244246
txs types.Transactions
@@ -263,59 +265,24 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
263265
tx.cacheIPLD(txNode)
264266

265267
// Indexing
266-
// extract topic and contract data from the receipt for indexing
267-
mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses
268-
logDataSet := make([]*models.LogsModel, len(receipt.Logs))
269-
for idx, l := range receipt.Logs {
270-
topicSet := make([]string, 4)
271-
for ti, topic := range l.Topics {
272-
topicSet[ti] = topic.Hex()
273-
}
274-
275-
if !args.logLeafNodeCIDs[i][idx].Defined() {
276-
return fmt.Errorf("invalid log cid")
277-
}
278-
279-
mappedContracts[l.Address.String()] = true
280-
logDataSet[idx] = &models.LogsModel{
281-
Address: l.Address.String(),
282-
Index: int64(l.Index),
283-
Data: l.Data,
284-
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
285-
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
286-
Topic0: topicSet[0],
287-
Topic1: topicSet[1],
288-
Topic2: topicSet[2],
289-
Topic3: topicSet[3],
290-
}
291-
}
292-
// these are the contracts seen in the logs
293-
logContracts := make([]string, 0, len(mappedContracts))
294-
for addr := range mappedContracts {
295-
logContracts = append(logContracts, addr)
296-
}
297-
// this is the contract address if this receipt is for a contract creation tx
298-
contract := shared.HandleZeroAddr(receipt.ContractAddress)
299-
var contractHash string
300-
if contract != "" {
301-
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
302-
}
303-
// index tx first so that the receipt can reference it by FK
268+
// index tx
304269
trx := args.txs[i]
270+
trxID := trx.Hash().String()
305271
// derive sender for the tx that corresponds with this receipt
306272
from, err := types.Sender(signer, trx)
307273
if err != nil {
308274
return fmt.Errorf("error deriving tx sender: %v", err)
309275
}
310276
txModel := models.TxModel{
311-
Dst: shared.HandleZeroAddrPointer(trx.To()),
312-
Src: shared.HandleZeroAddr(from),
313-
TxHash: trx.Hash().String(),
314-
Index: int64(i),
315-
Data: trx.Data(),
316-
CID: txNode.Cid().String(),
317-
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
318-
Type: trx.Type(),
277+
HeaderID: args.headerID,
278+
Dst: shared.HandleZeroAddrPointer(trx.To()),
279+
Src: shared.HandleZeroAddr(from),
280+
TxHash: trxID,
281+
Index: int64(i),
282+
Data: trx.Data(),
283+
CID: txNode.Cid().String(),
284+
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
285+
Type: trx.Type(),
319286
}
320287
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil {
321288
return err
@@ -328,6 +295,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
328295
storageKeys[k] = storageKey.Hex()
329296
}
330297
accessListElementModel := models.AccessListElementModel{
298+
TxID: trxID,
331299
Index: int64(j),
332300
Address: accessListElement.Address.Hex(),
333301
StorageKeys: storageKeys,
@@ -337,12 +305,20 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
337305
}
338306
}
339307

308+
// this is the contract address if this receipt is for a contract creation tx
309+
contract := shared.HandleZeroAddr(receipt.ContractAddress)
310+
var contractHash string
311+
if contract != "" {
312+
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
313+
}
314+
340315
// index the receipt
341316
if !args.rctLeafNodeCIDs[i].Defined() {
342317
return fmt.Errorf("invalid receipt leaf node cid")
343318
}
344319

345320
rctModel := &models.ReceiptModel{
321+
TxID: trxID,
346322
Contract: contract,
347323
ContractHash: contractHash,
348324
LeafCID: args.rctLeafNodeCIDs[i].String(),
@@ -359,6 +335,31 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
359335
return err
360336
}
361337

338+
logDataSet := make([]*models.LogsModel, len(receipt.Logs))
339+
for idx, l := range receipt.Logs {
340+
topicSet := make([]string, 4)
341+
for ti, topic := range l.Topics {
342+
topicSet[ti] = topic.Hex()
343+
}
344+
345+
if !args.logLeafNodeCIDs[i][idx].Defined() {
346+
return fmt.Errorf("invalid log cid")
347+
}
348+
349+
logDataSet[idx] = &models.LogsModel{
350+
ReceiptID: trxID,
351+
Address: l.Address.String(),
352+
Index: int64(l.Index),
353+
Data: l.Data,
354+
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
355+
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
356+
Topic0: topicSet[0],
357+
Topic1: topicSet[1],
358+
Topic2: topicSet[2],
359+
Topic3: topicSet[3],
360+
}
361+
}
362+
362363
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", logDataSet); err != nil {
363364
return err
364365
}
@@ -374,7 +375,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
374375
}
375376

376377
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql
377-
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode) error {
378+
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
378379
tx, ok := batch.(*BatchTx)
379380
if !ok {
380381
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
@@ -384,6 +385,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
384385
// short circuit if it is a Removed node
385386
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
386387
stateModel := models.StateNodeModel{
388+
HeaderID: headerID,
387389
Path: stateNode.Path,
388390
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
389391
CID: shared.RemovedNodeStateCID,
@@ -398,6 +400,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
398400
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
399401
}
400402
stateModel := models.StateNodeModel{
403+
HeaderID: headerID,
401404
Path: stateNode.Path,
402405
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
403406
CID: stateCIDStr,
@@ -422,6 +425,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
422425
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
423426
}
424427
accountModel := models.StateAccountModel{
428+
HeaderID: headerID,
429+
StatePath: stateNode.Path,
425430
Balance: account.Balance.String(),
426431
Nonce: account.Nonce,
427432
CodeHash: account.CodeHash,
@@ -437,6 +442,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
437442
// short circuit if it is a Removed node
438443
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
439444
storageModel := models.StorageNodeModel{
445+
HeaderID: headerID,
446+
StatePath: stateNode.Path,
440447
Path: storageNode.Path,
441448
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
442449
CID: shared.RemovedNodeStorageCID,
@@ -453,6 +460,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
453460
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
454461
}
455462
storageModel := models.StorageNodeModel{
463+
HeaderID: headerID,
464+
StatePath: stateNode.Path,
456465
Path: storageNode.Path,
457466
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
458467
CID: storageCIDStr,
@@ -482,7 +491,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
482491
return nil
483492
}
484493

485-
// Close satisfied io.Closer
494+
// Close satisfies io.Closer
486495
func (sdi *StateDiffIndexer) Close() error {
487496
return sdi.dump.Close()
488497
}

0 commit comments

Comments
 (0)