@@ -381,6 +381,24 @@ def handle_sub_proto_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgT
381
381
else :
382
382
self .logger .warn ("Peer %s has no subscribers, discarding %s msg" , self , cmd )
383
383
384
+ cmd_type = type (cmd )
385
+
386
+ if cmd_type in self .pending_requests :
387
+ request , future = self .pending_requests [cmd_type ]
388
+ try :
389
+ request .validate_response (msg )
390
+ except ValidationError as err :
391
+ self .logger .debug (
392
+ "Response validation failure for pending %s request from peer %s: %s" ,
393
+ cmd_type .__name__ ,
394
+ self ,
395
+ err ,
396
+ )
397
+ pass
398
+ else :
399
+ future .set_result (msg )
400
+ self .pending_requests .pop (cmd_type )
401
+
384
402
def process_msg (self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
385
403
if cmd .is_base_protocol :
386
404
self .handle_p2p_msg (cmd , msg )
@@ -531,21 +549,11 @@ def max_headers_fetch(self) -> int:
531
549
return les .MAX_HEADERS_FETCH
532
550
533
551
def handle_sub_proto_msg (self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
534
- cmd_type = type (cmd )
535
-
536
552
if isinstance (cmd , les .Announce ):
537
553
self .head_info = cmd .as_head_info (msg )
538
554
self .head_td = self .head_info .total_difficulty
539
555
self .head_hash = self .head_info .block_hash
540
- elif cmd_type in self .pending_requests :
541
- request , future = self .pending_requests [cmd_type ]
542
- try :
543
- request .validate_response (msg )
544
- except ValidationError :
545
- pass
546
- else :
547
- future .set_result (msg )
548
- self .pending_requests .pop (cmd_type )
556
+
549
557
super ().handle_sub_proto_msg (cmd , msg )
550
558
551
559
async def send_sub_proto_handshake (self ) -> None :
@@ -603,14 +611,26 @@ def request_block_headers(self,
603
611
604
612
async def wait_for_block_headers (self , request : les .HeaderRequest ) -> Tuple [BlockHeader , ...]:
605
613
future : 'asyncio.Future[protocol._DecodedMsgType]' = asyncio .Future ()
614
+ if les .BlockHeaders in self .pending_requests :
615
+ # the `finally` block below should prevent this from happening, but
616
+ # were two requests to the same peer to be fired off at the same
617
+ # time, this will prevent us from overwriting the first one.
618
+ raise ValueError (
619
+ "There is already a pending `BlockHeaders` request for peer {0}" .format (self )
620
+ )
606
621
self .pending_requests [les .BlockHeaders ] = cast (
607
622
Tuple [protocol .BaseRequest , 'asyncio.Future[protocol._DecodedMsgType]' ],
608
623
(request , future ),
609
624
)
610
- response = cast (
611
- Dict [str , Any ],
612
- await self .wait (future , timeout = self ._response_timeout ),
613
- )
625
+ try :
626
+ response = cast (
627
+ Dict [str , Any ],
628
+ await self .wait (future , timeout = self ._response_timeout ),
629
+ )
630
+ finally :
631
+ # We always want to be sure that this method cleans up the
632
+ # `pending_requests` so that we don't end up in a situation.
633
+ self .pending_requests .pop (les .BlockHeaders , None )
614
634
return cast (Tuple [BlockHeader , ...], response ['headers' ])
615
635
616
636
async def get_block_headers (self ,
@@ -631,8 +651,6 @@ def max_headers_fetch(self) -> int:
631
651
return eth .MAX_HEADERS_FETCH
632
652
633
653
def handle_sub_proto_msg (self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
634
- cmd_type = type (cmd )
635
-
636
654
if isinstance (cmd , eth .NewBlock ):
637
655
msg = cast (Dict [str , Any ], msg )
638
656
header , _ , _ = msg ['block' ]
@@ -642,16 +660,6 @@ def handle_sub_proto_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgT
642
660
self .head_hash = actual_head
643
661
self .head_td = actual_td
644
662
645
- if cmd_type in self .pending_requests :
646
- request , future = self .pending_requests [cmd_type ]
647
- try :
648
- request .validate_response (msg )
649
- except ValidationError :
650
- pass
651
- else :
652
- future .set_result (msg )
653
- self .pending_requests .pop (cmd_type )
654
-
655
663
super ().handle_sub_proto_msg (cmd , msg )
656
664
657
665
async def send_sub_proto_handshake (self ) -> None :
0 commit comments