Skip to content

Commit 4e73e17

Browse files
authored
Merge pull request #211 from vulcanize/v1.10.16-statediff-4.0.0
update sql indexer to use new v4 schema
2 parents 6b74310 + 1e64f48 commit 4e73e17

19 files changed

+673
-577
lines changed

statediff/indexer/database/file/batch_tx.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package file
1818

1919
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
2020
type BatchTx struct {
21-
BlockNumber uint64
21+
BlockNumber string
2222

2323
submit func(blockTx *BatchTx, err error) error
2424
}

statediff/indexer/database/file/indexer.go

Lines changed: 90 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"math/big"
2424
"os"
2525
"sync"
26+
"sync/atomic"
2627
"time"
2728

2829
"github.com/ipfs/go-cid"
@@ -53,10 +54,12 @@ var (
5354

5455
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
5556
type StateDiffIndexer struct {
56-
fileWriter *SQLWriter
57-
chainConfig *params.ChainConfig
58-
nodeID string
59-
wg *sync.WaitGroup
57+
fileWriter *SQLWriter
58+
chainConfig *params.ChainConfig
59+
nodeID string
60+
wg *sync.WaitGroup
61+
blockNumber string
62+
removedCacheFlag *uint32
6063
}
6164

6265
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
@@ -77,7 +80,6 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
7780
wg := new(sync.WaitGroup)
7881
w.Loop()
7982
w.upsertNode(config.NodeInfo)
80-
w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{})
8183
return &StateDiffIndexer{
8284
fileWriter: w,
8385
chainConfig: chainConfig,
@@ -92,6 +94,8 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
9294
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
9395
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
9496
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
97+
sdi.removedCacheFlag = new(uint32)
98+
sdi.blockNumber = block.Number().String()
9599
start, t := time.Now(), time.Now()
96100
blockHash := block.Hash()
97101
blockHashStr := blockHash.String()
@@ -127,7 +131,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
127131
t = time.Now()
128132

129133
blockTx := &BatchTx{
130-
BlockNumber: height,
134+
BlockNumber: sdi.blockNumber,
131135
submit: func(self *BatchTx, err error) error {
132136
tDiff := time.Since(t)
133137
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
@@ -189,7 +193,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
189193
// processHeader write a header IPLD insert SQL stmt to a file
190194
// it returns the headerID
191195
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string {
192-
sdi.fileWriter.upsertIPLDNode(headerNode)
196+
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, headerNode)
193197

194198
var baseFee *string
195199
if header.BaseFee != nil {
@@ -202,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
202206
CID: headerNode.Cid().String(),
203207
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
204208
ParentHash: header.ParentHash.String(),
205-
BlockNumber: header.Number.String(),
209+
BlockNumber: sdi.blockNumber,
206210
BlockHash: headerID,
207211
TotalDifficulty: td.String(),
208212
Reward: reward.String(),
@@ -221,7 +225,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
221225
func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) {
222226
// publish and index uncles
223227
for _, uncleNode := range uncleNodes {
224-
sdi.fileWriter.upsertIPLDNode(uncleNode)
228+
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, uncleNode)
225229
var uncleReward *big.Int
226230
// in PoA networks uncle reward is 0
227231
if sdi.chainConfig.Clique != nil {
@@ -230,12 +234,13 @@ func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64,
230234
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
231235
}
232236
sdi.fileWriter.upsertUncleCID(models.UncleModel{
233-
HeaderID: headerID,
234-
CID: uncleNode.Cid().String(),
235-
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
236-
ParentHash: uncleNode.ParentHash.String(),
237-
BlockHash: uncleNode.Hash().String(),
238-
Reward: uncleReward.String(),
237+
BlockNumber: sdi.blockNumber,
238+
HeaderID: headerID,
239+
CID: uncleNode.Cid().String(),
240+
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
241+
ParentHash: uncleNode.ParentHash.String(),
242+
BlockHash: uncleNode.Hash().String(),
243+
Reward: uncleReward.String(),
239244
})
240245
}
241246
}
@@ -261,10 +266,10 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
261266
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
262267
for i, receipt := range args.receipts {
263268
for _, logTrieNode := range args.logTrieNodes[i] {
264-
sdi.fileWriter.upsertIPLDNode(logTrieNode)
269+
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, logTrieNode)
265270
}
266271
txNode := args.txNodes[i]
267-
sdi.fileWriter.upsertIPLDNode(txNode)
272+
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, txNode)
268273

269274
// index tx
270275
trx := args.txs[i]
@@ -281,16 +286,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
281286
return fmt.Errorf("error deriving tx sender: %v", err)
282287
}
283288
txModel := models.TxModel{
284-
HeaderID: args.headerID,
285-
Dst: shared.HandleZeroAddrPointer(trx.To()),
286-
Src: shared.HandleZeroAddr(from),
287-
TxHash: txID,
288-
Index: int64(i),
289-
Data: trx.Data(),
290-
CID: txNode.Cid().String(),
291-
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
292-
Type: trx.Type(),
293-
Value: val,
289+
BlockNumber: sdi.blockNumber,
290+
HeaderID: args.headerID,
291+
Dst: shared.HandleZeroAddrPointer(trx.To()),
292+
Src: shared.HandleZeroAddr(from),
293+
TxHash: txID,
294+
Index: int64(i),
295+
Data: trx.Data(),
296+
CID: txNode.Cid().String(),
297+
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
298+
Type: trx.Type(),
299+
Value: val,
294300
}
295301
sdi.fileWriter.upsertTransactionCID(txModel)
296302

