Skip to content

Commit 900da3d

Browse files
committed
eth/downloader: don't hang for spurious deliveries
Unexpected deliveries could block indefinitely if they arrived at the right time. The fix is to ensure that the cancellation channel is always closed when the sync ends, unblocking any deliveries. Also remove the atomic check for whether a sync is currently running because it doesn't help and can be misleading. Cancelling always seems to break the tests though. The downloader spawned d.process whenever new data arrived, making it somewhat hard to track when block processing was actually done. Fix this by running d.process in a dedicated goroutine that is tied to the lifecycle of the sync. d.process gets notified of new work by the queue instead of being invoked all the time. This removes a ton of weird workaround code, including a hairy use of atomic CAS.
1 parent 9422eec commit 900da3d

File tree

3 files changed

+230
-236
lines changed

3 files changed

+230
-236
lines changed

eth/downloader/downloader.go

Lines changed: 57 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ var (
7474
errBadPeer = errors.New("action from bad peer ignored")
7575
errStallingPeer = errors.New("peer is stalling")
7676
errNoPeers = errors.New("no peers to keep download active")
77-
errPendingQueue = errors.New("pending items in queue")
7877
errTimeout = errors.New("timeout")
7978
errEmptyHashSet = errors.New("empty hash set by peer")
8079
errEmptyHeaderSet = errors.New("empty header set by peer")
@@ -90,6 +89,7 @@ var (
9089
errCancelBodyFetch = errors.New("block body download canceled (requested)")
9190
errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
9291
errCancelStateFetch = errors.New("state data download canceled (requested)")
92+
errCancelProcessing = errors.New("processing canceled (requested)")
9393
errNoSyncActive = errors.New("no sync active")
9494
)
9595

@@ -129,7 +129,6 @@ type Downloader struct {
129129
// Status
130130
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
131131
synchronising int32
132-
processing int32
133132
notified int32
134133

135134
// Channels
@@ -215,7 +214,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) {
215214

216215
// Synchronising returns whether the downloader is currently retrieving blocks.
217216
func (d *Downloader) Synchronising() bool {
218-
return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0
217+
return atomic.LoadInt32(&d.synchronising) > 0
219218
}
220219

221220
// RegisterPeer injects a new download peer into the set of block source to be
@@ -263,9 +262,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
263262
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
264263
d.dropPeer(id)
265264

266-
case errPendingQueue:
267-
glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
268-
269265
default:
270266
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
271267
}
@@ -290,10 +286,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
290286
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
291287
glog.V(logger.Info).Infoln("Block synchronisation started")
292288
}
293-
// Abort if the queue still contains some leftover data
294-
if d.queue.GetHeadResult() != nil {
295-
return errPendingQueue
296-
}
297289
// Reset the queue, peer set and wake channels to clean any internal leftover state
298290
d.queue.Reset()
299291
d.peers.Reset()
@@ -335,7 +327,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
335327
defer func() {
336328
// reset on error
337329
if err != nil {
338-
d.cancel()
339330
d.mux.Post(FailedEvent{err})
340331
} else {
341332
d.mux.Post(DoneEvent{})
@@ -365,23 +356,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
365356
d.syncStatsChainHeight = latest
366357
d.syncStatsLock.Unlock()
367358

368-
// Initiate the sync using a concurrent hash and block retrieval algorithm
359+
// Initiate the sync using a concurrent hash and block retrieval algorithm
360+
d.queue.Prepare(origin+1, d.mode, 0)
369361
if d.syncInitHook != nil {
370362
d.syncInitHook(origin, latest)
371363
}
372-
d.queue.Prepare(origin+1, d.mode, 0)
373-
374-
errc := make(chan error, 2)
375-
go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
376-
go func() { errc <- d.fetchBlocks61(origin + 1) }()
377-
378-
// If any fetcher fails, cancel the other
379-
if err := <-errc; err != nil {
380-
d.cancel()
381-
<-errc
382-
return err
383-
}
384-
return <-errc
364+
return d.spawnSync(
365+
func() error { return d.fetchHashes61(p, td, origin+1) },
366+
func() error { return d.fetchBlocks61(origin + 1) },
367+
)
385368

386369
case p.version >= 62:
387370
// Look up the sync boundaries: the common ancestor and the target block
@@ -405,7 +388,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
405388
switch d.mode {
406389
case LightSync:
407390
pivot = latest
408-
409391
case FastSync:
410392
// Calculate the new fast/slow sync pivot point
411393
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
@@ -426,34 +408,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
426408
glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
427409
}
428410
d.queue.Prepare(origin+1, d.mode, pivot)
429-
430411
if d.syncInitHook != nil {
431412
d.syncInitHook(origin, latest)
432413
}
433-
errc := make(chan error, 4)
434-
go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved
435-
go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync
436-
go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync
437-
go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync
438-
439-
// If any fetcher fails, cancel the others
440-
var fail error
441-
for i := 0; i < cap(errc); i++ {
442-
if err := <-errc; err != nil {
443-
if fail == nil {
444-
fail = err
445-
d.cancel()
446-
}
447-
}
448-
}
449-
return fail
414+
return d.spawnSync(
415+
func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved
416+
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
417+
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
418+
func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
419+
)
450420

451421
default:
452422
// Something very wrong, stop right here
453423
glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
454424
return errBadPeer
455425
}
456-
return nil
426+
}
427+
428+
// spawnSync runs d.process and all given fetcher functions to completion in
429+
// separate goroutines, returning the first error that appears.
430+
func (d *Downloader) spawnSync(fetchers ...func() error) error {
431+
var wg sync.WaitGroup
432+
errc := make(chan error, len(fetchers)+1)
433+
wg.Add(len(fetchers) + 1)
434+
go func() { defer wg.Done(); errc <- d.process() }()
435+
for _, fn := range fetchers {
436+
fn := fn
437+
go func() { defer wg.Done(); errc <- fn() }()
438+
}
439+
// Wait for the first error, then terminate the others.
440+
var err error
441+
for i := 0; i < len(fetchers)+1; i++ {
442+
if i == len(fetchers) {
443+
// Close the queue when all fetchers have exited.
444+
// This will cause the block processor to end when
445+
// it has processed the queue.
446+
d.queue.Close()
447+
}
448+
if err = <-errc; err != nil {
449+
break
450+
}
451+
}
452+
d.queue.Close()
453+
d.cancel()
454+
wg.Wait()
455+
return err
457456
}
458457

