Skip to content

Commit 17f65cd

Browse files
committed
eth: update metrics collection to handle eth/62 algos
1 parent 47a7fe5 commit 17f65cd

File tree

7 files changed

+160
-36
lines changed

7 files changed

+160
-36
lines changed

cmd/geth/monitorcmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ func updateChart(metric string, data []float64, base *int, chart *termui.LineCha
289289
}
290290
}
291291
unit, scale := 0, 1.0
292-
for high >= 1000 {
292+
for high >= 1000 && unit+1 < len(dataUnits) {
293293
high, unit, scale = high/1000, unit+1, scale*1000
294294
}
295295
// If the unit changes, re-create the chart (hack to set max height...)

eth/downloader/downloader.go

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
526526
glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from)
527527

528528
// Create a timeout timer, and the associated hash fetcher
529+
request := time.Now() // time of the last fetch request
529530
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
530531
<-timeout.C // timeout channel should be initially empty
531532
defer timeout.Stop()
@@ -534,6 +535,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
534535
glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from)
535536

536537
go p.getAbsHashes(from, MaxHashFetch)
538+
request = time.Now()
537539
timeout.Reset(hashTTL)
538540
}
539541
// Start pulling hashes, until all are exhausted
@@ -557,6 +559,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
557559
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
558560
break
559561
}
562+
hashReqTimer.UpdateSince(request)
560563
timeout.Stop()
561564

562565
// If no more hashes are inbound, notify the block fetcher and return
@@ -609,6 +612,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
609612

610613
case <-timeout.C:
611614
glog.V(logger.Debug).Infof("%v: hash request timed out", p)
615+
hashTimeoutMeter.Mark(1)
612616
return errTimeout
613617
}
614618
}
@@ -896,6 +900,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
896900
defer glog.V(logger.Debug).Infof("%v: header download terminated", p)
897901

898902
// Create a timeout timer, and the associated hash fetcher
903+
request := time.Now() // time of the last fetch request
899904
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
900905
<-timeout.C // timeout channel should be initially empty
901906
defer timeout.Stop()
@@ -904,6 +909,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
904909
glog.V(logger.Detail).Infof("%v: fetching %d headers from #%d", p, MaxHeaderFetch, from)
905910

906911
go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
912+
request = time.Now()
907913
timeout.Reset(headerTTL)
908914
}
909915
// Start pulling headers, until all are exhausted
@@ -927,6 +933,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
927933
glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", headerPack.peerId)
928934
break
929935
}
936+
headerReqTimer.UpdateSince(request)
930937
timeout.Stop()
931938

932939
// If no more headers are inbound, notify the body fetcher and return
@@ -980,6 +987,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
980987
case <-timeout.C:
981988
// Header retrieval timed out, consider the peer bad and drop
982989
glog.V(logger.Debug).Infof("%v: header request timed out", p)
990+
headerTimeoutMeter.Mark(1)
983991
d.dropPeer(p.id)
984992