@@ -301,6 +307,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
301307
storageKeys[k] = storageKey.Hex()
302308
}
303309
accessListElementModel := models.AccessListElementModel{
310+
BlockNumber: sdi.blockNumber,
304311
TxID: txID,
305312
Index: int64(j),
306313
Address: accessListElement.Address.Hex(),
@@ -322,6 +329,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
322329
}
323330

324331
rctModel := &models.ReceiptModel{
332+
BlockNumber: sdi.blockNumber,
325333
TxID: txID,
326334
Contract: contract,
327335
ContractHash: contractHash,
@@ -349,25 +357,26 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
349357
}
350358

351359
logDataSet[idx] = &models.LogsModel{
352-
ReceiptID: txID,
353-
Address: l.Address.String(),
354-
Index: int64(l.Index),
355-
Data: l.Data,
356-
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
357-
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
358-
Topic0: topicSet[0],
359-
Topic1: topicSet[1],
360-
Topic2: topicSet[2],
361-
Topic3: topicSet[3],
360+
BlockNumber: sdi.blockNumber,
361+
ReceiptID: txID,
362+
Address: l.Address.String(),
363+
Index: int64(l.Index),
364+
Data: l.Data,
365+
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
366+
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
367+
Topic0: topicSet[0],
368+
Topic1: topicSet[1],
369+
Topic2: topicSet[2],
370+
Topic3: topicSet[3],
362371
}
363372
}
364373
sdi.fileWriter.upsertLogCID(logDataSet)
365374
}
366375

367376
// publish trie nodes, these aren't indexed directly
368377
for i, n := range args.txTrieNodes {
369-
sdi.fileWriter.upsertIPLDNode(n)
370-
sdi.fileWriter.upsertIPLDNode(args.rctTrieNodes[i])
378+
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, n)
379+
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, args.rctTrieNodes[i])
371380
}
372381