459458
// cancel cancels all of the operations and resets the queue. It returns true
@@ -470,12 +469,10 @@ func (d *Downloader) cancel() {
470469
}
471470
}
472471
d.cancelLock.Unlock()
473-
474-
// Reset the queue
475-
d.queue.Reset()
476472
}
477473

478474
// Terminate interrupts the downloader, canceling all pending operations.
475+
// The downloader cannot be reused after calling Terminate.
479476
func (d *Downloader) Terminate() {
480477
atomic.StoreInt32(&d.interrupt, 1)
481478
d.cancel()
@@ -800,7 +797,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
800797
peer.Promote()
801798
peer.SetBlocksIdle()
802799
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
803-
go d.process()
804800

805801
case errInvalidChain:
806802
// The hash chain is invalid (blocks are not ordered properly), abort
@@ -826,7 +822,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
826822
peer.Demote()
827823
peer.SetBlocksIdle()
828824
glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
829-
go d.process()
830825
}
831826
}
832827
// Blocks arrived, try to update the progress
@@ -1336,10 +1331,8 @@ func (d *Downloader) fetchNodeData() error {
13361331
d.cancel()
13371332
return
13381333
}
1339-
// Processing succeeded, notify state fetcher and processor of continuation
1340-
if d.queue.PendingNodeData() == 0 {
1341-
go d.process()
1342-
} else {
1334+
// Processing succeeded, notify state fetcher of continuation
1335+
if d.queue.PendingNodeData() > 0 {
13431336
select {
13441337
case d.stateWakeCh <- true:
13451338
default:
@@ -1348,7 +1341,6 @@ func (d *Downloader) fetchNodeData() error {
13481341
// Log a message to the user and return
13491342
d.syncStatsLock.Lock()
13501343
defer d.syncStatsLock.Unlock()
1351-
13521344
d.syncStatsStateDone += uint64(delivered)
13531345
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
13541346
})
@@ -1415,7 +1407,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
14151407
peer.Promote()
14161408
setIdle(peer)
14171409
glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
1418-
go d.process()
14191410

14201411
case errInvalidChain:
14211412
// The hash chain is invalid (blocks are not ordered properly), abort
@@ -1441,7 +1432,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
14411432
peer.Demote()
14421433
setIdle(peer)
14431434
glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
1444-
go d.process()
14451435
}
14461436
}
14471437
// Blocks assembled, try to update the progress
@@ -1508,7 +1498,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
15081498
}
15091499
if progress {
15101500
progressed = true
1511-
go d.process()
15121501
}
15131502
if request == nil {
15141503
continue
@@ -1545,46 +1534,13 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
15451534
}
15461535

