|
522 | 522 | peer.Promote()
|
523 | 523 | peer.SetIdle()
|
524 | 524 | glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
|
525 |
| - if atomic.LoadInt32(&d.processing) == 0 { |
526 |
| - go d.process() |
527 |
| - } |
| 525 | + go d.process() |
528 | 526 |
|
529 | 527 | case errInvalidChain:
|
530 | 528 | // The hash chain is invalid (blocks are not ordered properly), abort
|
@@ -701,6 +699,19 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
|
701 | 699 | }
|
702 | 700 |
|
703 | 701 | // process takes blocks from the queue and tries to import them into the chain.
|
| 702 | +// |
| 703 | +// The algorithmic flow is as follows: |
| 704 | +// - The `processing` flag is swapped to 1 to ensure singleton access |
| 705 | +// - The current `cancel` channel is retrieved to detect sync abortions |
| 706 | +// - Blocks are iteratively taken from the cache and inserted into the chain |
| 707 | +// - When the cache becomes empty, insertion stops |
| 708 | +// - The `processing` flag is swapped back to 0 |
| 709 | +// - A post-exit check is made whether new blocks became available |
| 710 | +// - This step is important: it handles a potential race condition between |
| 711 | +// checking for no more work, and releasing the processing "mutex". In |
| 712 | +// between these state changes, a block may have arrived, but a processing |
| 713 | +// attempt denied, so we need to re-enter to ensure the block isn't left |
| 714 | +// to idle in the cache. |
704 | 715 | func (d *Downloader) process() (err error) {
|
705 | 716 | // Make sure only one goroutine is ever allowed to process blocks at once
|
706 | 717 | if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
|
@@ -763,7 +774,7 @@ func (d *Downloader) process() (err error) {
|
763 | 774 | // Try to inset the blocks, drop the originating peer if there's an error
|
764 | 775 | index, err := d.insertChain(raw)
|
765 | 776 | if err != nil {
|
766 |
| - glog.V(logger.Debug).Infoln("Block #%d import failed:", raw[index].NumberU64(), err) |
| 777 | + glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err) |
767 | 778 | d.dropPeer(blocks[index].OriginPeer)
|
768 | 779 | d.Cancel()
|
769 | 780 | return errCancelChainImport
|
|
0 commit comments