Skip to content

Commit 9d188f7

Browse files
committed
eth, eth/downloader: make synchronize thread safe
1 parent 43901c9 commit 9d188f7

File tree

5 files changed

+22
-82
lines changed

5 files changed

+22
-82
lines changed

eth/downloader/downloader.go

Lines changed: 15 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ type Downloader struct {
6868
getBlock getBlockFn
6969

7070
// Status
71-
fetchingHashes int32
72-
downloadingBlocks int32
71+
synchronizing int32
7372

7473
// Channels
7574
newPeerCh chan *peer
@@ -120,43 +119,26 @@ func (d *Downloader) UnregisterPeer(id string) {
120119
delete(d.peers, id)
121120
}
122121

123-
// SynchroniseWithPeer will select the peer and use it for synchronizing. If an empty string is given
122+
// Synchronize will select the peer and use it for synchronizing. If an empty string is given
124123
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
125124
// checks fail an error will be returned. This method is synchronous
126-
func (d *Downloader) Synchronise(id string, hash common.Hash) error {
127-
// Make sure it's doing neither. Once done we can restart the
128-
// downloading process if the TD is higher. For now just get on
129-
// with whatever is going on. This prevents unnecessary switching.
130-
if d.isBusy() {
131-
return errBusy
125+
func (d *Downloader) Synchronize(id string, hash common.Hash) error {
126+
// Make sure only one goroutine is ever allowed past this point at once
127+
if !atomic.CompareAndSwapInt32(&d.synchronizing, 0, 1) {
128+
return nil
132129
}
130+
defer atomic.StoreInt32(&d.synchronizing, 0)
133131

134-
// When a synchronization attempt is made while the queue still
135-
// contains items we abort the sync attempt
136-
if done, pend := d.queue.Size(); done+pend > 0 {
132+
// Abort if the queue still contains some leftover data
133+
if _, cached := d.queue.Size(); cached > 0 {
137134
return errPendingQueue
138135
}
139-
140-
// Fetch the peer using the id or throw an error if the peer couldn't be found
136+
// Retrieve the origin peer and initiate the downloading process
141137
p := d.peers[id]
142138
if p == nil {
143139
return errUnknownPeer
144140
}
145-
146-
// Get the hash from the peer and initiate the downloading progress.
147-
err := d.getFromPeer(p, hash, false)
148-
if err != nil {
149-
return err
150-
}
151-
152-
return nil
153-
}
154-
155-
// Done lets the downloader know that whatever previous hashes were taken
156-
// are processed. If the block count reaches zero and done is called
157-
// we reset the queue for the next batch of incoming hashes and blocks.
158-
func (d *Downloader) Done() {
159-
d.queue.Done()
141+
return d.getFromPeer(p, hash, false)
160142
}
161143

162144
// TakeBlocks takes blocks from the queue and yields them to the blockTaker handler
@@ -176,6 +158,7 @@ func (d *Downloader) Has(hash common.Hash) bool {
176158
}
177159

178160
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) {
161+
179162
d.activePeer = p.id
180163
defer func() {
181164
// reset on error
@@ -184,7 +167,7 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
184167
}
185168
}()
186169

187-
glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
170+
glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id)
188171
// Start the fetcher. This will block the update entirely
189172
// interupts need to be send to the appropriate channels
190173
// respectively.
@@ -200,20 +183,13 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
200183
return err
201184
}
202185

203-
glog.V(logger.Detail).Infoln("Sync completed")
186+
glog.V(logger.Debug).Infoln("Synchronization completed")
204187

205188
return nil
206189
}
207190

208191
// XXX Make synchronous
209192
func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error {
210-
atomic.StoreInt32(&d.fetchingHashes, 1)
211-
defer atomic.StoreInt32(&d.fetchingHashes, 0)
212-
213-
if d.queue.Has(h) { // TODO: Is this possible? Shouldn't queue be empty for startFetchingHashes to be even called?
214-
return errAlreadyInPool
215-
}
216-
217193
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
218194

219195
start := time.Now()
@@ -312,10 +288,8 @@ out:
312288
}
313289

314290
func (d *Downloader) startFetchingBlocks(p *peer) error {
315-
glog.V(logger.Detail).Infoln("Downloading", d.queue.Pending(), "block(s)")
291+
glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)")
316292

317-
atomic.StoreInt32(&d.downloadingBlocks, 1)
318-
defer atomic.StoreInt32(&d.downloadingBlocks, 0)
319293
// Defer the peer reset. This will empty the peer requested set
320294
// and makes sure there are no lingering peers with an incorrect
321295
// state
@@ -439,19 +413,3 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
439413

