Skip to content

Commit 1cc2f08

Browse files
committed
Merge pull request #1784 from karalabe/standard-sync-stats
eth, rpc: standardize the chain sync progress counters
2 parents e9a8051 + d4d3fc6 commit 1cc2f08

File tree

8 files changed

+1174
-251
lines changed

8 files changed

+1174
-251
lines changed

eth/downloader/downloader.go

Lines changed: 140 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,9 @@ type Downloader struct {
130130
interrupt int32 // Atomic boolean to signal termination
131131

132132
// Statistics
133-
importStart time.Time // Instance when the last blocks were taken from the cache
134-
importQueue []*Block // Previously taken blocks to check import progress
135-
importDone int // Number of taken blocks already imported from the last batch
136-
importLock sync.Mutex
133+
syncStatsOrigin uint64 // Origin block number where syncing started at
134+
syncStatsHeight uint64 // Highest block number known when syncing started
135+
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
137136

138137
// Callbacks
139138
hasBlock hashCheckFn // Checks if a block is present in the chain
@@ -161,6 +160,7 @@ type Downloader struct {
161160
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
162161

163162
// Testing hooks
163+
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
164164
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
165165
chainInsertHook func([]*Block) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
166166
}
@@ -192,27 +192,14 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he
192192
}
193193
}
194194

195-
// Stats retrieves the current status of the downloader.
196-
func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) {
197-
// Fetch the download status
198-
pending, cached = d.queue.Size()
195+
// Boundaries retrieves the synchronisation boundaries, specifically the origin
196+
// block where synchronisation started at (may have failed/suspended) and the
197+
// latest known block which the synchonisation targets.
198+
func (d *Downloader) Boundaries() (uint64, uint64) {
199+
d.syncStatsLock.RLock()
200+
defer d.syncStatsLock.RUnlock()
199201

200-
// Figure out the import progress
201-
d.importLock.Lock()
202-
defer d.importLock.Unlock()
203-
204-
for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) {
205-
d.importQueue = d.importQueue[1:]
206-
d.importDone++
207-
}
208-
importing = len(d.importQueue)
209-
210-
// Make an estimate on the total sync
211-
estimate = 0
212-
if d.importDone > 0 {
213-
estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing)
214-
}
215-
return
202+
return d.syncStatsOrigin, d.syncStatsHeight
216203
}
217204

218205
// Synchronising returns whether the downloader is currently retrieving blocks.
@@ -333,14 +320,29 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
333320

334321
switch {
335322
case p.version == eth61:
336-
// Old eth/61, use forward, concurrent hash and block retrieval algorithm
337-
number, err := d.findAncestor61(p)
323+
// Look up the sync boundaries: the common ancestor and the target block
324+
latest, err := d.fetchHeight61(p)
325+
if err != nil {
326+
return err
327+
}
328+
origin, err := d.findAncestor61(p)
338329
if err != nil {
339330
return err
340331
}
332+
d.syncStatsLock.Lock()
333+
if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
334+
d.syncStatsOrigin = origin
335+
}
336+
d.syncStatsHeight = latest
337+
d.syncStatsLock.Unlock()
338+
339+
// Initiate the sync using a concurrent hash and block retrieval algorithm
340+
if d.syncInitHook != nil {
341+
d.syncInitHook(origin, latest)
342+
}
341343
errc := make(chan error, 2)
342-
go func() { errc <- d.fetchHashes61(p, td, number+1) }()
343-
go func() { errc <- d.fetchBlocks61(number + 1) }()
344+
go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
345+
go func() { errc <- d.fetchBlocks61(origin + 1) }()
344346

345347
// If any fetcher fails, cancel the other
346348
if err := <-errc; err != nil {
@@ -351,14 +353,29 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
351353
return <-errc
352354

353355
case p.version >= eth62:
354-
// New eth/62, use forward, concurrent header and block body retrieval algorithm
355-
number, err := d.findAncestor(p)
356+
// Look up the sync boundaries: the common ancestor and the target block
357+
latest, err := d.fetchHeight(p)
356358
if err != nil {
357359
return err
358360
}
361+
origin, err := d.findAncestor(p)
362+
if err != nil {
363+
return err
364+
}
365+
d.syncStatsLock.Lock()
366+
if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
367+
d.syncStatsOrigin = origin
368+
}
369+
d.syncStatsHeight = latest
370+
d.syncStatsLock.Unlock()
371+
372+
// Initiate the sync using a concurrent hash and block retrieval algorithm
373+
if d.syncInitHook != nil {
374+
d.syncInitHook(origin, latest)
375+
}
359376
errc := make(chan error, 2)
360-
go func() { errc <- d.fetchHeaders(p, td, number+1) }()
361-
go func() { errc <- d.fetchBodies(number + 1) }()
377+
go func() { errc <- d.fetchHeaders(p, td, origin+1) }()
378+
go func() { errc <- d.fetchBodies(origin + 1) }()
362379