373382
return nil
@@ -377,30 +386,34 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
377386
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
378387
// publish the state node
379388
if stateNode.NodeType == sdtypes.Removed {
380-
// short circuit if it is a Removed node
381-
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
389+
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
390+
atomic.StoreUint32(sdi.removedCacheFlag, 1)
391+
sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{})
392+
}
382393
stateModel := models.StateNodeModel{
383-
HeaderID: headerID,
384-
Path: stateNode.Path,
385-
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
386-
CID: shared.RemovedNodeStateCID,
387-
MhKey: shared.RemovedNodeMhKey,
388-
NodeType: stateNode.NodeType.Int(),
394+
BlockNumber: sdi.blockNumber,
395+
HeaderID: headerID,
396+
Path: stateNode.Path,
397+
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
398+
CID: shared.RemovedNodeStateCID,
399+
MhKey: shared.RemovedNodeMhKey,
400+
NodeType: stateNode.NodeType.Int(),
389401
}
390402
sdi.fileWriter.upsertStateCID(stateModel)
391403
return nil
392404
}
393-
stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
405+
stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
394406
if err != nil {
395407
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
396408
}
397409
stateModel := models.StateNodeModel{
398-
HeaderID: headerID,
399-
Path: stateNode.Path,
400-
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
401-
CID: stateCIDStr,
402-
MhKey: stateMhKey,
403-
NodeType: stateNode.NodeType.Int(),
410+
BlockNumber: sdi.blockNumber,
411+
HeaderID: headerID,
412+
Path: stateNode.Path,
413+
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
414+
CID: stateCIDStr,
415+
MhKey: stateMhKey,
416+
NodeType: stateNode.NodeType.Int(),
404417
}
405418
// index the state node
406419
sdi.fileWriter.upsertStateCID(stateModel)
@@ -418,6 +431,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
418431
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
419432
}
420433
accountModel := models.StateAccountModel{
434+
BlockNumber: sdi.blockNumber,
421435
HeaderID: headerID,
422436
StatePath: stateNode.Path,
423437
Balance: account.Balance.String(),
@@ -430,32 +444,36 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
430444
// if there are any storage nodes associated with this node, publish and index them
431445
for _, storageNode := range stateNode.StorageNodes {
432446
if storageNode.NodeType == sdtypes.Removed {
433-
// short circuit if it is a Removed node
434-
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
447+
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
448+
atomic.StoreUint32(sdi.removedCacheFlag, 1)
449+
sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{})
450+
}
435451
storageModel := models.StorageNodeModel{
436-
HeaderID: headerID,
437-
StatePath: stateNode.Path,
438-
Path: storageNode.Path,
439-
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
440-
CID: shared.RemovedNodeStorageCID,
441-
MhKey: shared.RemovedNodeMhKey,
442-
NodeType: storageNode.NodeType.Int(),
452+
BlockNumber: sdi.blockNumber,
453+
HeaderID: headerID,
454+
StatePath: stateNode.Path,
455+
Path: storageNode.Path,
456+
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
457+
CID: shared.RemovedNodeStorageCID,
458+
MhKey: shared.RemovedNodeMhKey,
459+
NodeType: storageNode.NodeType.Int(),
443460
}
444461
sdi.fileWriter.upsertStorageCID(storageModel)
445462
continue
446463
}
447-
storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
464+
storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
448465
if err != nil {
449466
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
450467
}
451468
storageModel := models.StorageNodeModel{
452-
HeaderID: headerID,
453-
StatePath: stateNode.Path,
454-
Path: storageNode.Path,
455-
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
456-
CID: storageCIDStr,
457-
MhKey: storageMhKey,
458-
NodeType: storageNode.NodeType.Int(),
469+
BlockNumber: sdi.blockNumber,
470+
HeaderID: headerID,
471+
StatePath: stateNode.Path,
472+
Path: storageNode.Path,
473+
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
474+
CID: storageCIDStr,
475+
MhKey: storageMhKey,
476+
NodeType: storageNode.NodeType.Int(),
459477
}
460478
sdi.fileWriter.upsertStorageCID(storageModel)
461479
}
@@ -470,7 +488,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
470488
if err != nil {
471489
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
472490
}
473-
sdi.fileWriter.upsertIPLDDirect(mhKey, codeAndCodeHash.Code)
491+
sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, mhKey, codeAndCodeHash.Code)
474492
return nil
475493
}
476494

statediff/indexer/database/file/indexer_legacy_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
3434
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
3535
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
36-
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
3736
)
3837

3938
var (
@@ -71,7 +70,7 @@ func setupLegacy(t *testing.T) {
7170
require.NoError(t, err)
7271
}
7372

74-
test_helpers.ExpectEqual(t, tx.(*file.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64())
73+
require.Equal(t, legacyData.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)
7574

7675
connStr := postgres.DefaultConfig.DbConnectionString()
7776

@@ -123,10 +122,10 @@ func TestFileIndexerLegacy(t *testing.T) {
123122
err = sqlxdb.QueryRowx(pgStr, legacyData.BlockNumber.Uint64()).StructScan(header)
124123
require.NoError(t, err)
125124

126-
test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
127-
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
128-
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
129-
test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockBlock.Coinbase().String())
125+
require.Equal(t, legacyHeaderCID.String(), header.CID)
126+
require.Equal(t, legacyData.MockBlock.Difficulty().String(), header.TD)
127+
require.Equal(t, "5000000000000011250", header.Reward)
128+
require.Equal(t, legacyData.MockBlock.Coinbase().String(), header.Coinbase)
130129
require.Nil(t, legacyData.MockHeader.BaseFee)
131130
})
132131
}

0 commit comments

Comments
 (0)