Skip to content

Commit a175c22

Browse files
committed
Disassociate block number from the indexer object
1 parent d629c99 commit a175c22

File tree

4 files changed

+129
-112
lines changed

4 files changed

+129
-112
lines changed

statediff/indexer/database/dump/batch_tx.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030

3131
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
3232
type BatchTx struct {
33-
BlockNumber uint64
33+
BlockNumber string
3434
dump io.Writer
3535
quit chan struct{}
3636
iplds chan models.IPLDModel
@@ -68,15 +68,17 @@ func (tx *BatchTx) cache() {
6868

6969
func (tx *BatchTx) cacheDirect(key string, value []byte) {
7070
tx.iplds <- models.IPLDModel{
71-
Key: key,
72-
Data: value,
71+
BlockNumber: tx.BlockNumber,
72+
Key: key,
73+
Data: value,
7374
}
7475
}
7576

7677
func (tx *BatchTx) cacheIPLD(i node.Node) {
7778
tx.iplds <- models.IPLDModel{
78-
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
79-
Data: i.RawData(),
79+
BlockNumber: tx.BlockNumber,
80+
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
81+
Data: i.RawData(),
8082
}
8183
}
8284

@@ -87,8 +89,9 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error
8789
}
8890
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
8991
tx.iplds <- models.IPLDModel{
90-
Key: prefixedKey,
91-
Data: raw,
92+
BlockNumber: tx.BlockNumber,
93+
Key: prefixedKey,
94+
Data: raw,
9295
}
9396
return c.String(), prefixedKey, err
9497
}

statediff/indexer/database/dump/indexer.go

Lines changed: 68 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
102102
t = time.Now()
103103

104104
blockTx := &BatchTx{
105-
BlockNumber: height,
105+
BlockNumber: block.Number().String(),
106106
dump: sdi.dump,
107107
iplds: make(chan models.IPLDModel),
108108
quit: make(chan struct{}),
@@ -146,7 +146,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
146146
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
147147
t = time.Now()
148148
// Publish and index uncles
149-
err = sdi.processUncles(blockTx, headerID, height, uncleNodes)
149+
err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes)
150150
if err != nil {
151151
return nil, err
152152
}
@@ -206,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
206206
}
207207

208208
// processUncles publishes and indexes uncle IPLDs in Postgres
209-
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
209+
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error {
210210
// publish and index uncles
211211
for _, uncleNode := range uncleNodes {
212212
tx.cacheIPLD(uncleNode)
@@ -215,15 +215,16 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
215215
if sdi.chainConfig.Clique != nil {
216216
uncleReward = big.NewInt(0)
217217
} else {
218-
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
218+
uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64())
219219
}
220220
uncle := models.UncleModel{
221-
HeaderID: headerID,
222-
CID: uncleNode.Cid().String(),
223-
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
224-
ParentHash: uncleNode.ParentHash.String(),
225-
BlockHash: uncleNode.Hash().String(),
226-
Reward: uncleReward.String(),
221+
BlockNumber: blockNumber.String(),
222+
HeaderID: headerID,
223+
CID: uncleNode.Cid().String(),
224+
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
225+
ParentHash: uncleNode.ParentHash.String(),
226+
BlockHash: uncleNode.Hash().String(),
227+
Reward: uncleReward.String(),
227228
}
228229
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", uncle); err != nil {
229230
return err
@@ -274,16 +275,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
274275
return fmt.Errorf("error deriving tx sender: %v", err)
275276
}
276277
txModel := models.TxModel{
277-
HeaderID: args.headerID,
278-
Dst: shared.HandleZeroAddrPointer(trx.To()),
279-
Src: shared.HandleZeroAddr(from),
280-
TxHash: trxID,
281-
Index: int64(i),
282-
Data: trx.Data(),
283-
CID: txNode.Cid().String(),
284-
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
285-
Type: trx.Type(),
286-
Value: val,
278+
BlockNumber: args.blockNumber.String(),
279+
HeaderID: args.headerID,
280+
Dst: shared.HandleZeroAddrPointer(trx.To()),
281+
Src: shared.HandleZeroAddr(from),
282+
TxHash: trxID,
283+
Index: int64(i),
284+
Data: trx.Data(),
285+
CID: txNode.Cid().String(),
286+
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
287+
Type: trx.Type(),
288+
Value: val,
287289
}
288290
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil {
289291
return err
@@ -296,6 +298,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
296298
storageKeys[k] = storageKey.Hex()
297299
}
298300
accessListElementModel := models.AccessListElementModel{
301+
BlockNumber: args.blockNumber.String(),
299302
TxID: trxID,
300303
Index: int64(j),
301304
Address: accessListElement.Address.Hex(),
@@ -319,6 +322,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
319322
}
320323

