3
3
PriorityQueue ,
4
4
)
5
5
from concurrent .futures import CancelledError
6
+ import datetime
6
7
import enum
7
8
from functools import (
8
9
partial ,
11
12
from typing import (
12
13
Dict ,
13
14
List ,
15
+ NamedTuple ,
14
16
Set ,
15
17
Tuple ,
16
18
Type ,
62
64
OrderedTaskPreparation ,
63
65
TaskQueue ,
64
66
)
67
+ from trinity .utils .ema import EMA
68
+ from trinity .utils .humanize import humanize_elapsed
65
69
from trinity .utils .timer import Timer
66
70
67
71
# (ReceiptBundle, (Receipt, (root_hash, receipt_trie_data))
@@ -338,6 +342,71 @@ class BlockPersistPrereqs(enum.Enum):
338
342
StoreReceipts = enum .auto ()
339
343
340
344
345
+ class ChainSyncStats (NamedTuple ):
346
+ prev_head : BlockHeader
347
+ latest_head : BlockHeader
348
+
349
+ elapsed : float
350
+
351
+ num_blocks : int
352
+ blocks_per_second : float
353
+
354
+ num_transactions : int
355
+ transactions_per_second : float
356
+
357
+
358
+ class ChainSyncPerformanceTracker :
359
+ def __init__ (self , head : BlockHeader ) -> None :
360
+ # The `head` from the previous time we reported stats
361
+ self .prev_head = head
362
+ # The latest `head` we have synced
363
+ self .latest_head = head
364
+
365
+ # A `Timer` object to report elapsed time between reports
366
+ self .timer = Timer ()
367
+
368
+ # EMA of the blocks per second
369
+ self .blocks_per_second_ema = EMA (initial_value = 0 , smoothing_factor = 0.05 )
370
+
371
+ # EMA of the transactions per second
372
+ self .transactions_per_second_ema = EMA (initial_value = 0 , smoothing_factor = 0.05 )
373
+
374
+ # Number of transactions processed
375
+ self .num_transactions = 0
376
+
377
+ def record_transactions (self , count : int ) -> None :
378
+ self .num_transactions += count
379
+
380
+ def set_latest_head (self , head : BlockHeader ) -> None :
381
+ self .latest_head = head
382
+
383
+ def report (self ) -> ChainSyncStats :
384
+ elapsed = self .timer .pop_elapsed ()
385
+
386
+ num_blocks = self .latest_head .block_number - self .prev_head .block_number
387
+ blocks_per_second = num_blocks / elapsed
388
+ transactions_per_second = self .num_transactions / elapsed
389
+
390
+ self .blocks_per_second_ema .update (blocks_per_second )
391
+ self .transactions_per_second_ema .update (transactions_per_second )
392
+
393
+ stats = ChainSyncStats (
394
+ prev_head = self .prev_head ,
395
+ latest_head = self .latest_head ,
396
+ elapsed = elapsed ,
397
+ num_blocks = num_blocks ,
398
+ blocks_per_second = self .blocks_per_second_ema .value ,
399
+ num_transactions = self .num_transactions ,
400
+ transactions_per_second = self .transactions_per_second_ema .value ,
401
+ )
402
+
403
+ # reset the counters
404
+ self .num_transactions = 0
405
+ self .prev_head = self .latest_head
406
+
407
+ return stats
408
+
409
+
341
410
class FastChainSyncer (BaseBodyChainSyncer ):
342
411
"""
343
412
Sync with the Ethereum network by fetching block headers/bodies and storing them in our DB.
@@ -374,6 +443,8 @@ def __init__(self,
374
443
375
444
async def _run (self ) -> None :
376
445
head = await self .wait (self .db .coro_get_canonical_head ())
446
+ self .tracker = ChainSyncPerformanceTracker (head )
447
+
377
448
self ._block_persist_tracker .set_finished_dependency (head )
378
449
self .run_daemon_task (self ._launch_prerequisite_tasks ())
379
450
self .run_daemon_task (self ._assign_receipt_download_to_peers ())
@@ -445,9 +516,6 @@ async def _launch_prerequisite_tasks(self) -> None:
445
516
self .header_queue .complete (batch_id , headers )
446
517
447
518
async def _display_stats (self ) -> None :
448
- last_head = await self .wait (self .db .coro_get_canonical_head ())
449
- timer = Timer ()
450
-
451
519
while self .is_operational :
452
520
await self .sleep (5 )
453
521
self .logger .debug (
@@ -459,16 +527,29 @@ async def _display_stats(self) -> None:
459
527
)],
460
528
)
461
529
462
- head = await self .wait (self .db .coro_get_canonical_head ())
463
- if head == last_head :
464
- continue
465
- else :
466
- block_num_change = head .block_number - last_head .block_number
467
- last_head = head
468
-
469
- self .logger .info (
470
- "Advanced by %d blocks in %0.1f seconds, new head: %s" ,
471
- block_num_change , timer .pop_elapsed (), head )
530
+ stats = self .tracker .report ()
531
+ utcnow = int (datetime .datetime .utcnow ().timestamp ())
532
+ head_age = utcnow - stats .latest_head .timestamp
533
+ self .logger .info (
534
+ (
535
+ "blks=%-4d "
536
+ "txs=%-5d "
537
+ "bps=%-3d "
538
+ "tps=%-4d "
539
+ "elapsed=%0.1f "
540
+ "head=#%d (%s\u2026 %s) "
541
+ "age=%s"
542
+ ),
543
+ stats .num_blocks ,
544
+ stats .num_transactions ,
545
+ stats .blocks_per_second ,
546
+ stats .transactions_per_second ,
547
+ stats .elapsed ,
548
+ stats .latest_head .block_number ,
549
+ stats .latest_head .hex_hash [2 :6 ],
550
+ stats .latest_head .hex_hash [- 4 :],
551
+ humanize_elapsed (head_age ),
552
+ )
472
553
473
554
async def _persist_ready_blocks (self ) -> None :
474
555
"""
@@ -514,8 +595,12 @@ async def _persist_blocks(self, headers: Tuple[BlockHeader, ...]) -> None:
514
595
tx_class = block_class .get_transaction_class ()
515
596
transactions = [tx_class .from_base_transaction (tx ) for tx in body .transactions ]
516
597
598
+ # record progress in the tracker
599
+ self .tracker .record_transactions (len (transactions ))
600
+
517
601
block = block_class (header , transactions , uncles )
518
602
await self .wait (self .db .coro_persist_block (block ))
603
+ self .tracker .set_latest_head (header )
519
604
520
605
async def _assign_receipt_download_to_peers (self ) -> None :
521
606
"""
0 commit comments