Skip to content

Commit 5b0ee8e

Browse files
committed
core, eth, trie: fix data races and merge/review issues
1 parent aa0538d commit 5b0ee8e

27 files changed

+765
-465
lines changed

core/block_processor.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,14 +195,16 @@ func (sm *BlockProcessor) Process(block *types.Block) (logs vm.Logs, receipts ty
195195
defer sm.mutex.Unlock()
196196

197197
if sm.bc.HasBlock(block.Hash()) {
198-
return nil, nil, &KnownBlockError{block.Number(), block.Hash()}
198+
if _, err := state.New(block.Root(), sm.chainDb); err == nil {
199+
return nil, nil, &KnownBlockError{block.Number(), block.Hash()}
200+
}
199201
}
200-
201-
if !sm.bc.HasBlock(block.ParentHash()) {
202-
return nil, nil, ParentError(block.ParentHash())
202+
if parent := sm.bc.GetBlock(block.ParentHash()); parent != nil {
203+
if _, err := state.New(parent.Root(), sm.chainDb); err == nil {
204+
return sm.processWithParent(block, parent)
205+
}
203206
}
204-
parent := sm.bc.GetBlock(block.ParentHash())
205-
return sm.processWithParent(block, parent)
207+
return nil, nil, ParentError(block.ParentHash())
206208
}
207209

208210
func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs vm.Logs, receipts types.Receipts, err error) {

core/blockchain.go

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package core
1919

2020
import (
21+
crand "crypto/rand"
2122
"errors"
2223
"fmt"
2324
"io"
25+
"math"
2426
"math/big"
25-
"math/rand"
27+
mrand "math/rand"
2628
"runtime"
2729
"sync"
2830
"sync/atomic"
@@ -89,7 +91,8 @@ type BlockChain struct {
8991
procInterrupt int32 // interrupt signaler for block processing
9092
wg sync.WaitGroup
9193

92-
pow pow.PoW
94+
pow pow.PoW
95+
rand *mrand.Rand
9396
}
9497

9598
func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) {
@@ -112,6 +115,12 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl
112115
futureBlocks: futureBlocks,
113116
pow: pow,
114117
}
118+
// Seed a fast but crypto originating random generator
119+
seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
120+
if err != nil {
121+
return nil, err
122+
}
123+
bc.rand = mrand.New(mrand.NewSource(seed.Int64()))
115124

116125
bc.genesisBlock = bc.GetBlockByNumber(0)
117126
if bc.genesisBlock == nil {
@@ -178,21 +187,21 @@ func (self *BlockChain) loadLastState() error {
178187
fastTd := self.GetTd(self.currentFastBlock.Hash())
179188

180189
glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", self.currentHeader.Number, self.currentHeader.Hash().Bytes()[:4], headerTd)
181-
glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd)
182190
glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash().Bytes()[:4], blockTd)
191+
glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd)
183192

184193
return nil
185194
}
186195

