Skip to content

Commit 547b4c7

Browse files
authored
Merge pull request #302 from bane-labs/fix-persist
dbft: untie BlockQueue from worker
2 parents 1acd823 + b8f10ef commit 547b4c7

File tree

5 files changed

+44
-135
lines changed

5 files changed

+44
-135
lines changed

consensus/dbft/blockqueue.go

Lines changed: 22 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,26 @@
11
package dbft
22

33
import (
4-
"errors"
54
"fmt"
6-
"github.com/ethereum/go-ethereum/core/state"
7-
"sync"
85

9-
"github.com/ethereum/go-ethereum/common"
6+
"github.com/ethereum/go-ethereum/core"
7+
"github.com/ethereum/go-ethereum/core/state"
108
"github.com/ethereum/go-ethereum/core/types"
9+
"github.com/ethereum/go-ethereum/event"
1110
"github.com/ethereum/go-ethereum/log"
1211
)
1312

14-
const (
15-
// blockQueueCap is the number of tasks blockQueue can fit at once. It's OK for
16-
// the blockQueue not to have a proper task for the newly-created block, and
17-
// normally a single task is expected to be present in blockQueue. But we still
18-
// need blockQueueCap restriction for the case of endless change views.
19-
blockQueueCap = 100
20-
21-
clearAllMatchingTasks = -1
22-
)
23-
24-
// blockQueue is an entity that collects sealed blocks from dBFT and routs these
25-
// blocks to a proper place (either to miner or directly to chain).
13+
// blockQueue is an entity that collects sealed blocks from dBFT and stores these
14+
// blocks in the chain.
2615
type blockQueue struct {
27-
chain ChainHeaderWriter
28-
tasksLock sync.RWMutex
29-
tasks map[common.Hash]task
30-
}
31-
32-
// task holds information about miner sealing task.
33-
type task struct {
34-
height uint64
35-
resCh chan<- *types.Block
36-
cancelCh <-chan struct{}
16+
chain ChainHeaderWriter
17+
mux *event.TypeMux
3718
}
3819

3920
// newBlockQueue creates an instance of blockQueue. It's not ready for usage until
4021
// an instance of ChainHeaderWriter is properly set.
4122
func newBlockQueue() *blockQueue {
42-
return &blockQueue{
43-
tasks: make(map[common.Hash]task),
44-
}
23+
return &blockQueue{}
4524
}
4625

4726
// SetChain initializes ChainHeaderWriter instanse needed for proper blockQueue
@@ -50,54 +29,15 @@ func (bq *blockQueue) SetChain(chain ChainHeaderWriter) {
5029
bq.chain = chain
5130
}
5231

