Skip to content

Commit a5203fb

Browse files
fjlrjl493456442lightclient
authored andcommitted
core/txpool/blobpool: fork boundary conversion 3 (ethereum#32716)
This implements the conversion of existing blob transactions to the new proof version. Conversion is triggered at the Osaka fork boundary. The conversion is designed to be idempotent, and may be triggered multiple times in case of a reorg around the fork boundary. This change is the last missing piece that completes our strategy for the blobpool conversion. After the Osaka fork, - new transactions will be converted on-the-fly upon entry to the pool - reorged transactions will be converted while being reinjected - (this change) existing transactions will be converted in the background --------- Co-authored-by: Gary Rong <[email protected]> Co-authored-by: lightclient <[email protected]>
1 parent e45b113 commit a5203fb

File tree

4 files changed

+431
-21
lines changed

4 files changed

+431
-21
lines changed

core/txpool/blobpool/blobpool.go

Lines changed: 169 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ import (
2121
"container/heap"
2222
"errors"
2323
"fmt"
24+
"maps"
2425
"math"
2526
"math/big"
2627
"os"
2728
"path/filepath"
29+
"slices"
2830
"sort"
2931
"sync"
3032
"sync/atomic"
@@ -337,7 +339,7 @@ type BlobPool struct {
337339

338340
signer types.Signer // Transaction signer to use for sender recovery
339341
chain BlockChain // Chain object to access the state through
340-
cQueue *conversionQueue // The queue for performing legacy sidecar conversion
342+
cQueue *conversionQueue // The queue for performing legacy sidecar conversion (TODO: remove after Osaka)
341343

342344
head atomic.Pointer[types.Header] // Current head of the chain
343345
state *state.StateDB // Current state at the head of the chain
@@ -883,6 +885,172 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
883885
basefeeGauge.Update(int64(basefee.Uint64()))
884886
blobfeeGauge.Update(int64(blobfee.Uint64()))
885887
p.updateStorageMetrics()
888+
889+
// Perform the conversion logic at the fork boundary
890+
if !p.chain.Config().IsOsaka(oldHead.Number, oldHead.Time) && p.chain.Config().IsOsaka(newHead.Number, newHead.Time) {
891+
// Deep copy all indexed transaction metadata.
892+
var (
893+
ids = make(map[common.Address]map[uint64]uint64)
894+
txs = make(map[common.Address]map[uint64]common.Hash)
895+
)
896+
for sender, list := range p.index {
897+
ids[sender] = make(map[uint64]uint64)
898+
txs[sender] = make(map[uint64]common.Hash)
899+
for _, m := range list {
900+
ids[sender][m.nonce] = m.id
901+
txs[sender][m.nonce] = m.hash
902+
}
903+
}
904+
// Initiate the background conversion thread.
905+
p.cQueue.launchBillyConversion(func() {
906+
p.convertLegacySidecars(ids, txs)
907+
})
908+
}
909+
}
910+
911+
// compareAndSwap checks if the specified transaction is still tracked in the pool
912+
// and replace the metadata accordingly. It should only be used in the fork boundary
913+
// bulk conversion. If it fails for some reason, the subsequent txs won't be dropped
914+
// for simplicity which we assume it's very likely to happen.
915+
//
916+
// The returned flag indicates whether the replacement succeeded.
917+
func (p *BlobPool) compareAndSwap(address common.Address, hash common.Hash, blob []byte, oldID uint64, oldStorageSize uint32) bool {
918+
p.lock.Lock()
919+
defer p.lock.Unlock()
920+
921+
newId, err := p.store.Put(blob)
922+
if err != nil {
923+
log.Error("Failed to store transaction", "hash", hash, "err", err)
924+
return false
925+
}
926+
newSize := uint64(len(blob))
927+
newStorageSize := p.store.Size(newId)
928+
929+
// Terminate the procedure if the transaction was already evicted. The
930+
// newly added blob should be removed before return.
931+
if !p.lookup.update(hash, newId, newSize) {
932+
if derr := p.store.Delete(newId); derr != nil {
933+
log.Error("Failed to delete the dangling blob tx", "err", derr)
934+
} else {
935+
log.Warn("Deleted the dangling blob tx", "id", newId)
936+
}
937+
return false
938+
}
939+
// Update the metadata of blob transaction
940+
for _, meta := range p.index[address] {
941+
if meta.hash == hash {
942+
meta.id = newId
943+
meta.version = types.BlobSidecarVersion1
944+
meta.storageSize = newStorageSize
945+
meta.size = newSize
946+
947+
p.stored += uint64(newStorageSize)
948+
p.stored -= uint64(oldStorageSize)
949+
break
950+
}
951+
}
952+
if err := p.store.Delete(oldID); err != nil {
953+
log.Error("Failed to delete the legacy transaction", "hash", hash, "id", oldID, "err", err)
954+
}
955+
return true
956+
}
957+
958+
// convertLegacySidecar fetches transaction data from the store, performs an
959+
// on-the-fly conversion. This function is intended for use only during the
960+
// Osaka fork transition period.
961+
//
962+
// The returned flag indicates whether the replacement succeeds or not.
963+
func (p *BlobPool) convertLegacySidecar(sender common.Address, hash common.Hash, id uint64) bool {
964+
start := time.Now()
965+
966+
// Retrieves the legacy blob transaction from the underlying store with
967+
// read lock held, preventing any potential data race around the slot
968+
// specified by the id.
969+
p.lock.RLock()
970+
data, err := p.store.Get(id)
971+
if err != nil {
972+
p.lock.RUnlock()
973+
// The transaction may have been evicted simultaneously, safe to skip conversion.
974+
log.Debug("Blob transaction is missing", "hash", hash, "id", id, "err", err)
975+
return false
976+
}
977+
oldStorageSize := p.store.Size(id)
978+
p.lock.RUnlock()
979+
980+
// Decode the transaction, the failure is not expected and report the error
981+
// loudly if possible. If the blob transaction in this slot is corrupted,
982+
// leave it in the store, it will be dropped during the next pool
983+
// initialization.
984+
var tx types.Transaction
985+
if err = rlp.DecodeBytes(data, &tx); err != nil {
986+
log.Error("Blob transaction is corrupted", "hash", hash, "id", id, "err", err)
987+
return false
988+
}
989+
990+
// Skip conversion if the transaction does not match the expected hash, or if it was
991+
// already converted. This can occur if the original transaction was evicted from the
992+
// pool and the slot was reused by a new one.
993+
if tx.Hash() != hash {
994+
log.Warn("Blob transaction was replaced", "hash", hash, "id", id, "stored", tx.Hash())
995+
return false
996+
}
997+
sc := tx.BlobTxSidecar()
998+
if sc.Version >= types.BlobSidecarVersion1 {
999+
log.Debug("Skipping conversion of blob tx", "hash", hash, "id", id)
1000+
return false
1001+
}
1002+
1003+
// Perform the sidecar conversion, the failure is not expected and report the error
1004+
// loudly if possible.
1005+
if err := tx.BlobTxSidecar().ToV1(); err != nil {
1006+
log.Error("Failed to convert blob transaction", "hash", hash, "err", err)
1007+
return false
1008+
}
1009+
1010+
// Encode the converted transaction, the failure is not expected and report
1011+
// the error loudly if possible.
1012+
blob, err := rlp.EncodeToBytes(&tx)
1013+
if err != nil {
1014+
log.Error("Failed to encode blob transaction", "hash", tx.Hash(), "err", err)
1015+
return false
1016+
}
1017+
1018+
// Replace the legacy blob transaction with the converted format.
1019+
if !p.compareAndSwap(sender, hash, blob, id, oldStorageSize) {
1020+
log.Error("Failed to replace the legacy transaction", "hash", hash)
1021+
return false
1022+
}
1023+
log.Debug("Converted legacy blob transaction", "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
1024+
return true
1025+
}
1026+
1027+
// convertLegacySidecars converts all given transactions to sidecar version 1.
1028+
//
1029+
// If any of them fails to be converted, the subsequent transactions will still
1030+
// be processed, as we assume the failure is very unlikely to happen. If happens,
1031+
// these transactions will be stuck in the pool until eviction.
1032+
func (p *BlobPool) convertLegacySidecars(ids map[common.Address]map[uint64]uint64, txs map[common.Address]map[uint64]common.Hash) {
1033+
var (
1034+
start = time.Now()
1035+
success int
1036+
failure int
1037+
)
1038+
for addr, list := range txs {
1039+
// Transactions evicted from the pool must be contiguous, if in any case,
1040+
// the transactions are gapped with each other, they will be discarded.
1041+
nonces := slices.Collect(maps.Keys(list))
1042+
slices.Sort(nonces)
1043+
1044+
// Convert the txs with nonce order
1045+
for _, nonce := range nonces {
1046+
if p.convertLegacySidecar(addr, list[nonce], ids[addr][nonce]) {
1047+
success++
1048+
} else {
1049+
failure++
1050+
}
1051+
}
1052+
}
1053+
log.Info("Completed blob transaction conversion", "discarded", failure, "injected", success, "elapsed", common.PrettyDuration(time.Since(start)))
8861054
}
8871055

8881056
// reorg assembles all the transactors and missing transactions between an old

core/txpool/blobpool/blobpool_test.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2098,6 +2098,185 @@ func TestGetBlobs(t *testing.T) {
20982098
pool.Close()
20992099
}
21002100

2101+
// TestSidecarConversion will verify that after the Osaka fork, all legacy
2102+
// sidecars in the pool are successfully convert to v1 sidecars.
2103+
func TestSidecarConversion(t *testing.T) {
2104+
// log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))
2105+
2106+
// Create a temporary folder for the persistent backend
2107+
storage := t.TempDir()
2108+
os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700)
2109+
2110+
var (
2111+
preOsakaTxs = make(types.Transactions, 10)
2112+
postOsakaTxs = make(types.Transactions, 3)
2113+
keys = make([]*ecdsa.PrivateKey, len(preOsakaTxs)+len(postOsakaTxs))
2114+
addrs = make([]common.Address, len(preOsakaTxs)+len(postOsakaTxs))
2115+
statedb, _ = state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
2116+
)
2117+
for i := range keys {
2118+
keys[i], _ = crypto.GenerateKey()
2119+
addrs[i] = crypto.PubkeyToAddress(keys[i].PublicKey)
2120+
statedb.AddBalance(addrs[i], uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
2121+
}
2122+
for i := range preOsakaTxs {
2123+
preOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 2, 0, keys[i], types.BlobSidecarVersion0)
2124+
}
2125+
for i := range postOsakaTxs {
2126+
if i == 0 {
2127+
// First has a v0 sidecar.
2128+
postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion0)
2129+
}
2130+
postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion1)
2131+
}
2132+
statedb.Commit(0, true, false)
2133+
2134+
// Test plan:
2135+
// 1) Create a bunch v0 sidecar txs and add to pool before Osaka.
2136+
// 2) Pass in new Osaka header to activate the conversion thread.
2137+
// 3) Continue adding both v0 and v1 transactions to the pool.
2138+
// 4) Verify that as additional blocks come in, transactions involved in the
2139+
// migration are correctly discarded.
2140+
2141+
config := &params.ChainConfig{
2142+
ChainID: big.NewInt(1),
2143+
LondonBlock: big.NewInt(0),
2144+
BerlinBlock: big.NewInt(0),
2145+
CancunTime: newUint64(0),
2146+
PragueTime: newUint64(0),
2147+
OsakaTime: newUint64(1),
2148+
BlobScheduleConfig: params.DefaultBlobSchedule,
2149+
}
2150+
chain := &testBlockChain{
2151+
config: config,
2152+
basefee: uint256.NewInt(1050),
2153+
blobfee: uint256.NewInt(105),
2154+
statedb: statedb,
2155+
blocks: make(map[uint64]*types.Block),
2156+
}
2157+
2158+
// Create 3 blocks:
2159+
// - the current block, before Osaka
2160+
// - the first block after Osaka
2161+
// - another post-Osaka block with several transactions in it
2162+
header0 := chain.CurrentBlock()
2163+
header0.Time = 0
2164+
chain.blocks[0] = types.NewBlockWithHeader(header0)
2165+
2166+
header1 := chain.CurrentBlock()
2167+
header1.Number = big.NewInt(1)
2168+
header1.Time = 1
2169+
chain.blocks[1] = types.NewBlockWithHeader(header1)
2170+
2171+
header2 := chain.CurrentBlock()
2172+
header2.Time = 2
2173+
header2.Number = big.NewInt(2)
2174+
2175+
// Make a copy of one of the pre-Osaka transactions and convert it to v1 here
2176+
// so that we can add it to the pool later and ensure a duplicate is not added
2177+
// by the conversion queue.
2178+
tx := preOsakaTxs[len(preOsakaTxs)-1]
2179+
sc := *tx.BlobTxSidecar() // copy sidecar
2180+
sc.ToV1()
2181+
tx.WithBlobTxSidecar(&sc)
2182+
2183+
block2 := types.NewBlockWithHeader(header2).WithBody(types.Body{Transactions: append(postOsakaTxs, tx)})
2184+
chain.blocks[2] = block2
2185+
2186+
pool := New(Config{Datadir: storage}, chain, nil)
2187+
if err := pool.Init(1, header0, newReserver()); err != nil {
2188+
t.Fatalf("failed to create blob pool: %v", err)
2189+
}
2190+
2191+
errs := pool.Add(preOsakaTxs, true)
2192+
for i, err := range errs {
2193+
if err != nil {
2194+
t.Errorf("failed to insert blob tx from %s: %s", addrs[i], errs[i])
2195+
}
2196+
}
2197+
2198+
// Kick off migration.
2199+
pool.Reset(header0, header1)
2200+
2201+
// Add the v0 sidecar tx, but don't block so we can keep doing other stuff
2202+
// while it converts the sidecar.
2203+
addDone := make(chan struct{})
2204+
go func() {
2205+
pool.Add(types.Transactions{postOsakaTxs[0]}, false)
2206+
close(addDone)
2207+
}()
2208+
2209+
// Add the post-Osaka v1 sidecar txs.
2210+
errs = pool.Add(postOsakaTxs[1:], false)
2211+
for _, err := range errs {
2212+
if err != nil {
2213+
t.Fatalf("expected tx add to succeed: %v", err)
2214+
}
2215+
}
2216+
2217+
// Wait for the first tx's conversion to complete, then check that all
2218+
// transactions added after Osaka can be accounted for in the pool.
2219+
<-addDone
2220+
pending := pool.Pending(txpool.PendingFilter{BlobTxs: true, BlobVersion: types.BlobSidecarVersion1})
2221+
for _, tx := range postOsakaTxs {
2222+
from, _ := pool.signer.Sender(tx)
2223+
if len(pending[from]) != 1 || pending[from][0].Hash != tx.Hash() {
2224+
t.Fatalf("expected post-Osaka txs to be pending")
2225+
}
2226+
}
2227+
2228+
// Now update the pool with the next block. This should cause the pool to
2229+
// clear out the post-Osaka txs since they were included in block 2. Since the
2230+
// test blockchain doesn't manage nonces, we'll just do that manually before
2231+
// the reset is called. Don't forget about the pre-Osaka transaction we also
2232+
// added to block 2!
2233+
for i := range postOsakaTxs {
2234+
statedb.SetNonce(addrs[len(preOsakaTxs)+i], 1, tracing.NonceChangeEoACall)
2235+
}
2236+
statedb.SetNonce(addrs[len(preOsakaTxs)-1], 1, tracing.NonceChangeEoACall)
2237+
pool.Reset(header1, block2.Header())
2238+
2239+
// Now verify no post-Osaka transactions are tracked by the pool.
2240+
for i, tx := range postOsakaTxs {
2241+
if pool.Get(tx.Hash()) != nil {
2242+
t.Fatalf("expected txs added post-osaka to have been placed in limbo due to inclusion in a block: index %d, hash %s", i, tx.Hash())
2243+
}
2244+
}
2245+
2246+
// Wait for the pool migration to complete.
2247+
<-pool.cQueue.anyBillyConversionDone
2248+
2249+
// Verify all transactions in the pool were converted and verify the
2250+
// subsequent cell proofs.
2251+
count, _ := pool.Stats()
2252+
if count != len(preOsakaTxs)-1 {
2253+
t.Errorf("expected pending count to match initial tx count: pending=%d, expected=%d", count, len(preOsakaTxs)-1)
2254+
}
2255+
for addr, acc := range pool.index {
2256+
for _, m := range acc {
2257+
if m.version != types.BlobSidecarVersion1 {
2258+
t.Errorf("expected sidecar to have been converted: from %s, hash %s", addr, m.hash)
2259+
}
2260+
tx := pool.Get(m.hash)
2261+
if tx == nil {
2262+
t.Errorf("failed to get tx by hash: %s", m.hash)
2263+
}
2264+
sc := tx.BlobTxSidecar()
2265+
if err := kzg4844.VerifyCellProofs(sc.Blobs, sc.Commitments, sc.Proofs); err != nil {
2266+
t.Errorf("failed to verify cell proofs for tx %s after conversion: %s", m.hash, err)
2267+
}
2268+
}
2269+
}
2270+
2271+
verifyPoolInternals(t, pool)
2272+
2273+
// Launch conversion a second time.
2274+
// This is just a sanity check to ensure we can handle it.
2275+
pool.Reset(header0, header1)
2276+
2277+
pool.Close()
2278+
}
2279+
21012280
// fakeBilly is a billy.Database implementation which just drops data on the floor.
21022281
type fakeBilly struct {
21032282
billy.Database
@@ -2180,3 +2359,5 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
21802359
}
21812360
}
21822361
}
2362+
2363+
func newUint64(val uint64) *uint64 { return &val }

0 commit comments

Comments
 (0)