diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 948ecd14c3a..4c01de3dded 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -549,7 +549,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 // Included transactions blobs need to be moved to the limbo if filled && inclusions != nil { - p.offload(addr, txs[i].nonce, txs[i].id, inclusions) + p.offload(addr, txs[i], inclusions) } } delete(p.index, addr) @@ -566,9 +566,13 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 log.Trace("Dropping filled blob transactions", "from", addr, "filled", nonces, "ids", ids) dropFilledMeter.Mark(int64(len(ids))) } - for _, id := range ids { - if err := p.store.Delete(id); err != nil { - log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) + + // If the txs were recorded in the limbo, we don't delete them. + if !(filled && inclusions != nil) { + for _, id := range ids { + if err := p.store.Delete(id); err != nil { + log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) + } } } return @@ -590,16 +594,19 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 // Included transactions blobs need to be moved to the limbo if inclusions != nil { - p.offload(addr, txs[0].nonce, txs[0].id, inclusions) + p.offload(addr, txs[0], inclusions) } txs = txs[1:] } log.Trace("Dropping overlapped blob transactions", "from", addr, "overlapped", nonces, "ids", ids, "left", len(txs)) dropOverlappedMeter.Mark(int64(len(ids))) - for _, id := range ids { - if err := p.store.Delete(id); err != nil { - log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) + // If the txs were recorded in the limbo, we don't delete them. + if inclusions == nil { + for _, id := range ids { + if err := p.store.Delete(id); err != nil { + log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) + } } } p.index[addr] = txs @@ -769,23 +776,13 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 // any of it since there's no clear error case. Some errors may be due to coding // issues, others caused by signers mining MEV stuff or swapping transactions. In // all cases, the pool needs to continue operating. -func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusions map[common.Hash]uint64) { - data, err := p.store.Get(id) - if err != nil { - log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) - return - } - var tx types.Transaction - if err = rlp.DecodeBytes(data, &tx); err != nil { - log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) - return - } - block, ok := inclusions[tx.Hash()] +func (p *BlobPool) offload(addr common.Address, blobTxMeta *blobTxMeta, inclusions map[common.Hash]uint64) { + block, ok := inclusions[blobTxMeta.hash] if !ok { - log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id) + log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", blobTxMeta.nonce, "id", blobTxMeta.id) return } - if err := p.limbo.push(&tx, block); err != nil { + if err := p.limbo.push(blobTxMeta, block); err != nil { log.Warn("Failed to offload blob tx into limbo", "err", err) return } @@ -831,8 +828,13 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { } } // Flush out any blobs from limbo that are older than the latest finality + // and also delete the txs from the store. if p.chain.Config().IsCancun(p.head.Number, p.head.Time) { - p.limbo.finalize(p.chain.CurrentFinalBlock()) + p.limbo.finalize(p.chain.CurrentFinalBlock(), func(id uint64, txHash common.Hash) { + if err := p.store.Delete(id); err != nil { + log.Error("Failed to delete blob transaction", "hash", txHash, "id", id, "err", err) + } + }) } // Reset the price heap for the new set of basefee/blobfee pairs var ( @@ -986,31 +988,14 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]* func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { // Retrieve the associated blob from the limbo. Without the blobs, we cannot // add the transaction back into the pool as it is not mineable. - tx, err := p.limbo.pull(txhash) + meta, err := p.limbo.pull(txhash) if err != nil { log.Error("Blobs unavailable, dropping reorged tx", "err", err) return err } - // TODO: seems like an easy optimization here would be getting the serialized tx - // from limbo instead of re-serializing it here. - - // Serialize the transaction back into the primary datastore. - blob, err := rlp.EncodeToBytes(tx) - if err != nil { - log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) - return err - } - id, err := p.store.Put(blob) - if err != nil { - log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err) - return err - } - - // Update the indices and metrics - meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) if _, ok := p.index[addr]; !ok { if err := p.reserver.Hold(addr); err != nil { - log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) + log.Warn("Failed to reserve account for blob pool", "tx", meta.hash, "from", addr, "err", err) return err } p.index[addr] = []*blobTxMeta{meta} diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index 99d1b4ad6b3..07265f30d6c 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -30,28 +30,25 @@ import ( // to which it belongs as well as the block number in which it was included for // finality eviction. type limboBlob struct { - TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs - Block uint64 // Block in which the blob transaction was included - Tx *types.Transaction + Block uint64 // Block in which the blob transaction was included + BlobTxMeta *blobTxMeta } // limbo is a light, indexed database to temporarily store recently included // blobs until they are finalized. The purpose is to support small reorgs, which // would require pulling back up old blobs (which aren't part of the chain). -// -// TODO(karalabe): Currently updating the inclusion block of a blob needs a full db rewrite. Can we do without? type limbo struct { store billy.Database // Persistent data store for limboed blobs - index map[common.Hash]uint64 // Mappings from tx hashes to datastore ids - groups map[uint64]map[uint64]common.Hash // Set of txs included in past blocks + index map[common.Hash]uint64 // Mappings from tx hashes to datastore ids + limbos map[uint64]*limboBlob // Mappings from datastore ids to limboBlobs } // newLimbo opens and indexes a set of limboed blob transactions. func newLimbo(datadir string, maxBlobsPerTransaction int) (*limbo, error) { l := &limbo{ index: make(map[common.Hash]uint64), - groups: make(map[uint64]map[uint64]common.Hash), + limbos: make(map[uint64]*limboBlob), } // Index all limboed blobs on disk and delete anything unprocessable var fails []uint64 @@ -94,56 +91,55 @@ func (l *limbo) parseBlob(id uint64, data []byte) error { log.Error("Failed to decode blob limbo entry", "id", id, "err", err) return err } - if _, ok := l.index[item.TxHash]; ok { + txHash := item.BlobTxMeta.hash + if _, ok := l.index[txHash]; ok { // This path is impossible, unless due to a programming error a blob gets // inserted into the limbo which was already part of if. Recover gracefully // by ignoring this data entry. - log.Error("Dropping duplicate blob limbo entry", "owner", item.TxHash, "id", id) + log.Error("Dropping duplicate blob limbo entry", "owner", txHash, "id", id) return errors.New("duplicate blob") } - l.index[item.TxHash] = id - - if _, ok := l.groups[item.Block]; !ok { - l.groups[item.Block] = make(map[uint64]common.Hash) - } - l.groups[item.Block][id] = item.TxHash + l.index[txHash] = id + l.limbos[id] = item return nil } // finalize evicts all blobs belonging to a recently finalized block or older. -func (l *limbo) finalize(final *types.Header) { +func (l *limbo) finalize(final *types.Header, fn func(id uint64, txHash common.Hash)) { // Just in case there's no final block yet (network not yet merged, weird // restart, sethead, etc), fail gracefully. if final == nil { log.Warn("Nil finalized block cannot evict old blobs") return } - for block, ids := range l.groups { - if block > final.Number.Uint64() { + for id, item := range l.limbos { + if item.Block > final.Number.Uint64() { continue } - for id, owner := range ids { - if err := l.store.Delete(id); err != nil { - log.Error("Failed to drop finalized blob", "block", block, "id", id, "err", err) - } - delete(l.index, owner) + // Delete limbo metadata. + if err := l.store.Delete(id); err != nil { + log.Error("Failed to drop finalized blob", "block", item.Block, "id", id, "err", err) + } + delete(l.index, item.BlobTxMeta.hash) + delete(l.limbos, id) + if fn != nil { // Delete blob tx if fn is not null. + fn(item.BlobTxMeta.id, item.BlobTxMeta.hash) } - delete(l.groups, block) } } // push stores a new blob transaction into the limbo, waiting until finality for // it to be automatically evicted. -func (l *limbo) push(tx *types.Transaction, block uint64) error { +func (l *limbo) push(metaData *blobTxMeta, block uint64) error { // If the blobs are already tracked by the limbo, consider it a programming // error. There's not much to do against it, but be loud. - if _, ok := l.index[tx.Hash()]; ok { - log.Error("Limbo cannot push already tracked blobs", "tx", tx.Hash()) + if _, ok := l.index[metaData.hash]; ok { + log.Error("Limbo cannot push already tracked blobs", "tx", metaData.hash) return errors.New("already tracked blob transaction") } - if err := l.setAndIndex(tx, block); err != nil { - log.Error("Failed to set and index limboed blobs", "tx", tx.Hash(), "err", err) + if err := l.setAndIndex(metaData, block); err != nil { + log.Error("Failed to set and index limboed blobs", "tx", metaData.hash, "err", err) return err } return nil @@ -152,7 +148,7 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error { // pull retrieves a previously pushed set of blobs back from the limbo, removing // it at the same time. This method should be used when a previously included blob // transaction gets reorged out. -func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) { +func (l *limbo) pull(tx common.Hash) (*blobTxMeta, error) { // If the blobs are not tracked by the limbo, there's not much to do. This // can happen for example if a blob transaction is mined without pushing it // into the network first. @@ -166,7 +162,10 @@ func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) { log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err) return nil, err } - return item.Tx, nil + meta := item.BlobTxMeta + meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) + meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap) + return meta, nil } // update changes the block number under which a blob transaction is tracked. This @@ -187,7 +186,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) { } // If there was no change in the blob's inclusion block, don't mess around // with heavy database operations. - if _, ok := l.groups[block][id]; ok { + if item, ok := l.limbos[id]; ok && item.Block == block { log.Trace("Blob transaction unchanged in limbo", "tx", txhash, "block", block) return } @@ -198,7 +197,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) { log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", id, "err", err) return } - if err := l.setAndIndex(item.Tx, block); err != nil { + if err := l.setAndIndex(item.BlobTxMeta, block); err != nil { log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err) return } @@ -208,19 +207,9 @@ func (l *limbo) update(txhash common.Hash, block uint64) { // getAndDrop retrieves a blob item from the limbo store and deletes it both from // the store and indices. func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) { - data, err := l.store.Get(id) - if err != nil { - return nil, err - } - item := new(limboBlob) - if err = rlp.DecodeBytes(data, item); err != nil { - return nil, err - } - delete(l.index, item.TxHash) - delete(l.groups[item.Block], id) - if len(l.groups[item.Block]) == 0 { - delete(l.groups, item.Block) - } + item := l.limbos[id] + delete(l.index, item.BlobTxMeta.hash) + delete(l.limbos, id) if err := l.store.Delete(id); err != nil { return nil, err } @@ -229,12 +218,10 @@ func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) { // setAndIndex assembles a limbo blob database entry and stores it, also updating // the in-memory indices. -func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error { - txhash := tx.Hash() +func (l *limbo) setAndIndex(metaData *blobTxMeta, block uint64) error { item := &limboBlob{ - TxHash: txhash, - Block: block, - Tx: tx, + Block: block, + BlobTxMeta: metaData, } data, err := rlp.EncodeToBytes(item) if err != nil { @@ -244,10 +231,7 @@ func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error { if err != nil { return err } - l.index[txhash] = id - if _, ok := l.groups[block]; !ok { - l.groups[block] = make(map[uint64]common.Hash) - } - l.groups[block][id] = txhash + l.index[item.BlobTxMeta.hash] = id + l.limbos[id] = item return nil }