440414
return nil
441415
}
442-
443-
func (d *Downloader) isFetchingHashes() bool {
444-
return atomic.LoadInt32(&d.fetchingHashes) == 1
445-
}
446-
447-
func (d *Downloader) isDownloadingBlocks() bool {
448-
return atomic.LoadInt32(&d.downloadingBlocks) == 1
449-
}
450-
451-
func (d *Downloader) isBusy() bool {
452-
return d.isFetchingHashes() || d.isDownloadingBlocks()
453-
}
454-
455-
func (d *Downloader) IsBusy() bool {
456-
return d.isBusy()
457-
}

eth/downloader/downloader_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
6161

6262
func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
6363
dl.activePeerId = peerId
64-
return dl.downloader.Synchronise(peerId, hash)
64+
return dl.downloader.Synchronize(peerId, hash)
6565
}
6666

6767
func (dl *downloadTester) hasBlock(hash common.Hash) bool {

eth/downloader/queue.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,6 @@ func (q *queue) Reset() {
6363
q.blockCache = nil
6464
}
6565

66-
// Done checks if all the downloads have been retrieved, wiping the queue.
67-
func (q *queue) Done() {
68-
q.lock.Lock()
69-
defer q.lock.Unlock()
70-
71-
if len(q.blockCache) == 0 {
72-
q.Reset()
73-
}
74-
}
75-
7666
// Size retrieves the number of hashes in the queue, returning separately for
7767
// pending and already downloaded.
7868
func (q *queue) Size() (int, int) {

eth/handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
307307

308308
// Attempt to insert the newly received by checking if the parent exists.
309309
// if the parent exists we process the block and propagate to our peers
310-
// otherwise synchronise with the peer
310+
// otherwise synchronize with the peer
311311
if self.chainman.HasBlock(request.Block.ParentHash()) {
312312
if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
313313
glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
@@ -324,7 +324,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
324324
}
325325
self.BroadcastBlock(hash, request.Block)
326326
} else {
327-
go self.synchronise(p)
327+
go self.synchronize(p)
328328
}
329329
default:
330330
return errResp(ErrInvalidMsgCode, "%v", msg.Code)

eth/sync.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ func (pm *ProtocolManager) update() {
3232
}
3333

3434
itimer.Stop()
35-
go pm.synchronise(peer)
35+
go pm.synchronize(peer)
3636
case <-itimer.C:
3737
// The timer will make sure that the downloader keeps an active state
3838
// in which it attempts to always check the network for highest td peers
3939
// Either select the peer or restart the timer if no peers could
4040
// be selected.
4141
if peer := getBestPeer(pm.peers); peer != nil {
42-
go pm.synchronise(peer)
42+
go pm.synchronize(peer)
4343
} else {
4444
itimer.Reset(5 * time.Second)
4545
}
@@ -63,7 +63,6 @@ func (pm *ProtocolManager) processBlocks() error {
6363
if len(blocks) == 0 {
6464
return nil
6565
}
66-
defer pm.downloader.Done()
6766

6867
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
6968

@@ -78,26 +77,19 @@ func (pm *ProtocolManager) processBlocks() error {
7877
return nil
7978
}
8079

81-
func (pm *ProtocolManager) synchronise(peer *peer) {
80+
func (pm *ProtocolManager) synchronize(peer *peer) {
8281
// Make sure the peer's TD is higher than our own. If not drop.
8382
if peer.td.Cmp(pm.chainman.Td()) <= 0 {
8483
return
8584
}
86-
// Check downloader if it's busy so it doesn't show the sync message
87-
// for every attempty
88-
if pm.downloader.IsBusy() {
89-
return
90-
}
91-
9285
// FIXME if we have the hash in our chain and the TD of the peer is
9386
// much higher than ours, something is wrong with us or the peer.
9487
// Check if the hash is on our own chain
9588
if pm.chainman.HasBlock(peer.recentHash) {
9689
return
9790
}
98-
9991
// Get the hashes from the peer (synchronously)
100-
err := pm.downloader.Synchronise(peer.id, peer.recentHash)
92+
err := pm.downloader.Synchronize(peer.id, peer.recentHash)
10193
if err != nil && err == downloader.ErrBadPeer {
10294
glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action")
10395
pm.removePeer(peer)

0 commit comments

Comments
 (0)