32+
// SetMux initializes mux instanse needed for proper blockQueue functioning.
33+
func (bq *blockQueue) SetMux(mux *event.TypeMux) {
34+
bq.mux = mux
35+
}
36+
5337
// PutBlock routs block either to miner or (if there's no suitable sealing task)
5438
// directly to blockchain. No block verification is performed, it is assumed that
5539
// provided block is sealed and valid.
5640
func (bq *blockQueue) PutBlock(b *types.Block, state *state.StateDB, receipts []*types.Receipt) error {
57-
h := WorkerSealHash(b.Header())
58-
59-
bq.tasksLock.Lock()
60-
task, ok := bq.tasks[h]
61-
62-
bq.clearStaleTasks(b.NumberU64(), clearAllMatchingTasks)
63-
64-
if ok {
65-
var (
66-
err error
67-
readByMiner bool
68-
)
69-
select {
70-
case <-task.cancelCh:
71-
case task.resCh <- b:
72-
readByMiner = true
73-
default:
74-
err = errors.New("sealing result is not read by miner, trying to insert block in chain manually")
75-
}
76-
delete(bq.tasks, h)
77-
78-
if readByMiner {
79-
bq.tasksLock.Unlock()
80-
return nil
81-
}
82-
83-
if err != nil {
84-
log.Warn(err.Error(),
85-
"number", b.Number(),
86-
"seal hash", h.String(),
87-
"hash", b.Hash().String(),
88-
)
89-
}
90-
}
91-
bq.tasksLock.Unlock()
92-
93-
// If we're here then we're OK with that, it just means that:
94-
// 1) either dBFT received some extra commits and trying to
95-
// send already constructed block one more time
96-
// 2) or worker has received block with the same index via network. Then
97-
// we still need to save the block in case it has different hash.
98-
// 3) or we're not a primary node in this consensus round and thus,
99-
// worker's task differs from the dBFT's proposal. In this case we
100-
// need to try to insert block right into chain.
10141
hash := b.Hash()
10242
if bq.chain.HasBlock(hash, b.NumberU64()) {
10343
return nil
@@ -110,9 +50,14 @@ func (bq *blockQueue) PutBlock(b *types.Block, state *state.StateDB, receipts []
11050
return fmt.Errorf("failed to insert block into chain: %w", err)
11151
}
11252
log.Info("Successfully inserted new block", "number", b.Number(), "hash", hash)
53+
54+
// Broadcast the block and announce chain insertion event
55+
bq.mux.Post(core.NewMinedBlockEvent{Block: b})
56+
11357
return nil
11458
}
11559

60+
currH := bq.chain.CurrentBlock().Number
11661
// Insert state directly if we have one.
11762
var logs []*types.Log
11863
for i, receipt := range receipts {
@@ -129,7 +74,7 @@ func (bq *blockQueue) PutBlock(b *types.Block, state *state.StateDB, receipts []
12974
logs = append(logs, receipt.Logs...)
13075
}
13176
// Commit block and state to database.
132-
_, err := bq.chain.WriteBlockAndSetHead(b, receipts, logs, state, true)
77+
_, err := bq.chain.WriteBlockAndSetHead(b, receipts, logs, state, b.Number().Cmp(currH) > 0)
13378
if err != nil {
13479
log.Error("Failed to write block to chain and set head",
13580
"number", b.NumberU64(),
@@ -138,51 +83,8 @@ func (bq *blockQueue) PutBlock(b *types.Block, state *state.StateDB, receipts []
13883
}
13984
log.Info("Successfully wrote new block with state", "number", b.Number(), "hash", hash)
14085

141-
return nil
142-
}
143-
144-
// ClearStaleTasks removes all stale tasks up to the specified height (including
145-
// the height itself).
146-
func (bq *blockQueue) ClearStaleTasks(till uint64) {
147-
bq.tasksLock.Lock()
148-
defer bq.tasksLock.Unlock()
149-
150-
bq.clearStaleTasks(till, clearAllMatchingTasks)
151-
}
152-
153-
// clearStaleTasks removes all stale tasks up to the specified height (including
154-
// the height itself). It doesn't hold tasksLock, so it's the caller's responsibility.
155-
func (bq *blockQueue) clearStaleTasks(till uint64, count int) {
156-
for h, task := range bq.tasks {
157-
if task.height <= till {
158-
delete(bq.tasks, h)
159-
if count != clearAllMatchingTasks {
160-
count--
161-
if count <= 0 {
162-
break
163-
}
164-
}
165-
}
166-
}
167-
}
168-
169-
// SubmitTask adds subsequent miner task to the blockqueue instance.
170-
func (bq *blockQueue) SubmitTask(sealHash common.Hash, number uint64, resCh chan<- *types.Block, cancelCh <-chan struct{}) {
171-
bq.tasksLock.Lock()
172-
defer bq.tasksLock.Unlock()
86+
// Broadcast the block and announce chain insertion event
87+
bq.mux.Post(core.NewMinedBlockEvent{Block: b})
17388

174-
// We're OK with the fact that capacity is reached, remove random outdated seal
175-
// task (it's likely won't be completed, and if it will, then the block will be
176-
// inserted to the chain directly).
177-
if len(bq.tasks) == blockQueueCap {
178-
bq.clearStaleTasks(number, 1)
179-
}
180-
181-
// Do not check the existing task with the same hash. It could happen that new
182-
// sealing task has the same hash after ChangeView sealing proposal initialisation.
183-
bq.tasks[sealHash] = task{
184-
height: number,
185-
resCh: resCh,
186-
cancelCh: cancelCh,
187-
}
89+
return nil
18890
}

consensus/dbft/dbft.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,8 @@ func New(config *params.DBFTConfig, _ ethdb.Database) (*DBFT, error) {
356356
log.Warn("error on enqueue block", "error", err.Error())
357357
}
358358
}
359+
360+
c.postBlock(res.Header())
359361
}),
360362
dbft.WithNewBlockFromContext[common.Hash](func(ctx *dbft.Context[common.Hash]) dbft.Block[common.Hash] {
361363
prepareReq := ctx.PreparationPayloads[ctx.PrimaryIndex]
@@ -714,6 +716,7 @@ func (c *DBFT) WithRequestTxs(f func(hashed []common.Hash)) {
714716
// the ongoing node sync process.
715717
func (c *DBFT) WithMux(mux *event.TypeMux) {
716718
c.mux = mux
719+
c.blockQueue.SetMux(mux)
717720

718721
go c.syncWatcher()
719722
}
@@ -765,9 +768,7 @@ func (c *DBFT) WithTxPool(pool txPool) {
765768

766769
// postBlock is a callback that updates latest accepted block data and resets
767770
// last proposal data. It must be called every time new block arrives from chain
768-
// or from consensus. It must be called strictly after new block persist since
769-
// NextConsensus calculation depends on the storage state. It also clears all
770-
// BlockQueue tasks up to the accepted block height.
771+
// or from consensus.
771772
func (c *DBFT) postBlock(h *types.Header) {
772773
num := h.Number.Uint64()
773774
if c.lastIndex < num {
@@ -776,8 +777,6 @@ func (c *DBFT) postBlock(h *types.Header) {
776777
c.lastBlockHash = h.Hash()
777778
c.lastBlockSealHash = HonestSealHash(h)
778779
c.lastBlockExtra = h.Extra
779-
780-
c.blockQueue.ClearStaleTasks(num)
781780
}
782781
}
783782

@@ -1194,9 +1193,6 @@ func (c *DBFT) Seal(chain consensus.ChainHeaderReader, b *types.Block, results c
11941193
c.lastProposal = b
11951194
c.lastProposalLock.Unlock()
11961195

1197-
sealHash := c.SealHash(b.Header())
1198-
c.blockQueue.SubmitTask(sealHash, b.NumberU64(), results, stop)
1199-
12001196
return nil
12011197
}
12021198

eth/protocols/dbft/handler_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ func TestHandling(t *testing.T) {
2929
key, _ = crypto.GenerateKey()
3030
addr = crypto.PubkeyToAddress(key.PublicKey)
3131
bc = &testBC{height: 10}
32-
s1 = New(bc, nil, func(u uint64, address common.Address) bool { return true })
33-
s2 = New(bc, nil, func(u uint64, address common.Address) bool { return true })
34-
s3 = New(bc, nil, func(u uint64, address common.Address) bool { return true })
32+
s1 = New(bc, nil, func(u uint64, address common.Address) error { return nil })
33+
s2 = New(bc, nil, func(u uint64, address common.Address) error { return nil })
34+
s3 = New(bc, nil, func(u uint64, address common.Address) error { return nil })
3535
p1 = p2p.NewPeer(enode.ID{1}, "peer1", nil)
3636
p2 = p2p.NewPeer(enode.ID{2}, "peer2", nil)
3737
p3 = p2p.NewPeer(enode.ID{3}, "peer3", nil)

eth/protocols/dbft/pool.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package dbft
66
import (
77
"container/list"
88
"errors"
9+
"fmt"
910
"sync"
1011

1112
"github.com/ethereum/go-ethereum/common"
@@ -96,7 +97,7 @@ func (p *Pool) verify(m *Message) (bool, error) {
9697
if errors.Is(err, ErrSyncing) {
9798
return false, nil
9899
}
99-
return false, err
100+
return false, fmt.Errorf("%w: %w", errDisallowedSender, err)
100101
}
101102
return true, nil
102103
}

eth/protocols/dbft/pool_test.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ func TestAddGet(t *testing.T) {
2222
m := someMessage(t, 100, bc.goodAddrs[0], bc.badKey)
2323
p.testAdd(t, false, invalidSig, m)
2424
})
25+
t.Run("syncing", func(t *testing.T) {
26+
bc.syncing = true
27+
m := bc.badMessage(t, 100)
28+
p.testAdd(t, false, nil, m)
29+
bc.syncing = false
30+
})
2531
t.Run("disallowed sender", func(t *testing.T) {
2632
m := bc.badMessage(t, 100)
2733
p.testAdd(t, false, errDisallowedSender, m)
@@ -106,6 +112,7 @@ type testChain struct {
106112
badKey *ecdsa.PrivateKey
107113
goodAddrs []common.Address
108114
badAddr common.Address
115+
syncing bool
109116
}
110117

111118
var errVerification = errors.New("verification failed")
@@ -120,13 +127,16 @@ func newTestChain() *testChain {
120127
badAddr: crypto.PubkeyToAddress(bk.PublicKey),
121128
}
122129
}
123-
func (c *testChain) IsAddressAllowed(u common.Address) bool {
130+
func (c *testChain) IsAddressAllowed(u common.Address) error {
131+
if c.syncing {
132+
return ErrSyncing
133+
}
124134
for i := range c.goodAddrs {
125135
if u == c.goodAddrs[i] {
126-
return true
136+
return nil
127137
}
128138
}
129-
return false
139+
return errors.New("address not allowed")
130140
}
131141
func (c *testChain) BlockHeight() uint64 { return c.height }
132142

0 commit comments

Comments
 (0)