15471536
// process takes fetch results from the queue and tries to import them into the
1548-
// chain. The type of import operation will depend on the result contents:
1549-
// -
1550-
//
1551-
// The algorithmic flow is as follows:
1552-
// - The `processing` flag is swapped to 1 to ensure singleton access
1553-
// - The current `cancel` channel is retrieved to detect sync abortions
1554-
// - Blocks are iteratively taken from the cache and inserted into the chain
1555-
// - When the cache becomes empty, insertion stops
1556-
// - The `processing` flag is swapped back to 0
1557-
// - A post-exit check is made whether new blocks became available
1558-
// - This step is important: it handles a potential race condition between
1559-
// checking for no more work, and releasing the processing "mutex". In
1560-
// between these state changes, a block may have arrived, but a processing
1561-
// attempt denied, so we need to re-enter to ensure the block isn't left
1562-
// to idle in the cache.
1563-
func (d *Downloader) process() {
1564-
// Make sure only one goroutine is ever allowed to process blocks at once
1565-
if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
1566-
return
1567-
}
1568-
// If the processor just exited, but there are freshly pending items, try to
1569-
// reenter. This is needed because the goroutine spinned up for processing
1570-
// the fresh results might have been rejected entry to to this present thread
1571-
// not yet releasing the `processing` state.
1572-
defer func() {
1573-
if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil {
1574-
d.process()
1575-
}
1576-
}()
1577-
// Release the lock upon exit (note, before checking for reentry!)
1578-
// the import statistics to zero.
1579-
defer atomic.StoreInt32(&d.processing, 0)
1580-
1581-
// Repeat the processing as long as there are results to process
1537+
// chain. The type of import operation will depend on the result contents.
1538+
func (d *Downloader) process() error {
1539+
pivot := d.queue.FastSyncPivot()
15821540
for {
1583-
// Fetch the next batch of results
1584-
pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race
1585-
results := d.queue.TakeResults()
1541+
results := d.queue.WaitResults()
15861542
if len(results) == 0 {
1587-
return
1543+
return nil // queue empty
15881544
}
15891545
if d.chainInsertHook != nil {
15901546
d.chainInsertHook(results)
@@ -1597,7 +1553,7 @@ func (d *Downloader) process() {
15971553
for len(results) != 0 {
15981554
// Check for any termination requests
15991555
if atomic.LoadInt32(&d.interrupt) == 1 {
1600-
return
1556+
return errCancelProcessing
16011557
}
16021558
// Retrieve the a batch of results to import
16031559
var (
@@ -1633,8 +1589,7 @@ func (d *Downloader) process() {
16331589
}
16341590
if err != nil {
16351591
glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
1636-
d.cancel()
1637-
return
1592+
return err
16381593
}
16391594
// Shift the results to the next batch
16401595
results = results[items:]
@@ -1685,19 +1640,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
16851640
dropMeter.Mark(int64(packet.Items()))
16861641
}
16871642
}()
1688-
// Make sure the downloader is active
1689-
if atomic.LoadInt32(&d.synchronising) == 0 {
1690-
return errNoSyncActive
1691-
}
16921643
// Deliver or abort if the sync is canceled while queuing
16931644
d.cancelLock.RLock()
16941645
cancel := d.cancelCh
16951646
d.cancelLock.RUnlock()
1696-
1647+
if cancel == nil {
1648+
return errNoSyncActive
1649+
}
16971650
select {
16981651
case destCh <- packet:
16991652
return nil
1700-
17011653
case <-cancel:
17021654
return errNoSyncActive
17031655
}

0 commit comments

Comments
 (0)