7
7
import operator
8
8
import random
9
9
import struct
10
- import time
11
10
from abc import (
12
11
ABC ,
13
12
abstractmethod
@@ -776,8 +775,6 @@ async def ensure_same_side_on_dao_fork(
776
775
wait for that we may receive other messages from the peer, which are returned so that they
777
776
can be re-added to our subscribers' queues when the peer is finally added to the pool.
778
777
"""
779
- from trinity .protocol .common .commands import BaseBlockHeaders
780
- msgs = []
781
778
for start_block , vm_class in self .vm_configuration :
782
779
if not issubclass (vm_class , HomesteadVM ):
783
780
continue
@@ -790,47 +787,40 @@ async def ensure_same_side_on_dao_fork(
790
787
start_block = vm_class .dao_fork_block_number - 1
791
788
# TODO: This can be either an `ETHPeer` or an `LESPeer`. Will be
792
789
# fixed once full awaitable request API is completed.
793
- request = peer .request_block_headers ( # type: ignore
794
- start_block ,
795
- max_headers = 2 ,
796
- reverse = False ,
797
- )
798
- start = time .time ()
799
790
try :
800
- while True :
801
- elapsed = int (time .time () - start )
802
- remaining_timeout = max (0 , CHAIN_SPLIT_CHECK_TIMEOUT - elapsed )
803
- cmd , msg = await self .wait (
804
- peer .read_msg (), timeout = remaining_timeout )
805
- if isinstance (cmd , BaseBlockHeaders ):
806
- headers = cmd .extract_headers (msg )
807
- break
808
- else :
809
- msgs .append ((cmd , msg ))
810
- continue
791
+ class MsgBuffer (PeerSubscriber ):
792
+ logger = logging .getLogger ('p2p.peer.MsgBuffer' )
793
+ msg_queue_maxsize = 200
794
+ subscription_msg_types = {protocol .Command }
795
+
796
+ msg_buffer = MsgBuffer ()
797
+
798
+ with msg_buffer .subscribe_peer (peer ):
799
+ headers = await peer .handler .get_block_headers ( # type: ignore
800
+ start_block ,
801
+ max_headers = 2 ,
802
+ reverse = False ,
803
+ timeout = CHAIN_SPLIT_CHECK_TIMEOUT ,
804
+ )
805
+
806
+ msgs = [msg_buffer .msg_queue .get_nowait ()[1 :] for _ in range (msg_buffer .queue_size )]
807
+
811
808
except (TimeoutError , PeerConnectionLost ) as err :
812
809
raise DAOForkCheckFailure (
813
- "Timed out waiting for DAO fork header from {}: {}" .format (peer , err ))
810
+ "Timed out waiting for DAO fork header from {}: {}" .format (peer , err )
811
+ ) from err
814
812
except MalformedMessage as err :
815
813
raise DAOForkCheckFailure (
816
814
"Malformed message while doing DAO fork check with {0}: {1}" .format (
817
815
peer , err ,
818
816
)
819
817
) from err
820
-
821
- try :
822
- request .validate_headers (headers )
823
818
except ValidationError as err :
824
819
raise DAOForkCheckFailure (
825
820
"Invalid header response during DAO fork check: {}" .format (err )
826
- )
821
+ ) from err
827
822
828
- if len (headers ) != 2 :
829
- raise DAOForkCheckFailure (
830
- "Peer failed to return all requested headers for DAO fork check"
831
- )
832
- else :
833
- parent , header = headers
823
+ parent , header = headers
834
824
835
825
try :
836
826
vm_class .validate_header (header , parent , check_seal = True )
0 commit comments