Skip to content

Commit a29bdf5

Browse files
committed
[release/1.4.6] eth/downloader: make fast sync resilient to critical section fails
(cherry picked from commit 61ee9f2)
1 parent 44b912e commit a29bdf5

File tree

2 files changed

+109
-26
lines changed

2 files changed

+109
-26
lines changed

eth/downloader/downloader.go

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ var (
7373
fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
7474
fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point
7575
fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
76+
fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing
7677
)
7778

7879
var (
@@ -103,13 +104,15 @@ var (
103104
)
104105

105106
type Downloader struct {
106-
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
107-
noFast bool // Flag to disable fast syncing in case of a security error
108-
mux *event.TypeMux // Event multiplexer to announce sync operation events
107+
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
108+
mux *event.TypeMux // Event multiplexer to announce sync operation events
109109

110110
queue *queue // Scheduler for selecting the hashes to download
111111
peers *peerSet // Set of active peers from which download can proceed
112112

113+
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
114+
fsPivotFails int // Number of fast sync failures in the critical section
115+
113116
interrupt int32 // Atomic boolean to signal termination
114117

115118
// Statistics
@@ -314,6 +317,15 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
314317
default:
315318
}
316319
}
320+
for _, ch := range []chan dataPack{d.hashCh, d.blockCh, d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
321+
for empty := false; !empty; {
322+
select {
323+
case <-ch:
324+
default:
325+
empty = true
326+
}
327+
}
328+
}
317329
for empty := false; !empty; {
318330
select {
319331
case <-d.headerProcCh:
@@ -330,7 +342,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
330342

331343
// Set the requested sync mode, unless it's forbidden
332344
d.mode = mode
333-
if d.mode == FastSync && d.noFast {
345+
if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials {
334346
d.mode = FullSync
335347
}
336348
// Retrieve the origin peer and initiate the downloading process
@@ -413,12 +425,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
413425
pivot = height
414426
case FastSync:
415427
// Calculate the new fast/slow sync pivot point
416-
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
417-
if err != nil {
418-
panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
419-
}
420-
if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
421-
pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
428+
if d.fsPivotLock == nil {
429+
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
430+
if err != nil {
431+
panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
432+
}
433+
if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
434+
pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
435+
}
436+
} else {
437+
// Pivot point locked in, use this and do not pick a new one!
438+
pivot = d.fsPivotLock.Number.Uint64()
422439
}
423440
// If the point is below the origin, move origin back to ensure state download
424441
if pivot < origin {
@@ -1218,8 +1235,12 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
12181235
// If no more headers are inbound, notify the content fetchers and return
12191236
if packet.Items() == 0 {
12201237
glog.V(logger.Debug).Infof("%v: no available headers", p)
1221-
d.headerProcCh <- nil
1222-
return nil
1238+
select {
1239+
case d.headerProcCh <- nil:
1240+
return nil
1241+
case <-d.cancelCh:
1242+
return errCancelHeaderFetch
1243+
}
12231244
}
12241245
headers := packet.(*headerPack).headers
12251246

@@ -1611,9 +1632,18 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
16111632
glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
16121633
len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())
16131634

1614-
// If we're already past the pivot point, this could be an attack, disable fast sync
1635+
// If we're already past the pivot point, this could be an attack, thread carefully
16151636
if rollback[len(rollback)-1].Number.Uint64() > pivot {
1616-
d.noFast = true
1637+
// If we didn't ever fail, lock in te pivot header (must! not! change!)
1638+
if d.fsPivotFails == 0 {
1639+
for _, header := range rollback {
1640+
if header.Number.Uint64() == pivot {
1641+
glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
1642+
d.fsPivotLock = header
1643+
}
1644+
}
1645+
}
1646+
d.fsPivotFails++
16171647
}
16181648
}
16191649
}()
@@ -1712,6 +1742,13 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
17121742
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
17131743
}
17141744
}
1745+
// If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
1746+
if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
1747+
if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
1748+
glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4])
1749+
return errInvalidChain
1750+
}
1751+
}
17151752
// Unless we're doing light chains, schedule the headers for associated content retrieval
17161753
if d.mode == FullSync || d.mode == FastSync {
17171754
// If we've reached the allowed number of pending headers, stall a bit

eth/downloader/downloader_test.go

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -149,22 +149,25 @@ type downloadTester struct {
149149
peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers
150150
peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains
151151

152+
peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return
153+
152154
lock sync.RWMutex
153155
}
154156

155157
// newTester creates a new downloader test mocker.
156158
func newTester() *downloadTester {
157159
tester := &downloadTester{
158-
ownHashes: []common.Hash{genesis.Hash()},
159-
ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
160-
ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
161-
ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil},
162-
ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
163-
peerHashes: make(map[string][]common.Hash),
164-
peerHeaders: make(map[string]map[common.Hash]*types.Header),
165-
peerBlocks: make(map[string]map[common.Hash]*types.Block),
166-
peerReceipts: make(map[string]map[common.Hash]types.Receipts),
167-
peerChainTds: make(map[string]map[common.Hash]*big.Int),
160+
ownHashes: []common.Hash{genesis.Hash()},
161+
ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
162+
ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
163+
ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil},
164+
ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
165+
peerHashes: make(map[string][]common.Hash),
166+
peerHeaders: make(map[string]map[common.Hash]*types.Header),
167+
peerBlocks: make(map[string]map[common.Hash]*types.Block),
168+
peerReceipts: make(map[string]map[common.Hash]types.Receipts),
169+
peerChainTds: make(map[string]map[common.Hash]*big.Int),
170+
peerMissingStates: make(map[string]map[common.Hash]bool),
168171
}
169172
tester.stateDb, _ = ethdb.NewMemDatabase()
170173
tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
@@ -408,6 +411,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
408411
dl.peerBlocks[id] = make(map[common.Hash]*types.Block)
409412
dl.peerReceipts[id] = make(map[common.Hash]types.Receipts)
410413
dl.peerChainTds[id] = make(map[common.Hash]*big.Int)
414+
dl.peerMissingStates[id] = make(map[common.Hash]bool)
411415

