@@ -355,10 +355,18 @@ async def read_msg(self) -> Tuple[protocol.Command, protocol._DecodedMsgType]:
355
355
# too much time is being spent on this again, we need to consider running this in a
356
356
# ProcessPoolExecutor(). Need to make sure we don't use all CPUs in the machine for that,
357
357
# though, otherwise asyncio's event loop can't run and we can't keep up with other peers.
358
- decoded_msg = cast (Dict [str , Any ], cmd .decode (msg ))
359
- self .logger .trace ("Successfully decoded %s msg: %s" , cmd , decoded_msg )
360
- self .received_msgs [cmd ] += 1
361
- return cmd , decoded_msg
358
+ try :
359
+ decoded_msg = cast (Dict [str , Any ], cmd .decode (msg ))
360
+ except MalformedMessage as err :
361
+ self .logger .debug (
362
+ "Malformed message from peer %s: CMD:%s Error: %r" ,
363
+ self , type (cmd ).__name__ , err ,
364
+ )
365
+ raise
366
+ else :
367
+ self .logger .trace ("Successfully decoded %s msg: %s" , cmd , decoded_msg )
368
+ self .received_msgs [cmd ] += 1
369
+ return cmd , decoded_msg
362
370
363
371
def handle_p2p_msg (self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
364
372
"""Handle the base protocol (P2P) messages."""
@@ -837,10 +845,14 @@ async def start_peer(self, peer: BasePeer) -> None:
837
845
# check, we do it here because we want to perform it for incoming peer connections as
838
846
# well.
839
847
msgs = await self .ensure_same_side_on_dao_fork (peer )
840
- except DAOForkCheckFailure as e :
841
- self .logger .debug ("DAO fork check with %s failed: %s" , peer , e )
848
+ except DAOForkCheckFailure as err :
849
+ self .logger .debug ("DAO fork check with %s failed: %s" , peer , err )
842
850
await peer .disconnect (DisconnectReason .useless_peer )
843
851
return
852
+ except MalformedMessage as err :
853
+ self .logger .debug ("DAO fork check with %s failed: %s" , peer , err )
854
+ await peer .disconnect (DisconnectReason .bad_protocol )
855
+ return
844
856
asyncio .ensure_future (peer .run (finished_callback = self ._peer_finished ))
845
857
self ._add_peer (peer , msgs )
846
858
@@ -962,9 +974,9 @@ async def ensure_same_side_on_dao_fork(
962
974
else :
963
975
msgs .append ((cmd , msg ))
964
976
continue
965
- except (TimeoutError , PeerConnectionLost ) as e :
977
+ except (TimeoutError , PeerConnectionLost ) as err :
966
978
raise DAOForkCheckFailure (
967
- "Timed out waiting for DAO fork header from {}: {}" .format (peer , e ))
979
+ "Timed out waiting for DAO fork header from {}: {}" .format (peer , err ))
968
980
969
981
try :
970
982
request .validate_headers (headers )
0 commit comments