1717package core
1818
1919import (
20+ "context"
2021 "errors"
2122 "math"
2223 "math/big"
@@ -34,6 +35,7 @@ import (
3435 "github.com/ethereum/go-ethereum/log"
3536 "github.com/ethereum/go-ethereum/metrics"
3637 "github.com/ethereum/go-ethereum/params"
38+ "github.com/google/uuid"
3739 "golang.org/x/crypto/sha3"
3840)
3941
@@ -280,6 +282,8 @@ type TxPool struct {
280282 initDoneCh chan struct {} // is closed once the pool is initialized (for tests)
281283
282284 changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
285+
286+ bundleFetcher IFetcher
283287}
284288
285289type txpoolResetRequest struct {
@@ -344,6 +348,17 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
344348 return pool
345349}
346350
351+ type IFetcher interface {
352+ GetLatestUuidBundles (ctx context.Context , blockNum int64 ) ([]types.LatestUuidBundle , error )
353+ }
354+
355+ func (pool * TxPool ) RegisterBundleFetcher (fetcher IFetcher ) {
356+ pool .mu .Lock ()
357+ defer pool .mu .Unlock ()
358+
359+ pool .bundleFetcher = fetcher
360+ }
361+
347362// loop is the transaction pool's main event loop, waiting for and reacting to
348363// outside blockchain events as well as for various reporting and transaction
349364// eviction events.
@@ -581,21 +596,79 @@ func (pool *TxPool) Pending(enforceTips bool) map[common.Address]types.Transacti
581596 return pending
582597}
583598
584- /// AllMevBundles returns all the MEV Bundles currently in the pool
585- func (pool * TxPool ) AllMevBundles () []types.MevBundle {
586- return pool .mevBundles
599+ type uuidBundleKey struct {
600+ Uuid uuid.UUID
601+ SigningAddress common.Address
602+ }
603+
604+ func (pool * TxPool ) fetchLatestCancellableBundles (ctx context.Context , blockNumber * big.Int ) (chan []types.LatestUuidBundle , chan error ) {
605+ if pool .bundleFetcher == nil {
606+ return nil , nil
607+ }
608+ errCh := make (chan error , 1 )
609+ lubCh := make (chan []types.LatestUuidBundle , 1 )
610+ go func (blockNum int64 ) {
611+ lub , err := pool .bundleFetcher .GetLatestUuidBundles (ctx , blockNum )
612+ errCh <- err
613+ lubCh <- lub
614+ }(blockNumber .Int64 ())
615+ return lubCh , errCh
616+ }
617+
618+ func resolveCancellableBundles (lubCh chan []types.LatestUuidBundle , errCh chan error , uuidBundles map [uuidBundleKey ][]types.MevBundle ) []types.MevBundle {
619+ if lubCh == nil || errCh == nil {
620+ return nil
621+ }
622+
623+ if len (uuidBundles ) == 0 {
624+ return nil
625+ }
626+
627+ err := <- errCh
628+ if err != nil {
629+ log .Error ("could not fetch latest bundles uuid map" , "err" , err )
630+ return nil
631+ }
632+
633+ currentCancellableBundles := []types.MevBundle {}
634+
635+ log .Trace ("Processing uuid bundles" , "uuidBundles" , uuidBundles )
636+
637+ lubs := <- lubCh
638+ for _ , lub := range lubs {
639+ ubk := uuidBundleKey {lub .Uuid , lub .SigningAddress }
640+ bundles , found := uuidBundles [ubk ]
641+ if ! found {
642+ log .Trace ("missing uuid bundle" , "ubk" , ubk )
643+ continue
644+ }
645+ for _ , bundle := range bundles {
646+ if bundle .Hash == lub .BundleHash {
647+ log .Trace ("adding uuid bundle" , "bundle hash" , bundle .Hash .String (), "lub" , lub )
648+ currentCancellableBundles = append (currentCancellableBundles , bundle )
649+ break
650+ }
651+ }
652+ }
653+ return currentCancellableBundles
587654}
588655
589656// MevBundles returns a list of bundles valid for the given blockNumber/blockTimestamp
590657// also prunes bundles that are outdated
591- func (pool * TxPool ) MevBundles (blockNumber * big.Int , blockTimestamp uint64 ) []types.MevBundle {
658+ // Returns regular bundles and a function resolving to current cancellable bundles
659+ func (pool * TxPool ) MevBundles (blockNumber * big.Int , blockTimestamp uint64 ) ([]types.MevBundle , chan []types.MevBundle ) {
592660 pool .mu .Lock ()
593661 defer pool .mu .Unlock ()
594662
663+ ctx , cancel := context .WithTimeout (context .Background (), 500 * time .Millisecond )
664+ lubCh , errCh := pool .fetchLatestCancellableBundles (ctx , blockNumber )
665+
595666 // returned values
596667 var ret []types.MevBundle
597668 // rolled over values
598669 var bundles []types.MevBundle
670+ // (uuid, signingAddress) -> list of bundles
671+ var uuidBundles = make (map [uuidBundleKey ][]types.MevBundle )
599672
600673 for _ , bundle := range pool .mevBundles {
601674 // Prune outdated bundles
@@ -609,14 +682,31 @@ func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) []ty
609682 continue
610683 }
611684
612- // return the ones which are in time
613- ret = append (ret , bundle )
614685 // keep the bundles around internally until they need to be pruned
615686 bundles = append (bundles , bundle )
687+
688+ // TODO: omit duplicates
689+
690+ // do not append to the return quite yet, check the DB for the latest bundle for that uuid
691+ if bundle .Uuid != types .EmptyUUID {
692+ ubk := uuidBundleKey {bundle .Uuid , bundle .SigningAddress }
693+ uuidBundles [ubk ] = append (uuidBundles [ubk ], bundle )
694+ continue
695+ }
696+
697+ // return the ones which are in time
698+ ret = append (ret , bundle )
616699 }
617700
618701 pool .mevBundles = bundles
619- return ret
702+
703+ cancellableBundlesCh := make (chan []types.MevBundle , 1 )
704+ go func () {
705+ cancellableBundlesCh <- resolveCancellableBundles (lubCh , errCh , uuidBundles )
706+ cancel ()
707+ }()
708+
709+ return ret , cancellableBundlesCh
620710}
621711
622712// AddMevBundles adds a mev bundles to the pool
@@ -629,7 +719,7 @@ func (pool *TxPool) AddMevBundles(mevBundles []types.MevBundle) error {
629719}
630720
631721// AddMevBundle adds a mev bundle to the pool
632- func (pool * TxPool ) AddMevBundle (txs types.Transactions , blockNumber * big.Int , minTimestamp , maxTimestamp uint64 , revertingTxHashes []common.Hash ) error {
722+ func (pool * TxPool ) AddMevBundle (txs types.Transactions , blockNumber * big.Int , replacementUuid uuid. UUID , signingAddress common. Address , minTimestamp , maxTimestamp uint64 , revertingTxHashes []common.Hash ) error {
633723 bundleHasher := sha3 .NewLegacyKeccak256 ()
634724 for _ , tx := range txs {
635725 bundleHasher .Write (tx .Hash ().Bytes ())
@@ -642,6 +732,8 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m
642732 pool .mevBundles = append (pool .mevBundles , types.MevBundle {
643733 Txs : txs ,
644734 BlockNumber : blockNumber ,
735+ Uuid : replacementUuid ,
736+ SigningAddress : signingAddress ,
645737 MinTimestamp : minTimestamp ,
646738 MaxTimestamp : maxTimestamp ,
647739 RevertingTxHashes : revertingTxHashes ,
0 commit comments