Skip to content

Commit 645dfd9

Browse files
committed
core: changed interrupt strategy
Removed chain manager's select/channel approach when checking for interrupts. Now using an atomic int32 instead which checked for every block processed.
1 parent 90c4493 commit 645dfd9

File tree

1 file changed

+101
-102
lines changed

1 file changed

+101
-102
lines changed

core/chain_manager.go

Lines changed: 101 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os"
99
"runtime"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"github.com/ethereum/go-ethereum/common"
@@ -100,9 +101,10 @@ type ChainManager struct {
100101
cache *BlockCache
101102
futureBlocks *BlockCache
102103

103-
quit chan struct{}
104-
procInterupt chan struct{} // interupt signaler for block processing
105-
wg sync.WaitGroup
104+
quit chan struct{}
105+
// procInterrupt must be atomically called
106+
procInterrupt int32 // interrupt signaler for block processing
107+
wg sync.WaitGroup
106108

107109
pow pow.PoW
108110
}
@@ -114,7 +116,6 @@ func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow
114116
genesisBlock: GenesisBlock(42, stateDb),
115117
eventMux: mux,
116118
quit: make(chan struct{}),
117-
procInterupt: make(chan struct{}),
118119
cache: NewBlockCache(blockCacheLimit),
119120
pow: pow,
120121
}
@@ -518,7 +519,7 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) {
518519

519520
func (bc *ChainManager) Stop() {
520521
close(bc.quit)
521-
close(bc.procInterupt)
522+
atomic.StoreInt32(&bc.procInterrupt, 1)
522523

523524
bc.wg.Wait()
524525

@@ -571,126 +572,124 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
571572
defer close(nonceQuit)
572573

573574
txcount := 0
574-
done:
575575
for i, block := range chain {
576-
select {
577-
case <-self.procInterupt:
576+
if atomic.LoadInt32(&self.procInterrupt) == 1 {
578577
glog.V(logger.Debug).Infoln("Premature abort during chain processing")
579-
break done
580-
default:
581-
bstart := time.Now()
582-
// Wait for block i's nonce to be verified before processing
583-
// its state transition.
584-
for !nonceChecked[i] {
585-
r := <-nonceDone
586-
nonceChecked[r.i] = true
587-
if !r.valid {
588-
block := chain[r.i]
589-
return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
590-
}
591-
}
578+
break
579+
}
592580

593-
if BadHashes[block.Hash()] {
594-
err := fmt.Errorf("Found known bad hash in chain %x", block.Hash())
595-
blockErr(block, err)
596-
return i, err
581+
bstart := time.Now()
582+
// Wait for block i's nonce to be verified before processing
583+
// its state transition.
584+
for !nonceChecked[i] {
585+
r := <-nonceDone
586+
nonceChecked[r.i] = true
587+
if !r.valid {
588+
block := chain[r.i]
589+
return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
597590
}
591+
}
598592

599-
// Setting block.Td regardless of error (known for example) prevents errors down the line
600-
// in the protocol handler
601-
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
602-
603-
// Call in to the block processor and check for errors. It's likely that if one block fails
604-
// all others will fail too (unless a known block is returned).
605-
logs, err := self.processor.Process(block)
606-
if err != nil {
607-
if IsKnownBlockErr(err) {
608-
stats.ignored++
609-
continue
610-
}
611-
612-
if err == BlockFutureErr {
613-
// Allow up to MaxFuture second in the future blocks. If this limit
614-
// is exceeded the chain is discarded and processed at a later time
615-
// if given.
616-
if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max {
617-
return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
618-
}
593+
if BadHashes[block.Hash()] {
594+
err := fmt.Errorf("Found known bad hash in chain %x", block.Hash())
595+
blockErr(block, err)
596+
return i, err
597+
}
619598

620-
block.SetQueued(true)
621-
self.futureBlocks.Push(block)
622-
stats.queued++
623-
continue
624-
}
599+
// Setting block.Td regardless of error (known for example) prevents errors down the line
600+
// in the protocol handler
601+
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
602+
603+
// Call in to the block processor and check for errors. It's likely that if one block fails
604+
// all others will fail too (unless a known block is returned).
605+
logs, err := self.processor.Process(block)
606+
if err != nil {
607+
if IsKnownBlockErr(err) {
608+
stats.ignored++
609+
continue
610+
}
625611

626-
if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
627-
block.SetQueued(true)
628-
self.futureBlocks.Push(block)
629-
stats.queued++
630-
continue
612+
if err == BlockFutureErr {
613+
// Allow up to MaxFuture second in the future blocks. If this limit
614+
// is exceeded the chain is discarded and processed at a later time
615+
// if given.
616+
if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max {
617+
return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
631618
}
632619

633-
blockErr(block, err)
620+
block.SetQueued(true)
621+
self.futureBlocks.Push(block)
622+
stats.queued++
623+
continue
624+
}
634625

635-
return i, err
626+
if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
627+
block.SetQueued(true)
628+
self.futureBlocks.Push(block)
629+
stats.queued++
630+
continue
636631
}
637632

638-
txcount += len(block.Transactions())
639-
640-
cblock := self.currentBlock
641-
// Compare the TD of the last known block in the canonical chain to make sure it's greater.
642-
// At this point it's possible that a different chain (fork) becomes the new canonical chain.
643-
if block.Td.Cmp(self.Td()) > 0 {
644-
// chain fork
645-
if block.ParentHash() != cblock.Hash() {
646-
// during split we merge two different chains and create the new canonical chain
647-
err := self.merge(cblock, block)
648-
if err != nil {
649-
return i, err
650-
}
633+
blockErr(block, err)
634+
635+
return i, err
636+
}
651637

652-
queue[i] = ChainSplitEvent{block, logs}
653-
queueEvent.splitCount++
638+
txcount += len(block.Transactions())
639+
640+
cblock := self.currentBlock
641+
// Compare the TD of the last known block in the canonical chain to make sure it's greater.
642+
// At this point it's possible that a different chain (fork) becomes the new canonical chain.
643+
if block.Td.Cmp(self.Td()) > 0 {
644+
// chain fork
645+
if block.ParentHash() != cblock.Hash() {
646+
// during split we merge two different chains and create the new canonical chain
647+
err := self.merge(cblock, block)
648+
if err != nil {
649+
return i, err
654650
}
655651

656-
self.mu.Lock()
657-
self.setTotalDifficulty(block.Td)
658-
self.insert(block)
659-
self.mu.Unlock()
652+
queue[i] = ChainSplitEvent{block, logs}
653+
queueEvent.splitCount++
654+
}
660655

661-
jsonlogger.LogJson(&logger.EthChainNewHead{
662-
BlockHash: block.Hash().Hex(),
663-
BlockNumber: block.Number(),
664-
ChainHeadHash: cblock.Hash().Hex(),
665-
BlockPrevHash: block.ParentHash().Hex(),
666-
})
656+
self.mu.Lock()
657+
self.setTotalDifficulty(block.Td)
658+
self.insert(block)
659+
self.mu.Unlock()
667660

668-
self.setTransState(state.New(block.Root(), self.stateDb))
669-
self.txState.SetState(state.New(block.Root(), self.stateDb))
661+
jsonlogger.LogJson(&logger.EthChainNewHead{
662+
BlockHash: block.Hash().Hex(),
663+
BlockNumber: block.Number(),
664+
ChainHeadHash: cblock.Hash().Hex(),
665+
BlockPrevHash: block.ParentHash().Hex(),
666+
})
670667

671-
queue[i] = ChainEvent{block, block.Hash(), logs}
672-
queueEvent.canonicalCount++
668+
self.setTransState(state.New(block.Root(), self.stateDb))
669+
self.txState.SetState(state.New(block.Root(), self.stateDb))
673670

674-
if glog.V(logger.Debug) {
675-
glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
676-
}
677-
} else {
678-
if glog.V(logger.Detail) {
679-
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
680-
}
671+
queue[i] = ChainEvent{block, block.Hash(), logs}
672+
queueEvent.canonicalCount++
681673

682-
queue[i] = ChainSideEvent{block, logs}
683-
queueEvent.sideCount++
674+
if glog.V(logger.Debug) {
675+
glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
676+
}
677+
} else {
678+
if glog.V(logger.Detail) {
679+
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
684680
}
685-
// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
686-
// not in the canonical chain.
687-
self.write(block)
688-
// Delete from future blocks
689-
self.futureBlocks.Delete(block.Hash())
690-
691-
stats.processed++
692681

682+
queue[i] = ChainSideEvent{block, logs}
683+
queueEvent.sideCount++
693684
}
685+
// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
686+
// not in the canonical chain.
687+
self.write(block)
688+
// Delete from future blocks
689+
self.futureBlocks.Delete(block.Hash())
690+
691+
stats.processed++
692+
694693
}
695694

696695
if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {

0 commit comments

Comments
 (0)