@@ -191,6 +191,7 @@ def __init__(self,
191
191
self ._subscribers : List [PeerSubscriber ] = []
192
192
self .start_time = datetime .datetime .now ()
193
193
self .received_msgs : Dict [protocol .Command , int ] = collections .defaultdict (int )
194
+ self .booted = asyncio .Event ()
194
195
195
196
self .egress_mac = egress_mac
196
197
self .ingress_mac = ingress_mac
@@ -344,7 +345,84 @@ def is_closing(self) -> bool:
344
345
async def _cleanup (self ) -> None :
345
346
self .close ()
346
347
348
+ async def boot (self ) -> None :
349
+ if not self .events .started .is_set ():
350
+ raise RuntimeError ("Cannot boot a Peer which has not been started." )
351
+
352
+ try :
353
+ await self ._boot ()
354
+ except OperationCancelled :
355
+ # If a cancellation happens during boot we suppress it here and
356
+ # simply exit without setting the `booted` event.
357
+ return
358
+ else :
359
+ self .booted .set ()
360
+
361
+ async def _boot (self ) -> None :
362
+ try :
363
+ await self .ensure_same_side_on_dao_fork ()
364
+ except DAOForkCheckFailure as err :
365
+ self .logger .debug ("DAO fork check with %s failed: %s" , self , err )
366
+ await self .disconnect (DisconnectReason .useless_peer )
367
+ raise
368
+ except Exception as err :
369
+ self .logger .exception ('ERROR BOOTING' )
370
+ raise
371
+
372
+ vm_configuration : Tuple [Tuple [int , Type [BaseVM ]], ...] = None
373
+
374
+ async def ensure_same_side_on_dao_fork (self ) -> None :
375
+ """Ensure we're on the same side of the DAO fork
376
+
377
+ In order to do that we have to request the DAO fork block and its parent, but while we
378
+ wait for that we may receive other messages from the peer, which are returned so that they
379
+ can be re-added to our subscribers' queues when the peer is finally added to the pool.
380
+ """
381
+ for start_block , vm_class in self .vm_configuration :
382
+ if not issubclass (vm_class , HomesteadVM ):
383
+ continue
384
+ elif not vm_class .support_dao_fork :
385
+ break
386
+ elif start_block > vm_class .dao_fork_block_number :
387
+ # VM comes after the fork, so stop checking
388
+ break
389
+
390
+ start_block = vm_class .dao_fork_block_number - 1
391
+
392
+ try :
393
+ headers = await self .requests .get_block_headers ( # type: ignore
394
+ start_block ,
395
+ max_headers = 2 ,
396
+ reverse = False ,
397
+ timeout = CHAIN_SPLIT_CHECK_TIMEOUT ,
398
+ )
399
+
400
+ except (TimeoutError , PeerConnectionLost ) as err :
401
+ raise DAOForkCheckFailure (
402
+ f"Timed out waiting for DAO fork header from { self } : { err } "
403
+ ) from err
404
+ except ValidationError as err :
405
+ raise DAOForkCheckFailure (
406
+ f"Invalid header response during DAO fork check: { err } "
407
+ ) from err
408
+
409
+ if len (headers ) != 2 :
410
+ raise DAOForkCheckFailure (
411
+ f"Peer { self } failed to return DAO fork check headers"
412
+ )
413
+ else :
414
+ parent , header = headers
415
+
416
+ try :
417
+ vm_class .validate_header (header , parent , check_seal = True )
418
+ except ValidationError as err :
419
+ raise DAOForkCheckFailure (f"Peer failed DAO fork check validation: { err } " )
420
+
347
421
async def _run (self ) -> None :
422
+ # The `boot` process is run in the background to allow the `run` loop
423
+ # to continue so that all of the Peer APIs can be used within the
424
+ # `boot` task.
425
+ self .run_task (self .boot ())
348
426
while self .is_operational :
349
427
try :
350
428
cmd , msg = await self .read_msg ()
@@ -742,25 +820,26 @@ def unsubscribe(self, subscriber: PeerSubscriber) -> None:
742
820
peer .remove_subscriber (subscriber )
743
821
744
822
async def start_peer (self , peer : BasePeer ) -> None :
823
+ # TODO: temporary hack until all of this EVM stuff can be fully
824
+ # removed from the BasePeer and PeerPool classes.
825
+ peer .vm_configuration = self .vm_configuration
826
+
745
827
self .run_child_service (peer )
746
828
await self .wait (peer .events .started .wait (), timeout = 1 )
747
829
try :
748
- # Although connect() may seem like a more appropriate place to perform the DAO fork
749
- # check, we do it here because we want to perform it for incoming peer connections as
750
- # well.
751
830
with peer .collect_sub_proto_messages () as buffer :
752
- await self .ensure_same_side_on_dao_fork (peer )
753
- except DAOForkCheckFailure as err :
754
- self .logger .debug ("DAO fork check with %s failed: %s" , peer , err )
755
- await peer .disconnect (DisconnectReason .useless_peer )
831
+ # TODO: update to use a more generic timeout
832
+ await self .wait (peer .booted .wait (), timeout = CHAIN_SPLIT_CHECK_TIMEOUT )
833
+ except TimeoutError as err :
834
+ self .logger .debug ('Timout waiting for peer to boot: %s' , err )
835
+ await peer .disconnect (DisconnectReason .timeout )
756
836
return
757
837
else :
758
- msgs = tuple ((cmd , msg ) for _ , cmd , msg in buffer .get_messages ())
759
- self ._add_peer (peer , msgs )
838
+ self ._add_peer (peer , buffer .get_messages ())
760
839
761
840
def _add_peer (self ,
762
841
peer : BasePeer ,
763
- msgs : Tuple [Tuple [ protocol . Command , protocol . PayloadType ] , ...]) -> None :
842
+ msgs : Tuple [PeerMessage , ...]) -> None :
764
843
"""Add the given peer to the pool.
765
844
766
845
Appart from adding it to our list of connected nodes and adding each of our subscriber's
@@ -772,8 +851,8 @@ def _add_peer(self,
772
851
for subscriber in self ._subscribers :
773
852
subscriber .register_peer (peer )
774
853
peer .add_subscriber (subscriber )
775
- for cmd , msg in msgs :
776
- subscriber .add_msg (PeerMessage ( peer , cmd , msg ) )
854
+ for msg in msgs :
855
+ subscriber .add_msg (msg )
777
856
778
857
async def _run (self ) -> None :
779
858
# FIXME: PeerPool should probably no longer be a BaseService, but for now we're keeping it
@@ -848,54 +927,6 @@ async def connect_to_nodes(self, nodes: Iterator[Node]) -> None:
848
927
if peer is not None :
849
928
await self .start_peer (peer )
850
929
851
- async def ensure_same_side_on_dao_fork (
852
- self , peer : BasePeer ) -> None :
853
- """Ensure we're on the same side of the DAO fork as the given peer.
854
-
855
- In order to do that we have to request the DAO fork block and its parent, but while we
856
- wait for that we may receive other messages from the peer, which are returned so that they
857
- can be re-added to our subscribers' queues when the peer is finally added to the pool.
858
- """
859
- for start_block , vm_class in self .vm_configuration :
860
- if not issubclass (vm_class , HomesteadVM ):
861
- continue
862
- elif not vm_class .support_dao_fork :
863
- break
864
- elif start_block > vm_class .dao_fork_block_number :
865
- # VM comes after the fork, so stop checking
866
- break
867
-
868
- start_block = vm_class .dao_fork_block_number - 1
869
-
870
- try :
871
- headers = await peer .requests .get_block_headers ( # type: ignore
872
- start_block ,
873
- max_headers = 2 ,
874
- reverse = False ,
875
- timeout = CHAIN_SPLIT_CHECK_TIMEOUT ,
876
- )
877
-
878
- except (TimeoutError , PeerConnectionLost ) as err :
879
- raise DAOForkCheckFailure (
880
- f"Timed out waiting for DAO fork header from { peer } : { err } "
881
- ) from err
882
- except ValidationError as err :
883
- raise DAOForkCheckFailure (
884
- f"Invalid header response during DAO fork check: { err } "
885
- ) from err
886
-
887
- if len (headers ) != 2 :
888
- raise DAOForkCheckFailure (
889
- f"Peer { peer } failed to return DAO fork check headers"
890
- )
891
- else :
892
- parent , header = headers
893
-
894
- try :
895
- vm_class .validate_header (header , parent , check_seal = True )
896
- except ValidationError as err :
897
- raise DAOForkCheckFailure (f"Peer failed DAO fork check validation: { err } " )
898
-
899
930
def _peer_finished (self , peer : BaseService ) -> None :
900
931
"""Remove the given peer from our list of connected nodes.
901
932
This is passed as a callback to be called when a peer finishes.
0 commit comments