43
43
from p2p import les
44
44
from p2p .cancellable import CancellableMixin
45
45
from p2p .constants import MAX_REORG_DEPTH , SEAL_CHECK_RANDOM_SAMPLE_RATE
46
- from p2p .exceptions import NoEligiblePeers
46
+ from p2p .exceptions import NoEligiblePeers , ValidationError
47
47
from p2p .p2p_proto import DisconnectReason
48
- from p2p .peer import BasePeer , ETHPeer , LESPeer , HeaderRequest , PeerPool , PeerSubscriber
48
+ from p2p .peer import BasePeer , ETHPeer , LESPeer , PeerPool , PeerSubscriber
49
49
from p2p .rlp import BlockBody
50
50
from p2p .service import BaseService
51
51
from p2p .utils import (
@@ -91,7 +91,6 @@ def __init__(self,
91
91
self ._syncing = False
92
92
self ._sync_complete = asyncio .Event ()
93
93
self ._sync_requests : asyncio .Queue [HeaderRequestingPeer ] = asyncio .Queue ()
94
- self ._new_headers : asyncio .Queue [Tuple [BlockHeader , ...]] = asyncio .Queue ()
95
94
self ._executor = get_asyncio_executor ()
96
95
97
96
@property
@@ -207,7 +206,7 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
207
206
self .logger .warn ("Timeout waiting for header batch from %s, aborting sync" , peer )
208
207
await peer .disconnect (DisconnectReason .timeout )
209
208
break
210
- except ValueError as err :
209
+ except ValidationError as err :
211
210
self .logger .warn (
212
211
"Invalid header response sent by peer %s disconnecting: %s" ,
213
212
peer , err ,
@@ -253,47 +252,37 @@ async def _fetch_missing_headers(
253
252
self , peer : HeaderRequestingPeer , start_at : int ) -> Tuple [BlockHeader , ...]:
254
253
"""Fetch a batch of headers starting at start_at and return the ones we're missing."""
255
254
self .logger .debug ("Fetching chain segment starting at #%d" , start_at )
256
- request = peer .request_block_headers (
255
+
256
+ headers = await peer .get_block_headers (
257
257
start_at ,
258
258
peer .max_headers_fetch ,
259
259
skip = 0 ,
260
260
reverse = False ,
261
261
)
262
262
263
- # Pass the peer's token to self.wait() because we want to abort if either we
264
- # or the peer terminates.
265
- headers = tuple (await self .wait (
266
- self ._new_headers .get (),
267
- token = peer .cancel_token ,
268
- timeout = self ._reply_timeout ))
269
-
270
- # check that the response headers are a valid match for our
271
- # requested headers.
272
- request .validate_headers (headers )
273
-
274
- # the inner list comprehension is required to get python to evaluate
275
- # the asynchronous comprehension
276
- missing_headers = tuple ([
277
- header
278
- for header
279
- in headers
280
- if not (await self .wait (self .db .coro_header_exists (header .hash )))
281
- ])
282
- if len (missing_headers ) != len (headers ):
283
- self .logger .debug (
284
- "Discarding %d / %d headers that we already have" ,
285
- len (headers ) - len (missing_headers ),
286
- len (headers ),
287
- )
288
- return headers
289
-
290
- def _handle_block_headers (self , headers : Tuple [BlockHeader , ...]) -> None :
291
- if not headers :
292
- self .logger .warn ("Got an empty BlockHeaders msg" )
293
- return
294
- self .logger .debug (
295
- "Got BlockHeaders from %d to %d" , headers [0 ].block_number , headers [- 1 ].block_number )
296
- self ._new_headers .put_nowait (headers )
263
+ # We only want headers that are missing, so we iterate over the list
264
+ # until we find the first missing header, after which we return all of
265
+ # the remaining headers.
266
+ async def get_missing_tail (self : 'BaseHeaderChainSyncer' ,
267
+ headers : Tuple [BlockHeader , ...]
268
+ ) -> AsyncGenerator [BlockHeader , None ]:
269
+ iter_headers = iter (headers )
270
+ for header in iter_headers :
271
+ is_missing = not await self .wait (self .db .coro_header_exists (header .hash ))
272
+ if is_missing :
273
+ yield header
274
+ break
275
+ else :
276
+ self .logger .debug ("Discarding header that we already have: %s" , header )
277
+
278
+ for header in iter_headers :
279
+ yield header
280
+
281
+ # The inner list comprehension is needed because async_generators
282
+ # cannot be cast to a tuple.
283
+ tail_headers = tuple ([header async for header in get_missing_tail (self , headers )])
284
+
285
+ return tail_headers
297
286
298
287
@abstractmethod
299
288
async def _handle_msg (self , peer : HeaderRequestingPeer , cmd : protocol .Command ,
@@ -313,9 +302,6 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
313
302
msg : protocol ._DecodedMsgType ) -> None :
314
303
if isinstance (cmd , les .Announce ):
315
304
self ._sync_requests .put_nowait (peer )
316
- elif isinstance (cmd , les .BlockHeaders ):
317
- msg = cast (Dict [str , Any ], msg )
318
- self ._handle_block_headers (tuple (cast (Tuple [BlockHeader , ...], msg ['headers' ])))
319
305
elif isinstance (cmd , les .GetBlockHeaders ):
320
306
msg = cast (Dict [str , Any ], msg )
321
307
await self ._handle_get_block_headers (cast (LESPeer , peer ), msg )
@@ -324,15 +310,16 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
324
310
325
311
async def _handle_get_block_headers (self , peer : LESPeer , msg : Dict [str , Any ]) -> None :
326
312
self .logger .debug ("Peer %s made header request: %s" , peer , msg )
327
- request = HeaderRequest (
313
+ request = les . HeaderRequest (
328
314
msg ['query' ].block_number_or_hash ,
329
315
msg ['query' ].max_headers ,
330
316
msg ['query' ].skip ,
331
317
msg ['query' ].reverse ,
318
+ msg ['request_id' ],
332
319
)
333
320
headers = await self ._handler .lookup_headers (request )
334
321
self .logger .trace ("Replying to %s with %d headers" , peer , len (headers ))
335
- peer .sub_proto .send_block_headers (headers , buffer_value = 0 , request_id = msg [ ' request_id' ] )
322
+ peer .sub_proto .send_block_headers (headers , buffer_value = 0 , request_id = request . request_id )
336
323
337
324
async def _process_headers (
338
325
self , peer : HeaderRequestingPeer , headers : Tuple [BlockHeader , ...]) -> int :
@@ -538,9 +525,7 @@ def request_receipts(self, target_td: int, headers: List[BlockHeader]) -> int:
538
525
async def _handle_msg (self , peer : HeaderRequestingPeer , cmd : protocol .Command ,
539
526
msg : protocol ._DecodedMsgType ) -> None :
540
527
peer = cast (ETHPeer , peer )
541
- if isinstance (cmd , eth .BlockHeaders ):
542
- self ._handle_block_headers (tuple (cast (Tuple [BlockHeader , ...], msg )))
543
- elif isinstance (cmd , eth .BlockBodies ):
528
+ if isinstance (cmd , eth .BlockBodies ):
544
529
await self ._handle_block_bodies (peer , list (cast (Tuple [BlockBody ], msg )))
545
530
elif isinstance (cmd , eth .Receipts ):
546
531
await self ._handle_block_receipts (peer , cast (List [List [Receipt ]], msg ))
@@ -613,7 +598,7 @@ async def _handle_get_block_headers(
613
598
peer : ETHPeer ,
614
599
query : Dict [str , Any ]) -> None :
615
600
self .logger .debug ("Peer %s made header request: %s" , peer , query )
616
- request = HeaderRequest (
601
+ request = eth . HeaderRequest (
617
602
query ['block_number_or_hash' ],
618
603
query ['max_headers' ],
619
604
query ['skip' ],
@@ -732,7 +717,7 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -
732
717
peer .sub_proto .send_node_data (nodes )
733
718
734
719
async def lookup_headers (self ,
735
- request : HeaderRequest ) -> Tuple [BlockHeader , ...]:
720
+ request : protocol . BaseHeaderRequest ) -> Tuple [BlockHeader , ...]:
736
721
"""
737
722
Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items
738
723
between each, in reverse order if :reverse: is True.
@@ -753,7 +738,8 @@ async def lookup_headers(self,
753
738
return headers
754
739
755
740
async def _get_block_numbers_for_request (self ,
756
- request : HeaderRequest ) -> Tuple [BlockNumber , ...]:
741
+ request : protocol .BaseHeaderRequest
742
+ ) -> Tuple [BlockNumber , ...]:
757
743
"""
758
744
Generate the block numbers for a given `HeaderRequest`.
759
745
"""
0 commit comments