Skip to content

Commit 84565dc

Browse files
lightclientholiman
andauthored
eth/catalyst: ensure period zero mode leaves no pending txs in pool (#30264)
closes #29475, replaces #29657, #30104 Fixes two issues. First is a deadlock where the txpool attempts to reorg, but can't complete because there are no readers left for the new txs subscription. Second, resolves a problem with on demand mode where txs may be left pending when there are more pending txs than block space. Co-authored-by: Martin Holst Swende <[email protected]>
1 parent 41b3b30 commit 84565dc

File tree

4 files changed

+178
-70
lines changed

4 files changed

+178
-70
lines changed

eth/catalyst/api.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, pa
184184
return engine.STATUS_INVALID, engine.InvalidParams.With(errors.New("forkChoiceUpdateV1 called post-shanghai"))
185185
}
186186
}
187-
return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1, false)
187+
return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1)
188188
}
189189

190190
// ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload
@@ -207,7 +207,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, pa
207207
return engine.STATUS_INVALID, engine.UnsupportedFork.With(errors.New("forkchoiceUpdatedV2 must only be called with paris and shanghai payloads"))
208208
}
209209
}
210-
return api.forkchoiceUpdated(update, params, engine.PayloadV2, false)
210+
return api.forkchoiceUpdated(update, params, engine.PayloadV2)
211211
}
212212

213213
// ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root
@@ -228,10 +228,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, pa
228228
// hash, even if params are wrong. To do this we need to split up
229229
// forkchoiceUpdate into a function that only updates the head and then a
230230
// function that kicks off block construction.
231-
return api.forkchoiceUpdated(update, params, engine.PayloadV3, false)
231+
return api.forkchoiceUpdated(update, params, engine.PayloadV3)
232232
}
233233

