Skip to content

Commit 90c4493

Browse files
committed
eth, core: interupt the chain processing on stop
Added an additional channel which is used to interupt the chain manager when it's processing blocks.
1 parent e2c2d8e commit 90c4493

File tree

2 files changed

+105
-95
lines changed

2 files changed

+105
-95
lines changed

core/chain_manager.go

Lines changed: 104 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,9 @@ type ChainManager struct {
100100
cache *BlockCache
101101
futureBlocks *BlockCache
102102

103-
quit chan struct{}
104-
wg sync.WaitGroup
103+
quit chan struct{}
104+
procInterupt chan struct{} // interupt signaler for block processing
105+
wg sync.WaitGroup
105106

106107
pow pow.PoW
107108
}
@@ -113,6 +114,7 @@ func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow
113114
genesisBlock: GenesisBlock(42, stateDb),
114115
eventMux: mux,
115116
quit: make(chan struct{}),
117+
procInterupt: make(chan struct{}),
116118
cache: NewBlockCache(blockCacheLimit),
117119
pow: pow,
118120
}
@@ -516,6 +518,7 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) {
516518

517519
func (bc *ChainManager) Stop() {
518520
close(bc.quit)
521+
close(bc.procInterupt)
519522

520523
bc.wg.Wait()
521524

@@ -568,119 +571,126 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
568571
defer close(nonceQuit)
569572

570573
txcount := 0
574+
done:
571575
for i, block := range chain {
572-
bstart := time.Now()
573-
// Wait for block i's nonce to be verified before processing
574-
// its state transition.
575-
for !nonceChecked[i] {
576-
r := <-nonceDone
577-
nonceChecked[r.i] = true
578-
if !r.valid {
579-
block := chain[r.i]
580-
return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
576+
select {
577+
case <-self.procInterupt:
578+
glog.V(logger.Debug).Infoln("Premature abort during chain processing")
579+
break done
580+
default:
581+
bstart := time.Now()
582+
// Wait for block i's nonce to be verified before processing
583+
// its state transition.
584+
for !nonceChecked[i] {
585+
r := <-nonceDone
586+
nonceChecked[r.i] = true
587+
if !r.valid {
588+
block := chain[r.i]
589+
return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
590+
}
581591
}
582-
}
583592

584-
if BadHashes[block.Hash()] {
585-
err := fmt.Errorf("Found known bad hash in chain %x", block.Hash())
586-
blockErr(block, err)
587-
return i, err
588-
}
589-
590-
// Setting block.Td regardless of error (known for example) prevents errors down the line
591-
// in the protocol handler
592-
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
593-
594-
// Call in to the block processor and check for errors. It's likely that if one block fails
595-
// all others will fail too (unless a known block is returned).
596-
logs, err := self.processor.Process(block)
597-
if err != nil {
598-
if IsKnownBlockErr(err) {
599-
stats.ignored++
600-
continue
593+
if BadHashes[block.Hash()] {
594+
err := fmt.Errorf("Found known bad hash in chain %x", block.Hash())
595+
blockErr(block, err)
596+
return i, err
601597
}
602598

603-
if err == BlockFutureErr {
604-
// Allow up to MaxFuture second in the future blocks. If this limit
605-
// is exceeded the chain is discarded and processed at a later time
606-
// if given.
607-
if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max {
608-
return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
599+
// Setting block.Td regardless of error (known for example) prevents errors down the line
600+
// in the protocol handler
601+
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
602+
603+
// Call in to the block processor and check for errors. It's likely that if one block fails
604+
// all others will fail too (unless a known block is returned).
605+
logs, err := self.processor.Process(block)
606+
if err != nil {
607+
if IsKnownBlockErr(err) {
608+
stats.ignored++
609+
continue
609610
}
610611

611-
block.SetQueued(true)
612-
self.futureBlocks.Push(block)
613-
stats.queued++
614-
continue
615-
}
612+
if err == BlockFutureErr {
613+
// Allow up to MaxFuture second in the future blocks. If this limit
614+
// is exceeded the chain is discarded and processed at a later time
615+
// if given.
616+
if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max {
617+
return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
618+
}
616619

617-
if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
618-
block.SetQueued(true)
619-
self.futureBlocks.Push(block)
620-
stats.queued++
621-
continue
622-
}
620+
block.SetQueued(true)
621+
self.futureBlocks.Push(block)
622+
stats.queued++
623+
continue
624+
}
623625

624-
blockErr(block, err)
626+
if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
627+
block.SetQueued(true)
628+
self.futureBlocks.Push(block)
629+
stats.queued++
630+
continue
631+
}
625632

626-
return i, err
627-
}
633+
blockErr(block, err)
628634

629-
txcount += len(block.Transactions())
630-
631-
cblock := self.currentBlock
632-
// Compare the TD of the last known block in the canonical chain to make sure it's greater.
633-
// At this point it's possible that a different chain (fork) becomes the new canonical chain.
634-
if block.Td.Cmp(self.Td()) > 0 {
635-
// chain fork
636-
if block.ParentHash() != cblock.Hash() {
637-
// during split we merge two different chains and create the new canonical chain
638-
err := self.merge(cblock, block)
639-
if err != nil {
640-
return i, err
635+
return i, err
636+
}
637+
638+
txcount += len(block.Transactions())
639+
640+
cblock := self.currentBlock
641+
// Compare the TD of the last known block in the canonical chain to make sure it's greater.
642+
// At this point it's possible that a different chain (fork) becomes the new canonical chain.
643+
if block.Td.Cmp(self.Td()) > 0 {
644+
// chain fork
645+
if block.ParentHash() != cblock.Hash() {
646+
// during split we merge two different chains and create the new canonical chain
647+
err := self.merge(cblock, block)
648+
if err != nil {
649+
return i, err
650+
}
651+
652+
queue[i] = ChainSplitEvent{block, logs}
653+
queueEvent.splitCount++
641654
}
642655

643-
queue[i] = ChainSplitEvent{block, logs}
644-
queueEvent.splitCount++
645-
}
656+
self.mu.Lock()
657+
self.setTotalDifficulty(block.Td)
658+
self.insert(block)
659+
self.mu.Unlock()
646660

647-
self.mu.Lock()
648-
self.setTotalDifficulty(block.Td)
649-
self.insert(block)
650-
self.mu.Unlock()
661+
jsonlogger.LogJson(&logger.EthChainNewHead{
662+
BlockHash: block.Hash().Hex(),
663+
BlockNumber: block.Number(),
664+
ChainHeadHash: cblock.Hash().Hex(),
665+
BlockPrevHash: block.ParentHash().Hex(),
666+
})
651667

652-
jsonlogger.LogJson(&logger.EthChainNewHead{
653-
BlockHash: block.Hash().Hex(),
654-
BlockNumber: block.Number(),
655-
ChainHeadHash: cblock.Hash().Hex(),
656-
BlockPrevHash: block.ParentHash().Hex(),
657-
})
668+
self.setTransState(state.New(block.Root(), self.stateDb))
669+
self.txState.SetState(state.New(block.Root(), self.stateDb))
658670

659-
self.setTransState(state.New(block.Root(), self.stateDb))
660-
self.txState.SetState(state.New(block.Root(), self.stateDb))
671+
queue[i] = ChainEvent{block, block.Hash(), logs}
672+
queueEvent.canonicalCount++
661673

662-
queue[i] = ChainEvent{block, block.Hash(), logs}
663-
queueEvent.canonicalCount++
674+
if glog.V(logger.Debug) {
675+
glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
676+
}
677+
} else {
678+
if glog.V(logger.Detail) {
679+
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
680+
}
664681

665-
if glog.V(logger.Debug) {
666-
glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
667-
}
668-
} else {
669-
if glog.V(logger.Detail) {
670-
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
682+
queue[i] = ChainSideEvent{block, logs}
683+
queueEvent.sideCount++
671684
}
685+
// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
686+
// not in the canonical chain.
687+
self.write(block)
688+
// Delete from future blocks
689+
self.futureBlocks.Delete(block.Hash())
672690

673-
queue[i] = ChainSideEvent{block, logs}
674-
queueEvent.sideCount++
675-
}
676-
// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
677-
// not in the canonical chain.
678-
self.write(block)
679-
// Delete from future blocks
680-
self.futureBlocks.Delete(block.Hash())
681-
682-
stats.processed++
691+
stats.processed++
683692

693+
}
684694
}
685695

686696
if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {

eth/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,8 +527,8 @@ func (self *Ethereum) AddPeer(nodeURL string) error {
527527

528528
func (s *Ethereum) Stop() {
529529
s.net.Stop()
530-
s.protocolManager.Stop()
531530
s.chainManager.Stop()
531+
s.protocolManager.Stop()
532532
s.txPool.Stop()
533533
s.eventMux.Stop()
534534
if s.whisper != nil {

0 commit comments

Comments
 (0)