985993
// Finish the sync gracefully instead of dumping the gathered data though
@@ -1244,7 +1252,14 @@ func (d *Downloader) process() {
12441252
// DeliverHashes61 injects a new batch of hashes received from a remote node into
12451253
// the download schedule. This is usually invoked through the BlockHashesMsg by
12461254
// the protocol handler.
1247-
func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) error {
1255+
func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) (err error) {
1256+
// Update the delivery metrics for both good and failed deliveries
1257+
hashInMeter.Mark(int64(len(hashes)))
1258+
defer func() {
1259+
if err != nil {
1260+
hashDropMeter.Mark(int64(len(hashes)))
1261+
}
1262+
}()
12481263
// Make sure the downloader is active
12491264
if atomic.LoadInt32(&d.synchronising) == 0 {
12501265
return errNoSyncActive
@@ -1265,7 +1280,14 @@ func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) error {
12651280

12661281
// DeliverBlocks61 injects a new batch of blocks received from a remote node.
12671282
// This is usually invoked through the BlocksMsg by the protocol handler.
1268-
func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) error {
1283+
func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) (err error) {
1284+
// Update the delivery metrics for both good and failed deliveries
1285+
blockInMeter.Mark(int64(len(blocks)))
1286+
defer func() {
1287+
if err != nil {
1288+
blockDropMeter.Mark(int64(len(blocks)))
1289+
}
1290+
}()
12691291
// Make sure the downloader is active
12701292
if atomic.LoadInt32(&d.synchronising) == 0 {
12711293
return errNoSyncActive
@@ -1286,7 +1308,14 @@ func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) error {
12861308

12871309
// DeliverHeaders injects a new batch of blck headers received from a remote
12881310
// node into the download schedule.
1289-
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error {
1311+
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
1312+
// Update the delivery metrics for both good and failed deliveries
1313+
headerInMeter.Mark(int64(len(headers)))
1314+
defer func() {
1315+
if err != nil {
1316+
headerDropMeter.Mark(int64(len(headers)))
1317+
}
1318+
}()
12901319
// Make sure the downloader is active
12911320
if atomic.LoadInt32(&d.synchronising) == 0 {
12921321
return errNoSyncActive
@@ -1306,7 +1335,14 @@ func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error {
13061335
}
13071336

13081337
// DeliverBodies injects a new batch of block bodies received from a remote node.
1309-
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error {
1338+
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
1339+
// Update the delivery metrics for both good and failed deliveries
1340+
bodyInMeter.Mark(int64(len(transactions)))
1341+
defer func() {
1342+
if err != nil {
1343+
bodyDropMeter.Mark(int64(len(transactions)))
1344+
}
1345+
}()
13101346
// Make sure the downloader is active
13111347
if atomic.LoadInt32(&d.synchronising) == 0 {
13121348
return errNoSyncActive

eth/downloader/metrics.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2015 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
// Contains the metrics collected by the downloader.
18+
19+
package downloader
20+
21+
import (
22+
"github.com/ethereum/go-ethereum/metrics"
23+
)
24+
25+
var (
26+
hashInMeter = metrics.NewMeter("eth/downloader/hashes/in")
27+
hashReqTimer = metrics.NewTimer("eth/downloader/hashes/req")
28+
hashDropMeter = metrics.NewMeter("eth/downloader/hashes/drop")
29+
hashTimeoutMeter = metrics.NewMeter("eth/downloader/hashes/timeout")
30+
31+
blockInMeter = metrics.NewMeter("eth/downloader/blocks/in")
32+
blockReqTimer = metrics.NewTimer("eth/downloader/blocks/req")
33+
blockDropMeter = metrics.NewMeter("eth/downloader/blocks/drop")
34+
blockTimeoutMeter = metrics.NewMeter("eth/downloader/blocks/timeout")
35+
36+
headerInMeter = metrics.NewMeter("eth/downloader/headers/in")
37+
headerReqTimer = metrics.NewTimer("eth/downloader/headers/req")
38+
headerDropMeter = metrics.NewMeter("eth/downloader/headers/drop")
39+
headerTimeoutMeter = metrics.NewMeter("eth/downloader/headers/timeout")
40+
41+
bodyInMeter = metrics.NewMeter("eth/downloader/bodies/in")
42+
bodyReqTimer = metrics.NewTimer("eth/downloader/bodies/req")
43+
bodyDropMeter = metrics.NewMeter("eth/downloader/bodies/drop")
44+
bodyTimeoutMeter = metrics.NewMeter("eth/downloader/bodies/timeout")
45+
)

eth/downloader/queue.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,9 +397,19 @@ func (q *queue) Expire(timeout time.Duration) []string {
397397
peers := []string{}
398398
for id, request := range q.pendPool {
399399
if time.Since(request.Time) > timeout {
400+
// Update the metrics with the timeout
401+
if len(request.Hashes) > 0 {
402+
blockTimeoutMeter.Mark(1)
403+
} else {
404+
bodyTimeoutMeter.Mark(1)
405+
}
406+
// Return any non satisfied requests to the pool
400407
for hash, index := range request.Hashes {
401408
q.hashQueue.Push(hash, float32(index))
402409
}
410+
for _, header := range request.Headers {
411+
q.headerQueue.Push(header, -float32(header.Number.Uint64()))
412+
}
403413
peers = append(peers, id)
404414
}
405415
}
@@ -420,6 +430,7 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
420430
if request == nil {
421431
return errNoFetchesPending
422432
}
433+
blockReqTimer.UpdateSince(request.Time)
423434
delete(q.pendPool, id)
424435

425436
// If no blocks were retrieved, mark them as unavailable for the origin peer
@@ -468,6 +479,7 @@ func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists []
468479
if request == nil {
469480
return errNoFetchesPending
470481
}
482+
bodyReqTimer.UpdateSince(request.Time)
471483
delete(q.pendPool, id)
472484

473485
// If no block bodies were retrieved, mark them as unavailable for the origin peer

eth/fetcher/fetcher.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -347,18 +347,19 @@ func (f *Fetcher) loop() {
347347

348348
case notification := <-f.notify:
349349
// A block was announced, make sure the peer isn't DOSing us
350-
announceMeter.Mark(1)
350+
propAnnounceInMeter.Mark(1)
351351

352352
count := f.announces[notification.origin] + 1
353353
if count > hashLimit {
354354
glog.V(logger.Debug).Infof("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit)
355+
propAnnounceDOSMeter.Mark(1)
355356
break
356357
}
357358
// If we have a valid block number, check that it's potentially useful
358359
if notification.number > 0 {
359360
if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
360361
glog.V(logger.Debug).Infof("[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d", notification.origin, notification.number, notification.hash[:4], dist)
361-
discardMeter.Mark(1)
362+
propAnnounceDropMeter.Mark(1)
362363
break
363364
}
364365
}
@@ -377,7 +378,7 @@ func (f *Fetcher) loop() {
377378

378379
case op := <-f.inject:
379380
// A direct block insertion was requested, try and fill any pending gaps
380-
broadcastMeter.Mark(1)
381+
propBroadcastInMeter.Mark(1)
381382
f.enqueue(op.origin, op.block)
382383

383384
case hash := <-f.done:
@@ -425,10 +426,12 @@ func (f *Fetcher) loop() {
425426
}
426427
if fetchBlocks != nil {
427428
// Use old eth/61 protocol to retrieve whole blocks
429+
blockFetchMeter.Mark(int64(len(hashes)))
428430
fetchBlocks(hashes)
429431
} else {
430432
// Use new eth/62 protocol to retrieve headers first
431433
for _, hash := range hashes {
434+
headerFetchMeter.Mark(1)
432435
fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
433436
}
434437
}
@@ -467,6 +470,7 @@ func (f *Fetcher) loop() {
467470
if f.completingHook != nil {
468471
f.completingHook(hashes)
469472
}
473+
bodyFetchMeter.Mark(int64(len(hashes)))
470474
go f.completing[hashes[0]].fetchBodies(hashes)
471475
}
472476
// Schedule the next fetch if blocks are still pending
@@ -480,6 +484,7 @@ func (f *Fetcher) loop() {
480484
case <-f.quit:
481485
return
482486
}
487+
blockFilterInMeter.Mark(int64(len(blocks)))
483488

484489
explicit, download := []*types.Block{}, []*types.Block{}
485490
for _, block := range blocks {
@@ -498,6 +503,7 @@ func (f *Fetcher) loop() {
498503
}
499504
}
500505

506+
blockFilterOutMeter.Mark(int64(len(download)))
501507
select {
502508
case filter <- download:
503509
case <-f.quit:
@@ -520,6 +526,8 @@ func (f *Fetcher) loop() {
520526
case <-f.quit:
521527
return
522528
}
529+
headerFilterInMeter.Mark(int64(len(task.headers)))
530+
523531
// Split the batch of headers into unknown ones (to return to the caller),
524532
// known incomplete ones (requiring body retrievals) and completed blocks.
525533
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
@@ -544,7 +552,10 @@ func (f *Fetcher) loop() {
544552
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
545553
glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])
546554

547-
complete = append(complete, types.NewBlockWithHeader(header))
555+
block := types.NewBlockWithHeader(header)
556+
block.ReceivedAt = task.time
557+
558+
complete = append(complete, block)
548559
f.completing[hash] = announce
549560
continue
550561
}
@@ -559,6 +570,7 @@ func (f *Fetcher) loop() {
559570
unknown = append(unknown, header)
560571
}
561572
}
573+
headerFilterOutMeter.Mark(int64(len(unknown)))
562574
select {
563575
case filter <- &headerFilterTask{headers: unknown, time: task.time}:
564576
case <-f.quit:
@@ -590,6 +602,7 @@ func (f *Fetcher) loop() {
590602
case <-f.quit:
591603
return
592604
}
605+
bodyFilterInMeter.Mark(int64(len(task.transactions)))
593606

594607
blocks := []*types.Block{}
595608
for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
@@ -606,7 +619,10 @@ func (f *Fetcher) loop() {
606619
matched = true
607620

608621
if f.getBlock(hash) == nil {
609-
blocks = append(blocks, types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]))
622+
block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
623+
block.ReceivedAt = task.time
624+
625+
blocks = append(blocks, block)
610626
} else {
611627
f.forgetHash(hash)
612628
}
@@ -621,6 +637,7 @@ func (f *Fetcher) loop() {
621637
}
622638
}
623639

640+
bodyFilterOutMeter.Mark(int64(len(task.transactions)))
624641
select {
625642
case filter <- task:
626643
case <-f.quit:
@@ -677,13 +694,14 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
677694
count := f.queues[peer] + 1
678695
if count > blockLimit {
679696
glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit)
697+
propBroadcastDOSMeter.Mark(1)
680698
f.forgetHash(hash)
681699
return
682700
}
683701
// Discard any past or too distant blocks
684702
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
685703
glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist)
686-
discardMeter.Mark(1)
704+
propBroadcastDropMeter.Mark(1)
687705
f.forgetHash(hash)
688706
return
689707
}
@@ -724,11 +742,10 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
724742
switch err := f.validateBlock(block, parent); err {
725743
case nil:
726744
// All ok, quickly propagate to our peers
727-
broadcastTimer.UpdateSince(block.ReceivedAt)
745+
propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
728746
go f.broadcastBlock(block, true)
729747

730748
case core.BlockFutureErr:
731-
futureMeter.Mark(1)
732749
// Weird future block, don't fail, but neither propagate
733750

734751
default:
@@ -743,7 +760,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
743760
return
744761
}
745762
// If import succeeded, broadcast the block
746-
announceTimer.UpdateSince(block.ReceivedAt)
763+
propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
747764
go f.broadcastBlock(block, false)
748765

749766
// Invoke the testing hook if needed

0 commit comments

Comments
 (0)