39
39
from eth .utils .logging import TraceLogger
40
40
41
41
from p2p import protocol
42
- from p2p import eth
43
- from p2p import les
44
42
from p2p .cancellable import CancellableMixin
45
43
from p2p .constants import MAX_REORG_DEPTH , SEAL_CHECK_RANDOM_SAMPLE_RATE
46
44
from p2p .exceptions import NoEligiblePeers , ValidationError
47
45
from p2p .p2p_proto import DisconnectReason
48
- from p2p .peer import BasePeer , ETHPeer , LESPeer , PeerPool , PeerSubscriber
46
+ from p2p .peer import BasePeer , PeerPool , PeerSubscriber
49
47
from p2p .rlp import BlockBody
50
48
from p2p .service import BaseService
51
49
from p2p .utils import (
57
55
if TYPE_CHECKING :
58
56
from trinity .db .chain import AsyncChainDB # noqa: F401
59
57
from trinity .db .header import AsyncHeaderDB # noqa: F401
58
+ from trinity .protocol .eth .peer import ETHPeer # noqa: F401
59
+ from trinity .protocol .les .peer import LESPeer # noqa: F401
60
+ from trinity .protocol .base_request import BaseHeaderRequest # noqa: F401
60
61
61
62
62
- HeaderRequestingPeer = Union [LESPeer , ETHPeer ]
63
+ HeaderRequestingPeer = Union [' LESPeer' , ' ETHPeer' ]
63
64
64
65
65
66
class BaseHeaderChainSyncer (BaseService , PeerSubscriber ):
@@ -300,20 +301,23 @@ class LightChainSyncer(BaseHeaderChainSyncer):
300
301
301
302
async def _handle_msg (self , peer : HeaderRequestingPeer , cmd : protocol .Command ,
302
303
msg : protocol ._DecodedMsgType ) -> None :
303
- if isinstance (cmd , les .Announce ):
304
+ from trinity .protocol .les import commands
305
+ from trinity .protocol .les .peer import LESPeer # noqa: F811
306
+ if isinstance (cmd , commands .Announce ):
304
307
self ._sync_requests .put_nowait (peer )
305
- elif isinstance (cmd , les .GetBlockHeaders ):
308
+ elif isinstance (cmd , commands .GetBlockHeaders ):
306
309
msg = cast (Dict [str , Any ], msg )
307
310
await self ._handle_get_block_headers (cast (LESPeer , peer ), msg )
308
- elif isinstance (cmd , les .BlockHeaders ):
311
+ elif isinstance (cmd , commands .BlockHeaders ):
309
312
# `BlockHeaders` messages are handled at the peer level.
310
313
pass
311
314
else :
312
315
self .logger .debug ("Ignoring %s message from %s" , cmd , peer )
313
316
314
- async def _handle_get_block_headers (self , peer : LESPeer , msg : Dict [str , Any ]) -> None :
317
+ async def _handle_get_block_headers (self , peer : 'LESPeer' , msg : Dict [str , Any ]) -> None :
318
+ from trinity .protocol .les .requests import HeaderRequest
315
319
self .logger .debug ("Peer %s made header request: %s" , peer , msg )
316
- request = les . HeaderRequest (
320
+ request = HeaderRequest (
317
321
msg ['query' ].block_number_or_hash ,
318
322
msg ['query' ].max_headers ,
319
323
msg ['query' ].skip ,
@@ -356,8 +360,8 @@ def __init__(self,
356
360
super ().__init__ (chain , db , peer_pool , token )
357
361
# Those are used by our msg handlers and _download_block_parts() in order to track missing
358
362
# bodies/receipts for a given chain segment.
359
- self ._downloaded_receipts : asyncio .Queue [Tuple [ETHPeer , List [DownloadedBlockPart ]]] = asyncio .Queue () # noqa: E501
360
- self ._downloaded_bodies : asyncio .Queue [Tuple [ETHPeer , List [DownloadedBlockPart ]]] = asyncio .Queue () # noqa: E501
363
+ self ._downloaded_receipts : asyncio .Queue [Tuple [' ETHPeer' , List [DownloadedBlockPart ]]] = asyncio .Queue () # noqa: E501
364
+ self ._downloaded_bodies : asyncio .Queue [Tuple [' ETHPeer' , List [DownloadedBlockPart ]]] = asyncio .Queue () # noqa: E501
361
365
362
366
async def _calculate_td (self , headers : Tuple [BlockHeader , ...]) -> int :
363
367
"""Return the score (total difficulty) of the last header in the given list.
@@ -483,7 +487,8 @@ def _request_block_parts(
483
487
self ,
484
488
target_td : int ,
485
489
headers : List [BlockHeader ],
486
- request_func : Callable [[ETHPeer , List [BlockHeader ]], None ]) -> int :
490
+ request_func : Callable [['ETHPeer' , List [BlockHeader ]], None ]) -> int :
491
+ from trinity .protocol .eth .peer import ETHPeer # noqa: F811
487
492
peers = self .peer_pool .get_peers (target_td )
488
493
if not peers :
489
494
raise NoEligiblePeers ()
@@ -493,13 +498,13 @@ def _request_block_parts(
493
498
request_func (cast (ETHPeer , peer ), batch )
494
499
return len (batches )
495
500
496
- def _send_get_block_bodies (self , peer : ETHPeer , headers : List [BlockHeader ]) -> None :
501
+ def _send_get_block_bodies (self , peer : ' ETHPeer' , headers : List [BlockHeader ]) -> None :
497
502
block_numbers = ", " .join (str (h .block_number ) for h in headers )
498
503
self .logger .debug (
499
504
"Requesting %d block bodies (%s) to %s" , len (headers ), block_numbers , peer )
500
505
peer .sub_proto .send_get_block_bodies ([header .hash for header in headers ])
501
506
502
- def _send_get_receipts (self , peer : ETHPeer , headers : List [BlockHeader ]) -> None :
507
+ def _send_get_receipts (self , peer : ' ETHPeer' , headers : List [BlockHeader ]) -> None :
503
508
block_numbers = ", " .join (str (h .block_number ) for h in headers )
504
509
self .logger .debug (
505
510
"Requesting %d block receipts (%s) to %s" , len (headers ), block_numbers , peer )
@@ -527,47 +532,54 @@ def request_receipts(self, target_td: int, headers: List[BlockHeader]) -> int:
527
532
528
533
async def _handle_msg (self , peer : HeaderRequestingPeer , cmd : protocol .Command ,
529
534
msg : protocol ._DecodedMsgType ) -> None :
535
+ from trinity .protocol .eth .peer import ETHPeer # noqa: F811
536
+ from trinity .protocol .eth import commands
537
+ from trinity .protocol .eth import (
538
+ constants as eth_constants ,
539
+ )
540
+
530
541
peer = cast (ETHPeer , peer )
531
- if isinstance (cmd , eth .BlockBodies ):
542
+
543
+ if isinstance (cmd , commands .BlockBodies ):
532
544
await self ._handle_block_bodies (peer , list (cast (Tuple [BlockBody ], msg )))
533
- elif isinstance (cmd , eth .Receipts ):
545
+ elif isinstance (cmd , commands .Receipts ):
534
546
await self ._handle_block_receipts (peer , cast (List [List [Receipt ]], msg ))
535
- elif isinstance (cmd , eth .NewBlock ):
547
+ elif isinstance (cmd , commands .NewBlock ):
536
548
await self ._handle_new_block (peer , cast (Dict [str , Any ], msg ))
537
- elif isinstance (cmd , eth .GetBlockHeaders ):
549
+ elif isinstance (cmd , commands .GetBlockHeaders ):
538
550
await self ._handle_get_block_headers (peer , cast (Dict [str , Any ], msg ))
539
- elif isinstance (cmd , eth .BlockHeaders ):
551
+ elif isinstance (cmd , commands .BlockHeaders ):
540
552
# `BlockHeaders` messages are handled at the peer level.
541
553
pass
542
- elif isinstance (cmd , eth .GetBlockBodies ):
543
- # Only serve up to eth. MAX_BODIES_FETCH items in every request.
544
- block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_BODIES_FETCH ]
554
+ elif isinstance (cmd , commands .GetBlockBodies ):
555
+ # Only serve up to MAX_BODIES_FETCH items in every request.
556
+ block_hashes = cast (List [Hash32 ], msg )[:eth_constants .MAX_BODIES_FETCH ]
545
557
await self ._handler .handle_get_block_bodies (peer , block_hashes )
546
- elif isinstance (cmd , eth .GetReceipts ):
547
- # Only serve up to eth. MAX_RECEIPTS_FETCH items in every request.
548
- block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_RECEIPTS_FETCH ]
558
+ elif isinstance (cmd , commands .GetReceipts ):
559
+ # Only serve up to MAX_RECEIPTS_FETCH items in every request.
560
+ block_hashes = cast (List [Hash32 ], msg )[:eth_constants .MAX_RECEIPTS_FETCH ]
549
561
await self ._handler .handle_get_receipts (peer , block_hashes )
550
- elif isinstance (cmd , eth .GetNodeData ):
551
- # Only serve up to eth. MAX_STATE_FETCH items in every request.
552
- node_hashes = cast (List [Hash32 ], msg )[:eth .MAX_STATE_FETCH ]
562
+ elif isinstance (cmd , commands .GetNodeData ):
563
+ # Only serve up to MAX_STATE_FETCH items in every request.
564
+ node_hashes = cast (List [Hash32 ], msg )[:eth_constants .MAX_STATE_FETCH ]
553
565
await self ._handler .handle_get_node_data (peer , node_hashes )
554
- elif isinstance (cmd , eth .Transactions ):
566
+ elif isinstance (cmd , commands .Transactions ):
555
567
# Transactions msgs are handled by our TxPool service.
556
568
pass
557
- elif isinstance (cmd , eth .NodeData ):
569
+ elif isinstance (cmd , commands .NodeData ):
558
570
# When doing a chain sync we never send GetNodeData requests, so peers should not send
559
571
# us NodeData msgs.
560
572
self .logger .warn ("Unexpected NodeData msg from %s, disconnecting" , peer )
561
573
await peer .disconnect (DisconnectReason .bad_protocol )
562
574
else :
563
575
self .logger .debug ("%s msg not handled yet, need to be implemented" , cmd )
564
576
565
- async def _handle_new_block (self , peer : ETHPeer , msg : Dict [str , Any ]) -> None :
577
+ async def _handle_new_block (self , peer : ' ETHPeer' , msg : Dict [str , Any ]) -> None :
566
578
self ._sync_requests .put_nowait (peer )
567
579
568
580
async def _handle_block_receipts (self ,
569
- peer : ETHPeer ,
570
- receipts_by_block : List [List [eth . Receipt ]]) -> None :
581
+ peer : ' ETHPeer' ,
582
+ receipts_by_block : List [List [Receipt ]]) -> None :
571
583
self .logger .debug ("Got Receipts for %d blocks from %s" , len (receipts_by_block ), peer )
572
584
iterator = map (make_trie_root_and_nodes , receipts_by_block )
573
585
# The map() call above is lazy (it returns an iterator! ;-), so it's only evaluated in
@@ -582,7 +594,7 @@ async def _handle_block_receipts(self,
582
594
self ._downloaded_receipts .put_nowait ((peer , downloaded ))
583
595
584
596
async def _handle_block_bodies (self ,
585
- peer : ETHPeer ,
597
+ peer : ' ETHPeer' ,
586
598
bodies : List [BlockBody ]) -> None :
587
599
self .logger .debug ("Got Bodies for %d blocks from %s" , len (bodies ), peer )
588
600
iterator = map (make_trie_root_and_nodes , [body .transactions for body in bodies ])
@@ -601,10 +613,12 @@ async def _handle_block_bodies(self,
601
613
602
614
async def _handle_get_block_headers (
603
615
self ,
604
- peer : ETHPeer ,
616
+ peer : ' ETHPeer' ,
605
617
query : Dict [str , Any ]) -> None :
618
+ from trinity .protocol .eth .requests import HeaderRequest # noqa: F811
619
+
606
620
self .logger .debug ("Peer %s made header request: %s" , peer , query )
607
- request = eth . HeaderRequest (
621
+ request = HeaderRequest (
608
622
query ['block_number_or_hash' ],
609
623
query ['max_headers' ],
610
624
query ['skip' ],
@@ -626,7 +640,7 @@ class RegularChainSyncer(FastChainSyncer):
626
640
_seal_check_random_sample_rate = 1
627
641
628
642
async def _handle_block_receipts (
629
- self , peer : ETHPeer , receipts_by_block : List [List [eth . Receipt ]]) -> None :
643
+ self , peer : ' ETHPeer' , receipts_by_block : List [List [Receipt ]]) -> None :
630
644
# When doing a regular sync we never request receipts.
631
645
self .logger .warn ("Unexpected BlockReceipts msg from %s, disconnecting" , peer )
632
646
await peer .disconnect (DisconnectReason .bad_protocol )
@@ -675,7 +689,7 @@ def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken)
675
689
self .logger = logger
676
690
self .cancel_token = token
677
691
678
- async def handle_get_block_bodies (self , peer : ETHPeer , block_hashes : List [Hash32 ]) -> None :
692
+ async def handle_get_block_bodies (self , peer : ' ETHPeer' , block_hashes : List [Hash32 ]) -> None :
679
693
self .logger .trace ("%s requested bodies for %d blocks" , peer , len (block_hashes ))
680
694
chaindb = cast ('AsyncChainDB' , self .db )
681
695
bodies = []
@@ -692,7 +706,7 @@ async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32
692
706
self .logger .trace ("Replying to %s with %d block bodies" , peer , len (bodies ))
693
707
peer .sub_proto .send_block_bodies (bodies )
694
708
695
- async def handle_get_receipts (self , peer : ETHPeer , block_hashes : List [Hash32 ]) -> None :
709
+ async def handle_get_receipts (self , peer : ' ETHPeer' , block_hashes : List [Hash32 ]) -> None :
696
710
self .logger .trace ("%s requested receipts for %d blocks" , peer , len (block_hashes ))
697
711
chaindb = cast ('AsyncChainDB' , self .db )
698
712
receipts = []
@@ -708,7 +722,7 @@ async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -
708
722
self .logger .trace ("Replying to %s with receipts for %d blocks" , peer , len (receipts ))
709
723
peer .sub_proto .send_receipts (receipts )
710
724
711
- async def handle_get_node_data (self , peer : ETHPeer , node_hashes : List [Hash32 ]) -> None :
725
+ async def handle_get_node_data (self , peer : ' ETHPeer' , node_hashes : List [Hash32 ]) -> None :
712
726
self .logger .trace ("%s requested %d trie nodes" , peer , len (node_hashes ))
713
727
chaindb = cast ('AsyncChainDB' , self .db )
714
728
nodes = []
@@ -723,7 +737,7 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -
723
737
peer .sub_proto .send_node_data (nodes )
724
738
725
739
async def lookup_headers (self ,
726
- request : protocol . BaseHeaderRequest ) -> Tuple [BlockHeader , ...]:
740
+ request : ' BaseHeaderRequest' ) -> Tuple [BlockHeader , ...]:
727
741
"""
728
742
Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items
729
743
between each, in reverse order if :reverse: is True.
@@ -744,7 +758,7 @@ async def lookup_headers(self,
744
758
return headers
745
759
746
760
async def _get_block_numbers_for_request (self ,
747
- request : protocol . BaseHeaderRequest
761
+ request : ' BaseHeaderRequest'
748
762
) -> Tuple [BlockNumber , ...]:
749
763
"""
750
764
Generate the block numbers for a given `HeaderRequest`.
@@ -823,6 +837,8 @@ def _test() -> None:
823
837
from tests .p2p .integration_test_helpers import (
824
838
FakeAsyncChainDB , FakeAsyncMainnetChain , FakeAsyncRopstenChain , FakeAsyncHeaderDB ,
825
839
connect_to_peers_loop )
840
+ from trinity .protocol .eth .peer import ETHPeer # noqa: F811
841
+ from trinity .protocol .les .peer import LESPeer # noqa: F811
826
842
from trinity .utils .chains import load_nodekey
827
843
828
844
parser = argparse .ArgumentParser ()
0 commit comments