Skip to content

Commit b240983

Browse files
committed
eth, eth/downloader: do async block fetches, add dl tests
1 parent 30a9939 commit b240983

File tree

3 files changed

+47
-5
lines changed

3 files changed

+47
-5
lines changed

eth/downloader/downloader_test.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,14 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) {
122122

123123
// newPeer registers a new block download source into the downloader.
124124
func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error {
125-
err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id))
125+
return dl.newSlowPeer(id, hashes, blocks, 0)
126+
}
127+
128+
// newSlowPeer registers a new block download source into the downloader, with a
129+
// specific delay time on processing the network packets sent to it, simulating
130+
// potentially slow network IO.
131+
func (dl *downloadTester) newSlowPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error {
132+
err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay))
126133
if err == nil {
127134
// Assign the owned hashes and blocks to the peer (deep copy)
128135
dl.peerHashes[id] = make([]common.Hash, len(hashes))
@@ -147,8 +154,10 @@ func (dl *downloadTester) dropPeer(id string) {
147154
// peerGetBlocksFn constructs a getHashes function associated with a particular
148155
// peer in the download tester. The returned function can be used to retrieve
149156
// batches of hashes from the particularly requested peer.
150-
func (dl *downloadTester) peerGetHashesFn(id string) func(head common.Hash) error {
157+
func (dl *downloadTester) peerGetHashesFn(id string, delay time.Duration) func(head common.Hash) error {
151158
return func(head common.Hash) error {
159+
time.Sleep(delay)
160+
152161
limit := MaxHashFetch
153162
if dl.maxHashFetch > 0 {
154163
limit = dl.maxHashFetch
@@ -178,8 +187,10 @@ func (dl *downloadTester) peerGetHashesFn(id string) func(head common.Hash) erro
178187
// peerGetBlocksFn constructs a getBlocks function associated with a particular
179188
// peer in the download tester. The returned function can be used to retrieve
180189
// batches of blocks from the particularly requested peer.
181-
func (dl *downloadTester) peerGetBlocksFn(id string) func([]common.Hash) error {
190+
func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([]common.Hash) error {
182191
return func(hashes []common.Hash) error {
192+
time.Sleep(delay)
193+
183194
blocks := dl.peerBlocks[id]
184195
result := make([]*types.Block, 0, len(hashes))
185196
for _, hash := range hashes {
@@ -340,6 +351,37 @@ func TestMultiSynchronisation(t *testing.T) {
340351
}
341352
}
342353

354+
// Tests that synchronising with a peer who's very slow at network IO does not
355+
// stall the other peers in the system.
356+
func TestSlowSynchronisation(t *testing.T) {
357+
tester := newTester()
358+
359+
// Create a batch of blocks, with a slow and a full speed peer
360+
targetCycles := 2
361+
targetBlocks := targetCycles*blockCacheLimit - 15
362+
targetIODelay := 500 * time.Millisecond
363+
364+
hashes := createHashes(targetBlocks, knownHash)
365+
blocks := createBlocksFromHashes(hashes)
366+
367+
tester.newSlowPeer("fast", hashes, blocks, 0)
368+
tester.newSlowPeer("slow", hashes, blocks, targetIODelay)
369+
370+
// Try to sync with the peers (pull hashes from fast)
371+
start := time.Now()
372+
if err := tester.sync("fast"); err != nil {
373+
t.Fatalf("failed to synchronise blocks: %v", err)
374+
}
375+
if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
376+
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
377+
}
378+
// Check that the slow peer got hit at most once per block-cache-size import
379+
limit := time.Duration(targetCycles+1) * targetIODelay
380+
if delay := time.Since(start); delay >= limit {
381+
t.Fatalf("synchronisation exceeded delay limit: have %v, want %v", delay, limit)
382+
}
383+
}
384+
343385
// Tests that if a peer returns an invalid chain with a block pointing to a non-
344386
// existing parent, it is correctly detected and handled.
345387
func TestNonExistingParentAttack(t *testing.T) {

eth/downloader/peer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (p *peer) Fetch(request *fetchRequest) error {
7474
for hash, _ := range request.Hashes {
7575
hashes = append(hashes, hash)
7676
}
77-
p.getBlocks(hashes)
77+
go p.getBlocks(hashes)
7878

7979
return nil
8080
}

eth/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (pm *ProtocolManager) fetcher() {
171171
// Send out all block requests
172172
for peer, hashes := range request {
173173
glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id)
174-
peer.requestBlocks(hashes)
174+
go peer.requestBlocks(hashes)
175175
}
176176
request = make(map[*peer][]common.Hash)
177177

0 commit comments

Comments
 (0)