Skip to content

Commit b7b62d4

Browse files
committed
eth/downloader: also drain stateCh, receiptCh in eth/61 mode
State and receipt deliveries from a previous eth/62+ sync can hang if the downloader has moved on to syncing with eth/61. Fix this by also draining the eth/63 channels while waiting for eth/61 data. A nicer solution would be to take care of the channels in a central place, but that would involve a major rewrite.
1 parent db52a6a commit b7b62d4

File tree

1 file changed

+77
-71
lines changed

1 file changed

+77
-71
lines changed

eth/downloader/downloader.go

Lines changed: 77 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -492,15 +492,6 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
492492
case <-d.cancelCh:
493493
return 0, errCancelBlockFetch
494494

495-
case <-d.headerCh:
496-
// Out of bounds eth/62 block headers received, ignore them
497-
498-
case <-d.bodyCh:
499-
// Out of bounds eth/62 block bodies received, ignore them
500-
501-
case <-d.hashCh:
502-
// Out of bounds hashes received, ignore them
503-
504495
case packet := <-d.blockCh:
505496
// Discard anything not from the origin peer
506497
if packet.PeerId() != p.id {
@@ -518,6 +509,16 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
518509
case <-timeout:
519510
glog.V(logger.Debug).Infof("%v: head block timeout", p)
520511
return 0, errTimeout
512+
513+
case <-d.hashCh:
514+
// Out of bounds hashes received, ignore them
515+
516+
case <-d.headerCh:
517+
case <-d.bodyCh:
518+
case <-d.stateCh:
519+
case <-d.receiptCh:
520+
// Ignore eth/{62,63} packets because this is eth/61.
521+
// These can arrive as a late delivery from a previous sync.
521522
}
522523
}
523524
}
@@ -568,18 +569,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
568569
}
569570
}
570571

572+
case <-timeout:
573+
glog.V(logger.Debug).Infof("%v: head hash timeout", p)
574+
return 0, errTimeout
575+
571576
case <-d.blockCh:
572577
// Out of bounds blocks received, ignore them
573578

574579
case <-d.headerCh:
575-
// Out of bounds eth/62 block headers received, ignore them
576-
577580
case <-d.bodyCh:
578-
// Out of bounds eth/62 block bodies received, ignore them
579-
580-
case <-timeout:
581-
glog.V(logger.Debug).Infof("%v: head hash timeout", p)
582-
return 0, errTimeout
581+
case <-d.stateCh:
582+
case <-d.receiptCh:
583+
// Ignore eth/{62,63} packets because this is eth/61.
584+
// These can arrive as a late delivery from a previous sync.
583585
}
584586
}
585587
// If the head fetch already found an ancestor, return
@@ -628,18 +630,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
628630
}
629631
start = check
630632

633+
case <-timeout:
634+
glog.V(logger.Debug).Infof("%v: search hash timeout", p)
635+
return 0, errTimeout
636+
631637
case <-d.blockCh:
632638
// Out of bounds blocks received, ignore them
633639

634640
case <-d.headerCh:
635-
// Out of bounds eth/62 block headers received, ignore them
636-
637641
case <-d.bodyCh:
638-
// Out of bounds eth/62 block bodies received, ignore them
639-
640-
case <-timeout:
641-
glog.V(logger.Debug).Infof("%v: search hash timeout", p)
642-
return 0, errTimeout
642+
case <-d.stateCh:
643+
case <-d.receiptCh:
644+
// Ignore eth/{62,63} packets because this is eth/61.
645+
// These can arrive as a late delivery from a previous sync.
643646
}
644647
}
645648
}
@@ -673,12 +676,6 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
673676
case <-d.cancelCh:
674677
return errCancelHashFetch
675678

676-
case <-d.headerCh:
677-
// Out of bounds eth/62 block headers received, ignore them
678-
679-
case <-d.bodyCh:
680-
// Out of bounds eth/62 block bodies received, ignore them
681-
682679
case packet := <-d.hashCh:
683680
// Make sure the active peer is giving us the hashes
684681
if packet.PeerId() != p.id {
@@ -747,6 +744,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
747744
glog.V(logger.Debug).Infof("%v: hash request timed out", p)
748745
hashTimeoutMeter.Mark(1)
749746
return errTimeout
747+
748+
case <-d.headerCh:
749+
case <-d.bodyCh:
750+
case <-d.stateCh:
751+
case <-d.receiptCh:
752+
// Ignore eth/{62,63} packets because this is eth/61.
753+
// These can arrive as a late delivery from a previous sync.
750754
}
751755
}
752756
}
@@ -771,12 +775,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
771775
case <-d.cancelCh:
772776
return errCancelBlockFetch
773777

774-
case <-d.headerCh:
775-
// Out of bounds eth/62 block headers received, ignore them
776-
777-
case <-d.bodyCh:
778-
// Out of bounds eth/62 block bodies received, ignore them
779-
780778
case packet := <-d.blockCh:
781779
// If the peer was previously banned and failed to deliver it's pack
782780
// in a reasonable time frame, ignore it's message.
@@ -904,6 +902,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
904902
if !throttled && !d.queue.InFlightBlocks() && len(idles) == total {
905903
return errPeersUnavailable
906904
}
905+
906+
case <-d.headerCh:
907+
case <-d.bodyCh:
908+
case <-d.stateCh:
909+
case <-d.receiptCh:
910+
// Ignore eth/{62,63} packets because this is eth/61.
911+
// These can arrive as a late delivery from a previous sync.
907912
}
908913
}
909914
}
@@ -936,18 +941,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
936941
}
937942
return headers[0].Number.Uint64(), nil
938943

