Skip to content

Commit 684f0db

Browse files
rjl493456442fjl
andauthored
core/txpool/blobpool: introduce sidecar conversion for legacy blob transactions (#32656)
This pull request introduces a queue for legacy sidecar conversion to handle transactions that persist after the Osaka fork. Simply dropping these transactions would significantly harm the user experience. To balance usability with system complexity, we have introduced a conversion time window of two hours post Osaka fork. During this period, the system will accept legacy blob transactions and convert them in a background process. After the window, all legacy transactions will be rejected. Notably, all the blob transactions will be validated statically before the conversion, and also all conversion are performed in a single thread, minimize the risk of being DoS. We believe this two hour window provides sufficient time to process in-flight legacy transactions and allows submitters to migrate to the new format. --------- Co-authored-by: Felix Lange <[email protected]>
1 parent fd65f56 commit 684f0db

File tree

5 files changed

+384
-45
lines changed

5 files changed

+384
-45
lines changed

core/txpool/blobpool/blobpool.go

Lines changed: 93 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"path/filepath"
2828
"sort"
2929
"sync"
30+
"sync/atomic"
3031
"time"
3132

3233
"github.com/ethereum/go-ethereum/common"
@@ -61,11 +62,11 @@ const (
6162
// small buffer is added to the proof overhead.
6263
txBlobOverhead = uint32(kzg4844.CellProofsPerBlob*len(kzg4844.Proof{}) + 64)
6364

64-
// txMaxSize is the maximum size a single transaction can have, outside
65-
// the included blobs. Since blob transactions are pulled instead of pushed,
66-
// and only a small metadata is kept in ram, the rest is on disk, there is
67-
// no critical limit that should be enforced. Still, capping it to some sane
68-
// limit can never hurt.
65+
// txMaxSize is the maximum size a single transaction can have, including the
66+
// blobs. Since blob transactions are pulled instead of pushed, and only a
67+
// small metadata is kept in ram, the rest is on disk, there is no critical
68+
// limit that should be enforced. Still, capping it to some sane limit can
69+
// never hurt, which is aligned with maxBlobsPerTx constraint enforced internally.
6970
txMaxSize = 1024 * 1024
7071

7172
// maxBlobsPerTx is the maximum number of blobs that a single transaction can
@@ -93,6 +94,11 @@ const (
9394
// storeVersion is the current slotter layout used for the billy.Database
9495
// store.
9596
storeVersion = 1
97+
98+
// conversionTimeWindow defines the period after the Osaka fork during which
99+
// the pool will still accept and convert legacy blob transactions. After this
100+
// window, all legacy blob transactions will be rejected.
101+
conversionTimeWindow = time.Hour * 2
96102
)
97103

98104
// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
@@ -329,12 +335,13 @@ type BlobPool struct {
329335
stored uint64 // Useful data size of all transactions on disk
330336
limbo *limbo // Persistent data store for the non-finalized blobs
331337

332-
signer types.Signer // Transaction signer to use for sender recovery
333-
chain BlockChain // Chain object to access the state through
338+
signer types.Signer // Transaction signer to use for sender recovery
339+
chain BlockChain // Chain object to access the state through
340+
cQueue *conversionQueue // The queue for performing legacy sidecar conversion
334341

335-
head *types.Header // Current head of the chain
336-
state *state.StateDB // Current state at the head of the chain
337-
gasTip *uint256.Int // Currently accepted minimum gas tip
342+
head atomic.Pointer[types.Header] // Current head of the chain
343+
state *state.StateDB // Current state at the head of the chain
344+
gasTip atomic.Pointer[uint256.Int] // Currently accepted minimum gas tip
338345

339346
lookup *lookup // Lookup table mapping blobs to txs and txs to billy entries
340347
index map[common.Address][]*blobTxMeta // Blob transactions grouped by accounts, sorted by nonce
@@ -359,6 +366,7 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo
359366
hasPendingAuth: hasPendingAuth,
360367
signer: types.LatestSigner(chain.Config()),
361368
chain: chain,
369+
cQueue: newConversionQueue(), // Deprecate it after the osaka fork
362370
lookup: newLookup(),
363371
index: make(map[common.Address][]*blobTxMeta),
364372
spent: make(map[common.Address]*uint256.Int),
@@ -400,7 +408,8 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
400408
if err != nil {
401409
return err
402410
}
403-
p.head, p.state = head, state
411+
p.head.Store(head)
412+
p.state = state
404413

405414
// Create new slotter for pre-Osaka blob configuration.
406415
slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
@@ -440,11 +449,11 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
440449
p.recheck(addr, nil)
441450
}
442451
var (
443-
basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head))
452+
basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), head))
444453
blobfee = uint256.NewInt(params.BlobTxMinBlobGasprice)
445454
)
446-
if p.head.ExcessBlobGas != nil {
447-
blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(p.chain.Config(), p.head))
455+
if head.ExcessBlobGas != nil {
456+
blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(p.chain.Config(), head))
448457
}
449458
p.evict = newPriceHeap(basefee, blobfee, p.index)
450459

