Skip to content

Commit dd36053

Browse files
authored
Fix worker panic bug and add some info level logs (#107)
* Fix worker panic bug and add some info level logs * Changed to use ClosePreconfResultFn to avoid recover * Revert some unnecessary changes * add preconf tests * revert txpool log * Add cost in preconf api * fix close timing * add PreconfResponse buffer * Optimize the pre-confirmation queue performance from a maximum of 200ms+ to 2ms+ * Lower some preconf log levels
1 parent 16d3b9d commit dd36053

File tree

15 files changed

+146
-97
lines changed

15 files changed

+146
-97
lines changed

core/events.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ type NewPreconfTxEvent struct {
3737

3838
// NewPreconfTxRequestEvent is posted when a preconf transaction request enters the transaction pool.
3939
type NewPreconfTxRequest struct {
40-
Tx *types.Transaction
41-
PreconfResult chan<- *PreconfResponse
40+
Tx *types.Transaction
41+
PreconfResult chan<- *PreconfResponse
42+
ClosePreconfResultFn func()
4243
}
4344

4445
type PreconfResponse struct {

core/txpool/txpool.go

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,7 @@ func (pool *TxPool) Locals() []common.Address {
627627

628628
// PendingPreconfTxs retrieves the preconf transactions and the pending transactions.
629629
func (pool *TxPool) PendingPreconfTxs(enforceTips bool) ([]*types.Transaction, map[common.Address]types.Transactions) {
630+
defer preconf.MetricsPreconfTxPoolFilterCost(time.Now())
630631
pool.mu.Lock()
631632
defer pool.mu.Unlock()
632633

@@ -676,36 +677,36 @@ func (pool *TxPool) extractPreconfTxsFromPending(pending map[common.Address]type
676677

677678
// removes the preconf transaction from the pending map, maintaining the order.
678679
preconfTxs := make([]*types.Transaction, 0)
679-
for _, preconfTx := range pool.preconfTxs.Transactions() {
680-
from, _ := types.Sender(pool.signer, preconfTx)
680+
for _, preconfTx := range pool.preconfTxs.TxEntries() {
681+
preconfTxHash := preconfTx.Tx.Hash()
681682

682683
// Get the slice of transactions for the target address
683-
txs, exists := pending[from]
684+
txs, exists := pending[preconfTx.From]
684685

685686
// If the transaction isn't in pending map but it's expected to be there,
686687
// show the error log.
687688
if !exists || len(txs) == 0 {
688-
log.Error("Missing transaction in pending map, please report the issue", "hash", preconfTx.Hash())
689-
pool.preconfTxs.Remove(preconfTx.Hash()) // remove it prevent log always print
689+
log.Error("Missing transaction in pending map, please report the issue", "hash", preconfTxHash)
690+
pool.preconfTxs.Remove(preconfTxHash) // remove it prevent log always print
690691
continue
691692
}
692693

693694
// Create a new slice to hold the transactions that are not deleted
694695
var newTxs []*types.Transaction
695696
for _, tx := range txs {
696-
if tx.Hash() != preconfTx.Hash() {
697+
if tx.Hash() != preconfTxHash {
697698
newTxs = append(newTxs, tx) // Only keep the transactions that are not to be deleted
698699
}
699700
}
700701

701702
// Update the slice in the map
702703
if len(newTxs) == 0 {
703-
delete(pending, from) // If the slice is empty, delete the entry for that address
704+
delete(pending, preconfTx.From) // If the slice is empty, delete the entry for that address
704705
} else {
705-
pending[from] = newTxs // Replace with the new slice
706+
pending[preconfTx.From] = newTxs // Replace with the new slice
706707
}
707708

708-
preconfTxs = append(preconfTxs, preconfTx)
709+
preconfTxs = append(preconfTxs, preconfTx.Tx)
709710
}
710711
return preconfTxs
711712
}
@@ -1279,15 +1280,13 @@ func (pool *TxPool) addPreconfTx(tx *types.Transaction) {
12791280
return
12801281
}
12811282

1282-
pool.preconfTxs.Add(tx)
1283+
pool.preconfTxs.Add(from, tx)
12831284

12841285
// handle preconf txs
12851286
pool.handlePreconfTxs([]*types.Transaction{tx})
12861287
}
12871288

12881289
func (pool *TxPool) handlePreconfTxs(news []*types.Transaction) {
1289-
defer preconf.MetricsPreconfTxPoolHandleCost(time.Now())
1290-
12911290
for _, tx := range news {
12921291
txHash := tx.Hash()
12931292
if !pool.preconfTxs.Contains(txHash) {
@@ -1296,25 +1295,28 @@ func (pool *TxPool) handlePreconfTxs(news []*types.Transaction) {
12961295
}
12971296

12981297
// send preconf request event
1299-
result := make(chan *core.PreconfResponse)
1298+
result := make(chan *core.PreconfResponse, 1) // buffer 1 to avoid worker blocking
13001299
pool.preconfTxRequestFeed.Send(core.NewPreconfTxRequest{
13011300
Tx: tx,
13021301
PreconfResult: result,
1302+
ClosePreconfResultFn: func() {
1303+
close(result)
1304+
},
13031305
})
1304-
log.Trace("txpool sent preconf tx request", "tx", txHash)
1306+
log.Debug("txpool sent preconf tx request", "tx", txHash)
13051307

13061308
// avoid race condition
13071309
tx := tx
13081310
// goroutine to avoid blocking
13091311
go func() {
1310-
defer close(result)
1312+
defer preconf.MetricsPreconfTxPoolHandleCost(time.Now())
13111313

1312-
// wait for preconf ready, no need to wait again after it's ready, as it only needs to wait once when the service restarts
1313-
// wait for 10s, if not ready, skip
1314+
// If preconfReadyCh is not closed, it means this is a preconf tx restored from journal after system restart.
1315+
// In this case, we don't need to execute preconfirmation again to avoid resource contention with worker.
13141316
select {
13151317
case <-pool.preconfReadyCh:
1316-
case <-time.After(time.Minute):
1317-
log.Error("preconf txs not ready, skip handle", "tx", txHash)
1318+
default:
1319+
log.Info("preconf txs not ready, skip handle", "tx", txHash)
13181320
return
13191321
}
13201322

@@ -1326,10 +1328,11 @@ func (pool *TxPool) handlePreconfTxs(news []*types.Transaction) {
13261328
// timeout
13271329
timeout := time.NewTimer(pool.config.Preconf.PreconfTimeout)
13281330
defer timeout.Stop()
1331+
now := time.Now()
13291332
// wait for miner.worker preconf response
13301333
select {
13311334
case response := <-result:
1332-
log.Trace("txpool received preconf tx response", "tx", txHash)
1335+
log.Trace("txpool received preconf tx response", "tx", txHash, "duration", time.Since(now))
13331336
if response.Err == nil {
13341337
event.Status = core.PreconfStatusSuccess
13351338
} else {
@@ -1347,7 +1350,7 @@ func (pool *TxPool) handlePreconfTxs(news []*types.Transaction) {
13471350
}
13481351
case <-timeout.C:
13491352
event.Status = core.PreconfStatusFailed
1350-
event.Reason = "preconf timeout"
1353+
event.Reason = fmt.Sprintf("preconf timeout, over %s timeout", time.Since(now))
13511354
}
13521355

13531356
// send preconf event
@@ -1359,7 +1362,7 @@ func (pool *TxPool) handlePreconfTxs(news []*types.Transaction) {
13591362
log.Trace("preconf success", "tx", txHash)
13601363
if pool.journal != nil {
13611364
if err := pool.journal.insert(tx); err != nil {
1362-
log.Warn("Failed to journal preconf success transaction", "tx", txHash, "err", err)
1365+
log.Error("Failed to journal preconf success transaction", "tx", txHash, "err", err)
13631366
} else {
13641367
log.Trace("preconf success transaction journaled", "tx", txHash)
13651368
}

core/txpool/txpool_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2557,6 +2557,10 @@ func TestExtractPreconfTxsFromPending(t *testing.T) {
25572557
tx4 := types.NewTransaction(0, addr3, big.NewInt(400), 21000, big.NewInt(1), nil)
25582558
tx4, _ = types.SignTx(tx4, signer, key3)
25592559

2560+
from1, _ := types.Sender(signer, tx1)
2561+
from2, _ := types.Sender(signer, tx2)
2562+
from3, _ := types.Sender(signer, tx3)
2563+
from4, _ := types.Sender(signer, tx4)
25602564
// Create a test TxPool
25612565
createTxPool := func() *TxPool {
25622566
pool := &TxPool{
@@ -2570,7 +2574,7 @@ func TestExtractPreconfTxsFromPending(t *testing.T) {
25702574
preconfTxs: preconf.NewFIFOTxSet(),
25712575
}
25722576
// Add pre-confirmed transaction
2573-
pool.preconfTxs.Add(tx1)
2577+
pool.preconfTxs.Add(from1, tx1)
25742578
return pool
25752579
}
25762580

@@ -2636,9 +2640,9 @@ func TestExtractPreconfTxsFromPending(t *testing.T) {
26362640
pool.config.Preconf.AllPreconfs = true
26372641

26382642
// Add all transactions to the pre-confirmed set
2639-
pool.preconfTxs.Add(tx2)
2640-
pool.preconfTxs.Add(tx3)
2641-
pool.preconfTxs.Add(tx4)
2643+
pool.preconfTxs.Add(from2, tx2)
2644+
pool.preconfTxs.Add(from3, tx3)
2645+
pool.preconfTxs.Add(from4, tx4)
26422646

26432647
pending := make(map[common.Address]types.Transactions)
26442648
pending[addr1] = types.Transactions{tx1, tx2}

eth/api_backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ func (b *EthAPIBackend) sendTxWithPreconf(ctx context.Context, tx *types.Transac
353353
}
354354
case <-ctx.Done():
355355
log.Trace("preconf tx event not received", "tx", txHash, "err", ctx.Err())
356-
return nil, errors.New("preconf tx event not received")
356+
return nil, fmt.Errorf("preconf tx event not received, txHash: %s, err: %w", txHash, ctx.Err())
357357
}
358358
}
359359
}

internal/ethapi/api.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
"github.com/ethereum/go-ethereum/log"
4949
"github.com/ethereum/go-ethereum/p2p"
5050
"github.com/ethereum/go-ethereum/params"
51+
"github.com/ethereum/go-ethereum/preconf"
5152
"github.com/ethereum/go-ethereum/rlp"
5253
"github.com/ethereum/go-ethereum/rpc"
5354
)
@@ -2161,6 +2162,8 @@ func (s *TransactionAPI) SendRawTransaction(ctx context.Context, input hexutil.B
21612162
// SendRawTransactionWithPreconf will add the signed preconf transaction to the transaction pool and return the preconf result.
21622163
// The sender is responsible for signing the transaction and using the correct nonce.
21632164
func (s *TransactionAPI) SendRawTransactionWithPreconf(ctx context.Context, input hexutil.Bytes) (*core.NewPreconfTxEvent, error) {
2165+
defer preconf.MetricsPreconfAPIHandleCost(time.Now())
2166+
21642167
tx := new(types.Transaction)
21652168
if err := tx.UnmarshalBinary(input); err != nil {
21662169
return nil, err
@@ -2177,6 +2180,9 @@ func (s *TransactionAPI) SendRawTransactionWithPreconf(ctx context.Context, inpu
21772180
return nil, errors.New("only replay-protected (EIP-155) transactions allowed over RPC")
21782181
}
21792182

2183+
now := time.Now()
2184+
log.Trace("ethapi sendRawTransactionWithPreconf", "tx", tx.Hash())
2185+
21802186
// Send the transaction with preconf
21812187
result, err := s.b.SendTxWithPreconf(ctx, tx)
21822188
if err != nil {
@@ -2192,9 +2198,9 @@ func (s *TransactionAPI) SendRawTransactionWithPreconf(ctx context.Context, inpu
21922198

21932199
if tx.To() == nil {
21942200
addr := crypto.CreateAddress(from, tx.Nonce())
2195-
log.Info("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
2201+
log.Info("Submitted preconf contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
21962202
} else {
2197-
log.Info("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value())
2203+
log.Info("Submitted preconf transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value(), "duration", time.Since(now))
21982204
}
21992205

22002206
return result, nil

miner/preconf_checker.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func NewPreconfChecker(chainConfig *params.ChainConfig, chain *core.BlockChain,
7575

7676
func (c *preconfChecker) loop() {
7777
if !c.minerConfig.EnablePreconfChecker {
78-
log.Debug("preconf checker is disabled, skip loop")
78+
log.Info("preconf checker is disabled, skip loop")
7979
return
8080
}
8181
for {
@@ -298,7 +298,7 @@ func (c *preconfChecker) precheck() error {
298298
// The distance between envblock.number and (engine_sync_target.number or unsafe_l2.number) should not exceed 6.
299299
// Here's an explanation for why it's 6: a deposit transaction from L1 includes at least 1(l1.header.number-1 rather than l1.header.number-2) L1 block,
300300
// which corresponds to 6 L2 blocks. Therefore, we can have up to 6 blocks in advance to ensure that preconfirmations are not affected by deposit transactions.
301-
if envBlockNumber-engineSyncTargetBlockNumber > c.minerConfig.EthToleranceBlock() || envBlockNumber-unsafeL2BlockNumber > 6 {
301+
if envBlockNumber-engineSyncTargetBlockNumber > c.minerConfig.PreconfBufferBlock || envBlockNumber-unsafeL2BlockNumber > c.minerConfig.PreconfBufferBlock {
302302
// When there are a large number of preconfirmation transactions in the queue, it may cause the future 6 blocks to be
303303
// filled with preconfirmation transactions. At this point, stop new preconfirmation transactions from entering,
304304
// because there may be unblocked deposit transactions in future blocks, which cannot be predicted at this time.
@@ -356,15 +356,15 @@ func (c *preconfChecker) PausePreconf() chan<- []*types.Transaction {
356356
}
357357
c.unSealedPreconfTxsCh = make(chan []*types.Transaction, 1) // buffer 1 to avoid worker block
358358

359-
log.Trace("pause preconf")
359+
log.Debug("pause preconf")
360360
return c.unSealedPreconfTxsCh
361361
}
362362

363363
func (c *preconfChecker) UnpausePreconf(env *environment, preconfReady func()) {
364364
defer c.mu.Unlock()
365365
c.env = env
366366
c.envUpdatedAt = time.Now()
367-
log.Trace("unpause preconf", "env.header.Number", env.header.Number.Int64(), "env.gasPool", env.gasPool.Gas(), "envUpdatedAt", c.envUpdatedAt)
367+
log.Debug("unpause preconf", "env.header.Number", env.header.Number.Int64(), "env.gasPool", env.gasPool.Gas(), "envUpdatedAt", c.envUpdatedAt)
368368
// reset env
369369
c.env.header.Number = new(big.Int).Add(c.env.header.Number, common.Big1)
370370
c.env.gasPool.SetGas(c.env.header.GasLimit)
@@ -402,4 +402,6 @@ func (c *preconfChecker) UnpausePreconf(env *environment, preconfReady func()) {
402402

403403
// notify txpool that preconf is ready
404404
preconfReady()
405+
406+
log.Info("ready to preconf", "env.header.Number", env.header.Number.Int64(), "env.gasPool", env.gasPool.Gas(), "envUpdatedAt", c.envUpdatedAt, "deposit_txs", len(c.depositTxs), "unsealed_preconf_txs", len(unsealedPreconfTxs))
405407
}

miner/worker.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -653,19 +653,24 @@ func (w *worker) mainLoop() {
653653
}
654654
}
655655
case ev := <-w.preconfTxRequestCh:
656-
log.Trace("worker received preconf tx request", "tx", ev.Tx.Hash())
656+
now := time.Now()
657+
log.Debug("worker received preconf tx request", "tx", ev.Tx.Hash())
658+
657659
receipt, err := w.preconfChecker.Preconf(ev.Tx)
658660
if err != nil {
659661
// Not fatal, just warn to the log
660662
log.Warn("preconf failed", "tx", ev.Tx.Hash(), "err", err)
661663
}
662-
// Prevent panic caused by writing after ev.PreconfResult is closed
664+
log.Trace("worker preconf tx executed", "tx", ev.Tx.Hash(), "duration", time.Since(now))
665+
663666
select {
664667
case ev.PreconfResult <- &core.PreconfResponse{Receipt: receipt, Err: err}:
665-
log.Trace("worker sent preconf tx response", "tx", ev.Tx.Hash())
668+
log.Debug("worker sent preconf tx response", "tx", ev.Tx.Hash(), "duration", time.Since(now))
666669
case <-time.After(time.Second):
667670
log.Warn("preconf tx response timeout, preconf result is closed?", "tx", ev.Tx.Hash())
668671
}
672+
ev.ClosePreconfResultFn()
673+
669674
case ev := <-w.txsCh:
670675
if w.chainConfig.Optimism != nil && !w.config.RollupComputePendingBlock {
671676
continue // don't update the pending-block snapshot if we are not computing the pending block
@@ -1255,6 +1260,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) error {
12551260
// Split the pending transactions into locals and remotes
12561261
// Fill the block with all available pending transactions.
12571262
preconfTxs, pending := w.eth.TxPool().PendingPreconfTxs(true)
1263+
log.Debug("find preconf txs to fill into block", "count", len(preconfTxs))
12581264

12591265
var unsealedPreconfTxs []*types.Transaction
12601266
if len(preconfTxs) > 0 {

0 commit comments

Comments
 (0)