|
36 | 36 | from p2p import protocol
|
37 | 37 | from p2p import eth
|
38 | 38 | from p2p import les
|
39 |
| -from p2p.cancel_token import CancelToken |
| 39 | +from p2p.cancel_token import CancelToken, wait_with_token |
40 | 40 | from p2p.constants import MAX_REORG_DEPTH
|
41 | 41 | from p2p.exceptions import NoEligiblePeers, OperationCancelled
|
42 | 42 | from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerPoolSubscriber
|
@@ -255,56 +255,6 @@ def _handle_block_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
|
255 | 255 | "Got BlockHeaders from %d to %d", headers[0].block_number, headers[-1].block_number)
|
256 | 256 | self._new_headers.put_nowait(headers)
|
257 | 257 |
|
258 |
| - async def _get_block_numbers_for_request( |
259 |
| - self, block_number_or_hash: Union[int, bytes], max_headers: int, skip: int, |
260 |
| - reverse: bool) -> List[BlockNumber]: |
261 |
| - """ |
262 |
| - Generates the block numbers requested, subject to local availability. |
263 |
| - """ |
264 |
| - block_number_or_hash = block_number_or_hash |
265 |
| - if isinstance(block_number_or_hash, bytes): |
266 |
| - header = await self.wait( |
267 |
| - self.db.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)), |
268 |
| - ) |
269 |
| - block_number = header.block_number |
270 |
| - elif isinstance(block_number_or_hash, int): |
271 |
| - block_number = block_number_or_hash |
272 |
| - else: |
273 |
| - raise TypeError( |
274 |
| - "Unexpected type for 'block_number_or_hash': %s", |
275 |
| - type(block_number_or_hash), |
276 |
| - ) |
277 |
| - |
278 |
| - limit = max(max_headers, eth.MAX_HEADERS_FETCH) |
279 |
| - step = skip + 1 |
280 |
| - if reverse: |
281 |
| - low = max(0, block_number - limit) |
282 |
| - high = block_number + 1 |
283 |
| - block_numbers = reversed(range(low, high, step)) |
284 |
| - else: |
285 |
| - low = block_number |
286 |
| - high = block_number + limit |
287 |
| - block_numbers = iter(range(low, high, step)) # mypy thinks range isn't iterable |
288 |
| - return list(block_numbers) |
289 |
| - |
290 |
| - async def _generate_available_headers( |
291 |
| - self, |
292 |
| - block_numbers: List[BlockNumber]) -> AsyncGenerator[BlockHeader, None]: |
293 |
| - """ |
294 |
| - Generates the headers requested, halting on the first header that is not locally available. |
295 |
| - """ |
296 |
| - for block_num in block_numbers: |
297 |
| - try: |
298 |
| - yield await self.wait( |
299 |
| - self.db.coro_get_canonical_block_header_by_number(block_num) |
300 |
| - ) |
301 |
| - except HeaderNotFound: |
302 |
| - self.logger.debug( |
303 |
| - "Peer requested header number %s that is unavailable, stopping search.", |
304 |
| - block_num, |
305 |
| - ) |
306 |
| - break |
307 |
| - |
308 | 258 | @abstractmethod
|
309 | 259 | async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
|
310 | 260 | msg: protocol._DecodedMsgType) -> None:
|
@@ -335,19 +285,9 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
|
335 | 285 | async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) -> None:
|
336 | 286 | self.logger.debug("Peer %s made header request: %s", peer, msg)
|
337 | 287 | query = msg['query']
|
338 |
| - try: |
339 |
| - block_numbers = await self._get_block_numbers_for_request( |
340 |
| - query.block_number_or_hash, query.max_headers, |
341 |
| - query.skip, query.reverse) |
342 |
| - except HeaderNotFound: |
343 |
| - self.logger.debug( |
344 |
| - "Peer %r requested starting header %r that is unavailable, returning nothing", |
345 |
| - peer, |
346 |
| - query.block_number_or_hash, |
347 |
| - ) |
348 |
| - block_numbers = [] |
349 |
| - |
350 |
| - headers = [header async for header in self._generate_available_headers(block_numbers)] |
| 288 | + headers = await lookup_headers( |
| 289 | + self.db, query.block_number_or_hash, query.max_headers, |
| 290 | + query.skip, query.reverse, self.logger, self.cancel_token) |
351 | 291 | peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=msg['request_id'])
|
352 | 292 |
|
353 | 293 | async def _process_headers(
|
@@ -598,19 +538,9 @@ async def _handle_get_block_headers(
|
598 | 538 | header_request: Dict[str, Any]) -> None:
|
599 | 539 | self.logger.debug("Peer %s made header request: %s", peer, header_request)
|
600 | 540 |
|
601 |
| - try: |
602 |
| - block_numbers = await self._get_block_numbers_for_request( |
603 |
| - header_request['block_number_or_hash'], header_request['max_headers'], |
604 |
| - header_request['skip'], header_request['reverse']) |
605 |
| - except HeaderNotFound: |
606 |
| - self.logger.debug( |
607 |
| - "Peer %r requested starting header %r that is unavailable, returning nothing", |
608 |
| - peer, |
609 |
| - header_request['block_number_or_hash'], |
610 |
| - ) |
611 |
| - block_numbers = [] |
612 |
| - |
613 |
| - headers = [header async for header in self._generate_available_headers(block_numbers)] |
| 541 | + headers = await lookup_headers( |
| 542 | + self.db, header_request['block_number_or_hash'], header_request['max_headers'], |
| 543 | + header_request['skip'], header_request['reverse'], self.logger, self.cancel_token) |
614 | 544 | peer.sub_proto.send_block_headers(headers)
|
615 | 545 |
|
616 | 546 |
|
@@ -749,6 +679,83 @@ def _is_receipts_empty(header: BlockHeader) -> bool:
|
749 | 679 | return header.receipt_root == BLANK_ROOT_HASH
|
750 | 680 |
|
751 | 681 |
|
| 682 | +async def _get_block_numbers_for_request( |
| 683 | + headerdb: 'AsyncHeaderDB', block_number_or_hash: Union[int, bytes], max_headers: int, |
| 684 | + skip: int, reverse: bool, token: CancelToken) -> List[BlockNumber]: |
| 685 | + """ |
| 686 | + Generates the block numbers requested, subject to local availability. |
| 687 | + """ |
| 688 | + block_number_or_hash = block_number_or_hash |
| 689 | + if isinstance(block_number_or_hash, bytes): |
| 690 | + header = await wait_with_token( |
| 691 | + headerdb.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)), |
| 692 | + token=token, |
| 693 | + ) |
| 694 | + block_number = header.block_number |
| 695 | + elif isinstance(block_number_or_hash, int): |
| 696 | + block_number = block_number_or_hash |
| 697 | + else: |
| 698 | + raise TypeError( |
| 699 | + "Unexpected type for 'block_number_or_hash': %s", |
| 700 | + type(block_number_or_hash), |
| 701 | + ) |
| 702 | + |
| 703 | + limit = max(max_headers, eth.MAX_HEADERS_FETCH) |
| 704 | + step = skip + 1 |
| 705 | + if reverse: |
| 706 | + low = max(0, block_number - limit) |
| 707 | + high = block_number + 1 |
| 708 | + block_numbers = reversed(range(low, high, step)) |
| 709 | + else: |
| 710 | + low = block_number |
| 711 | + high = block_number + limit |
| 712 | + block_numbers = iter(range(low, high, step)) # mypy thinks range isn't iterable |
| 713 | + return list(block_numbers) |
| 714 | + |
| 715 | + |
| 716 | +async def _generate_available_headers( |
| 717 | + headerdb: 'AsyncHeaderDB', |
| 718 | + block_numbers: List[BlockNumber], |
| 719 | + logger: logging.Logger, |
| 720 | + token: CancelToken) -> AsyncGenerator[BlockHeader, None]: |
| 721 | + """ |
| 722 | + Generates the headers requested, halting on the first header that is not locally available. |
| 723 | + """ |
| 724 | + for block_num in block_numbers: |
| 725 | + try: |
| 726 | + yield await wait_with_token( |
| 727 | + headerdb.coro_get_canonical_block_header_by_number(block_num), |
| 728 | + token=token |
| 729 | + ) |
| 730 | + except HeaderNotFound: |
| 731 | + logger.debug( |
| 732 | + "Peer requested header number %s that is unavailable, stopping search.", |
| 733 | + block_num, |
| 734 | + ) |
| 735 | + break |
| 736 | + |
| 737 | + |
| 738 | +async def lookup_headers( |
| 739 | + headerdb: 'AsyncHeaderDB', block_number_or_hash: Union[int, bytes], max_headers: int, |
| 740 | + skip: int, reverse: bool, logger: logging.Logger, token: CancelToken) -> List[BlockHeader]: |
| 741 | + """ |
| 742 | + Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items between |
| 743 | + each, in reverse order if :reverse: is True. |
| 744 | + """ |
| 745 | + try: |
| 746 | + block_numbers = await _get_block_numbers_for_request( |
| 747 | + headerdb, block_number_or_hash, max_headers, skip, reverse, token) |
| 748 | + except HeaderNotFound: |
| 749 | + logger.debug( |
| 750 | + "Peer requested starting header %r that is unavailable, returning nothing", |
| 751 | + block_number_or_hash) |
| 752 | + block_numbers = [] |
| 753 | + |
| 754 | + headers = [header async for header in _generate_available_headers( |
| 755 | + headerdb, block_numbers, logger, token)] |
| 756 | + return headers |
| 757 | + |
| 758 | + |
752 | 759 | def _test() -> None:
|
753 | 760 | import argparse
|
754 | 761 | import signal
|
|
0 commit comments