@@ -30,12 +30,14 @@ import (
3030 "github.com/ethereum/go-ethereum/core/rawdb"
3131 "github.com/ethereum/go-ethereum/core/types"
3232 "github.com/ethereum/go-ethereum/core/vm"
33+ "github.com/ethereum/go-ethereum/crypto/kzg4844"
3334 "github.com/ethereum/go-ethereum/eth/downloader"
3435 "github.com/ethereum/go-ethereum/eth/protocols/eth"
3536 "github.com/ethereum/go-ethereum/event"
3637 "github.com/ethereum/go-ethereum/p2p"
3738 "github.com/ethereum/go-ethereum/p2p/enode"
3839 "github.com/ethereum/go-ethereum/params"
40+ "github.com/holiman/uint256"
3941)
4042
4143// testEthHandler is a mock event handler to listen for inbound network requests
@@ -71,7 +73,11 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
7173 return nil
7274
7375 case * eth.PooledTransactionsPacket :
74- h .txBroadcasts .Send (([]* types.Transaction )(* packet ))
76+ var txs []* types.Transaction
77+ for _ , tx := range * packet {
78+ txs = append (txs , tx .Transaction )
79+ }
80+ h .txBroadcasts .Send (txs )
7581 return nil
7682
7783 default :
@@ -454,6 +460,87 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
454460 }
455461}
456462
463+ func TestBlobTransactionPropagation68 (t * testing.T ) { testBlobTransactionPropagation (t , eth .ETH68 ) }
464+
465+ func testBlobTransactionPropagation (t * testing.T , protocol uint ) {
466+ t .Parallel ()
467+
468+ // Create a source handler to send transactions from and a number of sinks
469+ // to receive them. We need multiple sinks since a one-to-one peering would
470+ // broadcast all transactions without announcement.
471+ source := newTestHandler ()
472+ source .handler .snapSync .Store (false ) // Avoid requiring snap, otherwise some will be dropped below
473+ defer source .close ()
474+
475+ sinks := make ([]* testHandler , 10 )
476+ for i := 0 ; i < len (sinks ); i ++ {
477+ sinks [i ] = newTestHandler ()
478+ defer sinks [i ].close ()
479+
480+ sinks [i ].handler .acceptTxs .Store (true ) // mark synced to accept transactions
481+ }
482+ // Interconnect all the sink handlers with the source handler
483+ for i , sink := range sinks {
484+ sink := sink // Closure for gorotuine below
485+
486+ sourcePipe , sinkPipe := p2p .MsgPipe ()
487+ defer sourcePipe .Close ()
488+ defer sinkPipe .Close ()
489+
490+ sourcePeer := eth .NewPeer (protocol , p2p .NewPeerPipe (enode.ID {byte (i + 1 )}, "" , nil , sourcePipe ), sourcePipe , source .txpool )
491+ sinkPeer := eth .NewPeer (protocol , p2p .NewPeerPipe (enode.ID {0 }, "" , nil , sinkPipe ), sinkPipe , sink .txpool )
492+ defer sourcePeer .Close ()
493+ defer sinkPeer .Close ()
494+
495+ go source .handler .runEthPeer (sourcePeer , func (peer * eth.Peer ) error {
496+ return eth .Handle ((* ethHandler )(source .handler ), peer )
497+ })
498+ go sink .handler .runEthPeer (sinkPeer , func (peer * eth.Peer ) error {
499+ return eth .Handle ((* ethHandler )(sink .handler ), peer )
500+ })
501+ }
502+ // Subscribe to all the transaction pools
503+ txChs := make ([]chan core.NewTxsEvent , len (sinks ))
504+ for i := 0 ; i < len (sinks ); i ++ {
505+ txChs [i ] = make (chan core.NewTxsEvent , 1024 )
506+
507+ sub := sinks [i ].txpool .SubscribeNewTxsEvent (txChs [i ])
508+ defer sub .Unsubscribe ()
509+ }
510+ // Fill the source pool with transactions and wait for them at the sinks
511+ txs := make ([]* txpool.Transaction , 1024 )
512+ for nonce := range txs {
513+ tx := types .NewTx (& types.BlobTx {
514+ Nonce : uint64 (nonce ),
515+ To : common.Address {},
516+ GasTipCap : uint256 .NewInt (0 ),
517+ GasFeeCap : uint256 .NewInt (0 ),
518+ Gas : 100000 ,
519+ Value : uint256 .NewInt (0 ),
520+ BlobHashes : make ([]common.Hash , 1 ),
521+ ChainID : uint256 .NewInt (0 ),
522+ Data : nil ,
523+ })
524+ tx , _ = types .SignTx (tx , types .NewCancunSigner (common .Big0 ), testKey )
525+
526+ txs [nonce ] = & txpool.Transaction {Tx : tx , BlobTxBlobs : make ([]kzg4844.Blob , 1 ), BlobTxCommits : make ([]kzg4844.Commitment , 1 ), BlobTxProofs : make ([]kzg4844.Proof , 1 )}
527+ }
528+ source .txpool .Add (txs , false , false )
529+
530+ // Iterate through all the sinks and ensure they all got the transactions
531+ for i := range sinks {
532+ for arrived , timeout := 0 , false ; arrived < len (txs ) && ! timeout ; {
533+ select {
534+ case event := <- txChs [i ]:
535+ arrived += len (event .Txs )
536+ case <- time .After (2 * time .Second ):
537+ t .Errorf ("sink %d: transaction propagation timed out: have %d, want %d" , i , arrived , len (txs ))
538+ timeout = true
539+ }
540+ }
541+ }
542+ }
543+
457544// Tests that blocks are broadcast to a sqrt number of peers only.
458545func TestBroadcastBlock1Peer (t * testing.T ) { testBroadcastBlock (t , 1 , 1 ) }
459546func TestBroadcastBlock2Peers (t * testing.T ) { testBroadcastBlock (t , 2 , 1 ) }
0 commit comments