5
5
import functools
6
6
import logging
7
7
import operator
8
- import random
9
8
import struct
10
9
from abc import (
11
10
ABC ,
24
23
Set ,
25
24
Tuple ,
26
25
Type ,
27
- TYPE_CHECKING ,
28
26
)
29
27
30
28
import sha3
39
37
40
38
from eth_utils import (
41
39
to_tuple ,
42
- ValidationError ,
43
40
)
44
41
45
- from eth_typing import BlockNumber , Hash32
46
42
47
43
from eth_keys import datatypes
48
44
49
45
from cancel_token import CancelToken , OperationCancelled
50
46
51
- from lahja import (
52
- Endpoint ,
53
- )
54
-
55
- from eth .constants import GENESIS_BLOCK_NUMBER
56
- from eth .rlp .headers import BlockHeader
57
- from eth .vm .base import BaseVM
58
- from eth .vm .forks import HomesteadVM
47
+ from lahja import Endpoint
59
48
60
49
from p2p import auth
61
50
from p2p import protocol
62
51
from p2p .kademlia import Node
63
52
from p2p .exceptions import (
64
53
BadAckMessage ,
65
- DAOForkCheckFailure ,
66
54
DecryptionError ,
67
55
HandshakeFailure ,
68
56
MalformedMessage ,
69
- NoConnectedPeers ,
70
57
NoMatchingPeerCapabilities ,
71
58
PeerConnectionLost ,
72
59
RemoteDisconnected ,
91
78
)
92
79
93
80
from .constants import (
94
- CHAIN_SPLIT_CHECK_TIMEOUT ,
95
81
CONN_IDLE_TIMEOUT ,
96
82
DEFAULT_MAX_PEERS ,
97
83
DEFAULT_PEER_BOOT_TIMEOUT ,
104
90
PeerCountResponse ,
105
91
)
106
92
107
- if TYPE_CHECKING :
108
- from trinity .db .header import BaseAsyncHeaderDB # noqa: F401
109
-
110
93
111
94
async def handshake (remote : Node , factory : 'BasePeerFactory' ) -> 'BasePeer' :
112
95
"""Perform the auth and P2P handshakes with the given remote.
@@ -149,13 +132,6 @@ async def handshake(remote: Node, factory: 'BasePeerFactory') -> 'BasePeer':
149
132
return peer
150
133
151
134
152
- class ChainInfo (NamedTuple ):
153
- block_number : BlockNumber
154
- block_hash : Hash32
155
- total_difficulty : int
156
- genesis_hash : Hash32
157
-
158
-
159
135
class PeerConnection (NamedTuple ):
160
136
reader : asyncio .StreamReader
161
137
writer : asyncio .StreamWriter
@@ -165,80 +141,21 @@ class PeerConnection(NamedTuple):
165
141
ingress_mac : sha3 .keccak_256
166
142
167
143
168
- class PeerBootManager (BaseService ):
144
+ class BasePeerBootManager (BaseService ):
145
+ """
146
+ The default boot manager does nothing, simply serving as a hook for other
147
+ protocols which need to perform more complex boot check.
148
+ """
169
149
def __init__ (self , peer : 'BasePeer' ) -> None :
170
150
super ().__init__ (peer .cancel_token )
171
151
self .peer = peer
172
152
173
153
async def _run (self ) -> None :
174
- try :
175
- await self .ensure_same_side_on_dao_fork ()
176
- except DAOForkCheckFailure as err :
177
- self .logger .debug ("DAO fork check with %s failed: %s" , self .peer , err )
178
- # If we `await` the `peer.disconnect` call, we end up with an
179
- # OperationCancelled exception bubbling. This doesn't actually
180
- # cause anything *bad* to happen, but it does cause the service to
181
- # exit via exception rather than cleanly shutting down. By using
182
- # `run_task`, this service finishes exiting prior to the
183
- # cancellation.
184
- self .run_daemon_task (self .peer .disconnect (DisconnectReason .useless_peer ))
185
-
186
- async def ensure_same_side_on_dao_fork (self ) -> None :
187
- """Ensure we're on the same side of the DAO fork as the given peer.
188
-
189
- In order to do that we have to request the DAO fork block and its parent, but while we
190
- wait for that we may receive other messages from the peer, which are returned so that they
191
- can be re-added to our subscribers' queues when the peer is finally added to the pool.
192
- """
193
- for start_block , vm_class in self .peer .context .vm_configuration :
194
- if not issubclass (vm_class , HomesteadVM ):
195
- continue
196
- elif not vm_class .support_dao_fork :
197
- break
198
- elif start_block > vm_class .dao_fork_block_number :
199
- # VM comes after the fork, so stop checking
200
- break
201
-
202
- start_block = vm_class .dao_fork_block_number - 1
203
-
204
- try :
205
- headers = await self .peer .requests .get_block_headers ( # type: ignore
206
- start_block ,
207
- max_headers = 2 ,
208
- reverse = False ,
209
- timeout = CHAIN_SPLIT_CHECK_TIMEOUT ,
210
- )
211
-
212
- except (TimeoutError , PeerConnectionLost ) as err :
213
- raise DAOForkCheckFailure (
214
- f"Timed out waiting for DAO fork header from { self .peer } : { err } "
215
- ) from err
216
- except ValidationError as err :
217
- raise DAOForkCheckFailure (
218
- f"Invalid header response during DAO fork check: { err } "
219
- ) from err
220
-
221
- if len (headers ) != 2 :
222
- raise DAOForkCheckFailure (
223
- f"{ self .peer } failed to return DAO fork check headers"
224
- )
225
- else :
226
- parent , header = headers
227
-
228
- try :
229
- vm_class .validate_header (header , parent , check_seal = True )
230
- except ValidationError as err :
231
- raise DAOForkCheckFailure (f"{ self .peer } failed DAO fork check validation: { err } " )
154
+ pass
232
155
233
156
234
157
class BasePeerContext :
235
- def __init__ (self ,
236
- headerdb : 'BaseAsyncHeaderDB' ,
237
- network_id : int ,
238
- vm_configuration : Tuple [Tuple [int , Type [BaseVM ]], ...]) -> None :
239
- self .headerdb = headerdb
240
- self .network_id = network_id
241
- self .vm_configuration = vm_configuration
158
+ pass
242
159
243
160
244
161
class BasePeer (BaseService ):
@@ -250,8 +167,6 @@ class BasePeer(BaseService):
250
167
listen_port = 30303
251
168
# Will be set upon the successful completion of a P2P handshake.
252
169
sub_proto : protocol .Protocol = None
253
- head_td : int = None
254
- head_hash : Hash32 = None
255
170
256
171
def __init__ (self ,
257
172
remote : Node ,
@@ -266,10 +181,6 @@ def __init__(self,
266
181
# Any contextual information the peer may need.
267
182
self .context = context
268
183
269
- self .headerdb = context .headerdb
270
- self .network_id = context .network_id
271
- self .vm_configuration = context .vm_configuration
272
-
273
184
# The `Node` that this peer is connected to
274
185
self .remote = remote
275
186
@@ -317,10 +228,10 @@ def get_extra_stats(self) -> List[str]:
317
228
return []
318
229
319
230
@property
320
- def boot_manager_class (self ) -> Type [PeerBootManager ]:
321
- return PeerBootManager
231
+ def boot_manager_class (self ) -> Type [BasePeerBootManager ]:
232
+ return BasePeerBootManager
322
233
323
- def get_boot_manager (self ) -> PeerBootManager :
234
+ def get_boot_manager (self ) -> BasePeerBootManager :
324
235
return self .boot_manager_class (self )
325
236
326
237
@abstractmethod
@@ -402,24 +313,6 @@ async def do_p2p_handshake(self) -> None:
402
313
)
403
314
await self .process_p2p_handshake (cmd , msg )
404
315
405
- @property
406
- async def genesis (self ) -> BlockHeader :
407
- genesis_hash = await self .wait (
408
- self .headerdb .coro_get_canonical_block_hash (BlockNumber (GENESIS_BLOCK_NUMBER )))
409
- return await self .wait (self .headerdb .coro_get_block_header_by_hash (genesis_hash ))
410
-
411
- @property
412
- async def _local_chain_info (self ) -> ChainInfo :
413
- genesis = await self .genesis
414
- head = await self .wait (self .headerdb .coro_get_canonical_head ())
415
- total_difficulty = await self .headerdb .coro_get_score (head .hash )
416
- return ChainInfo (
417
- block_number = head .block_number ,
418
- block_hash = head .hash ,
419
- total_difficulty = total_difficulty ,
420
- genesis_hash = genesis .hash ,
421
- )
422
-
423
316
@property
424
317
def capabilities (self ) -> List [Tuple [str , int ]]:
425
318
return [(klass .name , klass .version ) for klass in self ._supported_sub_protocols ]
@@ -647,23 +540,34 @@ def send(self, header: bytes, body: bytes) -> None:
647
540
return
648
541
self .writer .write (self .encrypt (header , body ))
649
542
650
- async def disconnect (self , reason : DisconnectReason ) -> None :
651
- """Send a disconnect msg to the remote node and stop this Peer.
652
-
653
- Also awaits for self.cancel() to ensure any pending tasks are cleaned up.
654
-
655
- :param reason: An item from the DisconnectReason enum.
656
- """
543
+ def _disconnect (self , reason : DisconnectReason ) -> None :
657
544
if not isinstance (reason , DisconnectReason ):
658
545
raise ValueError (
659
546
f"Reason must be an item of DisconnectReason, got { reason } "
660
547
)
661
548
self .logger .debug ("Disconnecting from remote peer; reason: %s" , reason .name )
662
549
self .base_protocol .send_disconnect (reason .value )
663
550
self .close ()
551
+
552
+ async def disconnect (self , reason : DisconnectReason ) -> None :
553
+ """Send a disconnect msg to the remote node and stop this Peer.
554
+
555
+ Also awaits for self.cancel() to ensure any pending tasks are cleaned up.
556
+
557
+ :param reason: An item from the DisconnectReason enum.
558
+ """
559
+ self ._disconnect (reason )
664
560
if self .is_operational :
665
561
await self .cancel ()
666
562
563
+ def disconnect_nowait (self , reason : DisconnectReason ) -> None :
564
+ """
565
+ Non-coroutine version of `disconnect`
566
+ """
567
+ self ._disconnect (reason )
568
+ if self .is_operational :
569
+ self .cancel_nowait ()
570
+
667
571
def select_sub_protocol (self , remote_capabilities : List [Tuple [bytes , int ]]
668
572
) -> protocol .Protocol :
669
573
"""Select the sub-protocol to use when talking to the remote.
@@ -1031,22 +935,6 @@ def _peer_finished(self, peer: BaseService) -> None:
1031
935
def __aiter__ (self ) -> AsyncIterator [BasePeer ]:
1032
936
return ConnectedPeersIterator (tuple (self .connected_nodes .values ()))
1033
937
1034
- @property
1035
- def highest_td_peer (self ) -> BasePeer :
1036
- peers = tuple (self .connected_nodes .values ())
1037
- if not peers :
1038
- raise NoConnectedPeers ()
1039
- peers_by_td = groupby (operator .attrgetter ('head_td' ), peers )
1040
- max_td = max (peers_by_td .keys ())
1041
- return random .choice (peers_by_td [max_td ])
1042
-
1043
- def get_peers (self , min_td : int ) -> List [BasePeer ]:
1044
- # TODO: Consider turning this into a method that returns an AsyncIterator, to make it
1045
- # harder for callsites to get a list of peers while making blocking calls, as those peers
1046
- # might disconnect in the meantime.
1047
- peers = tuple (self .connected_nodes .values ())
1048
- return [peer for peer in peers if peer .head_td >= min_td ]
1049
-
1050
938
async def _periodically_report_stats (self ) -> None :
1051
939
while self .is_operational :
1052
940
inbound_peers = len (
0 commit comments