Skip to content

Commit 1acd823

Browse files
authored
Merge pull request #298 from bane-labs/fix-extensible
Skip dBFT extensible verification during first node sync
2 parents 7717358 + 8370816 commit 1acd823

File tree

6 files changed

+83
-28
lines changed

6 files changed

+83
-28
lines changed

consensus/dbft/dbft.go

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,9 @@ type DBFT struct {
185185
// downloader events. Downloader events are used to track miner's state since
186186
// miner work may be temporary suspended due to the node sync.
187187
mux *event.TypeMux
188-
// syncing indicates whether the node is still syncing.
188+
// syncing indicates whether the node is still syncing. This variable is updated
189+
// irrespectively from the engine activity, and thus, may be relied on even when
190+
// dBFT engine is not started.
189191
syncing atomic.Bool
190192

191193
config *params.DBFTConfig // Consensus engine configuration parameters
@@ -712,6 +714,47 @@ func (c *DBFT) WithRequestTxs(f func(hashed []common.Hash)) {
712714
// the ongoing node sync process.
713715
func (c *DBFT) WithMux(mux *event.TypeMux) {
714716
c.mux = mux
717+
718+
go c.syncWatcher()
719+
}
720+
721+
// syncWatcher is a standalone loop aimed to be active irrespectively of dBFT engine
722+
// activity. It tracks the first chain sync attempt till its end.
723+
func (c *DBFT) syncWatcher() {
724+
downloaderEvents := c.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
725+
defer func() {
726+
if !downloaderEvents.Closed() {
727+
downloaderEvents.Unsubscribe()
728+
}
729+
}()
730+
dlEventCh := downloaderEvents.Chan()
731+
732+
events:
733+
for {
734+
select {
735+
case <-c.quit:
736+
break events
737+
case ev := <-dlEventCh:
738+
if ev == nil {
739+
// Unsubscription done, stop listening.
740+
dlEventCh = nil
741+
break events
742+
}
743+
switch ev.Data.(type) {
744+
case downloader.StartEvent:
745+
c.syncing.Store(true)
746+
747+
case downloader.FailedEvent:
748+
c.syncing.Store(false)
749+
750+
case downloader.DoneEvent:
751+
c.syncing.Store(false)
752+
753+
// Stop reacting to downloader events.
754+
downloaderEvents.Unsubscribe()
755+
}
756+
}
757+
}
715758
}
716759

717760
// WithTxPool initializes transaction pool API for DBFT interactions with memory pool
@@ -1244,7 +1287,7 @@ func (c *DBFT) eventLoop() {
12441287
// been broadcasted the events are unregistered and the loop is exited. This to
12451288
// prevent a major security vuln where external parties can DOS you with blocks
12461289
// and halt your dBFT operation for as long as the DOS continues.
1247-
downloaderEvents := c.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
1290+
downloaderEvents := c.mux.Subscribe(downloader.DoneEvent{}, downloader.FailedEvent{})
12481291
defer func() {
12491292
if !downloaderEvents.Closed() {
12501293
downloaderEvents.Unsubscribe()
@@ -1296,7 +1339,7 @@ events:
12961339
case tx := <-c.txs:
12971340
c.dbft.OnTransaction(&Transaction{Tx: tx})
12981341
case b := <-c.chainHeadEvents:
1299-
err := c.handleChainBlock(b.Block.Header())
1342+
err := c.handleChainBlock(b.Block.Header(), true)
13001343
if err != nil {
13011344
log.Warn("Failed to handle chain block",
13021345
"index", b.Block.NumberU64(),
@@ -1318,14 +1361,9 @@ events:
13181361
continue
13191362
}
13201363
switch ev.Data.(type) {
1321-
case downloader.StartEvent:
1322-
c.syncing.Store(true)
1323-
13241364
case downloader.FailedEvent:
1325-
c.syncing.Store(false)
1326-
13271365
latest := c.chain.CurrentHeader()
1328-
err := c.handleChainBlock(latest)
1366+
err := c.handleChainBlock(latest, false)
13291367
if err != nil {
13301368
log.Warn("Failed to handle latest chain block",
13311369
"index", latest.Number.Uint64(),
@@ -1334,13 +1372,11 @@ events:
13341372
}
13351373

13361374
case downloader.DoneEvent:
1337-
c.syncing.Store(false)
1338-
13391375
// Stop reacting to downloader events.
13401376
downloaderEvents.Unsubscribe()
13411377

13421378
latest := c.chain.CurrentHeader()
1343-
err := c.handleChainBlock(latest)
1379+
err := c.handleChainBlock(latest, false)
13441380
if err != nil {
13451381
log.Warn("Failed to handle latest chain block",
13461382
"index", latest.Number.Uint64(),
@@ -1361,7 +1397,7 @@ events:
13611397
}
13621398
}
13631399
if latestBlock.Block != nil {
1364-
err := c.handleChainBlock(latestBlock.Block.Header())
1400+
err := c.handleChainBlock(latestBlock.Block.Header(), true)
13651401
if err != nil {
13661402
log.Warn("Failed to handle latest chain block",
13671403
"index", latestBlock.Block.NumberU64(),
@@ -1410,6 +1446,10 @@ func (c *DBFT) OnPayload(cp *dbftproto.Message) error {
14101446
log.Debug("Skip dBFT payload handling: dbft is inactive or not started yet", "hash", cp.Hash())
14111447
return nil
14121448
}
1449+
if c.syncing.Load() {
1450+
log.Debug("Skip dBFT payload handling due to sync", "hash", cp.Hash())
1451+
return nil
1452+
}
14131453

14141454
p := payloadFromMessage(cp)
14151455
// decode payload data into message
@@ -1477,15 +1517,21 @@ func (c *DBFT) validatePayload(p *Payload) error {
14771517

14781518
// IsExtensibleAllowed determines if address is allowed to send extensible payloads
14791519
// (only consensus payloads for now) at the specified height.
1480-
func (c *DBFT) IsExtensibleAllowed(h uint64, u common.Address) bool {
1520+
func (c *DBFT) IsExtensibleAllowed(h uint64, u common.Address) error {
1521+
// Can't verify extensible sender if the node has an outdated state.
1522+
if c.syncing.Load() {
1523+
return dbftproto.ErrSyncing
1524+
}
14811525
// Only validators are included into extensible whitelist for now.
14821526
validators, err := c.getValidators(&h, nil, nil)
14831527
if err != nil {
1484-
return false
1528+
return fmt.Errorf("failed to get validators: %w", err)
14851529
}
14861530
n := sort.Search(len(validators), func(i int) bool { return validators[i].Cmp(u) >= 0 })
1487-
res := n < len(validators)
1488-
return res
1531+
if n >= len(validators) {
1532+
return fmt.Errorf("address is not a validator")
1533+
}
1534+
return nil
14891535
}
14901536

14911537
func (c *DBFT) newPayload(ctx *dbft.Context[common.Hash], t dbft.MessageType, msg any) dbft.ConsensusPayload[common.Hash] {
@@ -1503,11 +1549,11 @@ func (c *DBFT) newPayload(ctx *dbft.Context[common.Hash], t dbft.MessageType, ms
15031549
return cp
15041550
}
15051551

1506-
func (c *DBFT) handleChainBlock(h *types.Header) error {
1552+
func (c *DBFT) handleChainBlock(h *types.Header, checkForSync bool) error {
15071553
// A short path if miner is not active and the node is in the process of block
15081554
// sync. In this case dBFT can't react properly on the newcoming blocks since no
15091555
// sealing task is expected from miner.
1510-
if c.syncing.Load() {
1556+
if checkForSync && c.syncing.Load() {
15111557
log.Info("Skipping dBFT block callback due to sync",
15121558
"block index", h.Number.Int64(),
15131559
"dbft index", c.dbft.BlockIndex,

eth/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
276276
var (
277277
bft *dbft.DBFT
278278
onPayload func(*dbftproto.Message) error
279-
isExtensibleAllowed func(uint64, common.Address) bool
279+
isExtensibleAllowed func(uint64, common.Address) error
280280
)
281281
switch t := eth.engine.(type) {
282282
case *dbft.DBFT:

eth/protocols/dbft/handler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ const (
3434
maxMessageSize = 4 * 1024 * 1024
3535
)
3636

37+
// ErrSyncing is returned when operation can't be performed due to the fact that
38+
// the node is in the process of chain sync.
39+
var ErrSyncing = errors.New("node is syncing")
40+
3741
var (
3842
errMsgTooLarge = errors.New("message too long")
3943
errDecode = errors.New("invalid message")

eth/protocols/dbft/ledger.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ import (
1010

1111
type ledger struct {
1212
bc BlockChainAPI
13-
isExtensibleAllowed func(height uint64, addr common.Address) bool
13+
isExtensibleAllowed func(height uint64, addr common.Address) error
1414
}
1515

16-
func newLedger(bc BlockChainAPI, isExtensibleAllowed func(uint64, common.Address) bool) *ledger {
16+
func newLedger(bc BlockChainAPI, isExtensibleAllowed func(uint64, common.Address) error) *ledger {
1717
return &ledger{
1818
bc: bc,
1919
isExtensibleAllowed: isExtensibleAllowed,
@@ -24,6 +24,6 @@ func (l *ledger) BlockHeight() uint64 {
2424
return uint64(l.bc.BlockNumber())
2525
}
2626

27-
func (l *ledger) IsAddressAllowed(addr common.Address) bool {
27+
func (l *ledger) IsAddressAllowed(addr common.Address) error {
2828
return l.isExtensibleAllowed(l.BlockHeight(), addr)
2929
}

eth/protocols/dbft/pool.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
// Ledger is enough of Blockchain to satisfy Pool.
1515
type Ledger interface {
1616
BlockHeight() uint64
17-
IsAddressAllowed(common.Address) bool
17+
IsAddressAllowed(common.Address) error
1818
}
1919

2020
// Pool represents a pool of extensible payloads.
@@ -90,8 +90,13 @@ func (p *Pool) verify(m *Message) (bool, error) {
9090
}
9191
return false, errInvalidHeight
9292
}
93-
if !p.chain.IsAddressAllowed(m.Sender) {
94-
return false, errDisallowedSender
93+
err = p.chain.IsAddressAllowed(m.Sender)
94+
if err != nil {
95+
// There's no reliable way to check sender for syncing node.
96+
if errors.Is(err, ErrSyncing) {
97+
return false, nil
98+
}
99+
return false, err
95100
}
96101
return true, nil
97102
}
@@ -120,7 +125,7 @@ func (p *Pool) RemoveStale(index uint64) {
120125
old := elem
121126
elem = elem.Next()
122127

123-
if m.ValidBlockEnd <= index || !p.chain.IsAddressAllowed(m.Sender) {
128+
if m.ValidBlockEnd <= index || p.chain.IsAddressAllowed(m.Sender) != nil {
124129
delete(p.verified, h)
125130
lst.Remove(old)
126131
continue

eth/protocols/dbft/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type Service struct {
3636
}
3737

3838
// New creates a new instance of [Service].
39-
func New(bc BlockChainAPI, onPayload func(*Message) error, isExtensibleAllowed func(uint64, common.Address) bool) *Service {
39+
func New(bc BlockChainAPI, onPayload func(*Message) error, isExtensibleAllowed func(uint64, common.Address) error) *Service {
4040
poolLedger := newLedger(bc, isExtensibleAllowed)
4141
return &Service{
4242
bc: bc,

0 commit comments

Comments
 (0)