Skip to content

core/txpool/blobpool: simplify limbo package #32266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 30 additions & 42 deletions core/txpool/blobpool/limbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type limboBlob struct {
TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs
Block uint64 // Block in which the blob transaction was included
Tx *types.Transaction
id uint64 // the billy id of transction
}

// limbo is a light, indexed database to temporarily store recently included
Expand All @@ -41,17 +42,14 @@ type limboBlob struct {
//
// TODO(karalabe): Currently updating the inclusion block of a blob needs a full db rewrite. Can we do without?
type limbo struct {
store billy.Database // Persistent data store for limboed blobs

index map[common.Hash]uint64 // Mappings from tx hashes to datastore ids
groups map[uint64]map[uint64]common.Hash // Set of txs included in past blocks
store billy.Database // Persistent data store for limboed blobs
index map[common.Hash]*limboBlob // Mappings from tx hashes to datastore ids
}

// newLimbo opens and indexes a set of limboed blob transactions.
func newLimbo(datadir string, maxBlobsPerTransaction int) (*limbo, error) {
l := &limbo{
index: make(map[common.Hash]uint64),
groups: make(map[uint64]map[uint64]common.Hash),
index: make(map[common.Hash]*limboBlob),
}
// Index all limboed blobs on disk and delete anything unprocessable
var fails []uint64
Expand Down Expand Up @@ -101,12 +99,9 @@ func (l *limbo) parseBlob(id uint64, data []byte) error {
log.Error("Dropping duplicate blob limbo entry", "owner", item.TxHash, "id", id)
return errors.New("duplicate blob")
}
l.index[item.TxHash] = id

if _, ok := l.groups[item.Block]; !ok {
l.groups[item.Block] = make(map[uint64]common.Hash)
}
l.groups[item.Block][id] = item.TxHash
// Delete tx and set id.
item.id, item.Tx = id, nil
l.index[item.TxHash] = item

return nil
}
Expand All @@ -119,17 +114,14 @@ func (l *limbo) finalize(final *types.Header) {
log.Warn("Nil finalized block cannot evict old blobs")
return
}
for block, ids := range l.groups {
if block > final.Number.Uint64() {
for _, item := range l.index {
if item.Block > final.Number.Uint64() {
continue
}
for id, owner := range ids {
if err := l.store.Delete(id); err != nil {
log.Error("Failed to drop finalized blob", "block", block, "id", id, "err", err)
}
delete(l.index, owner)
if err := l.store.Delete(item.id); err != nil {
log.Error("Failed to drop finalized blob", "block", item.Block, "id", item.id, "err", err)
}
delete(l.groups, block)
delete(l.index, item.TxHash)
}
}

Expand All @@ -152,21 +144,21 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error {
// pull retrieves a previously pushed set of blobs back from the limbo, removing
// it at the same time. This method should be used when a previously included blob
// transaction gets reorged out.
func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) {
func (l *limbo) pull(txhash common.Hash) (*types.Transaction, error) {
// If the blobs are not tracked by the limbo, there's not much to do. This
// can happen for example if a blob transaction is mined without pushing it
// into the network first.
id, ok := l.index[tx]
item, ok := l.index[txhash]
if !ok {
log.Trace("Limbo cannot pull non-tracked blobs", "tx", tx)
log.Trace("Limbo cannot pull non-tracked blobs", "tx", txhash)
return nil, errors.New("unseen blob transaction")
}
item, err := l.getAndDrop(id)
tx, err := l.getAndDrop(item.id)
if err != nil {
log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err)
log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", item.id, "err", err)
return nil, err
}
return item.Tx, nil
return tx, nil
}

// update changes the block number under which a blob transaction is tracked. This
Expand All @@ -180,25 +172,25 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
// If the blobs are not tracked by the limbo, there's not much to do. This
// can happen for example if a blob transaction is mined without pushing it
// into the network first.
id, ok := l.index[txhash]
item, ok := l.index[txhash]
if !ok {
log.Trace("Limbo cannot update non-tracked blobs", "tx", txhash)
return
}
// If there was no change in the blob's inclusion block, don't mess around
// with heavy database operations.
if _, ok := l.groups[block][id]; ok {
if item.Block == block {
log.Trace("Blob transaction unchanged in limbo", "tx", txhash, "block", block)
return
}
// Retrieve the old blobs from the data store and write them back with a new
// block number. IF anything fails, there's not much to do, go on.
item, err := l.getAndDrop(id)
tx, err := l.getAndDrop(item.id)
if err != nil {
log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", id, "err", err)
log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", item.id, "err", err)
return
}
if err := l.setAndIndex(item.Tx, block); err != nil {
if err := l.setAndIndex(tx, block); err != nil {
log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err)
return
}
Expand All @@ -207,7 +199,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) {

// getAndDrop retrieves a blob item from the limbo store and deletes it both from
// the store and indices.
func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {
func (l *limbo) getAndDrop(id uint64) (*types.Transaction, error) {
data, err := l.store.Get(id)
if err != nil {
return nil, err
Expand All @@ -217,14 +209,11 @@ func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {
return nil, err
}
delete(l.index, item.TxHash)
delete(l.groups[item.Block], id)
if len(l.groups[item.Block]) == 0 {
delete(l.groups, item.Block)
}
if err := l.store.Delete(id); err != nil {
return nil, err
}
return item, nil

return item.Tx, nil
}

// setAndIndex assembles a limbo blob database entry and stores it, also updating
Expand All @@ -244,10 +233,9 @@ func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error {
if err != nil {
return err
}
l.index[txhash] = id
if _, ok := l.groups[block]; !ok {
l.groups[block] = make(map[uint64]common.Hash)
}
l.groups[block][id] = txhash
// Delete tx and set id.
item.id, item.Tx = id, nil
l.index[txhash] = item

return nil
}