Skip to content

Commit cac1b21

Browse files
authored
cmd/devp2p/internal/ethtest: add more tx propagation tests (#22630)
This adds a test for large tx announcement messages, as well as a test to check that announced tx hashes are requested by the node.
1 parent 49281ab commit cac1b21

File tree

5 files changed

+278
-58
lines changed

5 files changed

+278
-58
lines changed

cmd/devp2p/internal/ethtest/eth66_suite.go

Lines changed: 81 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package ethtest
1919
import (
2020
"time"
2121

22+
"github.com/ethereum/go-ethereum/common"
2223
"github.com/ethereum/go-ethereum/core/types"
2324
"github.com/ethereum/go-ethereum/crypto"
2425
"github.com/ethereum/go-ethereum/eth/protocols/eth"
@@ -125,22 +126,7 @@ func (s *Suite) TestSimultaneousRequests_66(t *utesting.T) {
125126
// TestBroadcast_66 tests whether a block announcement is correctly
126127
// propagated to the given node's peer(s) on the eth66 protocol.
127128
func (s *Suite) TestBroadcast_66(t *utesting.T) {
128-
sendConn, receiveConn := s.setupConnection66(t), s.setupConnection66(t)
129-
defer sendConn.Close()
130-
defer receiveConn.Close()
131-
132-
nextBlock := len(s.chain.blocks)
133-
blockAnnouncement := &NewBlock{
134-
Block: s.fullChain.blocks[nextBlock],
135-
TD: s.fullChain.TD(nextBlock + 1),
136-
}
137-
s.testAnnounce66(t, sendConn, receiveConn, blockAnnouncement)
138-
// update test suite chain
139-
s.chain.blocks = append(s.chain.blocks, s.fullChain.blocks[nextBlock])
140-
// wait for client to update its chain
141-
if err := receiveConn.waitForBlock66(s.chain.Head()); err != nil {
142-
t.Fatal(err)
143-
}
129+
s.sendNextBlock66(t)
144130
}
145131

146132
// TestGetBlockBodies_66 tests whether the given node can respond to
@@ -426,3 +412,82 @@ func (s *Suite) TestSameRequestID_66(t *utesting.T) {
426412
// check response from first request
427413
headersMatch(t, s.chain, s.getBlockHeaders66(t, conn, req1, reqID))
428414
}
415+
416+
// TestLargeTxRequest_66 tests whether a node can fulfill a large GetPooledTransactions
417+
// request.
418+
func (s *Suite) TestLargeTxRequest_66(t *utesting.T) {
419+
// send the next block to ensure the node is no longer syncing and is able to accept
420+
// txs
421+
s.sendNextBlock66(t)
422+
// send 2000 transactions to the node
423+
hashMap, txs := generateTxs(t, s, 2000)
424+
sendConn := s.setupConnection66(t)
425+
defer sendConn.Close()
426+
427+
sendMultipleSuccessfulTxs(t, s, sendConn, txs)
428+
// set up connection to receive to ensure node is peered with the receiving connection
429+
// before tx request is sent
430+
recvConn := s.setupConnection66(t)
431+
defer recvConn.Close()
432+
// create and send pooled tx request
433+
hashes := make([]common.Hash, 0)
434+
for _, hash := range hashMap {
435+
hashes = append(hashes, hash)
436+
}
437+
getTxReq := &eth.GetPooledTransactionsPacket66{
438+
RequestId: 1234,
439+
GetPooledTransactionsPacket: hashes,
440+
}
441+
if err := recvConn.write66(getTxReq, GetPooledTransactions{}.Code()); err != nil {
442+
t.Fatalf("could not write to conn: %v", err)
443+
}
444+
// check that all received transactions match those that were sent to node
445+
switch msg := recvConn.waitForResponse(s.chain, timeout, getTxReq.RequestId).(type) {
446+
case PooledTransactions:
447+
for _, gotTx := range msg {
448+
if _, exists := hashMap[gotTx.Hash()]; !exists {
449+
t.Fatalf("unexpected tx received: %v", gotTx.Hash())
450+
}
451+
}
452+
default:
453+
t.Fatalf("unexpected %s", pretty.Sdump(msg))
454+
}
455+
}
456+
457+
// TestNewPooledTxs_66 tests whether a node will do a GetPooledTransactions
458+
// request upon receiving a NewPooledTransactionHashes announcement.
459+
func (s *Suite) TestNewPooledTxs_66(t *utesting.T) {
460+
// send the next block to ensure the node is no longer syncing and is able to accept
461+
// txs
462+
s.sendNextBlock66(t)
463+
// generate 50 txs
464+
hashMap, _ := generateTxs(t, s, 50)
465+
// create new pooled tx hashes announcement
466+
hashes := make([]common.Hash, 0)
467+
for _, hash := range hashMap {
468+
hashes = append(hashes, hash)
469+
}
470+
announce := NewPooledTransactionHashes(hashes)
471+
// send announcement
472+
conn := s.setupConnection66(t)
473+
defer conn.Close()
474+
if err := conn.Write(announce); err != nil {
475+
t.Fatalf("could not write to connection: %v", err)
476+
}
477+
// wait for GetPooledTxs request
478+
for {
479+
_, msg := conn.readAndServe66(s.chain, timeout)
480+
switch msg := msg.(type) {
481+
case GetPooledTransactions:
482+
if len(msg) != len(hashes) {
483+
t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg))
484+
}
485+
return
486+
case *NewPooledTransactionHashes:
487+
// ignore propagated txs from old tests
488+
continue
489+
default:
490+
t.Fatalf("unexpected %s", pretty.Sdump(msg))
491+
}
492+
}
493+
}

cmd/devp2p/internal/ethtest/eth66_suiteHelpers.go

Lines changed: 69 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,18 @@ func (c *Conn) read66() (uint64, Message) {
111111
msg = new(Transactions)
112112
case (NewPooledTransactionHashes{}).Code():
113113
msg = new(NewPooledTransactionHashes)
114+
case (GetPooledTransactions{}.Code()):
115+
ethMsg := new(eth.GetPooledTransactionsPacket66)
116+
if err := rlp.DecodeBytes(rawData, ethMsg); err != nil {
117+
return 0, errorf("could not rlp decode message: %v", err)
118+
}
119+
return ethMsg.RequestId, GetPooledTransactions(ethMsg.GetPooledTransactionsPacket)
120+
case (PooledTransactions{}.Code()):
121+
ethMsg := new(eth.PooledTransactionsPacket66)
122+
if err := rlp.DecodeBytes(rawData, ethMsg); err != nil {
123+
return 0, errorf("could not rlp decode message: %v", err)
124+
}
125+
return ethMsg.RequestId, PooledTransactions(ethMsg.PooledTransactionsPacket)
114126
default:
115127
msg = errorf("invalid message code: %d", code)
116128
}
@@ -124,6 +136,15 @@ func (c *Conn) read66() (uint64, Message) {
124136
return 0, errorf("invalid message: %s", string(rawData))
125137
}
126138

139+
func (c *Conn) waitForResponse(chain *Chain, timeout time.Duration, requestID uint64) Message {
140+
for {
141+
id, msg := c.readAndServe66(chain, timeout)
142+
if id == requestID {
143+
return msg
144+
}
145+
}
146+
}
147+
127148
// ReadAndServe serves GetBlockHeaders requests while waiting
128149
// on another message from the node.
129150
func (c *Conn) readAndServe66(chain *Chain, timeout time.Duration) (uint64, Message) {
@@ -173,27 +194,33 @@ func (s *Suite) testAnnounce66(t *utesting.T, sendConn, receiveConn *Conn, block
173194
}
174195

175196
func (s *Suite) waitAnnounce66(t *utesting.T, conn *Conn, blockAnnouncement *NewBlock) {
176-
timeout := 20 * time.Second
177-
_, msg := conn.readAndServe66(s.chain, timeout)
178-
switch msg := msg.(type) {
179-
case *NewBlock:
180-
t.Logf("received NewBlock message: %s", pretty.Sdump(msg.Block))
181-
assert.Equal(t,
182-
blockAnnouncement.Block.Header(), msg.Block.Header(),
183-
"wrong block header in announcement",
184-
)
185-
assert.Equal(t,
186-
blockAnnouncement.TD, msg.TD,
187-
"wrong TD in announcement",
188-
)
189-
case *NewBlockHashes:
190-
blockHashes := *msg
191-
t.Logf("received NewBlockHashes message: %s", pretty.Sdump(blockHashes))
192-
assert.Equal(t, blockAnnouncement.Block.Hash(), blockHashes[0].Hash,
193-
"wrong block hash in announcement",
194-
)
195-
default:
196-
t.Fatalf("unexpected: %s", pretty.Sdump(msg))
197+
for {
198+
_, msg := conn.readAndServe66(s.chain, timeout)
199+
switch msg := msg.(type) {
200+
case *NewBlock:
201+
t.Logf("received NewBlock message: %s", pretty.Sdump(msg.Block))
202+
assert.Equal(t,
203+
blockAnnouncement.Block.Header(), msg.Block.Header(),
204+
"wrong block header in announcement",
205+
)
206+
assert.Equal(t,
207+
blockAnnouncement.TD, msg.TD,
208+
"wrong TD in announcement",
209+
)
210+
return
211+
case *NewBlockHashes:
212+
blockHashes := *msg
213+
t.Logf("received NewBlockHashes message: %s", pretty.Sdump(blockHashes))
214+
assert.Equal(t, blockAnnouncement.Block.Hash(), blockHashes[0].Hash,
215+
"wrong block hash in announcement",
216+
)
217+
return
218+
case *NewPooledTransactionHashes:
219+
// ignore old txs being propagated
220+
continue
221+
default:
222+
t.Fatalf("unexpected: %s", pretty.Sdump(msg))
223+
}
197224
}
198225
}
199226

@@ -268,3 +295,24 @@ func headersMatch(t *utesting.T, chain *Chain, headers BlockHeaders) {
268295
assert.Equal(t, chain.blocks[int(num)].Header(), header)
269296
}
270297
}
298+
299+
func (s *Suite) sendNextBlock66(t *utesting.T) {
300+
sendConn, receiveConn := s.setupConnection66(t), s.setupConnection66(t)
301+
defer sendConn.Close()
302+
defer receiveConn.Close()
303+
304+
// create new block announcement
305+
nextBlock := len(s.chain.blocks)
306+
blockAnnouncement := &NewBlock{
307+
Block: s.fullChain.blocks[nextBlock],
308+
TD: s.fullChain.TD(nextBlock + 1),
309+
}
310+
// send announcement and wait for node to request the header
311+
s.testAnnounce66(t, sendConn, receiveConn, blockAnnouncement)
312+
// update test suite chain
313+
s.chain.blocks = append(s.chain.blocks, s.fullChain.blocks[nextBlock])
314+
// wait for client to update its chain
315+
if err := receiveConn.waitForBlock66(s.chain.Head()); err != nil {
316+
t.Fatal(err)
317+
}
318+
}

cmd/devp2p/internal/ethtest/suite.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ func (s *Suite) AllEthTests() []utesting.Test {
9797
{Name: "TestTransaction_66", Fn: s.TestTransaction_66},
9898
{Name: "TestMaliciousTx", Fn: s.TestMaliciousTx},
9999
{Name: "TestMaliciousTx_66", Fn: s.TestMaliciousTx_66},
100+
{Name: "TestLargeTxRequest_66", Fn: s.TestLargeTxRequest_66},
101+
{Name: "TestNewPooledTxs_66", Fn: s.TestNewPooledTxs_66},
100102
}
101103
}
102104

@@ -129,6 +131,8 @@ func (s *Suite) Eth66Tests() []utesting.Test {
129131
{Name: "TestMaliciousStatus_66", Fn: s.TestMaliciousStatus_66},
130132
{Name: "TestTransaction_66", Fn: s.TestTransaction_66},
131133
{Name: "TestMaliciousTx_66", Fn: s.TestMaliciousTx_66},
134+
{Name: "TestLargeTxRequest_66", Fn: s.TestLargeTxRequest_66},
135+
{Name: "TestNewPooledTxs_66", Fn: s.TestNewPooledTxs_66},
132136
}
133137
}
134138

@@ -455,7 +459,6 @@ func (s *Suite) testAnnounce(t *utesting.T, sendConn, receiveConn *Conn, blockAn
455459
}
456460

457461
func (s *Suite) waitAnnounce(t *utesting.T, conn *Conn, blockAnnouncement *NewBlock) {
458-
timeout := 20 * time.Second
459462
switch msg := conn.ReadAndServe(s.chain, timeout).(type) {
460463
case *NewBlock:
461464
t.Logf("received NewBlock message: %s", pretty.Sdump(msg.Block))

0 commit comments

Comments
 (0)