Skip to content

core/txpool/blobpool: Avoid restore blob tx again in limbo #32383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 103 additions & 47 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"container/heap"
"errors"
"fmt"
"io"
"math"
"math/big"
"os"
Expand Down Expand Up @@ -113,6 +114,64 @@ type blobTxMeta struct {
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
}

type blobTxMetaMarshal struct {
Hash common.Hash
Vhashes []common.Hash

ID uint64
StorageSize uint32
Size uint64

Nonce uint64
CostCap *uint256.Int
ExecTipCap *uint256.Int
ExecFeeCap *uint256.Int
BlobFeeCap *uint256.Int
ExecGas uint64
BlobGas uint64
}

// EncodeRLP encodes the blobTxMeta into the given writer.
func (b *blobTxMeta) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, &blobTxMetaMarshal{
Hash: b.hash,
Vhashes: b.vhashes,
ID: b.id,
StorageSize: b.storageSize,
Size: b.size,
Nonce: b.nonce,
CostCap: b.costCap,
ExecTipCap: b.execTipCap,
ExecFeeCap: b.execFeeCap,
BlobFeeCap: b.blobFeeCap,
ExecGas: b.execGas,
BlobGas: b.blobGas,
})
}

// DecodeRLP decodes the blobTxMeta from the given stream.
func (b *blobTxMeta) DecodeRLP(s *rlp.Stream) error {
var meta blobTxMetaMarshal
if err := s.Decode(&meta); err != nil {
return err
}
b.hash = meta.Hash
b.vhashes = meta.Vhashes
b.id = meta.ID
b.storageSize = meta.StorageSize
b.size = meta.Size
b.nonce = meta.Nonce
b.costCap = meta.CostCap
b.execTipCap = meta.ExecTipCap
b.execFeeCap = meta.ExecFeeCap
b.blobFeeCap = meta.BlobFeeCap
b.execGas = meta.ExecGas
b.blobGas = meta.BlobGas
b.basefeeJumps = dynamicFeeJumps(meta.ExecFeeCap)
b.blobfeeJumps = dynamicFeeJumps(meta.BlobFeeCap)
return nil
}

// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
// and assembles a helper struct to track in memory.
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta {
Expand Down Expand Up @@ -373,6 +432,16 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
return err
}
}

// Pool initialized, attach the blob limbo to it to track blobs included
// recently but not yet finalized
limbo, err := newLimbo(limbodir, eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
if err != nil {
p.Close()
return err
}
p.limbo = limbo

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available (might occur when node is not
// fully synced).
Expand All @@ -399,6 +468,11 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
}
p.store = store

// If still exit blob txs in limbo, delete the old reset a new one without tx content.
if err = p.limbo.setTxMeta(p.store); err != nil {
return err
}

if len(fails) > 0 {
log.Warn("Dropping invalidated blob transactions", "ids", fails)
dropInvalidMeter.Mark(int64(len(fails)))
Expand All @@ -423,14 +497,6 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(p.chain.Config(), p.head))
}
p.evict = newPriceHeap(basefee, blobfee, p.index)

// Pool initialized, attach the blob limbo to it to track blobs included
// recently but not yet finalized
p.limbo, err = newLimbo(limbodir, eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
if err != nil {
p.Close()
return err
}
// Set the configured gas tip, triggering a filtering of anything just loaded
basefeeGauge.Update(int64(basefee.Uint64()))
blobfeeGauge.Update(int64(blobfee.Uint64()))
Expand Down Expand Up @@ -486,6 +552,12 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
}

meta := newBlobTxMeta(id, tx.Size(), size, tx)

// If the tx metadata is already in limbo, we don't need to add it to the pool.
if p.limbo.existsAndSet(meta) {
return nil
}

if p.lookup.exists(meta.hash) {
// This path is only possible after a crash, where deleted items are not
// removed via the normal shutdown-startup procedure and thus may get
Expand Down Expand Up @@ -547,9 +619,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
p.stored -= uint64(txs[i].storageSize)
p.lookup.untrack(txs[i])

// Included transactions blobs need to be moved to the limbo
if filled && inclusions != nil {
p.offload(addr, txs[i].nonce, txs[i].id, inclusions)
// If the tx metadata is recorded by limbo, keep the tx in the db.
if p.offload(addr, txs[i], inclusions) {
ids = ids[:len(ids)-1]
}
}
}
delete(p.index, addr)
Expand Down Expand Up @@ -590,7 +664,10 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

// Included transactions blobs need to be moved to the limbo
if inclusions != nil {
p.offload(addr, txs[0].nonce, txs[0].id, inclusions)
// If the tx metadata is recorded by limbo, keep the tx in the db.
if p.offload(addr, txs[0], inclusions) {
ids = ids[:len(ids)-1]
}
}
txs = txs[1:]
}
Expand Down Expand Up @@ -769,26 +846,18 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
// any of it since there's no clear error case. Some errors may be due to coding
// issues, others caused by signers mining MEV stuff or swapping transactions. In
// all cases, the pool needs to continue operating.
func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusions map[common.Hash]uint64) {
data, err := p.store.Get(id)
if err != nil {
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
}
var tx types.Transaction
if err = rlp.DecodeBytes(data, &tx); err != nil {
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
}
block, ok := inclusions[tx.Hash()]
// Return bool type to let caller know whether the tx is offloaded to limbo successfully.
func (p *BlobPool) offload(addr common.Address, meta *blobTxMeta, inclusions map[common.Hash]uint64) bool {
block, ok := inclusions[meta.hash]
if !ok {
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id)
return
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", meta.nonce, "id", meta.id)
return false
}
if err := p.limbo.push(&tx, block); err != nil {
if err := p.limbo.push(meta, block); err != nil {
log.Warn("Failed to offload blob tx into limbo", "err", err)
return
return false
}
return true
}

// Reset implements txpool.SubPool, allowing the blob pool's internal state to be
Expand Down Expand Up @@ -832,7 +901,11 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
}
// Flush out any blobs from limbo that are older than the latest finality
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
p.limbo.finalize(p.chain.CurrentFinalBlock())
p.limbo.finalize(p.chain.CurrentFinalBlock(), func(id uint64, txHash common.Hash) {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "hash", txHash, "id", id, "err", err)
}
})
}
// Reset the price heap for the new set of basefee/blobfee pairs
var (
Expand Down Expand Up @@ -986,31 +1059,14 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable.
tx, err := p.limbo.pull(txhash)
meta, err := p.limbo.pull(txhash)
if err != nil {
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
return err
}
// TODO: seems like an easy optimization here would be getting the serialized tx
// from limbo instead of re-serializing it here.

// Serialize the transaction back into the primary datastore.
blob, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return err
}
id, err := p.store.Put(blob)
if err != nil {
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
return err
}

// Update the indices and metrics
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
if _, ok := p.index[addr]; !ok {
if err := p.reserver.Hold(addr); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
log.Warn("Failed to reserve account for blob pool", "tx", meta.hash, "from", addr, "err", err)
return err
}
p.index[addr] = []*blobTxMeta{meta}
Expand Down
Loading
Loading