1
1
import asyncio
2
+ from functools import (
3
+ partial ,
4
+ )
2
5
from typing import (
3
6
Any ,
4
7
Callable ,
24
27
)
25
28
26
29
from trie import HexaryTrie
30
+ from trie .exceptions import BadTrieProof
27
31
28
32
from evm .exceptions import (
29
33
BlockNotFound ,
35
39
36
40
from p2p .exceptions import (
37
41
BadLESResponse ,
42
+ NoConnectedPeers ,
43
+ NoEligiblePeers ,
38
44
)
39
45
from p2p .cancel_token import CancelToken
40
46
from p2p import protocol
41
- from p2p .constants import REPLY_TIMEOUT
47
+ from p2p .constants import (
48
+ COMPLETION_TIMEOUT ,
49
+ MAX_REORG_DEPTH ,
50
+ MAX_REQUEST_ATTEMPTS ,
51
+ REPLY_TIMEOUT ,
52
+ )
53
+ from p2p .p2p_proto import (
54
+ DisconnectReason ,
55
+ )
42
56
from p2p .peer import (
43
57
LESPeer ,
44
58
PeerPool ,
47
61
from p2p .rlp import BlockBody
48
62
from p2p .service import (
49
63
BaseService ,
64
+ service_timeout ,
50
65
)
51
66
from p2p .utils import gen_request_id
52
67
@@ -103,11 +118,22 @@ def callback(r: protocol._DecodedMsgType) -> None:
103
118
return cast (Dict [str , Any ], reply )
104
119
105
120
@alru_cache (maxsize = 1024 , cache_exceptions = False )
121
+ @service_timeout (COMPLETION_TIMEOUT )
106
122
async def get_block_header_by_hash (self , block_hash : Hash32 ) -> BlockHeader :
107
- peer = cast (LESPeer , self .peer_pool .highest_td_peer )
108
- return await self ._get_block_header_by_hash (peer , block_hash )
123
+ """
124
+ :param block_hash: hash of the header to retrieve
125
+
126
+ :return: header returned by peer
127
+
128
+ :raise NoEligiblePeers: if no peers are available to fulfill the request
129
+ :raise TimeoutError: if an individual request or the overall process times out
130
+ """
131
+ return await self ._retry_on_bad_response (
132
+ partial (self ._get_block_header_by_hash , block_hash )
133
+ )
109
134
110
135
@alru_cache (maxsize = 1024 , cache_exceptions = False )
136
+ @service_timeout (COMPLETION_TIMEOUT )
111
137
async def get_block_body_by_hash (self , block_hash : Hash32 ) -> BlockBody :
112
138
peer = cast (LESPeer , self .peer_pool .highest_td_peer )
113
139
self .logger .debug ("Fetching block %s from %s" , encode_hex (block_hash ), peer )
@@ -121,6 +147,7 @@ async def get_block_body_by_hash(self, block_hash: Hash32) -> BlockBody:
121
147
# TODO add a get_receipts() method to BaseChain API, and dispatch to this, as needed
122
148
123
149
@alru_cache (maxsize = 1024 , cache_exceptions = False )
150
+ @service_timeout (COMPLETION_TIMEOUT )
124
151
async def get_receipts (self , block_hash : Hash32 ) -> List [Receipt ]:
125
152
peer = cast (LESPeer , self .peer_pool .highest_td_peer )
126
153
self .logger .debug ("Fetching %s receipts from %s" , encode_hex (block_hash ), peer )
@@ -135,25 +162,158 @@ async def get_receipts(self, block_hash: Hash32) -> List[Receipt]:
135
162
# request accounts and code (and storage?)
136
163
137
164
@alru_cache (maxsize = 1024 , cache_exceptions = False )
165
+ @service_timeout (COMPLETION_TIMEOUT )
138
166
async def get_account (self , block_hash : Hash32 , address : Address ) -> Account :
139
- peer = cast (LESPeer , self .peer_pool .highest_td_peer )
167
+ return await self ._retry_on_bad_response (
168
+ partial (self ._get_account_from_peer , block_hash , address )
169
+ )
170
+
171
+ async def _get_account_from_peer (
172
+ self ,
173
+ block_hash : Hash32 ,
174
+ address : Address ,
175
+ peer : LESPeer ) -> Account :
140
176
key = keccak (address )
141
177
proof = await self ._get_proof (peer , block_hash , account_key = b'' , key = key )
142
- header = await self ._get_block_header_by_hash (peer , block_hash )
143
- rlp_account = HexaryTrie .get_from_proof (header .state_root , key , proof )
178
+ header = await self ._get_block_header_by_hash (block_hash , peer )
179
+ try :
180
+ rlp_account = HexaryTrie .get_from_proof (header .state_root , key , proof )
181
+ except BadTrieProof as exc :
182
+ raise BadLESResponse ("Peer %s returned an invalid proof for account %s at block %s" % (
183
+ peer ,
184
+ encode_hex (address ),
185
+ encode_hex (block_hash ),
186
+ )) from exc
144
187
return rlp .decode (rlp_account , sedes = Account )
145
188
146
189
@alru_cache (maxsize = 1024 , cache_exceptions = False )
147
- async def get_contract_code (self , block_hash : Hash32 , key : bytes ) -> bytes :
148
- peer = cast (LESPeer , self .peer_pool .highest_td_peer )
190
+ @service_timeout (COMPLETION_TIMEOUT )
191
+ async def get_contract_code (self , block_hash : Hash32 , address : Address ) -> bytes :
192
+ """
193
+ :param block_hash: find code as of the block with block_hash
194
+ :param address: which contract to look up
195
+
196
+ :return: bytecode of the contract, ``b''`` if no code is set
197
+
198
+ :raise NoEligiblePeers: if no peers are available to fulfill the request
199
+ :raise TimeoutError: if an individual request or the overall process times out
200
+ """
201
+ # get account for later verification, and
202
+ # to confirm that our highest total difficulty peer has the info
203
+ try :
204
+ account = await self .get_account (block_hash , address )
205
+ except HeaderNotFound as exc :
206
+ raise NoEligiblePeers ("Our best peer does not have header %s" % block_hash ) from exc
207
+
208
+ code_hash = account .code_hash
209
+
210
+ return await self ._retry_on_bad_response (
211
+ partial (self ._get_contract_code_from_peer , block_hash , address , code_hash )
212
+ )
213
+
214
+ async def _get_contract_code_from_peer (
215
+ self ,
216
+ block_hash : Hash32 ,
217
+ address : Address ,
218
+ code_hash : Hash32 ,
219
+ peer : LESPeer ) -> bytes :
220
+ """
221
+ A single attempt to get the contract code from the given peer
222
+
223
+ :raise BadLESResponse: if the peer replies with contract code that does not match the
224
+ account's code hash
225
+ """
226
+ # request contract code
149
227
request_id = gen_request_id ()
150
- peer .sub_proto .send_get_contract_code (block_hash , key , request_id )
228
+ peer .sub_proto .send_get_contract_code (block_hash , keccak ( address ) , request_id )
151
229
reply = await self ._wait_for_reply (request_id )
230
+
152
231
if not reply ['codes' ]:
153
- return b''
154
- return reply ['codes' ][0 ]
232
+ bytecode = b''
233
+ else :
234
+ bytecode = reply ['codes' ][0 ]
235
+
236
+ # validate bytecode against a proven account
237
+ if code_hash == keccak (bytecode ):
238
+ return bytecode
239
+ elif bytecode == b'' :
240
+ await self ._raise_for_empty_code (block_hash , address , code_hash , peer )
241
+ # The following is added for mypy linting:
242
+ raise RuntimeError ("Unreachable, _raise_for_empty_code must raise its own exception" )
243
+ else :
244
+ # a bad-acting peer sent an invalid non-empty bytecode
245
+ raise BadLESResponse ("Peer %s sent code %s that did not match hash %s in account %s" % (
246
+ peer ,
247
+ encode_hex (bytecode ),
248
+ encode_hex (code_hash ),
249
+ encode_hex (address ),
250
+ ))
251
+
252
+ async def _raise_for_empty_code (
253
+ self ,
254
+ block_hash : Hash32 ,
255
+ address : Address ,
256
+ code_hash : Hash32 ,
257
+ peer : LESPeer ) -> None :
258
+ """
259
+ A peer might return b'' if it doesn't have the block at the requested header,
260
+ or it might maliciously return b'' when the code is non-empty. This method tries to tell the
261
+ difference.
262
+
263
+ This method MUST raise an exception, it's trying to determine the appropriate one.
264
+
265
+ :raise BadLESResponse: if peer seems to be maliciously responding with invalid empty code
266
+ :raise NoEligiblePeers: if peer might simply not have the code available
267
+ """
268
+ try :
269
+ header = await self ._get_block_header_by_hash (block_hash , peer )
270
+ except HeaderNotFound :
271
+ # We presume that the current peer is the best peer. Because
272
+ # our best peer doesn't have the header we want, there are no eligible peers.
273
+ raise NoEligiblePeers ("Our best peer does not have the header %s" % block_hash )
155
274
156
- async def _get_block_header_by_hash (self , peer : LESPeer , block_hash : Hash32 ) -> BlockHeader :
275
+ head_number = peer .head_info .block_number
276
+ if head_number - header .block_number > MAX_REORG_DEPTH :
277
+ # The peer claims to be far ahead of the header we requested
278
+ if self .headerdb .get_canonical_block_hash (header .block_number ) == block_hash :
279
+ # Our node believes that the header at the reference hash is canonical,
280
+ # so treat the peer as malicious
281
+ raise BadLESResponse (
282
+ "Peer %s sent empty code that did not match hash %s in account %s" % (
283
+ peer ,
284
+ encode_hex (code_hash ),
285
+ encode_hex (address ),
286
+ )
287
+ )
288
+ else :
289
+ # our header isn't canonical, so treat the empty response as missing data
290
+ raise NoEligiblePeers (
291
+ "Our best peer does not have the non-canonical header %s" % block_hash
292
+ )
293
+ elif head_number - header .block_number < 0 :
294
+ # The peer claims to be behind the header we requested, but somehow served it to us.
295
+ # Odd, it might be a race condition. Treat as if there are no eligible peers for now.
296
+ raise NoEligiblePeers ("Our best peer's head does include header %s" % block_hash )
297
+ else :
298
+ # The peer is ahead of the current block header, but only by a bit. It might be on
299
+ # an uncle, or we might be. So we can't tell the difference between missing and
300
+ # malicious. We don't want to aggressively drop this peer, so treat the code as missing.
301
+ raise NoEligiblePeers (
302
+ "Peer %s claims to be ahead of %s, but returned empty code with hash %s. "
303
+ "It is on number %d, maybe an uncle. Retry with an older block hash." % (
304
+ peer ,
305
+ header ,
306
+ code_hash ,
307
+ head_number ,
308
+ )
309
+ )
310
+
311
+ async def _get_block_header_by_hash (self , block_hash : Hash32 , peer : LESPeer ) -> BlockHeader :
312
+ """
313
+ A single attempt to get the block header from the given peer.
314
+
315
+ :raise BadLESResponse: if the peer replies with a header that has a different hash
316
+ """
157
317
self .logger .debug ("Fetching header %s from %s" , encode_hex (block_hash ), peer )
158
318
request_id = gen_request_id ()
159
319
max_headers = 1
@@ -178,3 +338,27 @@ async def _get_proof(self,
178
338
peer .sub_proto .send_get_proof (block_hash , account_key , key , from_level , request_id )
179
339
reply = await self ._wait_for_reply (request_id )
180
340
return reply ['proof' ]
341
+
342
+ async def _retry_on_bad_response (self , make_request_to_peer : Callable [[LESPeer ], Any ]) -> Any :
343
+ """
344
+ Make a call to a peer. If it behaves badly, drop it and retry with a different peer.
345
+
346
+ :param make_request_to_peer: an abstract call to a peer that may raise a BadLESResponse
347
+
348
+ :raise NoEligiblePeers: if no peers are available to fulfill the request
349
+ :raise TimeoutError: if an individual request or the overall process times out
350
+ """
351
+ for _ in range (MAX_REQUEST_ATTEMPTS ):
352
+ try :
353
+ peer = cast (LESPeer , self .peer_pool .highest_td_peer )
354
+ except NoConnectedPeers as exc :
355
+ raise NoEligiblePeers () from exc
356
+
357
+ try :
358
+ return await make_request_to_peer (peer )
359
+ except BadLESResponse as exc :
360
+ self .logger .warn ("Disconnecting from peer, because: %s" , exc )
361
+ await peer .disconnect (DisconnectReason .subprotocol_error )
362
+ # reattempt after removing this peer from our pool
363
+
364
+ raise TimeoutError ("Could not complete peer request in %d attempts" % MAX_REQUEST_ATTEMPTS )
0 commit comments