Skip to content

Commit 88e8c06

Browse files
Ruteriavalonche
authored andcommitted
Bundle cancellations (flashbots#36)
Introduces bundle replacement and cancellation via replacementUuid. Since the replacement is tied to a specific sender, eth_sendBundle gets two additional optional fields: the replacement uuid and the signingAddress of the bundle submission. The DB requests are done in the background, and cancellations are resolved while non-cancelable bundles are already being simulated to avoid waiting for DB to reply. If anything goes wrong with the cancellations, the cancelable bundles are not considered. Note: every block is now sent to the relay, as we can no longer rely on the highest-profit rule!
1 parent 7f0dcda commit 88e8c06

File tree

19 files changed

+353
-53
lines changed

19 files changed

+353
-53
lines changed

builder/builder.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package builder
33
import (
44
"context"
55
"errors"
6-
"math/big"
76
_ "os"
87
"sync"
98
"time"
109

10+
"github.com/ethereum/go-ethereum/common"
1111
blockvalidation "github.com/ethereum/go-ethereum/eth/block-validation"
1212
"golang.org/x/time/rate"
1313

@@ -236,23 +236,22 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy
236236
var (
237237
queueSignal = make(chan struct{}, 1)
238238

239-
queueMu sync.Mutex
240-
queueLastSubmittedProfit = new(big.Int)
241-
queueBestProfit = new(big.Int)
242-
queueBestEntry blockQueueEntry
239+
queueMu sync.Mutex
240+
queueLastSubmittedHash common.Hash
241+
queueBestEntry blockQueueEntry
243242
)
244243

245244
log.Debug("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash)
246245

247246
submitBestBlock := func() {
248247
queueMu.Lock()
249-
if queueLastSubmittedProfit.Cmp(queueBestProfit) < 0 {
248+
if queueBestEntry.block.Hash() != queueLastSubmittedHash {
250249
err := b.onSealedBlock(queueBestEntry.block, queueBestEntry.ordersCloseTime, queueBestEntry.sealedAt, queueBestEntry.commitedBundles, queueBestEntry.allBundles, proposerPubkey, vd, attrs)
251250

252251
if err != nil {
253252
log.Error("could not run sealed block hook", "err", err)
254253
} else {
255-
queueLastSubmittedProfit.Set(queueBestProfit)
254+
queueLastSubmittedHash = queueBestEntry.block.Hash()
256255
}
257256
}
258257
queueMu.Unlock()
@@ -271,15 +270,14 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy
271270

272271
queueMu.Lock()
273272
defer queueMu.Unlock()
274-
if block.Profit.Cmp(queueBestProfit) > 0 {
273+
if block.Hash() != queueLastSubmittedHash {
275274
queueBestEntry = blockQueueEntry{
276275
block: block,
277276
ordersCloseTime: ordersCloseTime,
278277
sealedAt: sealedAt,
279278
commitedBundles: commitedBundles,
280279
allBundles: allBundles,
281280
}
282-
queueBestProfit.Set(block.Profit)
283281

284282
select {
285283
case queueSignal <- struct{}{}:

builder/builder_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,20 @@ func TestOnPayloadAttributes(t *testing.T) {
126126

127127
require.Equal(t, uint64(25), testRelay.requestedSlot)
128128

129-
// Clear the submitted message and check that the job will be ran again and but a new message will not be submitted since the profit is the same
129+
// Clear the submitted message and check that the job will be ran again and but a new message will not be submitted since the hash is the same
130+
testBlock.Profit = big.NewInt(10)
130131
testRelay.submittedMsg = nil
131132
time.Sleep(2200 * time.Millisecond)
132133
require.Nil(t, testRelay.submittedMsg)
133134

134-
// Up the profit, expect to get the block
135-
testEthService.testBlock.Profit.SetInt64(11)
135+
// Change the hash, expect to get the block
136+
testExecutableData.ExtraData = hexutil.MustDecode("0x0042fafd")
137+
testExecutableData.BlockHash = common.HexToHash("0x0579b1aaca5c079c91e5774bac72c7f9bc2ddf2b126e9c632be68a1cb8f3fc71")
138+
testBlock, err = beacon.ExecutableDataToBlock(*testExecutableData)
139+
testBlock.Profit = big.NewInt(10)
140+
require.NoError(t, err)
141+
testEthService.testBlock = testBlock
142+
136143
time.Sleep(2200 * time.Millisecond)
137144
require.NotNil(t, testRelay.submittedMsg)
138145
}

builder/service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {
185185
mevBundleCh := make(chan []types.MevBundle)
186186
blockNumCh := make(chan int64)
187187
bundleFetcher := flashbotsextra.NewBundleFetcher(backend, ds, blockNumCh, mevBundleCh, true)
188+
backend.RegisterBundleFetcher(bundleFetcher)
188189
go bundleFetcher.Run()
189190
}
190191

core/txpool/txpool.go

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package txpool
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"math"
@@ -35,6 +36,7 @@ import (
3536
"github.com/ethereum/go-ethereum/log"
3637
"github.com/ethereum/go-ethereum/metrics"
3738
"github.com/ethereum/go-ethereum/params"
39+
"github.com/google/uuid"
3840
"golang.org/x/crypto/sha3"
3941
)
4042

@@ -289,6 +291,8 @@ type TxPool struct {
289291
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
290292

291293
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
294+
295+
bundleFetcher IFetcher
292296
}
293297

294298
type txpoolResetRequest struct {
@@ -352,6 +356,17 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain)
352356
return pool
353357
}
354358

359+
type IFetcher interface {
360+
GetLatestUuidBundles(ctx context.Context, blockNum int64) ([]types.LatestUuidBundle, error)
361+
}
362+
363+
func (pool *TxPool) RegisterBundleFetcher(fetcher IFetcher) {
364+
pool.mu.Lock()
365+
defer pool.mu.Unlock()
366+
367+
pool.bundleFetcher = fetcher
368+
}
369+
355370
// loop is the transaction pool's main event loop, waiting for and reacting to
356371
// outside blockchain events as well as for various reporting and transaction
357372
// eviction events.
@@ -589,21 +604,79 @@ func (pool *TxPool) Pending(enforceTips bool) map[common.Address]types.Transacti
589604
return pending
590605
}
591606

592-
/// AllMevBundles returns all the MEV Bundles currently in the pool
593-
func (pool *TxPool) AllMevBundles() []types.MevBundle {
594-
return pool.mevBundles
607+
type uuidBundleKey struct {
608+
Uuid uuid.UUID
609+
SigningAddress common.Address
610+
}
611+
612+
func (pool *TxPool) fetchLatestCancellableBundles(ctx context.Context, blockNumber *big.Int) (chan []types.LatestUuidBundle, chan error) {
613+
if pool.bundleFetcher == nil {
614+
return nil, nil
615+
}
616+
errCh := make(chan error, 1)
617+
lubCh := make(chan []types.LatestUuidBundle, 1)
618+
go func(blockNum int64) {
619+
lub, err := pool.bundleFetcher.GetLatestUuidBundles(ctx, blockNum)
620+
errCh <- err
621+
lubCh <- lub
622+
}(blockNumber.Int64())
623+
return lubCh, errCh
624+
}
625+
626+
func resolveCancellableBundles(lubCh chan []types.LatestUuidBundle, errCh chan error, uuidBundles map[uuidBundleKey][]types.MevBundle) []types.MevBundle {
627+
if lubCh == nil || errCh == nil {
628+
return nil
629+
}
630+
631+
if len(uuidBundles) == 0 {
632+
return nil
633+
}
634+
635+
err := <-errCh
636+
if err != nil {
637+
log.Error("could not fetch latest bundles uuid map", "err", err)
638+
return nil
639+
}
640+
641+
currentCancellableBundles := []types.MevBundle{}
642+
643+
log.Trace("Processing uuid bundles", "uuidBundles", uuidBundles)
644+
645+
lubs := <-lubCh
646+
for _, lub := range lubs {
647+
ubk := uuidBundleKey{lub.Uuid, lub.SigningAddress}
648+
bundles, found := uuidBundles[ubk]
649+
if !found {
650+
log.Trace("missing uuid bundle", "ubk", ubk)
651+
continue
652+
}
653+
for _, bundle := range bundles {
654+
if bundle.Hash == lub.BundleHash {
655+
log.Trace("adding uuid bundle", "bundle hash", bundle.Hash.String(), "lub", lub)
656+
currentCancellableBundles = append(currentCancellableBundles, bundle)
657+
break
658+
}
659+
}
660+
}
661+
return currentCancellableBundles
595662
}
596663

597664
// MevBundles returns a list of bundles valid for the given blockNumber/blockTimestamp
598665
// also prunes bundles that are outdated
599-
func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) []types.MevBundle {
666+
// Returns regular bundles and a function resolving to current cancellable bundles
667+
func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) ([]types.MevBundle, chan []types.MevBundle) {
600668
pool.mu.Lock()
601669
defer pool.mu.Unlock()
602670

671+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
672+
lubCh, errCh := pool.fetchLatestCancellableBundles(ctx, blockNumber)
673+
603674
// returned values
604675
var ret []types.MevBundle
605676
// rolled over values
606677
var bundles []types.MevBundle
678+
// (uuid, signingAddress) -> list of bundles
679+
var uuidBundles = make(map[uuidBundleKey][]types.MevBundle)
607680

608681
for _, bundle := range pool.mevBundles {
609682
// Prune outdated bundles
@@ -617,14 +690,31 @@ func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) []ty
617690
continue
618691
}
619692

620-
// return the ones which are in time
621-
ret = append(ret, bundle)
622693
// keep the bundles around internally until they need to be pruned
623694
bundles = append(bundles, bundle)
695+
696+
// TODO: omit duplicates
697+
698+
// do not append to the return quite yet, check the DB for the latest bundle for that uuid
699+
if bundle.Uuid != types.EmptyUUID {
700+
ubk := uuidBundleKey{bundle.Uuid, bundle.SigningAddress}
701+
uuidBundles[ubk] = append(uuidBundles[ubk], bundle)
702+
continue
703+
}
704+
705+
// return the ones which are in time
706+
ret = append(ret, bundle)
624707
}
625708

626709
pool.mevBundles = bundles
627-
return ret
710+
711+
cancellableBundlesCh := make(chan []types.MevBundle, 1)
712+
go func() {
713+
cancellableBundlesCh <- resolveCancellableBundles(lubCh, errCh, uuidBundles)
714+
cancel()
715+
}()
716+
717+
return ret, cancellableBundlesCh
628718
}
629719

630720
// AddMevBundles adds a mev bundles to the pool
@@ -637,7 +727,7 @@ func (pool *TxPool) AddMevBundles(mevBundles []types.MevBundle) error {
637727
}
638728

639729
// AddMevBundle adds a mev bundle to the pool
640-
func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
730+
func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, replacementUuid uuid.UUID, signingAddress common.Address, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
641731
bundleHasher := sha3.NewLegacyKeccak256()
642732
for _, tx := range txs {
643733
bundleHasher.Write(tx.Hash().Bytes())
@@ -650,6 +740,8 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m
650740
pool.mevBundles = append(pool.mevBundles, types.MevBundle{
651741
Txs: txs,
652742
BlockNumber: blockNumber,
743+
Uuid: replacementUuid,
744+
SigningAddress: signingAddress,
653745
MinTimestamp: minTimestamp,
654746
MaxTimestamp: maxTimestamp,
655747
RevertingTxHashes: revertingTxHashes,

0 commit comments

Comments
 (0)