363380
// If any fetcher fails, cancel the other
364381
if err := <-errc; err != nil {
@@ -401,6 +418,50 @@ func (d *Downloader) Terminate() {
401418
d.cancel()
402419
}
403420

421+
// fetchHeight61 retrieves the head block of the remote peer to aid in estimating
422+
// the total time a pending synchronisation would take.
423+
func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
424+
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
425+
426+
// Request the advertised remote head block and wait for the response
427+
go p.getBlocks([]common.Hash{p.head})
428+
429+
timeout := time.After(blockSoftTTL)
430+
for {
431+
select {
432+
case <-d.cancelCh:
433+
return 0, errCancelBlockFetch
434+
435+
case <-d.headerCh:
436+
// Out of bounds eth/62 block headers received, ignore them
437+
438+
case <-d.bodyCh:
439+
// Out of bounds eth/62 block bodies received, ignore them
440+
441+
case <-d.hashCh:
442+
// Out of bounds hashes received, ignore them
443+
444+
case blockPack := <-d.blockCh:
445+
// Discard anything not from the origin peer
446+
if blockPack.peerId != p.id {
447+
glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", blockPack.peerId)
448+
break
449+
}
450+
// Make sure the peer actually gave something valid
451+
blocks := blockPack.blocks
452+
if len(blocks) != 1 {
453+
glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks))
454+
return 0, errBadPeer
455+
}
456+
return blocks[0].NumberU64(), nil
457+
458+
case <-timeout:
459+
glog.V(logger.Debug).Infof("%v: head block timeout", p)
460+
return 0, errTimeout
461+
}
462+
}
463+
}
464+
404465
// findAncestor61 tries to locate the common ancestor block of the local chain and
405466
// a remote peers blockchain. In the general case when our node was in sync and
406467
// on the correct chain, checking the top N blocks should already get us a match.
@@ -776,6 +837,50 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
776837
}
777838
}
778839

840+
// fetchHeight retrieves the head header of the remote peer to aid in estimating
841+
// the total time a pending synchronisation would take.
842+
func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
843+
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
844+
845+
// Request the advertised remote head block and wait for the response
846+
go p.getRelHeaders(p.head, 1, 0, false)
847+
848+
timeout := time.After(headerTTL)
849+
for {
850+
select {
851+
case <-d.cancelCh:
852+
return 0, errCancelBlockFetch
853+
854+
case headerPack := <-d.headerCh:
855+
// Discard anything not from the origin peer
856+
if headerPack.peerId != p.id {
857+
glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
858+
break
859+
}
860+
// Make sure the peer actually gave something valid
861+
headers := headerPack.headers
862+
if len(headers) != 1 {
863+
glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
864+
return 0, errBadPeer
865+
}
866+
return headers[0].Number.Uint64(), nil
867+
868+
case <-d.bodyCh:
869+
// Out of bounds block bodies received, ignore them
870+
871+
case <-d.hashCh:
872+
// Out of bounds eth/61 hashes received, ignore them
873+
874+
case <-d.blockCh:
875+
// Out of bounds eth/61 blocks received, ignore them
876+
877+
case <-timeout:
878+
glog.V(logger.Debug).Infof("%v: head header timeout", p)
879+
return 0, errTimeout
880+
}
881+
}
882+
}
883+
779884
// findAncestor tries to locate the common ancestor block of the local chain and
780885
// a remote peers blockchain. In the general case when our node was in sync and
781886
// on the correct chain, checking the top N blocks should already get us a match.
@@ -973,7 +1078,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
9731078
// Otherwise insert all the new headers, aborting in case of junk
9741079
glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from)
9751080

976-
inserts := d.queue.Insert(headerPack.headers)
1081+
inserts := d.queue.Insert(headerPack.headers, from)
9771082
if len(inserts) != len(headerPack.headers) {
9781083
glog.V(logger.Debug).Infof("%v: stale headers", p)
9791084
return errBadPeer
@@ -1203,16 +1308,10 @@ func (d *Downloader) process() {
12031308
d.process()
12041309
}
12051310
}()
1206-
// Release the lock upon exit (note, before checking for reentry!), and set
1311+
// Release the lock upon exit (note, before checking for reentry!)
12071312
// the import statistics to zero.
1208-
defer func() {
1209-
d.importLock.Lock()
1210-
d.importQueue = nil
1211-
d.importDone = 0
1212-
d.importLock.Unlock()
1313+
defer atomic.StoreInt32(&d.processing, 0)
12131314

1214-
atomic.StoreInt32(&d.processing, 0)
1215-
}()
12161315
// Repeat the processing as long as there are blocks to import
12171316
for {
12181317
// Fetch the next batch of blocks
@@ -1223,13 +1322,6 @@ func (d *Downloader) process() {
12231322
if d.chainInsertHook != nil {
12241323
d.chainInsertHook(blocks)
12251324
}
1226-
// Reset the import statistics
1227-
d.importLock.Lock()
1228-
d.importStart = time.Now()
1229-
d.importQueue = blocks
1230-
d.importDone = 0
1231-
d.importLock.Unlock()
1232-
12331325
// Actually import the blocks
12341326
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
12351327
for len(blocks) != 0 {

0 commit comments

Comments
 (0)