187-
// SetHead rewind the local chain to a new head entity. In the case of headers,
188-
// everything above the new head will be deleted and the new one set. In the case
189-
// of blocks though, the head may be further rewound if block bodies are missing
190-
// (non-archive nodes after a fast sync).
196+
// SetHead rewinds the local chain to a new head. In the case of headers, everything
197+
// above the new head will be deleted and the new one set. In the case of blocks
198+
// though, the head may be further rewound if block bodies are missing (non-archive
199+
// nodes after a fast sync).
191200
func (bc *BlockChain) SetHead(head uint64) {
192201
bc.mu.Lock()
193202
defer bc.mu.Unlock()
194203

195-
// Figure out the highest known canonical assignment
204+
// Figure out the highest known canonical headers and/or blocks
196205
height := uint64(0)
197206
if bc.currentHeader != nil {
198207
if hh := bc.currentHeader.Number.Uint64(); hh > height {
@@ -266,7 +275,7 @@ func (bc *BlockChain) SetHead(head uint64) {
266275
// FastSyncCommitHead sets the current head block to the one defined by the hash
267276
// irrelevant what the chain contents were prior.
268277
func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error {
269-
// Make sure that both the block as well at it's state trie exists
278+
// Make sure that both the block as well at its state trie exists
270279
block := self.GetBlock(hash)
271280
if block == nil {
272281
return fmt.Errorf("non existent block [%x…]", hash[:4])
@@ -298,7 +307,7 @@ func (self *BlockChain) LastBlockHash() common.Hash {
298307
}
299308

300309
// CurrentHeader retrieves the current head header of the canonical chain. The
301-
// header is retrieved from the chain manager's internal cache.
310+
// header is retrieved from the blockchain's internal cache.
302311
func (self *BlockChain) CurrentHeader() *types.Header {
303312
self.mu.RLock()
304313
defer self.mu.RUnlock()
@@ -307,7 +316,7 @@ func (self *BlockChain) CurrentHeader() *types.Header {
307316
}
308317

309318
// CurrentBlock retrieves the current head block of the canonical chain. The
310-
// block is retrieved from the chain manager's internal cache.
319+
// block is retrieved from the blockchain's internal cache.
311320
func (self *BlockChain) CurrentBlock() *types.Block {
312321
self.mu.RLock()
313322
defer self.mu.RUnlock()
@@ -316,7 +325,7 @@ func (self *BlockChain) CurrentBlock() *types.Block {
316325
}
317326

318327
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
319-
// chain. The block is retrieved from the chain manager's internal cache.
328+
// chain. The block is retrieved from the blockchain's internal cache.
320329
func (self *BlockChain) CurrentFastBlock() *types.Block {
321330
self.mu.RLock()
322331
defer self.mu.RUnlock()
@@ -353,7 +362,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
353362
bc.mu.Lock()
354363
defer bc.mu.Unlock()
355364

356-
// Prepare the genesis block and reinitialize the chain
365+
// Prepare the genesis block and reinitialise the chain
357366
if err := WriteTd(bc.chainDb, genesis.Hash(), genesis.Difficulty()); err != nil {
358367
glog.Fatalf("failed to write genesis block TD: %v", err)
359368
}
@@ -403,7 +412,7 @@ func (self *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
403412
// insert injects a new head block into the current block chain. This method
404413
// assumes that the block is indeed a true head. It will also reset the head
405414
// header and the head fast sync block to this very same block to prevent them
406-
// from diverging on a different header chain.
415+
// from pointing to a possibly old canonical chain (i.e. side chain by now).
407416
//
408417
// Note, this function assumes that the `mu` mutex is held!
409418
func (bc *BlockChain) insert(block *types.Block) {
@@ -625,10 +634,10 @@ const (
625634

626635
// writeHeader writes a header into the local chain, given that its parent is
627636
// already known. If the total difficulty of the newly inserted header becomes
628-
// greater than the old known TD, the canonical chain is re-routed.
637+
// greater than the current known TD, the canonical chain is re-routed.
629638
//
630639
// Note: This method is not concurrent-safe with inserting blocks simultaneously
631-
// into the chain, as side effects caused by reorganizations cannot be emulated
640+
// into the chain, as side effects caused by reorganisations cannot be emulated
632641
// without the real blocks. Hence, writing headers directly should only be done
633642
// in two scenarios: pure-header mode of operation (light clients), or properly
634643
// separated header/block phases (non-archive clients).
@@ -678,10 +687,9 @@ func (self *BlockChain) writeHeader(header *types.Header) error {
678687
return nil
679688
}
680689

681-
// InsertHeaderChain will attempt to insert the given header chain in to the
682-
// local chain, possibly creating a fork. If an error is returned, it will
683-
// return the index number of the failing header as well an error describing
684-
// what went wrong.
690+
// InsertHeaderChain attempts to insert the given header chain in to the local
691+
// chain, possibly creating a reorg. If an error is returned, it will return the
692+
// index number of the failing header as well an error describing what went wrong.
685693
//
686694
// The verify parameter can be used to fine tune whether nonce verification
687695
// should be done or not. The reason behind the optional check is because some
@@ -702,7 +710,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
702710
// Generate the list of headers that should be POW verified
703711
verify := make([]bool, len(chain))
704712
for i := 0; i < len(verify)/checkFreq; i++ {
705-
index := i*checkFreq + rand.Intn(checkFreq)
713+
index := i*checkFreq + self.rand.Intn(checkFreq)
706714
if index >= len(verify) {
707715
index = len(verify) - 1
708716
}
@@ -766,10 +774,6 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
766774
pending.Wait()
767775

768776
// If anything failed, report
769-
if atomic.LoadInt32(&self.procInterrupt) == 1 {
770-
glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
771-
return 0, nil
772-
}
773777
if failed > 0 {
774778
for i, err := range errs {
775779
if err != nil {
@@ -807,6 +811,9 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
807811
// Rollback is designed to remove a chain of links from the database that aren't
808812
// certain enough to be valid.
809813
func (self *BlockChain) Rollback(chain []common.Hash) {
814+
self.mu.Lock()
815+
defer self.mu.Unlock()
816+
810817
for i := len(chain) - 1; i >= 0; i-- {
811818
hash := chain[i]
812819

@@ -905,6 +912,12 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain
905912
glog.Fatal(errs[index])
906913
return
907914
}
915+
if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
916+
errs[index] = fmt.Errorf("failed to write log blooms: %v", err)
917+
atomic.AddInt32(&failed, 1)
918+
glog.Fatal(errs[index])
919+
return
920+
}
908921
atomic.AddInt32(&stats.processed, 1)
909922
}
910923
}
@@ -920,17 +933,17 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain
920933
pending.Wait()
921934

922935
// If anything failed, report
923-
if atomic.LoadInt32(&self.procInterrupt) == 1 {
924-
glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
925-
return 0, nil
926-
}
927936
if failed > 0 {
928937
for i, err := range errs {
929938
if err != nil {
930939
return i, err
931940
}
932941
}
933942
}
943+
if atomic.LoadInt32(&self.procInterrupt) == 1 {
944+
glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
945+
return 0, nil
946+
}
934947
// Update the head fast sync block if better
935948
self.mu.Lock()
936949
head := blockChain[len(errs)-1]

core/blockchain_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ func makeBlockChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.B
452452

453453
func chm(genesis *types.Block, db ethdb.Database) *BlockChain {
454454
var eventMux event.TypeMux
455-
bc := &BlockChain{chainDb: db, genesisBlock: genesis, eventMux: &eventMux, pow: FakePow{}}
455+
bc := &BlockChain{chainDb: db, genesisBlock: genesis, eventMux: &eventMux, pow: FakePow{}, rand: rand.New(rand.NewSource(0))}
456456
bc.headerCache, _ = lru.New(100)
457457
bc.bodyCache, _ = lru.New(100)
458458
bc.bodyRLPCache, _ = lru.New(100)

core/chain_util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ func WriteMipmapBloom(db ethdb.Database, number uint64, receipts types.Receipts)
394394
bloomDat, _ := db.Get(key)
395395
bloom := types.BytesToBloom(bloomDat)
396396
for _, receipt := range receipts {
397-
for _, log := range receipt.Logs() {
397+
for _, log := range receipt.Logs {
398398
bloom.Add(log.Address.Big())
399399
}
400400
}

core/chain_util_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -345,15 +345,15 @@ func TestMipmapBloom(t *testing.T) {
345345
db, _ := ethdb.NewMemDatabase()
346346

347347
receipt1 := new(types.Receipt)
348-
receipt1.SetLogs(vm.Logs{
348+
receipt1.Logs = vm.Logs{
349349
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
350350
&vm.Log{Address: common.BytesToAddress([]byte("address"))},
351-
})
351+
}
352352
receipt2 := new(types.Receipt)
353-
receipt2.SetLogs(vm.Logs{
353+
receipt2.Logs = vm.Logs{
354354
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
355355
&vm.Log{Address: common.BytesToAddress([]byte("address1"))},
356-
})
356+
}
357357

358358
WriteMipmapBloom(db, 1, types.Receipts{receipt1})
359359
WriteMipmapBloom(db, 2, types.Receipts{receipt2})
@@ -368,15 +368,15 @@ func TestMipmapBloom(t *testing.T) {
368368
// reset
369369
db, _ = ethdb.NewMemDatabase()
370370
receipt := new(types.Receipt)
371-
receipt.SetLogs(vm.Logs{
371+
receipt.Logs = vm.Logs{
372372
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
373-
})
373+
}
374374
WriteMipmapBloom(db, 999, types.Receipts{receipt1})
375375

376376
receipt = new(types.Receipt)
377-
receipt.SetLogs(vm.Logs{
377+
receipt.Logs = vm.Logs{
378378
&vm.Log{Address: common.BytesToAddress([]byte("test 1"))},
379-
})
379+
}
380380
WriteMipmapBloom(db, 1000, types.Receipts{receipt})
381381

382382
bloom := GetMipmapBloom(db, 1000, 1000)
@@ -403,22 +403,22 @@ func TestMipmapChain(t *testing.T) {
403403
defer db.Close()
404404

405405
genesis := WriteGenesisBlockForTesting(db, GenesisAccount{addr, big.NewInt(1000000)})
406-
chain := GenerateChain(genesis, db, 1010, func(i int, gen *BlockGen) {
406+
chain, receipts := GenerateChain(genesis, db, 1010, func(i int, gen *BlockGen) {
407407
var receipts types.Receipts
408408
switch i {
409409
case 1:
410410
receipt := types.NewReceipt(nil, new(big.Int))
411-
receipt.SetLogs(vm.Logs{
411+
receipt.Logs = vm.Logs{
412412
&vm.Log{
413413
Address: addr,
414414
Topics: []common.Hash{hash1},
415415
},
416-
})
416+
}
417417
gen.AddUncheckedReceipt(receipt)
418418
receipts = types.Receipts{receipt}
419419
case 1000:
420420
receipt := types.NewReceipt(nil, new(big.Int))
421-
receipt.SetLogs(vm.Logs{&vm.Log{Address: addr2}})
421+
receipt.Logs = vm.Logs{&vm.Log{Address: addr2}}
422422
gen.AddUncheckedReceipt(receipt)
423423
receipts = types.Receipts{receipt}
424424

@@ -431,15 +431,15 @@ func TestMipmapChain(t *testing.T) {
431431
}
432432
WriteMipmapBloom(db, uint64(i+1), receipts)
433433
})
434-
for _, block := range chain {
434+
for i, block := range chain {
435435
WriteBlock(db, block)
436436
if err := WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
437437
t.Fatalf("failed to insert block number: %v", err)
438438
}
439439
if err := WriteHeadBlockHash(db, block.Hash()); err != nil {
440440
t.Fatalf("failed to insert block number: %v", err)
441441
}
442-
if err := PutBlockReceipts(db, block, block.Receipts()); err != nil {
442+
if err := PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
443443
t.Fatal("error writing block receipts:", err)
444444
}
445445
}

core/state/sync.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,13 @@ import (
2626
"github.com/ethereum/go-ethereum/trie"
2727
)
2828

29-
// StateSync is the main state synchronisation scheduler, which provides yet the
29+
// StateSync is the main state synchronisation scheduler, which provides yet the
3030
// unknown state hashes to retrieve, accepts node data associated with said hashes
3131
// and reconstructs the state database step by step until all is done.
3232
type StateSync trie.TrieSync
3333

3434
// NewStateSync create a new state trie download scheduler.
3535
func NewStateSync(root common.Hash, database ethdb.Database) *StateSync {
36-
// Pre-declare the result syncer t
3736
var syncer *trie.TrieSync
3837

3938
callback := func(leaf []byte, parent common.Hash) error {

core/state/sync_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type testAccount struct {
3838
func makeTestState() (ethdb.Database, common.Hash, []*testAccount) {
3939
// Create an empty state
4040
db, _ := ethdb.NewMemDatabase()
41-
state := New(common.Hash{}, db)
41+
state, _ := New(common.Hash{}, db)
4242

4343
// Fill it with some arbitrary data
4444
accounts := []*testAccount{}
@@ -68,7 +68,7 @@ func makeTestState() (ethdb.Database, common.Hash, []*testAccount) {
6868
// checkStateAccounts cross references a reconstructed state with an expected
6969
// account array.
7070
func checkStateAccounts(t *testing.T, db ethdb.Database, root common.Hash, accounts []*testAccount) {
71-
state := New(root, db)
71+
state, _ := New(root, db)
7272
for i, acc := range accounts {
7373

7474
if balance := state.GetBalance(acc.address); balance.Cmp(acc.balance) != 0 {

0 commit comments

Comments
 (0)