Skip to content

Commit 92f3405

Browse files
authored
eth, les: fix time sensitive unit tests (#20741)
1 parent b1efff6 commit 92f3405

File tree

8 files changed

+52
-39
lines changed

8 files changed

+52
-39
lines changed

eth/handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
534534
}
535535
}
536536
// Wait until the test timeout passes to ensure proper cleanup
537-
time.Sleep(syncChallengeTimeout + 100*time.Millisecond)
537+
time.Sleep(syncChallengeTimeout + 300*time.Millisecond)
538538

539539
// Verify that the remote peer is maintained or dropped
540540
if drop {

les/client_handler.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package les
1919
import (
2020
"math/big"
2121
"sync"
22+
"sync/atomic"
2223
"time"
2324

2425
"github.com/ethereum/go-ethereum/common"
@@ -132,6 +133,10 @@ func (h *clientHandler) handle(p *serverPeer) error {
132133
if p.poolEntry != nil {
133134
h.backend.serverPool.registered(p.poolEntry)
134135
}
136+
// Mark the peer starts to be served.
137+
atomic.StoreUint32(&p.serving, 1)
138+
defer atomic.StoreUint32(&p.serving, 0)
139+
135140
// Spawn a main loop to handle all incoming messages.
136141
for {
137142
if err := h.handleMsg(p); err != nil {

les/odr_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,6 @@ func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn od
186186
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true)
187187
defer tearDown()
188188

189-
client.handler.synchronise(client.peer.speer)
190-
191189
// Ensure the client has synced all necessary data.
192190
clientHead := client.handler.backend.blockchain.CurrentHeader()
193191
if clientHead.Number.Uint64() != 4 {

les/peer.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ type peerCommons struct {
131131
network uint64 // Network ID being on.
132132
frozen uint32 // Flag whether the peer is frozen.
133133
announceType uint64 // New block announcement type.
134+
serving uint32 // The status indicates the peer is served.
134135
headInfo blockInfo // Latest block information.
135136

136137
// Background task queue for caching peer tasks and executing in order.
@@ -636,13 +637,12 @@ type clientPeer struct {
636637

637638
// responseLock ensures that responses are queued in the same order as
638639
// RequestProcessed is called
639-
responseLock sync.Mutex
640-
server bool
641-
invalidCount uint32 // Counter the invalid request the client peer has made.
642-
responseCount uint64 // Counter to generate an unique id for request processing.
643-
errCh chan error
644-
fcClient *flowcontrol.ClientNode // Server side mirror token bucket.
645-
balanceTracker *balanceTracker // set by clientPool.connect, used and removed by serverHandler
640+
responseLock sync.Mutex
641+
server bool
642+
invalidCount uint32 // Counter the invalid request the client peer has made.
643+
responseCount uint64 // Counter to generate an unique id for request processing.
644+
errCh chan error
645+
fcClient *flowcontrol.ClientNode // Server side mirror token bucket.
646646
}
647647

648648
func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *clientPeer {

les/request_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
8181
// Assemble the test environment
8282
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true)
8383
defer tearDown()
84-
client.handler.synchronise(client.peer.speer)
8584

8685
// Ensure the client has synced all necessary data.
8786
clientHead := client.handler.backend.blockchain.CurrentHeader()

les/server_handler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,9 @@ func (h *serverHandler) handle(p *clientPeer) error {
157157
clientConnectionGauge.Update(int64(h.server.peers.len()))
158158
connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
159159
}()
160+
// Mark the peer starts to be served.
161+
atomic.StoreUint32(&p.serving, 1)
162+
defer atomic.StoreUint32(&p.serving, 0)
160163

161164
// Spawn a main loop to handle all incoming messages.
162165
for {

les/sync_test.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -109,16 +109,12 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
109109
}
110110

111111
// Create connected peer pair.
112-
peer1, err1, peer2, err2 := newTestPeerPair("peer", protocol, server.handler, client.handler)
112+
peer1, peer2, err := newTestPeerPair("peer", protocol, server.handler, client.handler)
113+
if err != nil {
114+
t.Fatalf("Failed to connect testing peers %v", err)
115+
}
113116
defer peer1.close()
114117
defer peer2.close()
115-
select {
116-
case <-time.After(time.Millisecond * 100):
117-
case err := <-err1:
118-
t.Fatalf("peer 1 handshake error: %v", err)
119-
case err := <-err2:
120-
t.Fatalf("peer 2 handshake error: %v", err)
121-
}
122118

123119
select {
124120
case err := <-done:
@@ -208,17 +204,10 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) {
208204
done <- fmt.Errorf("blockchain length mismatch, want %d, got %d", expected, header.Number)
209205
}
210206
}
211-
212207
// Create connected peer pair.
213-
_, err1, _, err2 := newTestPeerPair("peer", 2, server.handler, client.handler)
214-
select {
215-
case <-time.After(time.Millisecond * 100):
216-
case err := <-err1:
217-
t.Fatalf("peer 1 handshake error: %v", err)
218-
case err := <-err2:
219-
t.Fatalf("peer 2 handshake error: %v", err)
208+
if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler); err != nil {
209+
t.Fatalf("Failed to connect testing peers %v", err)
220210
}
221-
222211
select {
223212
case err := <-done:
224213
if err != nil {

les/test_helper.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ package les
2222
import (
2323
"context"
2424
"crypto/rand"
25+
"fmt"
2526
"math/big"
27+
"sync/atomic"
2628
"testing"
2729
"time"
2830

@@ -347,7 +349,7 @@ func (p *testPeer) close() {
347349
p.app.Close()
348350
}
349351

350-
func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler) (*testPeer, <-chan error, *testPeer, <-chan error) {
352+
func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler) (*testPeer, *testPeer, error) {
351353
// Create a message pipe to communicate through
352354
app, net := p2p.MsgPipe()
353355

@@ -371,11 +373,25 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl
371373
go func() {
372374
select {
373375
case <-client.closeCh:
374-
errc1 <- p2p.DiscQuitting
375-
case errc1 <- client.handle(peer2):
376+
errc2 <- p2p.DiscQuitting
377+
case errc2 <- client.handle(peer2):
376378
}
377379
}()
378-
return &testPeer{cpeer: peer1, net: net, app: app}, errc1, &testPeer{speer: peer2, net: app, app: net}, errc2
380+
// Ensure the connection is established or exits when any error occurs
381+
for {
382+
select {
383+
case err := <-errc1:
384+
return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err)
385+
case err := <-errc2:
386+
return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err)
387+
default:
388+
}
389+
if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 {
390+
break
391+
}
392+
time.Sleep(50 * time.Millisecond)
393+
}
394+
return &testPeer{cpeer: peer1, net: net, app: app}, &testPeer{speer: peer2, net: app, app: net}, nil
379395
}
380396

381397
// handshake simulates a trivial handshake that expects the same state from the
@@ -514,17 +530,20 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer
514530
callback(scIndexer, sbIndexer, sbtIndexer)
515531
}
516532
var (
533+
err error
517534
speer, cpeer *testPeer
518-
err1, err2 <-chan error
519535
)
520536
if connect {
521-
cpeer, err1, speer, err2 = newTestPeerPair("peer", protocol, server, client)
537+
done := make(chan struct{})
538+
client.syncDone = func() { close(done) }
539+
cpeer, speer, err = newTestPeerPair("peer", protocol, server, client)
540+
if err != nil {
541+
t.Fatalf("Failed to connect testing peers %v", err)
542+
}
522543
select {
523-
case <-time.After(time.Millisecond * 300):
524-
case err := <-err1:
525-
t.Fatalf("peer 1 handshake error: %v", err)
526-
case err := <-err2:
527-
t.Fatalf("peer 2 handshake error: %v", err)
544+
case <-done:
545+
case <-time.After(3 * time.Second):
546+
t.Fatal("test peer did not connect and sync within 3s")
528547
}
529548
}
530549
s := &testServer{

0 commit comments

Comments
 (0)