Skip to content

Commit c5d7fae

Browse files
karalabeobscuren
authored andcommitted
eth, eth/downloader: don't report stall if fetcher filled the block
1 parent d7211de commit c5d7fae

File tree

3 files changed

+65
-43
lines changed

3 files changed

+65
-43
lines changed

eth/downloader/downloader.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"bytes"
2222
"errors"
2323
"math"
24+
"math/big"
2425
"math/rand"
2526
"sync"
2627
"sync/atomic"
@@ -232,10 +233,10 @@ func (d *Downloader) UnregisterPeer(id string) error {
232233

233234
// Synchronise tries to sync up our local block chain with a remote peer, both
234235
// adding various sanity checks as well as wrapping it with various log entries.
235-
func (d *Downloader) Synchronise(id string, head common.Hash) {
236-
glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", id, head)
236+
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int) {
237+
glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head 0x%x, TD %v", id, head[:4], td)
237238

238-
switch err := d.synchronise(id, head); err {
239+
switch err := d.synchronise(id, head, td); err {
239240
case nil:
240241
glog.V(logger.Detail).Infof("Synchronisation completed")
241242

@@ -257,7 +258,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash) {
257258
// synchronise will select the peer and use it for synchronising. If an empty string is given
258259
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
259260
// checks fail an error will be returned. This method is synchronous
260-
func (d *Downloader) synchronise(id string, hash common.Hash) error {
261+
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error {
261262
// Mock out the synchonisation if testing
262263
if d.synchroniseMock != nil {
263264
return d.synchroniseMock(id, hash)
@@ -295,7 +296,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
295296
if p == nil {
296297
return errUnknownPeer
297298
}
298-
return d.syncWithPeer(p, hash)
299+
return d.syncWithPeer(p, hash, td)
299300
}
300301

301302
// Has checks if the downloader knows about a particular hash, meaning that its
@@ -306,7 +307,7 @@ func (d *Downloader) Has(hash common.Hash) bool {
306307

307308
// syncWithPeer starts a block synchronization based on the hash chain from the
308309
// specified peer and head hash.
309-
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
310+
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
310311
d.mux.Post(StartEvent{})
311312
defer func() {
312313
// reset on error
@@ -335,7 +336,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
335336
return err
336337
}
337338
errc := make(chan error, 2)
338-
go func() { errc <- d.fetchHashes(p, number+1) }()
339+
go func() { errc <- d.fetchHashes(p, td, number+1) }()
339340
go func() { errc <- d.fetchBlocks(number + 1) }()
340341

341342
// If any fetcher fails, cancel the other
@@ -788,7 +789,7 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
788789

789790
// fetchHashes keeps retrieving hashes from the requested number, until no more
790791
// are returned, potentially throttling on the way.
791-
func (d *Downloader) fetchHashes(p *peer, from uint64) error {
792+
func (d *Downloader) fetchHashes(p *peer, td *big.Int, from uint64) error {
792793
glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from)
793794

794795
// Create a timeout timer, and the associated hash fetcher
@@ -827,8 +828,19 @@ func (d *Downloader) fetchHashes(p *peer, from uint64) error {
827828
case d.processCh <- false:
828829
case <-d.cancelCh:
829830
}
830-
// Error out if no hashes were retrieved at all
831-
if !gotHashes {
831+
// If no hashes were retrieved at all, the peer violated it's TD promise that it had a
832+
// better chain compared to ours. The only exception is if it's promised blocks were
833+
// already imported by other means (e.g. fecher):
834+
//
835+
// R <remote peer>, L <local node>: Both at block 10
836+
// R: Mine block 11, and propagate it to L
837+
// L: Queue block 11 for import
838+
// L: Notice that R's head and TD increased compared to ours, start sync
839+
// L: Import of block 11 finishes
840+
// L: Sync begins, and finds common ancestor at 11
841+
// L: Request new hashes up from 11 (R's TD was higher, it must have something)
842+
// R: Nothing to give
843+
if !gotHashes && td.Cmp(d.headBlock().Td) > 0 {
832844
return errStallingPeer
833845
}
834846
return nil

eth/downloader/downloader_test.go

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,18 @@ func newTester() *downloadTester {
9797
}
9898

9999
// sync starts synchronizing with a remote peer, blocking until it completes.
100-
func (dl *downloadTester) sync(id string) error {
101-
err := dl.downloader.synchronise(id, dl.peerHashes[id][0])
100+
func (dl *downloadTester) sync(id string, td *big.Int) error {
101+
hash := dl.peerHashes[id][0]
102+
103+
// If no particular TD was requested, load from the peer's blockchain
104+
if td == nil {
105+
td = big.NewInt(1)
106+
if block, ok := dl.peerBlocks[id][hash]; ok {
107+
td = block.Td
108+
}
109+
}
110+
err := dl.downloader.synchronise(id, hash, td)
111+
102112
for {
103113
// If the queue is empty and processing stopped, break
104114
hashes, blocks := dl.downloader.queue.Size()
@@ -261,7 +271,7 @@ func TestSynchronisation60(t *testing.T) {
261271
tester.newPeer("peer", eth60, hashes, blocks)
262272

263273
// Synchronise with the peer and make sure all blocks were retrieved
264-
if err := tester.sync("peer"); err != nil {
274+
if err := tester.sync("peer", nil); err != nil {
265275
t.Fatalf("failed to synchronise blocks: %v", err)
266276
}
267277
if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
@@ -281,7 +291,7 @@ func TestCanonicalSynchronisation61(t *testing.T) {
281291
tester.newPeer("peer", eth61, hashes, blocks)
282292

283293
// Synchronise with the peer and make sure all blocks were retrieved
284-
if err := tester.sync("peer"); err != nil {
294+
if err := tester.sync("peer", nil); err != nil {
285295
t.Fatalf("failed to synchronise blocks: %v", err)
286296
}
287297
if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
@@ -312,7 +322,7 @@ func testThrottling(t *testing.T, protocol int) {
312322
// Start a synchronisation concurrently
313323
errc := make(chan error)
314324
go func() {
315-
errc <- tester.sync("peer")
325+
errc <- tester.sync("peer", nil)
316326
}()
317327
// Iteratively take some blocks, always checking the retrieval count
318328
for len(tester.ownBlocks) < targetBlocks+1 {
@@ -361,14 +371,14 @@ func TestForkedSynchronisation61(t *testing.T) {
361371
tester.newPeer("fork B", eth61, hashesB, blocksB)
362372

363373
// Synchronise with the peer and make sure all blocks were retrieved
364-
if err := tester.sync("fork A"); err != nil {
374+
if err := tester.sync("fork A", nil); err != nil {
365375
t.Fatalf("failed to synchronise blocks: %v", err)
366376
}
367377
if imported := len(tester.ownBlocks); imported != common+fork+1 {
368378
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+fork+1)
369379
}
370380
// Synchronise with the second peer and make sure that fork is pulled too
371-
if err := tester.sync("fork B"); err != nil {
381+
if err := tester.sync("fork B", nil); err != nil {
372382
t.Fatalf("failed to synchronise blocks: %v", err)
373383
}
374384
if imported := len(tester.ownBlocks); imported != common+2*fork+1 {
@@ -411,7 +421,7 @@ func testCancel(t *testing.T, protocol int) {
411421
t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
412422
}
413423
// Synchronise with the peer, but cancel afterwards
414-
if err := tester.sync("peer"); err != nil {
424+
if err := tester.sync("peer", nil); err != nil {
415425
t.Fatalf("failed to synchronise blocks: %v", err)
416426
}
417427
tester.downloader.cancel()
@@ -438,14 +448,14 @@ func testMultiSynchronisation(t *testing.T, protocol int) {
438448
}
439449
// Synchronise with the middle peer and make sure half of the blocks were retrieved
440450
id := fmt.Sprintf("peer #%d", targetPeers/2)
441-
if err := tester.sync(id); err != nil {
451+
if err := tester.sync(id, nil); err != nil {
442452
t.Fatalf("failed to synchronise blocks: %v", err)
443453
}
444454
if imported := len(tester.ownBlocks); imported != len(tester.peerHashes[id]) {
445455
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(tester.peerHashes[id]))
446456
}
447457
// Synchronise with the best peer and make sure everything is retrieved
448-
if err := tester.sync("peer #0"); err != nil {
458+
if err := tester.sync("peer #0", nil); err != nil {
449459
t.Fatalf("failed to synchronise blocks: %v", err)
450460
}
451461
if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
@@ -469,7 +479,7 @@ func TestSlowSynchronisation60(t *testing.T) {
469479

470480
// Try to sync with the peers (pull hashes from fast)
471481
start := time.Now()
472-
if err := tester.sync("fast"); err != nil {
482+
if err := tester.sync("fast", nil); err != nil {
473483
t.Fatalf("failed to synchronise blocks: %v", err)
474484
}
475485
if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
@@ -497,14 +507,14 @@ func TestNonExistingParentAttack60(t *testing.T) {
497507
tester.newPeer("attack", eth60, hashes, blocks)
498508

499509
// Try and sync with the malicious node and check that it fails
500-
if err := tester.sync("attack"); err == nil {
510+
if err := tester.sync("attack", nil); err == nil {
501511
t.Fatalf("block synchronization succeeded")
502512
}
503513
if tester.hasBlock(hashes[0]) {
504514
t.Fatalf("tester accepted unknown-parent block: %v", blocks[hashes[0]])
505515
}
506516
// Try to synchronize with the valid chain and make sure it succeeds
507-
if err := tester.sync("valid"); err != nil {
517+
if err := tester.sync("valid", nil); err != nil {
508518
t.Fatalf("failed to synchronise blocks: %v", err)
509519
}
510520
if !tester.hasBlock(tester.peerHashes["valid"][0]) {
@@ -525,7 +535,7 @@ func TestRepeatingHashAttack60(t *testing.T) { // TODO: Is this thing valid??
525535
// Try and sync with the malicious node
526536
errc := make(chan error)
527537
go func() {
528-
errc <- tester.sync("attack")
538+
errc <- tester.sync("attack", nil)
529539
}()
530540
// Make sure that syncing returns and does so with a failure
531541
select {
@@ -537,7 +547,7 @@ func TestRepeatingHashAttack60(t *testing.T) { // TODO: Is this thing valid??
537547
}
538548
}
539549
// Ensure that a valid chain can still pass sync
540-
if err := tester.sync("valid"); err != nil {
550+
if err := tester.sync("valid", nil); err != nil {
541551
t.Fatalf("failed to synchronise blocks: %v", err)
542552
}
543553
}
@@ -555,11 +565,11 @@ func TestNonExistingBlockAttack60(t *testing.T) {
555565
tester.newPeer("attack", eth60, hashes, blocks)
556566

557567
// Try and sync with the malicious node and check that it fails
558-
if err := tester.sync("attack"); err != errPeersUnavailable {
568+
if err := tester.sync("attack", nil); err != errPeersUnavailable {
559569
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable)
560570
}
561571
// Ensure that a valid chain can still pass sync
562-
if err := tester.sync("valid"); err != nil {
572+
if err := tester.sync("valid", nil); err != nil {
563573
t.Fatalf("failed to synchronise blocks: %v", err)
564574
}
565575
}
@@ -583,11 +593,11 @@ func TestInvalidHashOrderAttack60(t *testing.T) {
583593
tester.newPeer("attack", eth60, hashes, blocks)
584594

585595
// Try and sync with the malicious node and check that it fails
586-
if err := tester.sync("attack"); err != errInvalidChain {
596+
if err := tester.sync("attack", nil); err != errInvalidChain {
587597
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
588598
}
589599
// Ensure that a valid chain can still pass sync
590-
if err := tester.sync("valid"); err != nil {
600+
if err := tester.sync("valid", nil); err != nil {
591601
t.Fatalf("failed to synchronise blocks: %v", err)
592602
}
593603
}
@@ -611,11 +621,11 @@ func TestMadeupHashChainAttack60(t *testing.T) {
611621
tester.newPeer("attack", eth60, randomHashes, nil)
612622

613623
// Try and sync with the malicious node and check that it fails
614-
if err := tester.sync("attack"); err != errCrossCheckFailed {
624+
if err := tester.sync("attack", nil); err != errCrossCheckFailed {
615625
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
616626
}
617627
// Ensure that a valid chain can still pass sync
618-
if err := tester.sync("valid"); err != nil {
628+
if err := tester.sync("valid", nil); err != nil {
619629
t.Fatalf("failed to synchronise blocks: %v", err)
620630
}
621631
}
@@ -636,7 +646,7 @@ func TestMadeupHashChainDrippingAttack60(t *testing.T) {
636646
// Try and sync with the attacker, one hash at a time
637647
tester.maxHashFetch = 1
638648
tester.newPeer("attack", eth60, randomHashes, nil)
639-
if err := tester.sync("attack"); err != errStallingPeer {
649+
if err := tester.sync("attack", nil); err != errStallingPeer {
640650
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
641651
}
642652
}
@@ -659,15 +669,15 @@ func TestMadeupBlockChainAttack60(t *testing.T) {
659669
// Try and sync with the malicious node and check that it fails
660670
tester := newTester()
661671
tester.newPeer("attack", eth60, gapped, blocks)
662-
if err := tester.sync("attack"); err != errCrossCheckFailed {
672+
if err := tester.sync("attack", nil); err != errCrossCheckFailed {
663673
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
664674
}
665675
// Ensure that a valid chain can still pass sync
666676
blockSoftTTL = defaultBlockTTL
667677
crossCheckCycle = defaultCrossCheckCycle
668678

669679
tester.newPeer("valid", eth60, hashes, blocks)
670-
if err := tester.sync("valid"); err != nil {
680+
if err := tester.sync("valid", nil); err != nil {
671681
t.Fatalf("failed to synchronise blocks: %v", err)
672682
}
673683
}
@@ -690,7 +700,7 @@ func TestBannedChainStarvationAttack60(t *testing.T) {
690700
// the head of the invalid chain is blocked too.
691701
for banned := tester.downloader.banned.Size(); ; {
692702
// Try to sync with the attacker, check hash chain failure
693-
if err := tester.sync("attack"); err != errInvalidChain {
703+
if err := tester.sync("attack", nil); err != errInvalidChain {
694704
if tester.downloader.banned.Has(forkHashes[0]) && err == errBannedHead {
695705
break
696706
}
@@ -711,7 +721,7 @@ func TestBannedChainStarvationAttack60(t *testing.T) {
711721
t.Fatalf("banned attacker registered: %v", peer)
712722
}
713723
// Ensure that a valid chain can still pass sync
714-
if err := tester.sync("valid"); err != nil {
724+
if err := tester.sync("valid", nil); err != nil {
715725
t.Fatalf("failed to synchronise blocks: %v", err)
716726
}
717727
}
@@ -743,7 +753,7 @@ func TestBannedChainMemoryExhaustionAttack60(t *testing.T) {
743753
// the head of the invalid chain is blocked too.
744754
for {
745755
// Try to sync with the attacker, check hash chain failure
746-
if err := tester.sync("attack"); err != errInvalidChain {
756+
if err := tester.sync("attack", nil); err != errInvalidChain {
747757
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
748758
}
749759
// Short circuit if the entire chain was banned.
@@ -754,7 +764,7 @@ func TestBannedChainMemoryExhaustionAttack60(t *testing.T) {
754764
if bans := tester.downloader.banned.Size(); bans > maxBannedHashes {
755765
t.Fatalf("ban cap exceeded: have %v, want max %v", bans, maxBannedHashes)
756766
}
757-
for hash, _ := range core.BadHashes {
767+
for hash := range core.BadHashes {
758768
if !tester.downloader.banned.Has(hash) {
759769
t.Fatalf("hard coded ban evacuated: %x", hash)
760770
}
@@ -764,7 +774,7 @@ func TestBannedChainMemoryExhaustionAttack60(t *testing.T) {
764774
MaxBlockFetch = defaultMaxBlockFetch
765775
maxBannedHashes = defaultMaxBannedHashes
766776

767-
if err := tester.sync("valid"); err != nil {
777+
if err := tester.sync("valid", nil); err != nil {
768778
t.Fatalf("failed to synchronise blocks: %v", err)
769779
}
770780
}
@@ -790,7 +800,7 @@ func TestOverlappingDeliveryAttack60(t *testing.T) {
790800
return rawGetBlocks(append(request, hashes[0]))
791801
}
792802
// Test that synchronisation can complete, check for import success
793-
if err := tester.sync("attack"); err != nil {
803+
if err := tester.sync("attack", nil); err != nil {
794804
t.Fatalf("failed to synchronise blocks: %v", err)
795805
}
796806
start := time.Now()
@@ -807,7 +817,7 @@ func TestOverlappingDeliveryAttack60(t *testing.T) {
807817
func TestHighTDStarvationAttack61(t *testing.T) {
808818
tester := newTester()
809819
tester.newPeer("attack", eth61, []common.Hash{genesis.Hash()}, nil)
810-
if err := tester.sync("attack"); err != errStallingPeer {
820+
if err := tester.sync("attack", big.NewInt(1000000)); err != errStallingPeer {
811821
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
812822
}
813823
}
@@ -849,7 +859,7 @@ func TestHashAttackerDropping(t *testing.T) {
849859
// Simulate a synchronisation and check the required result
850860
tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
851861

852-
tester.downloader.Synchronise(id, genesis.Hash())
862+
tester.downloader.Synchronise(id, genesis.Hash(), big.NewInt(1000))
853863
if _, ok := tester.peerHashes[id]; !ok != tt.drop {
854864
t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
855865
}

eth/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,5 +164,5 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
164164
return
165165
}
166166
// Otherwise try to sync with the downloader
167-
pm.downloader.Synchronise(peer.id, peer.Head())
167+
pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td())
168168
}

0 commit comments

Comments
 (0)