944+
case <-timeout:
945+
glog.V(logger.Debug).Infof("%v: head header timeout", p)
946+
return 0, errTimeout
947+
939948
case <-d.bodyCh:
940-
// Out of bounds block bodies received, ignore them
949+
case <-d.stateCh:
950+
case <-d.receiptCh:
951+
// Out of bounds delivery, ignore
941952

942953
case <-d.hashCh:
943-
// Out of bounds eth/61 hashes received, ignore them
944-
945954
case <-d.blockCh:
946-
// Out of bounds eth/61 blocks received, ignore them
947-
948-
case <-timeout:
949-
glog.V(logger.Debug).Infof("%v: head header timeout", p)
950-
return 0, errTimeout
955+
// Ignore eth/61 packets because this is eth/62+.
956+
// These can arrive as a late delivery from a previous sync.
951957
}
952958
}
953959
}
@@ -1003,18 +1009,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
10031009
}
10041010
}
10051011

1012+
case <-timeout:
1013+
glog.V(logger.Debug).Infof("%v: head header timeout", p)
1014+
return 0, errTimeout
1015+
10061016
case <-d.bodyCh:
1007-
// Out of bounds block bodies received, ignore them
1017+
case <-d.stateCh:
1018+
case <-d.receiptCh:
1019+
// Out of bounds delivery, ignore
10081020

10091021
case <-d.hashCh:
1010-
// Out of bounds eth/61 hashes received, ignore them
1011-
10121022
case <-d.blockCh:
1013-
// Out of bounds eth/61 blocks received, ignore them
1014-
1015-
case <-timeout:
1016-
glog.V(logger.Debug).Infof("%v: head header timeout", p)
1017-
return 0, errTimeout
1023+
// Ignore eth/61 packets because this is eth/62+.
1024+
// These can arrive as a late delivery from a previous sync.
10181025
}
10191026
}
10201027
// If the head fetch already found an ancestor, return
@@ -1063,18 +1070,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
10631070
}
10641071
start = check
10651072

1073+
case <-timeout:
1074+
glog.V(logger.Debug).Infof("%v: search header timeout", p)
1075+
return 0, errTimeout
1076+
10661077
case <-d.bodyCh:
1067-
// Out of bounds block bodies received, ignore them
1078+
case <-d.stateCh:
1079+
case <-d.receiptCh:
1080+
// Out of bounds delivery, ignore
10681081

10691082
case <-d.hashCh:
1070-
// Out of bounds eth/61 hashes received, ignore them
1071-
10721083
case <-d.blockCh:
1073-
// Out of bounds eth/61 blocks received, ignore them
1074-
1075-
case <-timeout:
1076-
glog.V(logger.Debug).Infof("%v: search header timeout", p)
1077-
return 0, errTimeout
1084+
// Ignore eth/61 packets because this is eth/62+.
1085+
// These can arrive as a late delivery from a previous sync.
10781086
}
10791087
}
10801088
}
@@ -1136,12 +1144,6 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
11361144
case <-d.cancelCh:
11371145
return errCancelHeaderFetch
11381146

1139-
case <-d.hashCh:
1140-
// Out of bounds eth/61 hashes received, ignore them
1141-
1142-
case <-d.blockCh:
1143-
// Out of bounds eth/61 blocks received, ignore them
1144-
11451147
case packet := <-d.headerCh:
11461148
// Make sure the active peer is giving us the headers
11471149
if packet.PeerId() != p.id {
@@ -1263,6 +1265,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
12631265
}
12641266
}
12651267
return nil
1268+
1269+
case <-d.hashCh:
1270+
case <-d.blockCh:
1271+
// Ignore eth/61 packets because this is eth/62+.
1272+
// These can arrive as a late delivery from a previous sync.
12661273
}
12671274
}
12681275
}
@@ -1383,12 +1390,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
13831390
case <-d.cancelCh:
13841391
return errCancel
13851392

1386-
case <-d.hashCh:
1387-
// Out of bounds eth/61 hashes received, ignore them
1388-
1389-
case <-d.blockCh:
1390-
// Out of bounds eth/61 blocks received, ignore them
1391-
13921393
case packet := <-deliveryCh:
13931394
// If the peer was previously banned and failed to deliver it's pack
13941395
// in a reasonable time frame, ignore it's message.
@@ -1529,6 +1530,11 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
15291530
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
15301531
return errPeersUnavailable
15311532
}
1533+
1534+
case <-d.hashCh:
1535+
case <-d.blockCh:
1536+
// Ignore eth/61 packets because this is eth/62+.
1537+
// These can arrive as a late delivery from a previous sync.
15321538
}
15331539
}
15341540
}

0 commit comments

Comments
 (0)