20
20
Dict ,
21
21
Iterator ,
22
22
List ,
23
+ NamedTuple ,
23
24
Set ,
24
25
TYPE_CHECKING ,
25
26
Tuple ,
@@ -209,7 +210,7 @@ async def send_sub_proto_handshake(self) -> None:
209
210
210
211
@abstractmethod
211
212
async def process_sub_proto_handshake (
212
- self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
213
+ self , cmd : protocol .Command , msg : protocol .PayloadType ) -> None :
213
214
raise NotImplementedError ("Must be implemented by subclasses" )
214
215
215
216
@contextlib .contextmanager
@@ -365,7 +366,7 @@ async def _run(self) -> None:
365
366
self .logger .debug ("%s disconnected: %s" , self , e )
366
367
return
367
368
368
- async def read_msg (self ) -> Tuple [protocol .Command , protocol ._DecodedMsgType ]:
369
+ async def read_msg (self ) -> Tuple [protocol .Command , protocol .PayloadType ]:
369
370
header_data = await self .read (HEADER_LEN + MAC_LEN )
370
371
header = self .decrypt_header (header_data )
371
372
frame_size = self .get_frame_size (header )
@@ -392,7 +393,7 @@ async def read_msg(self) -> Tuple[protocol.Command, protocol._DecodedMsgType]:
392
393
self .received_msgs [cmd ] += 1
393
394
return cmd , decoded_msg
394
395
395
- def handle_p2p_msg (self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
396
+ def handle_p2p_msg (self , cmd : protocol .Command , msg : protocol .PayloadType ) -> None :
396
397
"""Handle the base protocol (P2P) messages."""
397
398
if isinstance (cmd , Disconnect ):
398
399
msg = cast (Dict [str , Any ], msg )
@@ -406,12 +407,12 @@ def handle_p2p_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -
406
407
else :
407
408
raise UnexpectedMessage ("Unexpected msg: {} ({})" .format (cmd , msg ))
408
409
409
- def handle_sub_proto_msg (self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
410
+ def handle_sub_proto_msg (self , cmd : protocol .Command , msg : protocol .PayloadType ) -> None :
410
411
cmd_type = type (cmd )
411
412
412
413
if self ._subscribers :
413
414
was_added = tuple (
414
- subscriber .add_msg ((self , cmd , msg ))
415
+ subscriber .add_msg (PeerMessage (self , cmd , msg ))
415
416
for subscriber
416
417
in self ._subscribers
417
418
)
@@ -424,14 +425,14 @@ def handle_sub_proto_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgT
424
425
else :
425
426
self .logger .warn ("Peer %s has no subscribers, discarding %s msg" , self , cmd )
426
427
427
- def process_msg (self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
428
+ def process_msg (self , cmd : protocol .Command , msg : protocol .PayloadType ) -> None :
428
429
if cmd .is_base_protocol :
429
430
self .handle_p2p_msg (cmd , msg )
430
431
else :
431
432
self .handle_sub_proto_msg (cmd , msg )
432
433
433
434
async def process_p2p_handshake (
434
- self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
435
+ self , cmd : protocol .Command , msg : protocol .PayloadType ) -> None :
435
436
msg = cast (Dict [str , Any ], msg )
436
437
if not isinstance (cmd , Hello ):
437
438
await self .disconnect (DisconnectReason .bad_protocol )
@@ -563,8 +564,14 @@ def __hash__(self) -> int:
563
564
return hash (self .remote )
564
565
565
566
567
+ class PeerMessage (NamedTuple ):
568
+ peer : BasePeer
569
+ command : protocol .Command
570
+ payload : protocol .PayloadType
571
+
572
+
566
573
class PeerSubscriber (ABC ):
567
- _msg_queue : 'asyncio.Queue[PEER_MSG_TYPE ]' = None
574
+ _msg_queue : 'asyncio.Queue[PeerMessage ]' = None
568
575
569
576
@property
570
577
@abstractmethod
@@ -609,7 +616,7 @@ def deregister_peer(self, peer: BasePeer) -> None:
609
616
pass
610
617
611
618
@property
612
- def msg_queue (self ) -> 'asyncio.Queue[PEER_MSG_TYPE ]' :
619
+ def msg_queue (self ) -> 'asyncio.Queue[PeerMessage ]' :
613
620
if self ._msg_queue is None :
614
621
self ._msg_queue = asyncio .Queue (maxsize = self .msg_queue_maxsize )
615
622
return self ._msg_queue
@@ -618,26 +625,29 @@ def msg_queue(self) -> 'asyncio.Queue[PEER_MSG_TYPE]':
618
625
def queue_size (self ) -> int :
619
626
return self .msg_queue .qsize ()
620
627
621
- def add_msg (self , msg : 'PEER_MSG_TYPE' ) -> bool :
628
+ def add_msg (self , msg : PeerMessage ) -> bool :
622
629
peer , cmd , _ = msg
623
630
624
631
if not self .is_subscription_command (type (cmd )):
625
- self .logger .trace ( # type: ignore
626
- "Discarding %s msg from %s; not subscribed to msg type; "
627
- "subscriptions: %s" ,
628
- cmd , peer , self .subscription_msg_types ,
629
- )
632
+ if hasattr (self , 'logger' ):
633
+ self .logger .trace ( # type: ignore
634
+ "Discarding %s msg from %s; not subscribed to msg type; "
635
+ "subscriptions: %s" ,
636
+ cmd , peer , self .subscription_msg_types ,
637
+ )
630
638
return False
631
639
632
640
try :
633
- self .logger .trace ( # type: ignore
634
- "Adding %s msg from %s to queue; queue_size=%d" , cmd , peer , self .queue_size )
641
+ if hasattr (self , 'logger' ):
642
+ self .logger .trace ( # type: ignore
643
+ "Adding %s msg from %s to queue; queue_size=%d" , cmd , peer , self .queue_size )
635
644
self .msg_queue .put_nowait (msg )
636
645
return True
637
646
except asyncio .queues .QueueFull :
638
- self .logger .warn ( # type: ignore
639
- "%s msg queue is full; discarding %s msg from %s" ,
640
- self .__class__ .__name__ , cmd , peer )
647
+ if hasattr (self , 'logger' ):
648
+ self .logger .warn ( # type: ignore
649
+ "%s msg queue is full; discarding %s msg from %s" ,
650
+ self .__class__ .__name__ , cmd , peer )
641
651
return False
642
652
643
653
@contextlib .contextmanager
@@ -663,7 +673,7 @@ class MsgBuffer(PeerSubscriber):
663
673
subscription_msg_types = {protocol .Command }
664
674
665
675
@to_tuple
666
- def get_messages (self ) -> Iterator ['PEER_MSG_TYPE' ]:
676
+ def get_messages (self ) -> Iterator [PeerMessage ]:
667
677
while not self .msg_queue .empty ():
668
678
yield self .msg_queue .get_nowait ()
669
679
@@ -740,7 +750,7 @@ async def start_peer(self, peer: BasePeer) -> None:
740
750
741
751
def _add_peer (self ,
742
752
peer : BasePeer ,
743
- msgs : Tuple [Tuple [protocol .Command , protocol ._DecodedMsgType ], ...]) -> None :
753
+ msgs : Tuple [Tuple [protocol .Command , protocol .PayloadType ], ...]) -> None :
744
754
"""Add the given peer to the pool.
745
755
746
756
Appart from adding it to our list of connected nodes and adding each of our subscriber's
@@ -753,7 +763,7 @@ def _add_peer(self,
753
763
subscriber .register_peer (peer )
754
764
peer .add_subscriber (subscriber )
755
765
for cmd , msg in msgs :
756
- subscriber .add_msg ((peer , cmd , msg ))
766
+ subscriber .add_msg (PeerMessage (peer , cmd , msg ))
757
767
758
768
async def _run (self ) -> None :
759
769
# FIXME: PeerPool should probably no longer be a BaseService, but for now we're keeping it
@@ -1006,9 +1016,6 @@ def __init__(self,
1006
1016
self .genesis_hash = genesis_hash
1007
1017
1008
1018
1009
- PEER_MSG_TYPE = Tuple [BasePeer , protocol .Command , protocol ._DecodedMsgType ]
1010
-
1011
-
1012
1019
def _test () -> None :
1013
1020
"""
1014
1021
Create a Peer instance connected to a local geth instance and log messages exchanged with it.
0 commit comments