diff --git a/blocks/block.go b/blocks/block.go index d4b2760..b82874f 100644 --- a/blocks/block.go +++ b/blocks/block.go @@ -13,6 +13,7 @@ import ( "sync/atomic" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/core/types" "go.uber.org/zap" ) @@ -114,3 +115,27 @@ func (b *Block) CopyAncestorsFrom(c *Block) error { a := c.ancestry.Load() return b.setAncestors(a.parent, a.lastSettled) } + +// A Source returns a [Block] that matches both a hash and number, and a boolean +// indicating if such a block was found. +type Source func(hash common.Hash, number uint64) (*Block, bool) + +// EthBlock returns the [types.Block] with the given hash and number, or nil if +// not found. +func (s Source) EthBlock(h common.Hash, n uint64) *types.Block { + b, ok := s(h, n) + if !ok { + return nil + } + return b.EthBlock() +} + +// Header returns the [types.Header] with the given hash and number, or nil if +// not found. +func (s Source) Header(h common.Hash, n uint64) *types.Header { + b, ok := s(h, n) + if !ok { + return nil + } + return b.Header() +} diff --git a/blocks/blockstest/chain.go b/blocks/blockstest/chain.go index 9a5a569..7622d4f 100644 --- a/blocks/blockstest/chain.go +++ b/blocks/blockstest/chain.go @@ -99,6 +99,8 @@ func (cb *ChainBuilder) AllExceptGenesis() []*blocks.Block { return slices.Clone(cb.chain[1:]) } +var _ blocks.Source = (*ChainBuilder)(nil).GetBlock + // GetBlock returns the block with specified hash and height, and a flag // indicating if it was found. If either argument does not match, it returns // `nil, false`. diff --git a/cmputils/types.go b/cmputils/types.go index d82ae07..1f3cc8d 100644 --- a/cmputils/types.go +++ b/cmputils/types.go @@ -26,6 +26,14 @@ func BlocksByHash() cmp.Option { }) } +// TransactionsByHash returns a [cmp.Comparer] for [types.Transaction] pointers, +// equating them by hash alone. +func TransactionsByHash() cmp.Option { + return ComparerWithNilCheck(func(t, u *types.Transaction) bool { + return t.Hash() == u.Hash() + }) +} + // ReceiptsByTxHash returns a [cmp.Comparer] for [types.Receipt] pointers, // equating them by transaction hash alone. func ReceiptsByTxHash() cmp.Option { diff --git a/go.mod b/go.mod index 9f0bca3..ec80283 100644 --- a/go.mod +++ b/go.mod @@ -76,7 +76,7 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.16.0 // indirect + github.com/prometheus/client_golang v1.16.0 github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect diff --git a/saexec/context.go b/saexec/context.go index eca9dd5..b8192f2 100644 --- a/saexec/context.go +++ b/saexec/context.go @@ -13,23 +13,15 @@ import ( "github.com/ava-labs/strevm/blocks" ) -// A BlockSource returns a block that matches both a hash and number, or nil -// if not found. -type BlockSource func(hash common.Hash, number uint64) *blocks.Block - var _ core.ChainContext = (*chainContext)(nil) type chainContext struct { - blocks BlockSource + blocks blocks.Source log logging.Logger } func (c *chainContext) GetHeader(h common.Hash, n uint64) *types.Header { - b := c.blocks(h, n) - if b == nil { - return nil - } - return b.Header() + return c.blocks.Header(h, n) } func (c *chainContext) Engine() consensus.Engine { diff --git a/saexec/execution.go b/saexec/execution.go index 68df2b7..1fba350 100644 --- a/saexec/execution.go +++ b/saexec/execution.go @@ -32,6 +32,8 @@ func (e *Executor) Enqueue(ctx context.Context, block *blocks.Block) error { for { select { case e.queue <- block: + e.lastEnqueued.Store(block) + e.enqueueEvents.Send(block.EthBlock()) return nil case <-ctx.Done(): diff --git a/saexec/saexec.go b/saexec/saexec.go index c3c069d..7aea840 100644 --- a/saexec/saexec.go +++ b/saexec/saexec.go @@ -31,12 +31,13 @@ type Executor struct { log logging.Logger hooks hook.Points - queue chan *blocks.Block - lastExecuted atomic.Pointer[blocks.Block] + queue chan *blocks.Block + lastEnqueued, lastExecuted atomic.Pointer[blocks.Block] - headEvents event.FeedOf[core.ChainHeadEvent] - chainEvents event.FeedOf[core.ChainEvent] - logEvents event.FeedOf[[]*types.Log] + enqueueEvents event.FeedOf[*types.Block] + headEvents event.FeedOf[core.ChainHeadEvent] + chainEvents event.FeedOf[core.ChainEvent] + logEvents event.FeedOf[[]*types.Log] chainContext core.ChainContext chainConfig *params.ChainConfig @@ -55,7 +56,7 @@ type Executor struct { // executed block after shutdown and recovery. func New( lastExecuted *blocks.Block, - blockSrc BlockSource, + blockSrc blocks.Source, chainConfig *params.ChainConfig, db ethdb.Database, triedbConfig *triedb.Config, @@ -84,6 +85,7 @@ func New( stateCache: cache, snaps: snaps, } + e.lastEnqueued.Store(lastExecuted) e.lastExecuted.Store(lastExecuted) go e.processQueue() @@ -124,3 +126,8 @@ func (e *Executor) StateCache() state.Database { func (e *Executor) LastExecuted() *blocks.Block { return e.lastExecuted.Load() } + +// LastEnqueued returns the last-enqueued block in a threadsafe manner. +func (e *Executor) LastEnqueued() *blocks.Block { + return e.lastEnqueued.Load() +} diff --git a/saexec/saexec_test.go b/saexec/saexec_test.go index e97c9ba..7e6c94b 100644 --- a/saexec/saexec_test.go +++ b/saexec/saexec_test.go @@ -34,7 +34,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/goleak" - "github.com/ava-labs/strevm/blocks" "github.com/ava-labs/strevm/blocks/blockstest" "github.com/ava-labs/strevm/cmputils" "github.com/ava-labs/strevm/gastime" @@ -87,15 +86,8 @@ func newSUT(tb testing.TB, hooks hook.Points) (context.Context, SUT) { chain.SetDefaultOptions(blockstest.WithBlockOptions( blockstest.WithLogger(logger), )) - src := BlockSource(func(h common.Hash, n uint64) *blocks.Block { - b, ok := chain.GetBlock(h, n) - if !ok { - return nil - } - return b - }) - e, err := New(genesis, src, config, db, tdbConfig, hooks, logger) + e, err := New(genesis, chain.GetBlock, config, db, tdbConfig, hooks, logger) require.NoError(tb, err, "New()") tb.Cleanup(func() { require.NoErrorf(tb, e.Close(), "%T.Close()", e) diff --git a/saexec/subscription.go b/saexec/subscription.go index 24e6052..bf2fd70 100644 --- a/saexec/subscription.go +++ b/saexec/subscription.go @@ -24,6 +24,12 @@ func (e *Executor) sendPostExecutionEvents(b *types.Block, receipts types.Receip e.logEvents.Send(logs) } +// SubscribeBlockEnqueueEvent returns a new subscription for each block queued +// [Executor.Enqueue]. +func (e *Executor) SubscribeBlockEnqueueEvent(ch chan<- *types.Block) event.Subscription { + return e.enqueueEvents.Subscribe(ch) +} + // SubscribeChainHeadEvent returns a new subscription for each // [core.ChainHeadEvent] emitted after execution of a [blocks.Block]. func (e *Executor) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { diff --git a/txgossip/blockchain.go b/txgossip/blockchain.go new file mode 100644 index 0000000..2a5a408 --- /dev/null +++ b/txgossip/blockchain.go @@ -0,0 +1,108 @@ +// Copyright (C) 2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package txgossip + +import ( + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core" + "github.com/ava-labs/libevm/core/state" + "github.com/ava-labs/libevm/core/txpool" + "github.com/ava-labs/libevm/core/txpool/legacypool" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/event" + "github.com/ava-labs/libevm/params" + + "github.com/ava-labs/strevm/blocks" + "github.com/ava-labs/strevm/saexec" +) + +// A BlockChain is the union of [txpool.BlockChain] and [legacypool.BlockChain]. +type BlockChain interface { + txpool.BlockChain + legacypool.BlockChain +} + +// NewBlockChain wraps an [saexec.Executor] to be compatible with a +// non-blob-transaction mempool. +func NewBlockChain(exec *saexec.Executor, blocks blocks.Source) BlockChain { + return &blockchain{ + exec: exec, + blocks: blocks, + } +} + +type blockchain struct { + exec *saexec.Executor + blocks blocks.Source +} + +func (bc *blockchain) Config() *params.ChainConfig { + return bc.exec.ChainConfig() +} + +func (bc *blockchain) CurrentBlock() *types.Header { + return bc.exec.LastEnqueued().Header() +} + +func (bc *blockchain) GetBlock(hash common.Hash, number uint64) *types.Block { + return bc.blocks.EthBlock(hash, number) +} + +func (bc *blockchain) StateAt(root common.Hash) (*state.StateDB, error) { + return state.New(root, bc.exec.StateCache(), nil) +} + +// SubscribeChainHeadEvent subscribes to block enqueueing, NOT to regular head +// events as these only occur after execution. Enqueuing is equivalent to block +// acceptance, which is when a transaction SHOULD be removed from the mempool. +func (bc *blockchain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + bCh := make(chan *types.Block) + sub := bc.exec.SubscribeBlockEnqueueEvent(bCh) + + p := &pipe{ + in: bCh, + out: ch, + sub: sub, + quit: make(chan struct{}), + done: make(chan struct{}), + } + go p.loop() + return p +} + +// A pipe is an [event.Subscription] that converts from a [types.Block] channel +// to a [core.ChainHeadEvent] one. +type pipe struct { + in <-chan *types.Block + out chan<- core.ChainHeadEvent + sub event.Subscription + quit, done chan struct{} +} + +func (p *pipe) loop() { + defer close(p.done) + for { + select { + case b := <-p.in: + select { + case p.out <- core.ChainHeadEvent{Block: b}: + + case <-p.quit: + return + } + case <-p.quit: + return + } + } +} + +func (p *pipe) Err() <-chan error { + return p.sub.Err() +} + +func (p *pipe) Unsubscribe() { + p.sub.Unsubscribe() + close(p.quit) + <-p.done +} diff --git a/txgossip/priority.go b/txgossip/priority.go new file mode 100644 index 0000000..3a246ed --- /dev/null +++ b/txgossip/priority.go @@ -0,0 +1,54 @@ +// Copyright (C) 2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package txgossip + +import ( + "slices" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/txpool" +) + +// A LazyTransaction couples a [txpool.LazyTransaction] with its sender. +type LazyTransaction struct { + *txpool.LazyTransaction + Sender common.Address +} + +// TransactionsByPriority calls [txpool.TxPool.Pending] with the given filter, +// collapses the results into a slice, and sorts said slice by decreasing gas +// tip then chronologically. Transactions from the same sender are merely sorted +// by increasing nonce. +func (s *Set) TransactionsByPriority(filter txpool.PendingFilter) []*LazyTransaction { + pending := s.Pool.Pending(filter) + var n int + for _, txs := range pending { + n += len(txs) + } + + all := make([]*LazyTransaction, n) + var i int + for from, txs := range pending { + for _, tx := range txs { + all[i] = &LazyTransaction{ + LazyTransaction: tx, + Sender: from, + } + i++ + } + } + + slices.SortStableFunc(all, func(a, b *LazyTransaction) int { + if a.Sender == b.Sender { + // [txpool.TxPool.Pending] already returns each slice in nonce order + // and we're performing a stable sort. + return 0 + } + if tip := a.GasTipCap.Cmp(b.GasTipCap); tip != 0 { + return -tip // Higher tips first + } + return a.Time.Compare(b.Time) + }) + return all +} diff --git a/txgossip/txgossip.go b/txgossip/txgossip.go new file mode 100644 index 0000000..5d4fb71 --- /dev/null +++ b/txgossip/txgossip.go @@ -0,0 +1,163 @@ +// Copyright (C) 2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +// Package txgossip provides a mempool for [Streaming Asynchronous Execution], +// which is also compatible with AvalancheGo's [gossip] mechanism. +// +// [Streaming Asynchronous Execution]: https://github.com/avalanche-foundation/ACPs/tree/main/ACPs/194-streaming-asynchronous-execution +package txgossip + +import ( + "errors" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core" + "github.com/ava-labs/libevm/core/txpool" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/event" + "github.com/ava-labs/libevm/rlp" + "go.uber.org/zap" +) + +var ( + _ gossip.Gossipable = Transaction{} + _ gossip.Marshaller[Transaction] = Marshaller{} + _ gossip.Set[Transaction] = (*Set)(nil) +) + +// A Transaction is a [gossip.Gossipable] wrapper for a [types.Transaction]. +type Transaction struct { + *types.Transaction +} + +// GossipID returns the transaction hash. +func (tx Transaction) GossipID() ids.ID { + return ids.ID(tx.Hash()) +} + +// A Marshaller implements [gossip.Marshaller] for [Transaction], based on RLP +// encoding. +type Marshaller struct{} + +// MarshalGossip returns the [rlp] encoding of the underlying +// [types.Transaction]. +func (Marshaller) MarshalGossip(tx Transaction) ([]byte, error) { + return rlp.EncodeToBytes(tx.Transaction) +} + +// UnmarshalGossip [rlp] decodes the buffer into a [types.Transaction]. +func (Marshaller) UnmarshalGossip(buf []byte) (Transaction, error) { + tx := Transaction{new(types.Transaction)} + if err := rlp.DecodeBytes(buf, tx.Transaction); err != nil { + return Transaction{}, err + } + return tx, nil +} + +// A Set is a [gossip.Set] wrapping a [txpool.TxPool]. Transactions MAY be added +// to the pool directly, or via [Set.Add]. +type Set struct { + Pool *txpool.TxPool + bloom *gossip.BloomFilter + + txSub event.Subscription + bloomDone chan error +} + +// NewSet returns a new [gossip.Set]. [Set.Close] MUST be called to release +// resources. +func NewSet(logger logging.Logger, pool *txpool.TxPool, bloom *gossip.BloomFilter) *Set { + fillBloomFilter(pool, bloom) + + txs := make(chan core.NewTxsEvent) + s := &Set{ + Pool: pool, + bloom: bloom, + txSub: pool.SubscribeTransactions(txs, false), + bloomDone: make(chan error, 1), + } + go s.maintainBloomFilter(logger, txs) + + return s +} + +func fillBloomFilter(pool *txpool.TxPool, bloom *gossip.BloomFilter) { + pending, queued := pool.Content() + for _, txSet := range []map[common.Address][]*types.Transaction{pending, queued} { + for _, txs := range txSet { + for _, tx := range txs { + bloom.Add(Transaction{tx}) + } + } + } +} + +func (s *Set) maintainBloomFilter(logger logging.Logger, txs <-chan core.NewTxsEvent) { + for { + select { + case ev := <-txs: + pending, queued := s.Pool.Stats() + if _, err := gossip.ResetBloomFilterIfNeeded(s.bloom, 2*(pending+queued)); err != nil { + logger.Error("Resetting mempool bloom filter", zap.Error(err)) + } + for _, tx := range ev.Txs { + s.bloom.Add(Transaction{tx}) + } + + case err, ok := <-s.txSub.Err(): + // [event.Subscription] documents semantics of its error channel, + // stating that at most one error will ever be sent, and that it + // will be closed when unsubscribing. + if ok { + logger.Error("TxPool subscription", zap.Error(err)) + s.bloomDone <- err + } else { + close(s.bloomDone) + } + return + } + } +} + +// Close stops background work being performed by the [Set], and returns the +// last error encountered by said processes. +func (s *Set) Close() error { + s.txSub.Unsubscribe() + return <-s.bloomDone +} + +// Add is a wrapper around [txpool.TxPool.Add], exposed to accept transactions +// over [gossip]. It MAY be bypassed, and the pool's method accessed directly. +func (s *Set) Add(tx Transaction) error { + errs := s.Pool.Add([]*types.Transaction{tx.Transaction}, false, false) + return errors.Join(errs...) +} + +// Has returns [txpool.TxPool.Has]. +func (s *Set) Has(id ids.ID) bool { + return s.Pool.Has(common.Hash(id)) +} + +// Iterate calls `fn` for every pending and queued transaction returned by +// [txpool.TxPool.Content] +func (s *Set) Iterate(fn func(Transaction) bool) { + pending, queued := s.Pool.Content() + for _, group := range []map[common.Address][]*types.Transaction{pending, queued} { + for _, txs := range group { + for _, tx := range txs { + if !fn(Transaction{tx}) { + return + } + } + } + } +} + +// GetFilter returns [gossip.BloomFilter.Marshal] for a Bloom filter of the +// transactions in the pool. +func (s *Set) GetFilter() ([]byte, []byte) { + return s.bloom.Marshal() +} diff --git a/txgossip/txgossip_test.go b/txgossip/txgossip_test.go new file mode 100644 index 0000000..f637bde --- /dev/null +++ b/txgossip/txgossip_test.go @@ -0,0 +1,345 @@ +// Copyright (C) 2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package txgossip + +import ( + "context" + "errors" + "math/big" + "math/rand/v2" + "path/filepath" + "slices" + "testing" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/network/p2p/p2ptest" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/txpool" + "github.com/ava-labs/libevm/core/txpool/legacypool" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/params" + "github.com/google/go-cmp/cmp" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/ava-labs/strevm/blocks/blockstest" + "github.com/ava-labs/strevm/cmputils" + "github.com/ava-labs/strevm/hook/hookstest" + "github.com/ava-labs/strevm/saetest" + "github.com/ava-labs/strevm/saexec" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain( + m, + goleak.IgnoreCurrent(), + goleak.IgnoreTopFunction("github.com/ava-labs/libevm/core/state/snapshot.(*diskLayer).generate"), + // Even with a call to [txpool.TxPool.Close], this leaks. We don't + // expect to open and close multiple pools, so it's ok to ignore. + goleak.IgnoreTopFunction("github.com/ava-labs/libevm/core/txpool.(*TxPool).loop.func2"), + ) +} + +// SUT is the system under test, primarily the [Set]. +type SUT struct { + *Set + chain *blockstest.ChainBuilder + wallet *saetest.Wallet + exec *saexec.Executor + bloom *gossip.BloomFilter +} + +func chainConfig() *params.ChainConfig { + return params.AllDevChainProtocolChanges +} + +func newWallet(tb testing.TB, numAccounts uint) *saetest.Wallet { + tb.Helper() + signer := types.LatestSigner(chainConfig()) + return saetest.NewUNSAFEWallet(tb, numAccounts, signer) +} + +func newSUT(t *testing.T, numAccounts uint, existingTxs ...*types.Transaction) SUT { + t.Helper() + logger := saetest.NewTBLogger(t, logging.Warn) + + wallet := newWallet(t, numAccounts) + config := chainConfig() + + db := rawdb.NewMemoryDatabase() + genesis := blockstest.NewGenesis(t, db, config, saetest.MaxAllocFor(wallet.Addresses()...)) + chain := blockstest.NewChainBuilder(genesis) + + exec, err := saexec.New(genesis, chain.GetBlock, config, db, nil, &hookstest.Stub{Target: 1e6}, logger) + require.NoError(t, err, "saexec.New()") + t.Cleanup(func() { + require.NoErrorf(t, exec.Close(), "%T.Close()", exec) + }) + + bloom, err := gossip.NewBloomFilter(prometheus.NewRegistry(), "", 1, 1e-9, 1e-9) + require.NoError(t, err, "gossip.NewBloomFilter([1 in a billion FP])") + + bc := NewBlockChain(exec, chain.GetBlock) + pool := newTxPool(t, bc) + require.NoErrorf(t, errors.Join(pool.Add(existingTxs, false, true)...), "%T.Add([existing txs in setup], local=false, sync=true)", pool) + set := NewSet(logger, pool, bloom) + t.Cleanup(func() { + assert.NoErrorf(t, set.Close(), "%T.Close()", set) + assert.NoErrorf(t, pool.Close(), "%T.Close()", pool) + }) + + return SUT{ + Set: set, + chain: chain, + wallet: wallet, + exec: exec, + bloom: bloom, + } +} + +func newTxPool(t *testing.T, bc BlockChain) *txpool.TxPool { + t.Helper() + + config := legacypool.DefaultConfig // copies + config.Journal = filepath.Join(t.TempDir(), "transactions.rlp") + subs := []txpool.SubPool{legacypool.New(config, bc)} + + p, err := txpool.New(1, bc, subs) + require.NoError(t, err, "txpool.New()") + return p +} + +func TestExecutorIntegration(t *testing.T) { + ctx := t.Context() + + const numAccounts = 3 + s := newSUT(t, numAccounts) + + rng := rand.New(rand.NewPCG(0, 0)) //nolint:gosec // Reproducibility is useful in tests + + const txPerAccount = 5 + const numTxs = numAccounts * txPerAccount + for range txPerAccount { + for i := range numAccounts { + tx := s.wallet.SetNonceAndSign(t, i, &types.DynamicFeeTx{ + To: &common.Address{}, + Gas: params.TxGas, + GasFeeCap: big.NewInt(100), + GasTipCap: big.NewInt(1 + rng.Int64N(3)), + }) + require.NoErrorf(t, s.Add(Transaction{tx}), "%T.Add()", s.Set) + } + } + + t.Run("Iterate_after_Add", func(t *testing.T) { + // Note that calls to [txpool.TxPool] are only necessary in tests, and MUST + // NOT be replicated in production. + require.NoErrorf(t, s.Pool.Sync(), "%T.Sync()", s.Pool) + require.Lenf(t, slices.Collect(s.Iterate), numTxs, "slices.Collect(%T.Iterate)", s.Set) + }) + if t.Failed() { + t.FailNow() + } + + var ( + txs types.Transactions + last *LazyTransaction + ) + for _, tx := range s.TransactionsByPriority(txpool.PendingFilter{}) { + txs = append(txs, tx.Resolve()) + + t.Run("priority_ordering", func(t *testing.T) { + defer func() { last = tx }() + + switch { + case last == nil: + case tx.Sender == last.Sender: + require.Equal(t, last.Tx.Nonce()+1, tx.Tx.Nonce(), "incrementing nonce for same sender") + case tx.GasTipCap.Eq(last.GasTipCap): + require.True(t, last.Time.Before(tx.Time), "equal gas tips ordered by first seen") + default: + require.Greater(t, last.GasTipCap.Uint64(), tx.GasTipCap.Uint64(), "larger gas tips first") + } + }) + } + require.Lenf(t, txs, numTxs, "%T.TransactionsByPriority()", s.Set) + + b := s.chain.NewBlock(t, txs) + require.NoErrorf(t, s.exec.Enqueue(ctx, b), "%T.Enqueue([txs from %T.TransactionsByPriority()])", s.exec, s.Set) + + assert.EventuallyWithTf( + t, func(c *assert.CollectT) { + require.NoErrorf(c, s.Pool.Sync(), "%T.Sync()", s.Pool) + assert.Emptyf(c, slices.Collect(s.Iterate), "slices.Collect(%T.Iterate)", s.Set) + for _, tx := range txs { + assert.Falsef(c, s.Has(ids.ID(tx.Hash())), "%T.Has(%#x)", s.Set, tx.Hash()) + } + }, + 2*time.Second, 10*time.Millisecond, + "empty %T after transactions included in a block", s.Set, + ) + + t.Run("block_execution", func(t *testing.T) { + // The above test for nonce ordering only runs if the same sender has 2 + // consecutive transactions. Successful execution demonstrates correct + // nonce ordering across the board. + require.NoErrorf(t, b.WaitUntilExecuted(ctx), "%T.WaitUntilExecuted()", b) + for i, r := range b.Receipts() { + assert.Equalf(t, types.ReceiptStatusSuccessful, r.Status, "%T[%d].Status", r, i) + } + }) +} + +func TestP2PIntegration(t *testing.T) { + ctx := t.Context() + + reg := prometheus.NewRegistry() + metrics, err := gossip.NewMetrics(reg, "") + require.NoError(t, err, "gossip.NewMetrics()") + + tests := []struct { + name string + gossiper func(_ logging.Logger, sendID ids.NodeID, send *Set, recvID ids.NodeID, recv *Set) (gossip.Gossiper, error) + }{ + { + name: "push", + gossiper: func(l logging.Logger, sendID ids.NodeID, send *Set, recvID ids.NodeID, recv *Set) (gossip.Gossiper, error) { + c := p2ptest.NewClient( + t, ctx, + sendID, p2p.NoOpHandler{}, + recvID, gossip.NewHandler(l, Marshaller{}, recv, metrics, 0), + ) + branch := gossip.BranchingFactor{Peers: 1} + return gossip.NewPushGossiper( + Marshaller{}, + send, + &stubPeers{[]ids.NodeID{recvID}}, + c, + metrics, + branch, branch, + 0, 1<<20, time.Millisecond, + ) + }, + }, + { + name: "pull", + gossiper: func(l logging.Logger, sendID ids.NodeID, send *Set, recvID ids.NodeID, recv *Set) (gossip.Gossiper, error) { + c := p2ptest.NewClient( + t, ctx, + sendID, gossip.NewHandler(l, Marshaller{}, send, metrics, 0), + recvID, gossip.NewHandler(l, Marshaller{}, recv, metrics, 0), + ) + return gossip.NewPullGossiper(l, Marshaller{}, recv, c, metrics, 1), nil + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := saetest.NewTBLogger(t, logging.Debug) + ctx = logger.CancelOnError(ctx) + + sendID := ids.GenerateTestNodeID() + recvID := ids.GenerateTestNodeID() + send := newSUT(t, 1) + // Although the receiving mempool doesn't need to sign transactions, create + // the same (deterministic) account so it has non-zero balance otherwise the + // mempool will reject it. + recv := newSUT(t, 1) + + tx := Transaction{ + Transaction: send.wallet.SetNonceAndSign(t, 0, &types.LegacyTx{ + To: &common.Address{}, + Gas: params.TxGas, + GasPrice: big.NewInt(1), + }), + } + require.NoErrorf(t, send.Add(tx), "%T.Add()", send.Set) + require.NoErrorf(t, send.Pool.Sync(), "sender %T.Sync()", send.Pool) + + gossiper, err := tt.gossiper(logger, sendID, send.Set, recvID, recv.Set) + require.NoError(t, err, "Bad test setup: gossiper creation failed") + if push, ok := gossiper.(*gossip.PushGossiper[Transaction]); ok { + push.Add(tx) + } + require.NoErrorf(t, gossiper.Gossip(ctx), "%T.Gossip()", gossiper) + + // The Bloom filter is the deepest in the stack of recipients, going + // [Set.Add] -> [txpool.TxPool] -> [legacypool.LegacyPool] -> + // [core.NewTxsEvent] -> [gossip.BloomFilter]. If it has it then + // everything upstream does too. + require.Eventuallyf( + t, func() bool { + return recv.bloom.Has(tx) + }, + 3*time.Second, 50*time.Millisecond, + "Receiving %T.bloom.Has(tx)", recv.Set, + ) + + assert.True(t, recv.Has(tx.GossipID()), "receiving %T.Has([tx])", recv.Set) + + want := slices.Collect(send.Iterate) + got := slices.Collect(recv.Iterate) + if diff := cmp.Diff(want, got, cmputils.TransactionsByHash()); diff != "" { + t.Errorf("slices.Collect(%T.Iterate) diff (-sender +receiver):\n%s", send.Set, diff) + } + }) + } +} + +type stubPeers struct { + ids []ids.NodeID +} + +var _ interface { + p2p.NodeSampler + p2p.ValidatorSubset +} = (*stubPeers)(nil) + +func (p *stubPeers) Sample(context.Context, int) []ids.NodeID { + return p.ids +} + +func (p *stubPeers) Top(context.Context, float64) []ids.NodeID { + return p.ids +} + +func TestTxPoolPopulatesBloomFilter(t *testing.T) { + // Note that the wallet addresses are deterministic, so this single address + // will have sufficient funds when the SUT is created below. + wallet := newWallet(t, 1) + + var txs types.Transactions + const n = 10 + for range n { + txs = append(txs, wallet.SetNonceAndSign(t, 0, &types.LegacyTx{ + To: &common.Address{}, + Gas: params.TxGas, + GasPrice: big.NewInt(1), + })) + } + + existing := txs[:n/2] + deferred := txs[n/2:] + + sut := newSUT(t, 1, existing...) + for i, tx := range existing { + assert.True(t, sut.bloom.Has(Transaction{tx}), "%T.Has(%#x [tx[%d] already in %T when constructing %T])", sut.bloom, tx.Hash(), i, sut.Pool, sut.Set) + } + + require.NoError(t, errors.Join(sut.Pool.Add(deferred, false, true)...)) + require.EventuallyWithT(t, func(c *assert.CollectT) { + for i, tx := range deferred { + idx := i + n/2 + require.Truef(c, sut.bloom.Has(Transaction{tx}), "%T.Has(%#x [tx[%d] added to %T after constructing %T])", sut.bloom, tx.Hash(), idx, sut.Pool, sut.Set) + } + }, 2*time.Second, 20*time.Millisecond) +}