Skip to content

Commit f7792c3

Browse files
committed
core: add Indexer interface and index server logic
1 parent 078a5ec commit f7792c3

File tree

4 files changed

+845
-1
lines changed

4 files changed

+845
-1
lines changed

core/blockchain.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ type BlockChain struct {
308308
blockProcCounter int32
309309
scope event.SubscriptionScope
310310
genesisBlock *types.Block
311+
indexServers indexServers
311312

312313
// This mutex synchronizes chain write operations.
313314
// Readers don't need to take it, they can just read the database.
@@ -540,9 +541,15 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine,
540541
log.Info("Failed to setup size tracker", "err", err)
541542
}
542543
}
544+
bc.indexServers.init(bc)
543545
return bc, nil
544546
}
545547

548+
// RegisterIndexer registers a new indexer to the chain.
549+
func (bc *BlockChain) RegisterIndexer(indexer Indexer, name string) {
550+
bc.indexServers.register(indexer, name)
551+
}
552+
546553
func (bc *BlockChain) setupSnapshot() {
547554
// Short circuit if the chain is established with path scheme, as the
548555
// state snapshot has been integrated into path database natively.
@@ -655,6 +662,7 @@ func (bc *BlockChain) loadLastState() error {
655662
if head := rawdb.ReadFinalizedBlockHash(bc.db); head != (common.Hash{}) {
656663
if block := bc.GetBlockByHash(head); block != nil {
657664
bc.currentFinalBlock.Store(block.Header())
665+
bc.indexServers.setFinalBlock(block.NumberU64())
658666
headFinalizedBlockGauge.Update(int64(block.NumberU64()))
659667
bc.currentSafeBlock.Store(block.Header())
660668
headSafeBlockGauge.Update(int64(block.NumberU64()))
@@ -702,6 +710,7 @@ func (bc *BlockChain) initializeHistoryPruning(latest uint64) error {
702710
return errors.New("unexpected database tail")
703711
}
704712
bc.historyPrunePoint.Store(predefinedPoint)
713+
bc.indexServers.setHistoryCutoff(predefinedPoint.BlockNumber)
705714
return nil
706715

707716
case history.KeepPostMerge:
@@ -723,6 +732,7 @@ func (bc *BlockChain) initializeHistoryPruning(latest uint64) error {
723732
return errors.New("unexpected database tail")
724733
}
725734
bc.historyPrunePoint.Store(predefinedPoint)
735+
bc.indexServers.setHistoryCutoff(predefinedPoint.BlockNumber)
726736
return nil
727737

728738
default:
@@ -785,9 +795,11 @@ func (bc *BlockChain) SetFinalized(header *types.Header) {
785795
if header != nil {
786796
rawdb.WriteFinalizedBlockHash(bc.db, header.Hash())
787797
headFinalizedBlockGauge.Update(int64(header.Number.Uint64()))
798+
bc.indexServers.setFinalBlock(header.Number.Uint64())
788799
} else {
789800
rawdb.WriteFinalizedBlockHash(bc.db, common.Hash{})
790801
headFinalizedBlockGauge.Update(0)
802+
bc.indexServers.setFinalBlock(0)
791803
}
792804
}
793805

@@ -1094,6 +1106,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
10941106
bc.receiptsCache.Purge()
10951107
bc.blockCache.Purge()
10961108
bc.txLookupCache.Purge()
1109+
bc.indexServers.revert(bc.CurrentBlock())
10971110

10981111
// Clear safe block, finalized block if needed
10991112
if safe := bc.CurrentSafeBlock(); safe != nil && head < safe.Number.Uint64() {
@@ -1150,6 +1163,7 @@ func (bc *BlockChain) Reset() error {
11501163
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
11511164
// specified genesis state.
11521165
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
1166+
bc.indexServers.revert(genesis.Header())
11531167
// Dump the entire block chain and purge the caches
11541168
if err := bc.SetHead(0); err != nil {
11551169
return err
@@ -1166,6 +1180,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
11661180
log.Crit("Failed to write genesis block", "err", err)
11671181
}
11681182
bc.writeHeadBlock(genesis)
1183+
bc.indexServers.broadcast(genesis.Header())
11691184

11701185
// Last update all in-memory chain markers
11711186
bc.genesisBlock = genesis
@@ -1282,6 +1297,7 @@ func (bc *BlockChain) stopWithoutSaving() {
12821297
// Stop stops the blockchain service. If any imports are currently in progress
12831298
// it will abort them using the procInterrupt.
12841299
func (bc *BlockChain) Stop() {
1300+
bc.indexServers.stop()
12851301
bc.stopWithoutSaving()
12861302

12871303
// Ensure that the entirety of the state snapshot is journaled to disk.
@@ -1583,6 +1599,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
15831599
}
15841600
}
15851601
bc.writeHeadBlock(block)
1602+
bc.indexServers.broadcast(block.Header())
15861603
return nil
15871604
}
15881605

@@ -1598,7 +1615,14 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
15981615
// should be written atomically. BlockBatch is used for containing all components.
15991616
blockBatch := bc.db.NewBatch()
16001617
rawdb.WriteBlock(blockBatch, block)
1601-
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
1618+
blockHash := block.Hash()
1619+
bc.blockCache.Add(blockHash, block)
1620+
rawdb.WriteReceipts(blockBatch, blockHash, block.NumberU64(), receipts)
1621+
if receipts != nil {
1622+
bc.receiptsCache.Add(blockHash, receipts)
1623+
} else {
1624+
bc.receiptsCache.Add(blockHash, []*types.Receipt{})
1625+
}
16021626
rawdb.WritePreimages(blockBatch, statedb.Preimages())
16031627
if err := blockBatch.Write(); err != nil {
16041628
log.Crit("Failed to write block into disk", "err", err)
@@ -1689,6 +1713,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
16891713

16901714
// Set new head.
16911715
bc.writeHeadBlock(block)
1716+
bc.indexServers.broadcast(block.Header())
16921717

16931718
bc.chainFeed.Send(ChainEvent{
16941719
Header: block.Header(),
@@ -1760,10 +1785,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
17601785
}
17611786

17621787
if atomic.AddInt32(&bc.blockProcCounter, 1) == 1 {
1788+
bc.indexServers.setBlockProcessing(true)
17631789
bc.blockProcFeed.Send(true)
17641790
}
17651791
defer func() {
17661792
if atomic.AddInt32(&bc.blockProcCounter, -1) == 0 {
1793+
bc.indexServers.setBlockProcessing(false)
17671794
bc.blockProcFeed.Send(false)
17681795
}
17691796
}()
@@ -2426,6 +2453,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
24262453
return errInvalidNewChain
24272454
}
24282455
}
2456+
bc.indexServers.revert(commonBlock)
24292457
// Ensure the user sees large reorgs
24302458
if len(oldChain) > 0 && len(newChain) > 0 {
24312459
logFn := log.Info
@@ -2523,6 +2551,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
25232551
}
25242552
// Update the head block
25252553
bc.writeHeadBlock(block)
2554+
bc.indexServers.broadcast(block.Header())
25262555
}
25272556
if len(rebirthLogs) > 0 {
25282557
bc.logsFeed.Send(rebirthLogs)
@@ -2598,6 +2627,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
25982627
}
25992628
}
26002629
bc.writeHeadBlock(head)
2630+
bc.indexServers.broadcast(head.Header())
26012631

26022632
// Emit events
26032633
receipts, logs := bc.collectReceiptsAndLogs(head, false)

core/blockchain_reader.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,14 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
266266
if !ok {
267267
return nil
268268
}
269+
return bc.GetReceipts(hash, number)
270+
}
271+
272+
// GetReceipts retrieves the receipts for all transactions in a given block.
273+
func (bc *BlockChain) GetReceipts(hash common.Hash, number uint64) types.Receipts {
274+
if receipts, ok := bc.receiptsCache.Get(hash); ok {
275+
return receipts
276+
}
269277
header := bc.GetHeader(hash, number)
270278
if header == nil {
271279
return nil

0 commit comments

Comments
 (0)