412416
genesis := hashes[len(hashes)-1]
413417
if header := headers[genesis]; header != nil {
@@ -648,7 +652,9 @@ func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func
648652
results := make([][]byte, 0, len(hashes))
649653
for _, hash := range hashes {
650654
if data, err := testdb.Get(hash.Bytes()); err == nil {
651-
results = append(results, data)
655+
if !dl.peerMissingStates[id][hash] {
656+
results = append(results, data)
657+
}
652658
}
653659
}
654660
go dl.downloader.DeliverNodeData(id, results)
@@ -1288,7 +1294,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
12881294
tester.newPeer("withhold-attack", protocol, hashes, headers, blocks, receipts)
12891295
missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
12901296

1291-
tester.downloader.noFast = false
1297+
tester.downloader.fsPivotFails = 0
12921298
tester.downloader.syncInitHook = func(uint64, uint64) {
12931299
for i := missing; i <= len(hashes); i++ {
12941300
delete(tester.peerHeaders["withhold-attack"], hashes[len(hashes)-i])
@@ -1307,6 +1313,8 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
13071313
t.Errorf("fast sync pivot block #%d not rolled back", head)
13081314
}
13091315
}
1316+
tester.downloader.fsPivotFails = fsCriticalTrials
1317+
13101318
// Synchronise with the valid peer and make sure sync succeeds. Since the last
13111319
// rollback should also disable fast syncing for this process, verify that we
13121320
// did a fresh full sync. Note, we can't assert anything about the receipts
@@ -1749,3 +1757,41 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
17491757
}
17501758
}
17511759
}
1760+
1761+
// Tests that if fast sync aborts in the critical section, it can restart a few
1762+
// times before giving up.
1763+
func TestFastCriticalRestarts63(t *testing.T) { testFastCriticalRestarts(t, 63) }
1764+
func TestFastCriticalRestarts64(t *testing.T) { testFastCriticalRestarts(t, 64) }
1765+
1766+
func testFastCriticalRestarts(t *testing.T, protocol int) {
1767+
t.Parallel()
1768+
1769+
// Create a large enough blockchin to actually fast sync on
1770+
targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15
1771+
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
1772+
1773+
// Create a tester peer with the critical section state roots missing (force failures)
1774+
tester := newTester()
1775+
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
1776+
1777+
for i := 0; i < fsPivotInterval; i++ {
1778+
tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true
1779+
}
1780+
// Synchronise with the peer a few times and make sure they fail until the retry limit
1781+
for i := 0; i < fsCriticalTrials; i++ {
1782+
// Attempt a sync and ensure it fails properly
1783+
if err := tester.sync("peer", nil, FastSync); err == nil {
1784+
t.Fatalf("failing fast sync succeeded: %v", err)
1785+
}
1786+
// If it's the first failure, pivot should be locked => reenable all others to detect pivot changes
1787+
if i == 0 {
1788+
tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true}
1789+
}
1790+
time.Sleep(100 * time.Millisecond) // Make sure no in-flight requests remain
1791+
}
1792+
// Retry limit exhausted, downloader will switch to full sync, should succeed
1793+
if err := tester.sync("peer", nil, FastSync); err != nil {
1794+
t.Fatalf("failed to synchronise blocks in slow sync: %v", err)
1795+
}
1796+
assertOwnChain(t, tester, targetBlocks+1)
1797+
}

0 commit comments

Comments
 (0)