@@ -27,6 +27,7 @@ import (
2727 "path/filepath"
2828 "sort"
2929 "sync"
30+ "sync/atomic"
3031 "time"
3132
3233 "github.com/ethereum/go-ethereum/common"
@@ -61,11 +62,11 @@ const (
6162 // small buffer is added to the proof overhead.
6263 txBlobOverhead = uint32 (kzg4844 .CellProofsPerBlob * len (kzg4844.Proof {}) + 64 )
6364
64- // txMaxSize is the maximum size a single transaction can have, outside
65- // the included blobs. Since blob transactions are pulled instead of pushed,
66- // and only a small metadata is kept in ram, the rest is on disk, there is
67- // no critical limit that should be enforced. Still, capping it to some sane
68- // limit can never hurt.
65+ // txMaxSize is the maximum size a single transaction can have, including the
66+ // blobs. Since blob transactions are pulled instead of pushed, and only a
67+ // small metadata is kept in ram, the rest is on disk, there is no critical
68+ // limit that should be enforced. Still, capping it to some sane limit can
69+ // never hurt, which is aligned with maxBlobsPerTx constraint enforced internally .
6970 txMaxSize = 1024 * 1024
7071
7172 // maxBlobsPerTx is the maximum number of blobs that a single transaction can
@@ -93,6 +94,11 @@ const (
9394 // storeVersion is the current slotter layout used for the billy.Database
9495 // store.
9596 storeVersion = 1
97+
98+ // conversionTimeWindow defines the period after the Osaka fork during which
99+ // the pool will still accept and convert legacy blob transactions. After this
100+ // window, all legacy blob transactions will be rejected.
101+ conversionTimeWindow = time .Hour * 2
96102)
97103
98104// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
@@ -329,12 +335,13 @@ type BlobPool struct {
329335 stored uint64 // Useful data size of all transactions on disk
330336 limbo * limbo // Persistent data store for the non-finalized blobs
331337
332- signer types.Signer // Transaction signer to use for sender recovery
333- chain BlockChain // Chain object to access the state through
338+ signer types.Signer // Transaction signer to use for sender recovery
339+ chain BlockChain // Chain object to access the state through
340+ cQueue * conversionQueue // The queue for performing legacy sidecar conversion
334341
335- head * types.Header // Current head of the chain
336- state * state.StateDB // Current state at the head of the chain
337- gasTip * uint256.Int // Currently accepted minimum gas tip
342+ head atomic. Pointer [ types.Header ] // Current head of the chain
343+ state * state.StateDB // Current state at the head of the chain
344+ gasTip atomic. Pointer [ uint256.Int ] // Currently accepted minimum gas tip
338345
339346 lookup * lookup // Lookup table mapping blobs to txs and txs to billy entries
340347 index map [common.Address ][]* blobTxMeta // Blob transactions grouped by accounts, sorted by nonce
@@ -359,6 +366,7 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo
359366 hasPendingAuth : hasPendingAuth ,
360367 signer : types .LatestSigner (chain .Config ()),
361368 chain : chain ,
369+ cQueue : newConversionQueue (), // Deprecate it after the osaka fork
362370 lookup : newLookup (),
363371 index : make (map [common.Address ][]* blobTxMeta ),
364372 spent : make (map [common.Address ]* uint256.Int ),
@@ -400,7 +408,8 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
400408 if err != nil {
401409 return err
402410 }
403- p .head , p .state = head , state
411+ p .head .Store (head )
412+ p .state = state
404413
405414 // Create new slotter for pre-Osaka blob configuration.
406415 slotter := newSlotter (eip4844 .LatestMaxBlobsPerBlock (p .chain .Config ()))
@@ -440,11 +449,11 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
440449 p .recheck (addr , nil )
441450 }
442451 var (
443- basefee = uint256 .MustFromBig (eip1559 .CalcBaseFee (p .chain .Config (), p . head ))
452+ basefee = uint256 .MustFromBig (eip1559 .CalcBaseFee (p .chain .Config (), head ))
444453 blobfee = uint256 .NewInt (params .BlobTxMinBlobGasprice )
445454 )
446- if p . head .ExcessBlobGas != nil {
447- blobfee = uint256 .MustFromBig (eip4844 .CalcBlobFee (p .chain .Config (), p . head ))
455+ if head .ExcessBlobGas != nil {
456+ blobfee = uint256 .MustFromBig (eip4844 .CalcBlobFee (p .chain .Config (), head ))
448457 }
449458 p .evict = newPriceHeap (basefee , blobfee , p .index )
450459
@@ -474,6 +483,9 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
474483
475484// Close closes down the underlying persistent store.
476485func (p * BlobPool ) Close () error {
486+ // Terminate the conversion queue
487+ p .cQueue .close ()
488+
477489 var errs []error
478490 if p .limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set
479491 if err := p .limbo .Close (); err != nil {
@@ -832,7 +844,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
832844 log .Error ("Failed to reset blobpool state" , "err" , err )
833845 return
834846 }
835- p .head = newHead
847+ p .head . Store ( newHead )
836848 p .state = statedb
837849
838850 // Run the reorg between the old and new head and figure out which accounts
@@ -855,7 +867,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
855867 }
856868 }
857869 // Flush out any blobs from limbo that are older than the latest finality
858- if p .chain .Config ().IsCancun (p . head . Number , p . head .Time ) {
870+ if p .chain .Config ().IsCancun (newHead . Number , newHead .Time ) {
859871 p .limbo .finalize (p .chain .CurrentFinalBlock ())
860872 }
861873 // Reset the price heap for the new set of basefee/blobfee pairs
@@ -1056,14 +1068,15 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
10561068 defer p .lock .Unlock ()
10571069
10581070 // Store the new minimum gas tip
1059- old := p .gasTip
1060- p .gasTip = uint256 .MustFromBig (tip )
1071+ old := p .gasTip .Load ()
1072+ newTip := uint256 .MustFromBig (tip )
1073+ p .gasTip .Store (newTip )
10611074
10621075 // If the min miner fee increased, remove transactions below the new threshold
1063- if old == nil || p . gasTip .Cmp (old ) > 0 {
1076+ if old == nil || newTip .Cmp (old ) > 0 {
10641077 for addr , txs := range p .index {
10651078 for i , tx := range txs {
1066- if tx .execTipCap .Cmp (p . gasTip ) < 0 {
1079+ if tx .execTipCap .Cmp (newTip ) < 0 {
10671080 // Drop the offending transaction
10681081 var (
10691082 ids = []uint64 {tx .id }
@@ -1123,10 +1136,10 @@ func (p *BlobPool) ValidateTxBasics(tx *types.Transaction) error {
11231136 Config : p .chain .Config (),
11241137 Accept : 1 << types .BlobTxType ,
11251138 MaxSize : txMaxSize ,
1126- MinTip : p .gasTip .ToBig (),
1139+ MinTip : p .gasTip .Load (). ToBig (),
11271140 MaxBlobCount : maxBlobsPerTx ,
11281141 }
1129- return txpool .ValidateTransaction (tx , p .head , p .signer , opts )
1142+ return txpool .ValidateTransaction (tx , p .head . Load () , p .signer , opts )
11301143}
11311144
11321145// checkDelegationLimit determines if the tx sender is delegated or has a
@@ -1164,10 +1177,10 @@ func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error {
11641177
11651178// validateTx checks whether a transaction is valid according to the consensus
11661179// rules and adheres to some heuristic limits of the local node (price and size).
1180+ //
1181+ // This function assumes the static validation has been performed already and
1182+ // only runs the stateful checks with lock protection.
11671183func (p * BlobPool ) validateTx (tx * types.Transaction ) error {
1168- if err := p .ValidateTxBasics (tx ); err != nil {
1169- return err
1170- }
11711184 // Ensure the transaction adheres to the stateful pool filters (nonce, balance)
11721185 stateOpts := & txpool.ValidationOptionsWithState {
11731186 State : p .state ,
@@ -1444,17 +1457,67 @@ func (p *BlobPool) AvailableBlobs(vhashes []common.Hash) int {
14441457 return available
14451458}
14461459
1460+ // preCheck performs the static validation upon the provided txs and converts
1461+ // the legacy sidecars if Osaka fork has been activated with a short time window.
1462+ //
1463+ // This function is pure static and lock free.
1464+ func (p * BlobPool ) preCheck (txs []* types.Transaction ) ([]* types.Transaction , []error ) {
1465+ var (
1466+ head = p .head .Load ()
1467+ isOsaka = p .chain .Config ().IsOsaka (head .Number , head .Time )
1468+ deadline time.Time
1469+ )
1470+ if isOsaka {
1471+ deadline = time .Unix (int64 (* p .chain .Config ().OsakaTime ), 0 ).Add (conversionTimeWindow )
1472+ }
1473+ var errs []error
1474+ for _ , tx := range txs {
1475+ // Validate the transaction statically at first to avoid unnecessary
1476+ // conversion. This step doesn't require lock protection.
1477+ if err := p .ValidateTxBasics (tx ); err != nil {
1478+ errs = append (errs , err )
1479+ continue
1480+ }
1481+ // Before the Osaka fork, reject the blob txs with cell proofs
1482+ if ! isOsaka {
1483+ if tx .BlobTxSidecar ().Version == types .BlobSidecarVersion0 {
1484+ errs = append (errs , nil )
1485+ } else {
1486+ errs = append (errs , errors .New ("cell proof is not supported yet" ))
1487+ }
1488+ continue
1489+ }
1490+ // After the Osaka fork, reject the legacy blob txs if the conversion
1491+ // time window is passed.
1492+ if tx .BlobTxSidecar ().Version == types .BlobSidecarVersion1 {
1493+ errs = append (errs , nil )
1494+ continue
1495+ }
1496+ if head .Time > uint64 (deadline .Unix ()) {
1497+ errs = append (errs , errors .New ("legacy blob tx is not supported" ))
1498+ continue
1499+ }
1500+ // Convert the legacy sidecar after Osaka fork. This could be a long
1501+ // procedure which takes a few seconds, even minutes if there is a long
1502+ // queue. Fortunately it will only block the routine of the source peer
1503+ // announcing the tx, without affecting other parts.
1504+ errs = append (errs , p .cQueue .convert (tx ))
1505+ }
1506+ return txs , errs
1507+ }
1508+
14471509// Add inserts a set of blob transactions into the pool if they pass validation (both
14481510// consensus validity and pool restrictions).
1449- //
1450- // Note, if sync is set the method will block until all internal maintenance
1451- // related to the add is finished. Only use this during tests for determinism.
14521511func (p * BlobPool ) Add (txs []* types.Transaction , sync bool ) []error {
14531512 var (
1454- errs = make ( []error , len ( txs ))
1513+ errs []error
14551514 adds = make ([]* types.Transaction , 0 , len (txs ))
14561515 )
1516+ txs , errs = p .preCheck (txs )
14571517 for i , tx := range txs {
1518+ if errs [i ] != nil {
1519+ continue
1520+ }
14581521 errs [i ] = p .add (tx )
14591522 if errs [i ] == nil {
14601523 adds = append (adds , tx .WithoutBlobTxSidecar ())
@@ -1949,7 +2012,7 @@ func (p *BlobPool) Clear() {
19492012 p .spent = make (map [common.Address ]* uint256.Int )
19502013
19512014 var (
1952- basefee = uint256 .MustFromBig (eip1559 .CalcBaseFee (p .chain .Config (), p .head ))
2015+ basefee = uint256 .MustFromBig (eip1559 .CalcBaseFee (p .chain .Config (), p .head . Load () ))
19532016 blobfee = uint256 .NewInt (params .BlobTxMinBlobGasprice )
19542017 )
19552018 p .evict = newPriceHeap (basefee , blobfee , p .index )
0 commit comments