Skip to content

Commit 99b62f3

Browse files
committed
eth/downloader: header-chain order and ancestry check
1 parent 0a7d059 commit 99b62f3

File tree

3 files changed

+77
-7
lines changed

3 files changed

+77
-7
lines changed

eth/downloader/downloader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1078,7 +1078,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
10781078
// Otherwise insert all the new headers, aborting in case of junk
10791079
glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from)
10801080

1081-
inserts := d.queue.Insert(headerPack.headers)
1081+
inserts := d.queue.Insert(headerPack.headers, from)
10821082
if len(inserts) != len(headerPack.headers) {
10831083
glog.V(logger.Debug).Infof("%v: stale headers", p)
10841084
return errBadPeer

eth/downloader/downloader_test.go

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,6 @@ func (dl *downloadTester) sync(id string, td *big.Int) error {
139139
if hashes+blocks == 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 {
140140
break
141141
}
142-
// If there are queued blocks, but the head is missing, it's a stale leftover
143-
if hashes+blocks > 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 && dl.downloader.queue.GetHeadBlock() == nil {
144-
break
145-
}
146142
// Otherwise sleep a bit and retry
147143
time.Sleep(time.Millisecond)
148144
}
@@ -660,6 +656,67 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
660656
}
661657
}
662658

659+
// Tests that headers are enqueued continuously, preventing malicious nodes from
660+
// stalling the downloader by feeding gapped header chains.
661+
func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62) }
662+
func TestMissingHeaderAttack63(t *testing.T) { testMissingHeaderAttack(t, 63) }
663+
func TestMissingHeaderAttack64(t *testing.T) { testMissingHeaderAttack(t, 64) }
664+
665+
func testMissingHeaderAttack(t *testing.T, protocol int) {
666+
// Create a small enough block chain to download
667+
targetBlocks := blockCacheLimit - 15
668+
hashes, blocks := makeChain(targetBlocks, 0, genesis)
669+
670+
tester := newTester()
671+
672+
// Attempt a full sync with an attacker feeding gapped headers
673+
tester.newPeer("attack", protocol, hashes, blocks)
674+
missing := targetBlocks / 2
675+
delete(tester.peerBlocks["attack"], hashes[missing])
676+
677+
if err := tester.sync("attack", nil); err == nil {
678+
t.Fatalf("succeeded attacker synchronisation")
679+
}
680+
// Synchronise with the valid peer and make sure sync succeeds
681+
tester.newPeer("valid", protocol, hashes, blocks)
682+
if err := tester.sync("valid", nil); err != nil {
683+
t.Fatalf("failed to synchronise blocks: %v", err)
684+
}
685+
if imported := len(tester.ownBlocks); imported != len(hashes) {
686+
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes))
687+
}
688+
}
689+
690+
// Tests that if requested headers are shifted (i.e. first is missing), the queue
691+
// detects the invalid numbering.
692+
func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62) }
693+
func TestShiftedHeaderAttack63(t *testing.T) { testShiftedHeaderAttack(t, 63) }
694+
func TestShiftedHeaderAttack64(t *testing.T) { testShiftedHeaderAttack(t, 64) }
695+
696+
func testShiftedHeaderAttack(t *testing.T, protocol int) {
697+
// Create a small enough block chain to download
698+
targetBlocks := blockCacheLimit - 15
699+
hashes, blocks := makeChain(targetBlocks, 0, genesis)
700+
701+
tester := newTester()
702+
703+
// Attempt a full sync with an attacker feeding shifted headers
704+
tester.newPeer("attack", protocol, hashes, blocks)
705+
delete(tester.peerBlocks["attack"], hashes[len(hashes)-2])
706+
707+
if err := tester.sync("attack", nil); err == nil {
708+
t.Fatalf("succeeded attacker synchronisation")
709+
}
710+
// Synchronise with the valid peer and make sure sync succeeds
711+
tester.newPeer("valid", protocol, hashes, blocks)
712+
if err := tester.sync("valid", nil); err != nil {
713+
t.Fatalf("failed to synchronise blocks: %v", err)
714+
}
715+
if imported := len(tester.ownBlocks); imported != len(hashes) {
716+
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes))
717+
}
718+
}
719+
663720
// Tests that if a peer sends an invalid body for a requested block, it gets
664721
// dropped immediately by the downloader.
665722
func TestInvalidBlockBodyAttack62(t *testing.T) { testInvalidBlockBodyAttack(t, 62) }

eth/downloader/queue.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type queue struct {
5757

5858
headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes
5959
headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for
60+
headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
6061

6162
pendPool map[string]*fetchRequest // Currently pending block retrieval operations
6263

@@ -91,6 +92,7 @@ func (q *queue) Reset() {
9192

9293
q.headerPool = make(map[common.Hash]*types.Header)
9394
q.headerQueue.Reset()
95+
q.headerHead = common.Hash{}
9496

9597
q.pendPool = make(map[string]*fetchRequest)
9698

@@ -186,7 +188,7 @@ func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash {
186188

187189
// Insert adds a set of headers for the download queue for scheduling, returning
188190
// the new headers encountered.
189-
func (q *queue) Insert(headers []*types.Header) []*types.Header {
191+
func (q *queue) Insert(headers []*types.Header, from uint64) []*types.Header {
190192
q.lock.Lock()
191193
defer q.lock.Unlock()
192194

@@ -196,13 +198,24 @@ func (q *queue) Insert(headers []*types.Header) []*types.Header {
196198
// Make sure no duplicate requests are executed
197199
hash := header.Hash()
198200
if _, ok := q.headerPool[hash]; ok {
199-
glog.V(logger.Warn).Infof("Header %x already scheduled", hash)
201+
glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled", header.Number.Uint64(), hash[:4])
200202
continue
201203
}
204+
// Make sure chain order is honored and preserved throughout
205+
if header.Number == nil || header.Number.Uint64() != from {
206+
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from)
207+
break
208+
}
209+
if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
210+
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4])
211+
break
212+
}
202213
// Queue the header for body retrieval
203214
inserts = append(inserts, header)
204215
q.headerPool[hash] = header
205216
q.headerQueue.Push(header, -float32(header.Number.Uint64()))
217+
q.headerHead = hash
218+
from++
206219
}
207220
return inserts
208221
}

0 commit comments

Comments
 (0)