321324
rctModel := &models.ReceiptModel{
325+
BlockNumber: args.blockNumber.String(),
322326
TxID: trxID,
323327
Contract: contract,
324328
ContractHash: contractHash,
@@ -348,16 +352,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
348352
}
349353

350354
logDataSet[idx] = &models.LogsModel{
351-
ReceiptID: trxID,
352-
Address: l.Address.String(),
353-
Index: int64(l.Index),
354-
Data: l.Data,
355-
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
356-
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
357-
Topic0: topicSet[0],
358-
Topic1: topicSet[1],
359-
Topic2: topicSet[2],
360-
Topic3: topicSet[3],
355+
BlockNumber: args.blockNumber.String(),
356+
ReceiptID: trxID,
357+
Address: l.Address.String(),
358+
Index: int64(l.Index),
359+
Data: l.Data,
360+
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
361+
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
362+
Topic0: topicSet[0],
363+
Topic1: topicSet[1],
364+
Topic2: topicSet[2],
365+
Topic3: topicSet[3],
361366
}
362367
}
363368

@@ -379,33 +384,35 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
379384
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
380385
tx, ok := batch.(*BatchTx)
381386
if !ok {
382-
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
387+
return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
383388
}
384389
// publish the state node
385390
var stateModel models.StateNodeModel
386391
if stateNode.NodeType == sdtypes.Removed {
387392
// short circuit if it is a Removed node
388393
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
389394
stateModel = models.StateNodeModel{
390-
HeaderID: headerID,
391-
Path: stateNode.Path,
392-
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
393-
CID: shared.RemovedNodeStateCID,
394-
MhKey: shared.RemovedNodeMhKey,
395-
NodeType: stateNode.NodeType.Int(),
395+
BlockNumber: tx.BlockNumber,
396+
HeaderID: headerID,
397+
Path: stateNode.Path,
398+
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
399+
CID: shared.RemovedNodeStateCID,
400+
MhKey: shared.RemovedNodeMhKey,
401+
NodeType: stateNode.NodeType.Int(),
396402
}
397403
} else {
398404
stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
399405
if err != nil {
400406
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
401407
}
402408
stateModel = models.StateNodeModel{
403-
HeaderID: headerID,
404-
Path: stateNode.Path,
405-
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
406-
CID: stateCIDStr,
407-
MhKey: stateMhKey,
408-
NodeType: stateNode.NodeType.Int(),
409+
BlockNumber: tx.BlockNumber,
410+
HeaderID: headerID,
411+
Path: stateNode.Path,
412+
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
413+
CID: stateCIDStr,
414+
MhKey: stateMhKey,
415+
NodeType: stateNode.NodeType.Int(),
409416
}
410417
}
411418

@@ -428,6 +435,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
428435
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
429436
}
430437
accountModel := models.StateAccountModel{
438+
BlockNumber: tx.BlockNumber,
431439
HeaderID: headerID,
432440
StatePath: stateNode.Path,
433441
Balance: account.Balance.String(),
@@ -446,13 +454,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
446454
// short circuit if it is a Removed node
447455
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
448456
storageModel := models.StorageNodeModel{
449-
HeaderID: headerID,
450-
StatePath: stateNode.Path,
451-
Path: storageNode.Path,
452-
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
453-
CID: shared.RemovedNodeStorageCID,
454-
MhKey: shared.RemovedNodeMhKey,
455-
NodeType: storageNode.NodeType.Int(),
457+
BlockNumber: tx.BlockNumber,
458+
HeaderID: headerID,
459+
StatePath: stateNode.Path,
460+
Path: storageNode.Path,
461+
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
462+
CID: shared.RemovedNodeStorageCID,
463+
MhKey: shared.RemovedNodeMhKey,
464+
NodeType: storageNode.NodeType.Int(),
456465
}
457466
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil {
458467
return err
@@ -464,13 +473,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
464473
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
465474
}
466475
storageModel := models.StorageNodeModel{
467-
HeaderID: headerID,
468-
StatePath: stateNode.Path,
469-
Path: storageNode.Path,
470-
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
471-
CID: storageCIDStr,
472-
MhKey: storageMhKey,
473-
NodeType: storageNode.NodeType.Int(),
476+
BlockNumber: tx.BlockNumber,
477+
HeaderID: headerID,
478+
StatePath: stateNode.Path,
479+
Path: storageNode.Path,
480+
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
481+
CID: storageCIDStr,
482+
MhKey: storageMhKey,
483+
NodeType: storageNode.NodeType.Int(),
474484
}
475485
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil {
476486
return err
@@ -484,7 +494,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
484494
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
485495
tx, ok := batch.(*BatchTx)
486496
if !ok {
487-
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
497+
return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
488498
}
489499
// codec doesn't matter since db key is multihash-based
490500
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)

0 commit comments

Comments
 (0)