Skip to content

Commit d5d910e

Browse files
authored
Merge pull request #14323 from fjl/ethash-verify-headers-fix
consensus/ethash: simplify concurrency in VerifyHeaders
2 parents 5e29f4b + b275895 commit d5d910e

File tree

2 files changed

+52
-85
lines changed

2 files changed

+52
-85
lines changed

consensus/ethash/consensus.go

Lines changed: 48 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"fmt"
2323
"math/big"
2424
"runtime"
25-
"sync/atomic"
2625
"time"
2726

2827
"github.com/ethereum/go-ethereum/common"
@@ -46,7 +45,6 @@ var (
4645
// codebase, inherently breaking if the engine is swapped out. Please put common
4746
// error types into the consensus package.
4847
var (
49-
errInvalidChain = errors.New("invalid header chain")
5048
errLargeBlockTime = errors.New("timestamp too big")
5149
errZeroBlockTime = errors.New("timestamp equals parent's")
5250
errTooManyUncles = errors.New("too many uncles")
@@ -90,111 +88,80 @@ func (ethash *Ethash) VerifyHeader(chain consensus.ChainReader, header *types.He
9088
// a results channel to retrieve the async verifications.
9189
func (ethash *Ethash) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) {
9290
// If we're running a full engine faking, accept any input as valid
93-
if ethash.fakeFull {
91+
if ethash.fakeFull || len(headers) == 0 {
9492
abort, results := make(chan struct{}), make(chan error, len(headers))
9593
for i := 0; i < len(headers); i++ {
9694
results <- nil
9795
}
9896
return abort, results
9997
}
98+
10099
// Spawn as many workers as allowed threads
101100
workers := runtime.GOMAXPROCS(0)
102101
if len(headers) < workers {
103102
workers = len(headers)
104103
}
105-
// Create a task channel and spawn the verifiers
106-
type result struct {
107-
index int
108-
err error
109-
}
110-
inputs := make(chan int, workers)
111-
outputs := make(chan result, len(headers))
112104

113-
var badblock uint64
105+
// Create a task channel and spawn the verifiers
106+
var (
107+
inputs = make(chan int)
108+
done = make(chan int, workers)
109+
errors = make([]error, len(headers))
110+
abort = make(chan struct{})
111+
)
114112
for i := 0; i < workers; i++ {
115113
go func() {
116114
for index := range inputs {
117-
// If we've found a bad block already before this, stop validating
118-
if bad := atomic.LoadUint64(&badblock); bad != 0 && bad <= headers[index].Number.Uint64() {
119-
outputs <- result{index: index, err: errInvalidChain}
120-
continue
121-
}
122-
// We need to look up the first parent
123-
var parent *types.Header
124-
if index == 0 {
125-
parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1)
126-
} else if headers[index-1].Hash() == headers[index].ParentHash {
127-
parent = headers[index-1]
128-
}
129-
// Ensure the validation is useful and execute it
130-
var failure error
131-
switch {
132-
case chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()-1) != nil:
133-
outputs <- result{index: index, err: nil}
134-
case parent == nil:
135-
failure = consensus.ErrUnknownAncestor
136-
outputs <- result{index: index, err: failure}
137-
default:
138-
failure = ethash.verifyHeader(chain, headers[index], parent, false, seals[index])
139-
outputs <- result{index: index, err: failure}
140-
}
141-
// If a validation failure occurred, mark subsequent blocks invalid
142-
if failure != nil {
143-
number := headers[index].Number.Uint64()
144-
if prev := atomic.LoadUint64(&badblock); prev == 0 || prev > number {
145-
// This two step atomic op isn't thread-safe in that `badblock` might end
146-
// up slightly higher than the block number of the first failure (if many
147-
// workers try to write at the same time), but it's fine as we're mostly
148-
// interested to avoid large useless work, we don't care about 1-2 extra
149-
// runs. Doing "full thread safety" would involve mutexes, which would be
150-
// a noticeable sync overhead on the fast spinning worker routines.
151-
atomic.StoreUint64(&badblock, number)
152-
}
153-
}
115+
errors[index] = ethash.verifyHeaderWorker(chain, headers, seals, index)
116+
done <- index
154117
}
155118
}()
156119
}
157-
// Feed item indices to the workers until done, sorting and feeding the results to the caller
158-
dones := make([]bool, len(headers))
159-
errors := make([]error, len(headers))
160-
161-
abort := make(chan struct{})
162-
returns := make(chan error, len(headers))
163120

121+
errorsOut := make(chan error, len(headers))
164122
go func() {
165123
defer close(inputs)
166-
167-
input, output := 0, 0
168-
for i := 0; i < len(headers)*2; i++ {
169-
var res result
170-
171-
// If there are tasks left, push to workers
172-
if input < len(headers) {
173-
select {
174-
case inputs <- input:
175-
input++
176-
continue
177-
case <-abort:
178-
return
179-
case res = <-outputs:
124+
var (
125+
in, out = 0, 0
126+
checked = make([]bool, len(headers))
127+
inputs = inputs
128+
)
129+
for {
130+
select {
131+
case inputs <- in:
132+
if in++; in == len(headers) {
133+
// Reached end of headers. Stop sending to workers.
134+
inputs = nil
180135
}
181-
} else {
182-
// Otherwise keep waiting for results
183-
select {
184-
case <-abort:
185-
return
186-
case res = <-outputs:
136+
case index := <-done:
137+
for checked[index] = true; checked[out]; out++ {
138+
errorsOut <- errors[out]
139+
if out == len(headers)-1 {
140+
return
141+
}
187142
}
188-
}
189-
// A result arrived, save and propagate if next
190-
dones[res.index], errors[res.index] = true, res.err
191-
for output < len(headers) && dones[output] {
192-
returns <- errors[output]
193-
output++
143+
case <-abort:
144+
return
194145
}
195146
}
196147
}()
197-
return abort, returns
148+
return abort, errorsOut
149+
}
150+
151+
func (ethash *Ethash) verifyHeaderWorker(chain consensus.ChainReader, headers []*types.Header, seals []bool, index int) error {
152+
var parent *types.Header
153+
if index == 0 {
154+
parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1)
155+
} else if headers[index-1].Hash() == headers[index].ParentHash {
156+
parent = headers[index-1]
157+
}
158+
if parent == nil {
159+
return consensus.ErrUnknownAncestor
160+
}
161+
if chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()) != nil {
162+
return nil // known block
163+
}
164+
return ethash.verifyHeader(chain, headers[index], parent, false, seals[index])
198165
}
199166

200167
// VerifyUncles verifies that the given block's uncles conform to the consensus

core/dao_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
6262
gspec.MustCommit(db)
6363
bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
6464

65-
blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()+1))
65+
blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()))
6666
for j := 0; j < len(blocks)/2; j++ {
6767
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
6868
}
@@ -83,7 +83,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
8383
gspec.MustCommit(db)
8484
bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
8585

86-
blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()+1))
86+
blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()))
8787
for j := 0; j < len(blocks)/2; j++ {
8888
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
8989
}
@@ -105,7 +105,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
105105
gspec.MustCommit(db)
106106
bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
107107

108-
blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()+1))
108+
blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()))
109109
for j := 0; j < len(blocks)/2; j++ {
110110
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
111111
}
@@ -121,7 +121,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
121121
gspec.MustCommit(db)
122122
bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
123123

124-
blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()+1))
124+
blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()))
125125
for j := 0; j < len(blocks)/2; j++ {
126126
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
127127
}

0 commit comments

Comments
 (0)