@@ -474,6 +483,9 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
474483

475484
// Close closes down the underlying persistent store.
476485
func (p *BlobPool) Close() error {
486+
// Terminate the conversion queue
487+
p.cQueue.close()
488+
477489
var errs []error
478490
if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set
479491
if err := p.limbo.Close(); err != nil {
@@ -832,7 +844,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
832844
log.Error("Failed to reset blobpool state", "err", err)
833845
return
834846
}
835-
p.head = newHead
847+
p.head.Store(newHead)
836848
p.state = statedb
837849

838850
// Run the reorg between the old and new head and figure out which accounts
@@ -855,7 +867,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
855867
}
856868
}
857869
// Flush out any blobs from limbo that are older than the latest finality
858-
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
870+
if p.chain.Config().IsCancun(newHead.Number, newHead.Time) {
859871
p.limbo.finalize(p.chain.CurrentFinalBlock())
860872
}
861873
// Reset the price heap for the new set of basefee/blobfee pairs
@@ -1056,14 +1068,15 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
10561068
defer p.lock.Unlock()
10571069

10581070
// Store the new minimum gas tip
1059-
old := p.gasTip
1060-
p.gasTip = uint256.MustFromBig(tip)
1071+
old := p.gasTip.Load()
1072+
newTip := uint256.MustFromBig(tip)
1073+
p.gasTip.Store(newTip)
10611074