234-
func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, simulatorMode bool) (engine.ForkChoiceResponse, error) {
234+
func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion) (engine.ForkChoiceResponse, error) {
235235
api.forkchoiceLock.Lock()
236236
defer api.forkchoiceLock.Unlock()
237237

@@ -374,19 +374,6 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
374374
if api.localBlocks.has(id) {
375375
return valid(&id), nil
376376
}
377-
// If the beacon chain is ran by a simulator, then transaction insertion,
378-
// block insertion and block production will happen without any timing
379-
// delay between them. This will cause flaky simulator executions due to
380-
// the transaction pool running its internal reset operation on a back-
381-
// ground thread. To avoid the racey behavior - in simulator mode - the
382-
// pool will be explicitly blocked on its reset before continuing to the
383-
// block production below.
384-
if simulatorMode {
385-
if err := api.eth.TxPool().Sync(); err != nil {
386-
log.Error("Failed to sync transaction pool", "err", err)
387-
return valid(nil), engine.InvalidPayloadAttributes.With(err)
388-
}
389-
}
390377
payload, err := api.eth.Miner().BuildPayload(args)
391378
if err != nil {
392379
log.Error("Failed to build payload", "err", err)

eth/catalyst/simulated_beacon.go

Lines changed: 51 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"crypto/rand"
2121
"crypto/sha256"
2222
"errors"
23+
"fmt"
2324
"math/big"
2425
"sync"
2526
"time"
@@ -30,6 +31,7 @@ import (
3031
"github.com/ethereum/go-ethereum/core/types"
3132
"github.com/ethereum/go-ethereum/crypto/kzg4844"
3233
"github.com/ethereum/go-ethereum/eth"
34+
"github.com/ethereum/go-ethereum/event"
3335
"github.com/ethereum/go-ethereum/log"
3436
"github.com/ethereum/go-ethereum/node"
3537
"github.com/ethereum/go-ethereum/params"
@@ -41,36 +43,46 @@ const devEpochLength = 32
4143
// withdrawalQueue implements a FIFO queue which holds withdrawals that are
4244
// pending inclusion.
4345
type withdrawalQueue struct {
44-
pending chan *types.Withdrawal
46+
pending types.Withdrawals
47+
mu sync.Mutex
48+
feed event.Feed
49+
subs event.SubscriptionScope
4550
}
4651

52+
type newWithdrawalsEvent struct{ Withdrawals types.Withdrawals }
53+
4754
// add queues a withdrawal for future inclusion.
4855
func (w *withdrawalQueue) add(withdrawal *types.Withdrawal) error {
49-
select {
50-
case w.pending <- withdrawal:
51-
break
52-
default:
53-
return errors.New("withdrawal queue full")
54-
}
56+
w.mu.Lock()
57+
w.pending = append(w.pending, withdrawal)
58+
w.mu.Unlock()
59+
60+
w.feed.Send(newWithdrawalsEvent{types.Withdrawals{withdrawal}})
5561
return nil
5662
}
5763

58-
// gatherPending returns a number of queued withdrawals up to a maximum count.
59-
func (w *withdrawalQueue) gatherPending(maxCount int) []*types.Withdrawal {
60-
withdrawals := []*types.Withdrawal{}
61-
for {
62-
select {
63-
case withdrawal := <-w.pending:
64-
withdrawals = append(withdrawals, withdrawal)
65-
if len(withdrawals) == maxCount {
66-
return withdrawals
67-
}
68-
default:
69-
return withdrawals
70-
}
71-
}
64+
// pop dequeues the specified number of withdrawals from the queue.
65+
func (w *withdrawalQueue) pop(count int) types.Withdrawals {
66+
w.mu.Lock()
67+
defer w.mu.Unlock()
68+
69+
count = min(count, len(w.pending))
70+
popped := w.pending[0:count]
71+
w.pending = w.pending[count:]
72+
73+
return popped
74+
}
75+
76+
// subscribe allows a listener to be updated when new withdrawals are added to
77+
// the queue.
78+
func (w *withdrawalQueue) subscribe(ch chan<- newWithdrawalsEvent) event.Subscription {
79+
sub := w.feed.Subscribe(ch)
80+
return w.subs.Track(sub)
7281
}
7382

83+
// SimulatedBeacon drives an Ethereum instance as if it were a real beacon
84+
// client. It can run in period mode where it mines a new block every period
85+
// (seconds) or on every transaction via Commit, Fork and AdjustTime.
7486
type SimulatedBeacon struct {
7587
shutdownCh chan struct{}
7688
eth *eth.Ethereum
@@ -86,10 +98,6 @@ type SimulatedBeacon struct {
8698
}
8799

88100
// NewSimulatedBeacon constructs a new simulated beacon chain.
89-
// Period sets the period in which blocks should be produced.
90-
//
91-
// - If period is set to 0, a block is produced on every transaction.
92-
// via Commit, Fork and AdjustTime.
93101
func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, error) {
94102
block := eth.BlockChain().CurrentBlock()
95103
current := engine.ForkchoiceStateV1{
@@ -112,7 +120,6 @@ func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, err
112120
engineAPI: engineAPI,
113121
lastBlockTime: block.Time,
114122
curForkchoiceState: current,
115-
withdrawals: withdrawalQueue{make(chan *types.Withdrawal, 20)},
116123
}, nil
117124
}
118125

@@ -156,6 +163,16 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
156163
c.setCurrentState(header.Hash(), *finalizedHash)
157164
}
158165

166+
// Because transaction insertion, block insertion, and block production will
167+
// happen without any timing delay between them in simulator mode and the
168+
// transaction pool will be running its internal reset operation on a
169+
// background thread, flaky executions can happen. To avoid the racey
170+
// behavior, the pool will be explicitly blocked on its reset before
171+
// continuing to the block production below.
172+
if err := c.eth.APIBackend.TxPool().Sync(); err != nil {
173+
return fmt.Errorf("failed to sync txpool: %w", err)
174+
}
175+
159176
var random [32]byte
160177
rand.Read(random[:])
161178
fcResponse, err := c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, &engine.PayloadAttributes{
@@ -164,13 +181,14 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
164181
Withdrawals: withdrawals,
165182
Random: random,
166183
BeaconRoot: &common.Hash{},
167-
}, engine.PayloadV3, true)
184+
}, engine.PayloadV3)
168185
if err != nil {
169186
return err
170187
}
171188
if fcResponse == engine.STATUS_SYNCING {
172189
return errors.New("chain rewind prevented invocation of payload creation")
173190
}
191+
174192
envelope, err := c.engineAPI.getPayload(*fcResponse.PayloadID, true)
175193
if err != nil {
176194
return err
@@ -223,8 +241,7 @@ func (c *SimulatedBeacon) loop() {
223241
case <-c.shutdownCh:
224242
return
225243
case <-timer.C:
226-
withdrawals := c.withdrawals.gatherPending(10)
227-
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
244+
if err := c.sealBlock(c.withdrawals.pop(10), uint64(time.Now().Unix())); err != nil {
228245
log.Warn("Error performing sealing work", "err", err)
229246
} else {
230247
timer.Reset(time.Second * time.Duration(c.period))
@@ -260,7 +277,7 @@ func (c *SimulatedBeacon) setCurrentState(headHash, finalizedHash common.Hash) {
260277

261278
// Commit seals a block on demand.
262279
func (c *SimulatedBeacon) Commit() common.Hash {
263-
withdrawals := c.withdrawals.gatherPending(10)
280+
withdrawals := c.withdrawals.pop(10)
264281
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
265282
log.Warn("Error performing sealing work", "err", err)
266283
}
@@ -301,16 +318,14 @@ func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
301318
if parent == nil {
302319
return errors.New("parent not found")
303320
}
304-
withdrawals := c.withdrawals.gatherPending(10)
321+
withdrawals := c.withdrawals.pop(10)
305322
return c.sealBlock(withdrawals, parent.Time+uint64(adjustment/time.Second))
306323
}
307324

325+
// RegisterSimulatedBeaconAPIs registers the simulated beacon's API with the
326+
// stack.
308327
func RegisterSimulatedBeaconAPIs(stack *node.Node, sim *SimulatedBeacon) {
309-
api := &api{sim}
310-
if sim.period == 0 {
311-
// mine on demand if period is set to 0
312-
go api.loop()
313-
}
328+
api := newSimulatedBeaconAPI(sim)
314329
stack.RegisterAPIs([]rpc.API{
315330
{
316331
Namespace: "dev",

eth/catalyst/simulated_beacon_api.go

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,44 +18,88 @@ package catalyst
1818

1919
import (
2020
"context"
21-
"time"
2221

2322
"github.com/ethereum/go-ethereum/common"
2423
"github.com/ethereum/go-ethereum/core"
2524
"github.com/ethereum/go-ethereum/core/types"
26-
"github.com/ethereum/go-ethereum/log"
2725
)
2826

29-
type api struct {
27+
// simulatedBeaconAPI provides a RPC API for SimulatedBeacon.
28+
type simulatedBeaconAPI struct {
3029
sim *SimulatedBeacon
3130
}
3231

33-
func (a *api) loop() {
32+
// newSimulatedBeaconAPI returns an instance of simulatedBeaconAPI with a
33+
// buffered commit channel. If period is zero, it starts a goroutine to handle
34+
// new tx events.
35+
func newSimulatedBeaconAPI(sim *SimulatedBeacon) *simulatedBeaconAPI {
36+
api := &simulatedBeaconAPI{sim: sim}
37+
if sim.period == 0 {
38+
// mine on demand if period is set to 0
39+
go api.loop()
40+
}
41+
return api
42+
}
43+
44+
// loop is the main loop for the API when it's running in period = 0 mode. It
45+
// ensures that block production is triggered as soon as a new withdrawal or
46+
// transaction is received.
47+
func (a *simulatedBeaconAPI) loop() {
3448
var (
35-
newTxs = make(chan core.NewTxsEvent)
36-
sub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
49+
newTxs = make(chan core.NewTxsEvent)
50+
newWxs = make(chan newWithdrawalsEvent)
51+
newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
52+
newWxsSub = a.sim.withdrawals.subscribe(newWxs)
53+
doCommit = make(chan struct{}, 1)
3754
)
38-
defer sub.Unsubscribe()
55+
defer newTxsSub.Unsubscribe()
56+
defer newWxsSub.Unsubscribe()
57+
58+
// A background thread which signals to the simulator when to commit
59+
// based on messages over doCommit.
60+
go func() {
61+
for range doCommit {
62+
a.sim.Commit()
63+
a.sim.eth.TxPool().Sync()
64+
65+
// It's worth noting that in case a tx ends up in the pool listed as
66+
// "executable", but for whatever reason the miner does not include it in
67+
// a block -- maybe the miner is enforcing a higher tip than the pool --
68+
// this code will spinloop.
69+
for {
70+
if executable, _ := a.sim.eth.TxPool().Stats(); executable == 0 {
71+
break
72+
}
73+
a.sim.Commit()
74+
}
75+
}
76+
}()
3977

4078
for {
4179
select {
4280
case <-a.sim.shutdownCh:
81+
close(doCommit)
4382
return
44-
case w := <-a.sim.withdrawals.pending:
45-
withdrawals := append(a.sim.withdrawals.gatherPending(9), w)
46-
if err := a.sim.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
47-
log.Warn("Error performing sealing work", "err", err)
83+
case <-newWxs:
84+
select {
85+
case doCommit <- struct{}{}:
86+
default:
4887
}
4988
case <-newTxs:
50-
a.sim.Commit()
89+
select {
90+
case doCommit <- struct{}{}:
91+
default:
92+
}
5193
}
5294
}
5395
}
5496

55-
func (a *api) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error {
97+
// AddWithdrawal adds a withdrawal to the pending queue.
98+
func (a *simulatedBeaconAPI) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error {
5699
return a.sim.withdrawals.add(withdrawal)
57100
}
58101

59-
func (a *api) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) {
102+
// SetFeeRecipient sets the fee recipient for block building purposes.
103+
func (a *simulatedBeaconAPI) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) {
60104
a.sim.setFeeRecipient(feeRecipient)
61105
}

0 commit comments

Comments
 (0)