From 21f5158249f3278f90d12816c4dfa49f8179d0d6 Mon Sep 17 00:00:00 2001 From: maskpp Date: Wed, 23 Jul 2025 23:26:32 +0800 Subject: [PATCH 1/8] simplify limbo logic --- core/txpool/blobpool/limbo.go | 71 ++++++++++++++--------------------- 1 file changed, 28 insertions(+), 43 deletions(-) diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index 99d1b4ad6b3..5165aa5700c 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -33,6 +33,7 @@ type limboBlob struct { TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs Block uint64 // Block in which the blob transaction was included Tx *types.Transaction + id uint64 // the billy id of transction } // limbo is a light, indexed database to temporarily store recently included @@ -41,17 +42,14 @@ type limboBlob struct { // // TODO(karalabe): Currently updating the inclusion block of a blob needs a full db rewrite. Can we do without? type limbo struct { - store billy.Database // Persistent data store for limboed blobs - - index map[common.Hash]uint64 // Mappings from tx hashes to datastore ids - groups map[uint64]map[uint64]common.Hash // Set of txs included in past blocks + store billy.Database // Persistent data store for limboed blobs + index map[common.Hash]*limboBlob // Mappings from tx hashes to datastore ids } // newLimbo opens and indexes a set of limboed blob transactions. func newLimbo(datadir string, maxBlobsPerTransaction int) (*limbo, error) { l := &limbo{ - index: make(map[common.Hash]uint64), - groups: make(map[uint64]map[uint64]common.Hash), + index: make(map[common.Hash]*limboBlob), } // Index all limboed blobs on disk and delete anything unprocessable var fails []uint64 @@ -101,12 +99,9 @@ func (l *limbo) parseBlob(id uint64, data []byte) error { log.Error("Dropping duplicate blob limbo entry", "owner", item.TxHash, "id", id) return errors.New("duplicate blob") } - l.index[item.TxHash] = id - - if _, ok := l.groups[item.Block]; !ok { - l.groups[item.Block] = make(map[uint64]common.Hash) - } - l.groups[item.Block][id] = item.TxHash + // Delete tx and set id. + item.id, item.Tx = id, nil + l.index[item.TxHash] = item return nil } @@ -119,17 +114,11 @@ func (l *limbo) finalize(final *types.Header) { log.Warn("Nil finalized block cannot evict old blobs") return } - for block, ids := range l.groups { - if block > final.Number.Uint64() { + for _, item := range l.index { + if item.Block > final.Number.Uint64() { continue } - for id, owner := range ids { - if err := l.store.Delete(id); err != nil { - log.Error("Failed to drop finalized blob", "block", block, "id", id, "err", err) - } - delete(l.index, owner) - } - delete(l.groups, block) + delete(l.index, item.TxHash) } } @@ -152,21 +141,21 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error { // pull retrieves a previously pushed set of blobs back from the limbo, removing // it at the same time. This method should be used when a previously included blob // transaction gets reorged out. -func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) { +func (l *limbo) pull(txhash common.Hash) (*types.Transaction, error) { // If the blobs are not tracked by the limbo, there's not much to do. This // can happen for example if a blob transaction is mined without pushing it // into the network first. - id, ok := l.index[tx] + item, ok := l.index[txhash] if !ok { - log.Trace("Limbo cannot pull non-tracked blobs", "tx", tx) + log.Trace("Limbo cannot pull non-tracked blobs", "tx", txhash) return nil, errors.New("unseen blob transaction") } - item, err := l.getAndDrop(id) + tx, err := l.getAndDrop(item.id) if err != nil { - log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err) + log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", item.id, "err", err) return nil, err } - return item.Tx, nil + return tx, nil } // update changes the block number under which a blob transaction is tracked. This @@ -180,25 +169,25 @@ func (l *limbo) update(txhash common.Hash, block uint64) { // If the blobs are not tracked by the limbo, there's not much to do. This // can happen for example if a blob transaction is mined without pushing it // into the network first. - id, ok := l.index[txhash] + item, ok := l.index[txhash] if !ok { log.Trace("Limbo cannot update non-tracked blobs", "tx", txhash) return } // If there was no change in the blob's inclusion block, don't mess around // with heavy database operations. - if _, ok := l.groups[block][id]; ok { + if item.Block == block { log.Trace("Blob transaction unchanged in limbo", "tx", txhash, "block", block) return } // Retrieve the old blobs from the data store and write them back with a new // block number. IF anything fails, there's not much to do, go on. - item, err := l.getAndDrop(id) + tx, err := l.getAndDrop(item.id) if err != nil { - log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", id, "err", err) + log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", item.id, "err", err) return } - if err := l.setAndIndex(item.Tx, block); err != nil { + if err := l.setAndIndex(tx, block); err != nil { log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err) return } @@ -207,7 +196,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) { // getAndDrop retrieves a blob item from the limbo store and deletes it both from // the store and indices. -func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) { +func (l *limbo) getAndDrop(id uint64) (*types.Transaction, error) { data, err := l.store.Get(id) if err != nil { return nil, err @@ -217,14 +206,11 @@ func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) { return nil, err } delete(l.index, item.TxHash) - delete(l.groups[item.Block], id) - if len(l.groups[item.Block]) == 0 { - delete(l.groups, item.Block) - } if err := l.store.Delete(id); err != nil { return nil, err } - return item, nil + + return item.Tx, nil } // setAndIndex assembles a limbo blob database entry and stores it, also updating @@ -244,10 +230,9 @@ func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error { if err != nil { return err } - l.index[txhash] = id - if _, ok := l.groups[block]; !ok { - l.groups[block] = make(map[uint64]common.Hash) - } - l.groups[block][id] = txhash + // Delete tx and set id. + item.id, item.Tx = id, nil + l.index[txhash] = item + return nil } From 918aa15cd3f5624173b4dd5a7dd2b46156152049 Mon Sep 17 00:00:00 2001 From: maskpp Date: Thu, 24 Jul 2025 09:47:27 +0800 Subject: [PATCH 2/8] delete item --- core/txpool/blobpool/limbo.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index 5165aa5700c..75c1880536b 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -118,6 +118,9 @@ func (l *limbo) finalize(final *types.Header) { if item.Block > final.Number.Uint64() { continue } + if err := l.store.Delete(item.id); err != nil { + log.Error("Failed to drop finalized blob", "block", item.Block, "id", item.id, "err", err) + } delete(l.index, item.TxHash) } } From 524ba61b04fcede92f4361efc40287c76f785669 Mon Sep 17 00:00:00 2001 From: maskpp Date: Sun, 10 Aug 2025 15:16:27 +0800 Subject: [PATCH 3/8] upgrade blobpool --- core/txpool/blobpool/blobpool.go | 148 +++++++++++++++++++++---------- core/txpool/blobpool/limbo.go | 110 ++++++++++++++--------- 2 files changed, 168 insertions(+), 90 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 948ecd14c3a..d8655bf274c 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -21,6 +21,7 @@ import ( "container/heap" "errors" "fmt" + "io" "math" "math/big" "os" @@ -113,6 +114,64 @@ type blobTxMeta struct { evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces } +type blobTxMetaMarshal struct { + Hash common.Hash + Vhashes []common.Hash + + ID uint64 + StorageSize uint32 + Size uint64 + + Nonce uint64 + CostCap *uint256.Int + ExecTipCap *uint256.Int + ExecFeeCap *uint256.Int + BlobFeeCap *uint256.Int + ExecGas uint64 + BlobGas uint64 +} + +// EncodeRLP encodes the blobTxMeta into the given writer. +func (b *blobTxMeta) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, blobTxMetaMarshal{ + Hash: b.hash, + Vhashes: b.vhashes, + ID: b.id, + StorageSize: b.storageSize, + Size: b.size, + Nonce: b.nonce, + CostCap: b.costCap, + ExecTipCap: b.execTipCap, + ExecFeeCap: b.execFeeCap, + BlobFeeCap: b.blobFeeCap, + ExecGas: b.execGas, + BlobGas: b.blobGas, + }) +} + +// DecodeRLP decodes the blobTxMeta from the given stream. +func (b *blobTxMeta) DecodeRLP(s *rlp.Stream) error { + var meta blobTxMetaMarshal + if err := s.Decode(&meta); err != nil { + return err + } + b.hash = meta.Hash + b.vhashes = meta.Vhashes + b.id = meta.ID + b.storageSize = meta.StorageSize + b.size = meta.Size + b.nonce = meta.Nonce + b.costCap = meta.CostCap + b.execTipCap = meta.ExecTipCap + b.execFeeCap = meta.ExecFeeCap + b.blobFeeCap = meta.BlobFeeCap + b.execGas = meta.ExecGas + b.blobGas = meta.BlobGas + b.basefeeJumps = dynamicFeeJumps(meta.ExecFeeCap) + b.blobfeeJumps = dynamicFeeJumps(meta.BlobFeeCap) + return nil +} + // newBlobTxMeta retrieves the indexed metadata fields from a blob transaction // and assembles a helper struct to track in memory. func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta { @@ -373,6 +432,16 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser return err } } + + // Pool initialized, attach the blob limbo to it to track blobs included + // recently but not yet finalized + limbo, err := newLimbo(limbodir, eip4844.LatestMaxBlobsPerBlock(p.chain.Config())) + if err != nil { + p.Close() + return err + } + p.limbo = limbo + // Initialize the state with head block, or fallback to empty one in // case the head state is not available (might occur when node is not // fully synced). @@ -399,6 +468,11 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser } p.store = store + // If still exit blob txs in limbo, try to repair them. This is needed. + if err = p.limbo.tryRepair(p.store); err != nil { + return err + } + if len(fails) > 0 { log.Warn("Dropping invalidated blob transactions", "ids", fails) dropInvalidMeter.Mark(int64(len(fails))) @@ -423,14 +497,6 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(p.chain.Config(), p.head)) } p.evict = newPriceHeap(basefee, blobfee, p.index) - - // Pool initialized, attach the blob limbo to it to track blobs included - // recently but not yet finalized - p.limbo, err = newLimbo(limbodir, eip4844.LatestMaxBlobsPerBlock(p.chain.Config())) - if err != nil { - p.Close() - return err - } // Set the configured gas tip, triggering a filtering of anything just loaded basefeeGauge.Update(int64(basefee.Uint64())) blobfeeGauge.Update(int64(blobfee.Uint64())) @@ -486,6 +552,10 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { } meta := newBlobTxMeta(id, tx.Size(), size, tx) + if p.limbo.existsAndSet(meta) { + return nil + } + if p.lookup.exists(meta.hash) { // This path is only possible after a crash, where deleted items are not // removed via the normal shutdown-startup procedure and thus may get @@ -547,9 +617,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 p.stored -= uint64(txs[i].storageSize) p.lookup.untrack(txs[i]) - // Included transactions blobs need to be moved to the limbo if filled && inclusions != nil { - p.offload(addr, txs[i].nonce, txs[i].id, inclusions) + // If the tx metadata is recorded by limbo, we don't need to delete the tx from db. + if p.offload(addr, txs[i], inclusions) { + ids = ids[:len(ids)-1] + } } } delete(p.index, addr) @@ -590,7 +662,10 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 // Included transactions blobs need to be moved to the limbo if inclusions != nil { - p.offload(addr, txs[0].nonce, txs[0].id, inclusions) + // If the tx metadata is recorded by limbo, we don't need to delete the tx from db. + if p.offload(addr, txs[0], inclusions) { + ids = ids[:len(ids)-1] + } } txs = txs[1:] } @@ -769,26 +844,18 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 // any of it since there's no clear error case. Some errors may be due to coding // issues, others caused by signers mining MEV stuff or swapping transactions. In // all cases, the pool needs to continue operating. -func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusions map[common.Hash]uint64) { - data, err := p.store.Get(id) - if err != nil { - log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) - return - } - var tx types.Transaction - if err = rlp.DecodeBytes(data, &tx); err != nil { - log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) - return - } - block, ok := inclusions[tx.Hash()] +// Return bool type to let caller know whether the tx is offloaded to limbo successfully. +func (p *BlobPool) offload(addr common.Address, meta *blobTxMeta, inclusions map[common.Hash]uint64) bool { + block, ok := inclusions[meta.hash] if !ok { - log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id) - return + log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", meta.nonce, "id", meta.id) + return false } - if err := p.limbo.push(&tx, block); err != nil { + if err := p.limbo.push(meta, block); err != nil { log.Warn("Failed to offload blob tx into limbo", "err", err) - return + return false } + return true } // Reset implements txpool.SubPool, allowing the blob pool's internal state to be @@ -832,7 +899,11 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { } // Flush out any blobs from limbo that are older than the latest finality if p.chain.Config().IsCancun(p.head.Number, p.head.Time) { - p.limbo.finalize(p.chain.CurrentFinalBlock()) + p.limbo.finalize(p.chain.CurrentFinalBlock(), func(id uint64, txHash common.Hash) { + if err := p.store.Delete(id); err != nil { + log.Error("Failed to delete blob transaction", "hash", txHash, "id", id, "err", err) + } + }) } // Reset the price heap for the new set of basefee/blobfee pairs var ( @@ -986,31 +1057,14 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]* func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { // Retrieve the associated blob from the limbo. Without the blobs, we cannot // add the transaction back into the pool as it is not mineable. - tx, err := p.limbo.pull(txhash) + meta, err := p.limbo.pull(txhash) if err != nil { log.Error("Blobs unavailable, dropping reorged tx", "err", err) return err } - // TODO: seems like an easy optimization here would be getting the serialized tx - // from limbo instead of re-serializing it here. - - // Serialize the transaction back into the primary datastore. - blob, err := rlp.EncodeToBytes(tx) - if err != nil { - log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) - return err - } - id, err := p.store.Put(blob) - if err != nil { - log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err) - return err - } - - // Update the indices and metrics - meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) if _, ok := p.index[addr]; !ok { if err := p.reserver.Hold(addr); err != nil { - log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) + log.Warn("Failed to reserve account for blob pool", "tx", meta.hash, "from", addr, "err", err) return err } p.index[addr] = []*blobTxMeta{meta} diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index 75c1880536b..a581b457109 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -30,10 +30,12 @@ import ( // to which it belongs as well as the block number in which it was included for // finality eviction. type limboBlob struct { - TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs - Block uint64 // Block in which the blob transaction was included - Tx *types.Transaction - id uint64 // the billy id of transction + TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs + Block uint64 // Block in which the blob transaction was included + Tx *types.Transaction `rlp:"omitempty"` + // Optional blob transaction metadata. + TxMeta *blobTxMeta `rlp:"omitempty"` + id uint64 // the billy id of limboBlob } // limbo is a light, indexed database to temporarily store recently included @@ -100,14 +102,53 @@ func (l *limbo) parseBlob(id uint64, data []byte) error { return errors.New("duplicate blob") } // Delete tx and set id. - item.id, item.Tx = id, nil + item.id = id l.index[item.TxHash] = item return nil } +// existsAndSet checks whether a blob transaction is already tracked by the limbo. +func (l *limbo) existsAndSet(meta *blobTxMeta) bool { + if item := l.index[meta.hash]; item != nil { + if item.Tx != nil { + item.TxMeta, item.Tx = meta, nil + } + return true + } + return false +} + +// tryRepair attempts to repair the limbo by re-encoding all transactions that are +// currently in the limbo, but not yet stored in the database. This is useful +// when the limbo is created from a previous state, and the transactions are not +// yet stored in the database. The method will re-encode all transactions and +// store them in the database, updating the in-memory indices at the same time. +func (l *limbo) tryRepair(store billy.Database) error { + for _, item := range l.index { + if item.Tx == nil { + continue + } + tx := item.Tx + // Transaction permitted into the pool from a nonce and cost perspective, + // insert it into the database and update the indices + blob, err := rlp.EncodeToBytes(tx) + if err != nil { + log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) + return err + } + id, err := store.Put(blob) + if err != nil { + return err + } + meta := newBlobTxMeta(id, tx.Size(), store.Size(id), tx) + item.TxMeta, item.Tx = meta, nil + } + return nil +} + // finalize evicts all blobs belonging to a recently finalized block or older. -func (l *limbo) finalize(final *types.Header) { +func (l *limbo) finalize(final *types.Header, fn func(id uint64, txHash common.Hash)) { // Just in case there's no final block yet (network not yet merged, weird // restart, sethead, etc), fail gracefully. if final == nil { @@ -122,20 +163,24 @@ func (l *limbo) finalize(final *types.Header) { log.Error("Failed to drop finalized blob", "block", item.Block, "id", item.id, "err", err) } delete(l.index, item.TxHash) + if fn != nil { + meta := item.TxMeta + fn(meta.id, meta.hash) + } } } // push stores a new blob transaction into the limbo, waiting until finality for // it to be automatically evicted. -func (l *limbo) push(tx *types.Transaction, block uint64) error { +func (l *limbo) push(meta *blobTxMeta, block uint64) error { // If the blobs are already tracked by the limbo, consider it a programming // error. There's not much to do against it, but be loud. - if _, ok := l.index[tx.Hash()]; ok { - log.Error("Limbo cannot push already tracked blobs", "tx", tx.Hash()) + if _, ok := l.index[meta.hash]; ok { + log.Error("Limbo cannot push already tracked blobs", "tx", meta.hash) return errors.New("already tracked blob transaction") } - if err := l.setAndIndex(tx, block); err != nil { - log.Error("Failed to set and index limboed blobs", "tx", tx.Hash(), "err", err) + if err := l.setAndIndex(meta, block); err != nil { + log.Error("Failed to set and index limboed blobs", "tx", meta.hash, "err", err) return err } return nil @@ -144,7 +189,7 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error { // pull retrieves a previously pushed set of blobs back from the limbo, removing // it at the same time. This method should be used when a previously included blob // transaction gets reorged out. -func (l *limbo) pull(txhash common.Hash) (*types.Transaction, error) { +func (l *limbo) pull(txhash common.Hash) (*blobTxMeta, error) { // If the blobs are not tracked by the limbo, there's not much to do. This // can happen for example if a blob transaction is mined without pushing it // into the network first. @@ -153,12 +198,10 @@ func (l *limbo) pull(txhash common.Hash) (*types.Transaction, error) { log.Trace("Limbo cannot pull non-tracked blobs", "tx", txhash) return nil, errors.New("unseen blob transaction") } - tx, err := l.getAndDrop(item.id) - if err != nil { - log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", item.id, "err", err) + if err := l.store.Delete(item.id); err != nil { return nil, err } - return tx, nil + return item.TxMeta, nil } // update changes the block number under which a blob transaction is tracked. This @@ -185,45 +228,26 @@ func (l *limbo) update(txhash common.Hash, block uint64) { } // Retrieve the old blobs from the data store and write them back with a new // block number. IF anything fails, there's not much to do, go on. - tx, err := l.getAndDrop(item.id) - if err != nil { - log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", item.id, "err", err) + if err := l.store.Delete(item.id); err != nil { + log.Error("Failed to drop old limboed blobs", "tx", txhash, "err", err) return } - if err := l.setAndIndex(tx, block); err != nil { + if err := l.setAndIndex(item.TxMeta, block); err != nil { log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err) return } log.Trace("Blob transaction updated in limbo", "tx", txhash, "old-block", item.Block, "new-block", block) } -// getAndDrop retrieves a blob item from the limbo store and deletes it both from -// the store and indices. -func (l *limbo) getAndDrop(id uint64) (*types.Transaction, error) { - data, err := l.store.Get(id) - if err != nil { - return nil, err - } - item := new(limboBlob) - if err = rlp.DecodeBytes(data, item); err != nil { - return nil, err - } - delete(l.index, item.TxHash) - if err := l.store.Delete(id); err != nil { - return nil, err - } - - return item.Tx, nil -} - // setAndIndex assembles a limbo blob database entry and stores it, also updating // the in-memory indices. -func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error { - txhash := tx.Hash() +func (l *limbo) setAndIndex(meta *blobTxMeta, block uint64) error { + txhash := meta.hash item := &limboBlob{ TxHash: txhash, Block: block, - Tx: tx, + TxMeta: meta, + Tx: nil, } data, err := rlp.EncodeToBytes(item) if err != nil { @@ -234,7 +258,7 @@ func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error { return err } // Delete tx and set id. - item.id, item.Tx = id, nil + item.id = id l.index[txhash] = item return nil From c13ba75bfb9e01d1400b1a252999518ac5e98c13 Mon Sep 17 00:00:00 2001 From: maskpp Date: Sun, 10 Aug 2025 15:24:16 +0800 Subject: [PATCH 4/8] upgrade blobpool --- core/txpool/blobpool/limbo.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index a581b457109..fde523be912 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -142,7 +142,7 @@ func (l *limbo) tryRepair(store billy.Database) error { return err } meta := newBlobTxMeta(id, tx.Size(), store.Size(id), tx) - item.TxMeta, item.Tx = meta, nil + l.existsAndSet(meta) } return nil } @@ -247,7 +247,7 @@ func (l *limbo) setAndIndex(meta *blobTxMeta, block uint64) error { TxHash: txhash, Block: block, TxMeta: meta, - Tx: nil, + Tx: nil, // The tx is stored in the blob database, not here. } data, err := rlp.EncodeToBytes(item) if err != nil { From d959ad18bcbf6982cac3f25f4d0d189b0b06541d Mon Sep 17 00:00:00 2001 From: maskpp Date: Sun, 10 Aug 2025 20:05:34 +0800 Subject: [PATCH 5/8] update the slotSizeFn method --- core/txpool/blobpool/limbo.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index fde523be912..04e541c20b7 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -41,8 +41,6 @@ type limboBlob struct { // limbo is a light, indexed database to temporarily store recently included // blobs until they are finalized. The purpose is to support small reorgs, which // would require pulling back up old blobs (which aren't part of the chain). -// -// TODO(karalabe): Currently updating the inclusion block of a blob needs a full db rewrite. Can we do without? type limbo struct { store billy.Database // Persistent data store for limboed blobs index map[common.Hash]*limboBlob // Mappings from tx hashes to datastore ids @@ -60,7 +58,13 @@ func newLimbo(datadir string, maxBlobsPerTransaction int) (*limbo, error) { fails = append(fails, id) } } - store, err := billy.Open(billy.Options{Path: datadir, Repair: true}, newSlotter(maxBlobsPerTransaction), index) + store, err := billy.Open(billy.Options{Path: datadir, Repair: true}, func() (size uint32, done bool) { + // 8*6: Total size of uint64. + // 4: The size of uint32. + // 32*4: The max total size of big.Int. + // 32*(maxBlobsPerTransaction+2): The total size of hashes. + return 8*6 + 4 + 32*4 + 32*uint32(maxBlobsPerTransaction+2), true + }, index) if err != nil { return nil, err } From 9fac6aebbfd6abb94e61ebdaebb7693bbddba678 Mon Sep 17 00:00:00 2001 From: maskpp Date: Sun, 10 Aug 2025 20:28:39 +0800 Subject: [PATCH 6/8] add comments --- core/txpool/blobpool/blobpool.go | 8 +++++--- core/txpool/blobpool/limbo.go | 22 +++++++++++++--------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index d8655bf274c..dd765466597 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -133,7 +133,7 @@ type blobTxMetaMarshal struct { // EncodeRLP encodes the blobTxMeta into the given writer. func (b *blobTxMeta) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, blobTxMetaMarshal{ + return rlp.Encode(w, &blobTxMetaMarshal{ Hash: b.hash, Vhashes: b.vhashes, ID: b.id, @@ -468,8 +468,8 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser } p.store = store - // If still exit blob txs in limbo, try to repair them. This is needed. - if err = p.limbo.tryRepair(p.store); err != nil { + // If still exit blob txs in limbo, delete the old reset a new one without tx content. + if err = p.limbo.setTxMeta(p.store); err != nil { return err } @@ -552,6 +552,8 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { } meta := newBlobTxMeta(id, tx.Size(), size, tx) + + // If the tx metadata is already in limbo, we don't need to add it to the pool. if p.limbo.existsAndSet(meta) { return nil } diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index 04e541c20b7..88e163271ca 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -32,10 +32,9 @@ import ( type limboBlob struct { TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs Block uint64 // Block in which the blob transaction was included - Tx *types.Transaction `rlp:"omitempty"` - // Optional blob transaction metadata. - TxMeta *blobTxMeta `rlp:"omitempty"` - id uint64 // the billy id of limboBlob + Tx *types.Transaction `rlp:"omitempty"` // After this commitment the Tx field is optional, as the TxMeta contains the tx metadata. + TxMeta *blobTxMeta `rlp:"omitempty"` // Optional blob transaction metadata. + id uint64 // the billy id of limboBlob } // limbo is a light, indexed database to temporarily store recently included @@ -115,7 +114,7 @@ func (l *limbo) parseBlob(id uint64, data []byte) error { // existsAndSet checks whether a blob transaction is already tracked by the limbo. func (l *limbo) existsAndSet(meta *blobTxMeta) bool { if item := l.index[meta.hash]; item != nil { - if item.Tx != nil { + if item.TxMeta == nil { item.TxMeta, item.Tx = meta, nil } return true @@ -123,12 +122,12 @@ func (l *limbo) existsAndSet(meta *blobTxMeta) bool { return false } -// tryRepair attempts to repair the limbo by re-encoding all transactions that are +// setTxMeta attempts to repair the limbo by re-encoding all transactions that are // currently in the limbo, but not yet stored in the database. This is useful // when the limbo is created from a previous state, and the transactions are not // yet stored in the database. The method will re-encode all transactions and // store them in the database, updating the in-memory indices at the same time. -func (l *limbo) tryRepair(store billy.Database) error { +func (l *limbo) setTxMeta(store billy.Database) error { for _, item := range l.index { if item.Tx == nil { continue @@ -146,7 +145,12 @@ func (l *limbo) tryRepair(store billy.Database) error { return err } meta := newBlobTxMeta(id, tx.Size(), store.Size(id), tx) - l.existsAndSet(meta) + if _, err := l.pull(meta.hash); err != nil { + return err + } + if err := l.push(meta, item.Block); err != nil { + return err + } } return nil } @@ -251,7 +255,7 @@ func (l *limbo) setAndIndex(meta *blobTxMeta, block uint64) error { TxHash: txhash, Block: block, TxMeta: meta, - Tx: nil, // The tx is stored in the blob database, not here. + Tx: nil, // The tx already stored in the blob database, not here. } data, err := rlp.EncodeToBytes(item) if err != nil { From 666bff4d013a1538f3d34c866618ff83f7bebed2 Mon Sep 17 00:00:00 2001 From: maskpp Date: Sun, 10 Aug 2025 21:11:38 +0800 Subject: [PATCH 7/8] add drop method --- core/txpool/blobpool/limbo.go | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index 88e163271ca..5383b8062c3 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -145,9 +145,11 @@ func (l *limbo) setTxMeta(store billy.Database) error { return err } meta := newBlobTxMeta(id, tx.Size(), store.Size(id), tx) - if _, err := l.pull(meta.hash); err != nil { + // Delete the old item which hash blob tx content. + if err := l.drop(meta.hash); err != nil { return err } + // Set the new one which has blob tx metadata. if err := l.push(meta, item.Block); err != nil { return err } @@ -167,10 +169,9 @@ func (l *limbo) finalize(final *types.Header, fn func(id uint64, txHash common.H if item.Block > final.Number.Uint64() { continue } - if err := l.store.Delete(item.id); err != nil { + if err := l.drop(item.TxHash); err != nil { log.Error("Failed to drop finalized blob", "block", item.Block, "id", item.id, "err", err) } - delete(l.index, item.TxHash) if fn != nil { meta := item.TxMeta fn(meta.id, meta.hash) @@ -206,7 +207,7 @@ func (l *limbo) pull(txhash common.Hash) (*blobTxMeta, error) { log.Trace("Limbo cannot pull non-tracked blobs", "tx", txhash) return nil, errors.New("unseen blob transaction") } - if err := l.store.Delete(item.id); err != nil { + if err := l.drop(item.TxHash); err != nil { return nil, err } return item.TxMeta, nil @@ -234,10 +235,8 @@ func (l *limbo) update(txhash common.Hash, block uint64) { log.Trace("Blob transaction unchanged in limbo", "tx", txhash, "block", block) return } - // Retrieve the old blobs from the data store and write them back with a new - // block number. IF anything fails, there's not much to do, go on. - if err := l.store.Delete(item.id); err != nil { - log.Error("Failed to drop old limboed blobs", "tx", txhash, "err", err) + if err := l.drop(txhash); err != nil { + log.Error("Failed to drop old limboed metadata", "tx", txhash, "err", err) return } if err := l.setAndIndex(item.TxMeta, block); err != nil { @@ -247,6 +246,20 @@ func (l *limbo) update(txhash common.Hash, block uint64) { log.Trace("Blob transaction updated in limbo", "tx", txhash, "old-block", item.Block, "new-block", block) } +// drop removes the blob metadata from the limbo. +func (l *limbo) drop(txhash common.Hash) error { + if item, ok := l.index[txhash]; ok { + // Retrieve the old blobs from the data store and write them back with a new + // block number. IF anything fails, there's not much to do, go on. + if err := l.store.Delete(item.id); err != nil { + log.Error("Failed to drop old limboed blobs", "tx", txhash, "err", err) + return err + } + delete(l.index, txhash) + } + return nil +} + // setAndIndex assembles a limbo blob database entry and stores it, also updating // the in-memory indices. func (l *limbo) setAndIndex(meta *blobTxMeta, block uint64) error { From ca028c8ee14720337b81835d0e39c583fc4d5e55 Mon Sep 17 00:00:00 2001 From: maskpp Date: Sun, 10 Aug 2025 21:12:55 +0800 Subject: [PATCH 8/8] update comment --- core/txpool/blobpool/blobpool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index dd765466597..81a18e5ac31 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -620,7 +620,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 p.lookup.untrack(txs[i]) if filled && inclusions != nil { - // If the tx metadata is recorded by limbo, we don't need to delete the tx from db. + // If the tx metadata is recorded by limbo, keep the tx in the db. if p.offload(addr, txs[i], inclusions) { ids = ids[:len(ids)-1] } @@ -664,7 +664,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 // Included transactions blobs need to be moved to the limbo if inclusions != nil { - // If the tx metadata is recorded by limbo, we don't need to delete the tx from db. + // If the tx metadata is recorded by limbo, keep the tx in the db. if p.offload(addr, txs[0], inclusions) { ids = ids[:len(ids)-1] }