10621075
// If the min miner fee increased, remove transactions below the new threshold
1063-
if old == nil || p.gasTip.Cmp(old) > 0 {
1076+
if old == nil || newTip.Cmp(old) > 0 {
10641077
for addr, txs := range p.index {
10651078
for i, tx := range txs {
1066-
if tx.execTipCap.Cmp(p.gasTip) < 0 {
1079+
if tx.execTipCap.Cmp(newTip) < 0 {
10671080
// Drop the offending transaction
10681081
var (
10691082
ids = []uint64{tx.id}
@@ -1123,10 +1136,10 @@ func (p *BlobPool) ValidateTxBasics(tx *types.Transaction) error {
11231136
Config: p.chain.Config(),
11241137
Accept: 1 << types.BlobTxType,
11251138
MaxSize: txMaxSize,
1126-
MinTip: p.gasTip.ToBig(),
1139+
MinTip: p.gasTip.Load().ToBig(),
11271140
MaxBlobCount: maxBlobsPerTx,
11281141
}
1129-
return txpool.ValidateTransaction(tx, p.head, p.signer, opts)
1142+
return txpool.ValidateTransaction(tx, p.head.Load(), p.signer, opts)
11301143
}
11311144

11321145
// checkDelegationLimit determines if the tx sender is delegated or has a
@@ -1164,10 +1177,10 @@ func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error {
11641177

11651178
// validateTx checks whether a transaction is valid according to the consensus
11661179
// rules and adheres to some heuristic limits of the local node (price and size).
1180+
//
1181+
// This function assumes the static validation has been performed already and
1182+
// only runs the stateful checks with lock protection.
11671183
func (p *BlobPool) validateTx(tx *types.Transaction) error {
1168-
if err := p.ValidateTxBasics(tx); err != nil {
1169-
return err
1170-
}
11711184
// Ensure the transaction adheres to the stateful pool filters (nonce, balance)
11721185
stateOpts := &txpool.ValidationOptionsWithState{
11731186
State: p.state,
@@ -1444,17 +1457,67 @@ func (p *BlobPool) AvailableBlobs(vhashes []common.Hash) int {
14441457
return available
14451458
}
14461459

1460+
// preCheck performs the static validation upon the provided txs and converts
1461+
// the legacy sidecars if Osaka fork has been activated with a short time window.
1462+
//
1463+
// This function is pure static and lock free.
1464+
func (p *BlobPool) preCheck(txs []*types.Transaction) ([]*types.Transaction, []error) {
1465+
var (
1466+
head = p.head.Load()
1467+
isOsaka = p.chain.Config().IsOsaka(head.Number, head.Time)
1468+
deadline time.Time
1469+
)
1470+
if isOsaka {
1471+
deadline = time.Unix(int64(*p.chain.Config().OsakaTime), 0).Add(conversionTimeWindow)
1472+
}
1473+
var errs []error
1474+
for _, tx := range txs {
1475+
// Validate the transaction statically at first to avoid unnecessary
1476+
// conversion. This step doesn't require lock protection.
1477+
if err := p.ValidateTxBasics(tx); err != nil {
1478+
errs = append(errs, err)
1479+
continue
1480+
}
1481+
// Before the Osaka fork, reject the blob txs with cell proofs
1482+
if !isOsaka {
1483+
if tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 {
1484+
errs = append(errs, nil)
1485+
} else {
1486+
errs = append(errs, errors.New("cell proof is not supported yet"))
1487+
}
1488+
continue
1489+
}
1490+
// After the Osaka fork, reject the legacy blob txs if the conversion
1491+
// time window is passed.
1492+
if tx.BlobTxSidecar().Version == types.BlobSidecarVersion1 {
1493+
errs = append(errs, nil)
1494+
continue
1495+
}
1496+
if head.Time > uint64(deadline.Unix()) {
1497+
errs = append(errs, errors.New("legacy blob tx is not supported"))
1498+
continue
1499+
}
1500+
// Convert the legacy sidecar after Osaka fork. This could be a long
1501+
// procedure which takes a few seconds, even minutes if there is a long
1502+
// queue. Fortunately it will only block the routine of the source peer
1503+
// announcing the tx, without affecting other parts.
1504+
errs = append(errs, p.cQueue.convert(tx))
1505+
}
1506+
return txs, errs
1507+
}
1508+
14471509
// Add inserts a set of blob transactions into the pool if they pass validation (both
14481510
// consensus validity and pool restrictions).
1449-
//
1450-
// Note, if sync is set the method will block until all internal maintenance
1451-
// related to the add is finished. Only use this during tests for determinism.
14521511
func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error {
14531512
var (
1454-
errs = make([]error, len(txs))
1513+
errs []error
14551514
adds = make([]*types.Transaction, 0, len(txs))
14561515
)
1516+
txs, errs = p.preCheck(txs)
14571517
for i, tx := range txs {
1518+
if errs[i] != nil {
1519+
continue
1520+
}
14581521
errs[i] = p.add(tx)
14591522
if errs[i] == nil {
14601523
adds = append(adds, tx.WithoutBlobTxSidecar())
@@ -1949,7 +2012,7 @@ func (p *BlobPool) Clear() {
19492012
p.spent = make(map[common.Address]*uint256.Int)
19502013

19512014
var (
1952-
basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head))
2015+
basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head.Load()))
19532016
blobfee = uint256.NewInt(params.BlobTxMinBlobGasprice)
19542017
)
19552018
p.evict = newPriceHeap(basefee, blobfee, p.index)

core/txpool/blobpool/blobpool_test.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ type testBlockChain struct {
8888
statedb *state.StateDB
8989

9090
blocks map[uint64]*types.Block
91+
92+
blockTime *uint64
93+
}
94+
95+
func (bc *testBlockChain) setHeadTime(time uint64) {
96+
bc.blockTime = &time
9197
}
9298

9399
func (bc *testBlockChain) Config() *params.ChainConfig {
@@ -105,6 +111,10 @@ func (bc *testBlockChain) CurrentBlock() *types.Header {
105111
blockTime = *bc.config.CancunTime + 1
106112
gasLimit = uint64(30_000_000)
107113
)
114+
if bc.blockTime != nil {
115+
blockTime = *bc.blockTime
116+
}
117+
108118
lo := new(big.Int)
109119
hi := new(big.Int).Mul(big.NewInt(5714), new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil))
110120

@@ -1748,8 +1758,8 @@ func TestAdd(t *testing.T) {
17481758
// Add each transaction one by one, verifying the pool internals in between
17491759
for j, add := range tt.adds {
17501760
signed, _ := types.SignNewTx(keys[add.from], types.LatestSigner(params.MainnetChainConfig), add.tx)
1751-
if err := pool.add(signed); !errors.Is(err, add.err) {
1752-
t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, err, add.err)
1761+
if errs := pool.Add([]*types.Transaction{signed}, true); !errors.Is(errs[0], add.err) {
1762+
t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, errs[0], add.err)
17531763
}
17541764
if add.err == nil {
17551765
size, exist := pool.lookup.sizeOfTx(signed.Hash())
@@ -1796,8 +1806,14 @@ func TestAdd(t *testing.T) {
17961806
}
17971807
}
17981808

1799-
// Tests adding transactions with legacy sidecars are correctly rejected.
1809+
// Tests that transactions with legacy sidecars are accepted within the
1810+
// conversion window but rejected after it has passed.
18001811
func TestAddLegacyBlobTx(t *testing.T) {
1812+
testAddLegacyBlobTx(t, true) // conversion window has not yet passed
1813+
testAddLegacyBlobTx(t, false) // conversion window passed
1814+
}
1815+
1816+
func testAddLegacyBlobTx(t *testing.T, accept bool) {
18011817
var (
18021818
key1, _ = crypto.GenerateKey()
18031819
key2, _ = crypto.GenerateKey()
@@ -1817,6 +1833,15 @@ func TestAddLegacyBlobTx(t *testing.T) {
18171833
blobfee: uint256.NewInt(105),
18181834
statedb: statedb,
18191835
}
1836+
var timeDiff uint64
1837+
if accept {
1838+
timeDiff = uint64(conversionTimeWindow.Seconds()) - 1
1839+
} else {
1840+
timeDiff = uint64(conversionTimeWindow.Seconds()) + 1
1841+
}
1842+
time := *params.MergedTestChainConfig.OsakaTime + timeDiff
1843+
chain.setHeadTime(time)
1844+
18201845
pool := New(Config{Datadir: t.TempDir()}, chain, nil)
18211846
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
18221847
t.Fatalf("failed to create blob pool: %v", err)
@@ -1826,12 +1851,15 @@ func TestAddLegacyBlobTx(t *testing.T) {
18261851
var (
18271852
tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0)
18281853
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 6, key2, types.BlobSidecarVersion0)
1829-
tx3 = makeMultiBlobTx(1, 1, 800, 70, 6, 12, key2, types.BlobSidecarVersion1)
1854+
txs = []*types.Transaction{tx1, tx2}
18301855
)
1831-
errs := pool.Add([]*types.Transaction{tx1, tx2, tx3}, true)
1856+
errs := pool.Add(txs, true)
18321857
for _, err := range errs {
1833-
if err == nil {
1834-
t.Fatalf("expected tx add to fail")
1858+
if accept && err != nil {
1859+
t.Fatalf("expected tx add to succeed, %v", err)
1860+
}
1861+
if !accept && err == nil {
1862+
t.Fatal("expected tx add to fail")
18351863
}
18361864
}
18371865
verifyPoolInternals(t, pool)

0 commit comments

Comments
 (0)