Skip to content

Commit 04f3dd4

Browse files
committed
Implement HeaderRequest object for GetHeaders p2p messages
1 parent 87819a1 commit 04f3dd4

File tree

12 files changed

+434
-134
lines changed

12 files changed

+434
-134
lines changed

p2p/chain.py

Lines changed: 88 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,10 @@
4242
from p2p.constants import MAX_REORG_DEPTH, SEAL_CHECK_RANDOM_SAMPLE_RATE
4343
from p2p.exceptions import NoEligiblePeers, OperationCancelled
4444
from p2p.p2p_proto import DisconnectReason
45-
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerSubscriber
45+
from p2p.peer import BasePeer, ETHPeer, LESPeer, HeaderRequest, PeerPool, PeerSubscriber
4646
from p2p.rlp import BlockBody
4747
from p2p.service import BaseService
4848
from p2p.utils import (
49-
get_block_numbers_for_request,
5049
get_asyncio_executor,
5150
Timer,
5251
)
@@ -205,6 +204,13 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
205204
self.logger.warn("Timeout waiting for header batch from %s, aborting sync", peer)
206205
await peer.disconnect(DisconnectReason.timeout)
207206
break
207+
except ValidationError as err:
208+
self.logger.warn(
209+
"Invalid header response sent by peer %s disconnecting: %s",
210+
peer, err,
211+
)
212+
await peer.disconnect(DisconnectReason.useless_peer)
213+
break
208214

209215
if not headers:
210216
self.logger.info("Got no new headers from %s, aborting sync", peer)
@@ -244,22 +250,39 @@ async def _fetch_missing_headers(
244250
self, peer: HeaderRequestingPeer, start_at: int) -> Tuple[BlockHeader, ...]:
245251
"""Fetch a batch of headers starting at start_at and return the ones we're missing."""
246252
self.logger.debug("Fetching chain segment starting at #%d", start_at)
247-
peer.request_block_headers(start_at, peer.max_headers_fetch, reverse=False)
253+
request = peer.request_block_headers(
254+
start_at,
255+
peer.max_headers_fetch,
256+
skip=0,
257+
reverse=False,
258+
)
259+
248260
# Pass the peer's token to self.wait() because we want to abort if either we
249261
# or the peer terminates.
250-
headers = list(await self.wait(
262+
headers = tuple(await self.wait(
251263
self._new_headers.get(),
252264
token=peer.cancel_token,
253265
timeout=self._reply_timeout))
254-
for header in headers.copy():
255-
try:
256-
await self.wait(self.db.coro_get_block_header_by_hash(header.hash))
257-
except HeaderNotFound:
258-
break
259-
else:
260-
self.logger.debug("Discarding %s as we already have it", header)
261-
headers.remove(header)
262-
return tuple(headers)
266+
267+
# check that the response headers are a valid match for our
268+
# requested headers.
269+
request.validate_headers(headers)
270+
271+
# the inner list comprehension is required to get python to evaluate
272+
# the asynchronous comprehension
273+
missing_headers = tuple([
274+
header
275+
for header
276+
in headers
277+
if not (await self.wait(self.db.coro_header_exists(header.hash)))
278+
])
279+
if len(missing_headers) != len(headers):
280+
self.logger.debug(
281+
"Discarding %d / %d headers that we already have",
282+
len(headers) - len(missing_headers),
283+
len(headers),
284+
)
285+
return headers
263286

264287
def _handle_block_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
265288
if not headers:
@@ -298,9 +321,13 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
298321

299322
async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) -> None:
300323
self.logger.debug("Peer %s made header request: %s", peer, msg)
301-
query = msg['query']
302-
headers = await self._handler.lookup_headers(
303-
query.block_number_or_hash, query.max_headers, query.skip, query.reverse)
324+
request = HeaderRequest(
325+
msg['query'].block_number_or_hash,
326+
msg['query'].max_headers,
327+
msg['query'].skip,
328+
msg['query'].reverse,
329+
)
330+
headers = await self._handler.lookup_headers(request)
304331
self.logger.trace("Replying to %s with %d headers", peer, len(headers))
305332
peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=msg['request_id'])
306333

@@ -581,12 +608,16 @@ async def _handle_block_bodies(self,
581608
async def _handle_get_block_headers(
582609
self,
583610
peer: ETHPeer,
584-
header_request: Dict[str, Any]) -> None:
585-
self.logger.debug("Peer %s made header request: %s", peer, header_request)
586-
587-
headers = await self._handler.lookup_headers(
588-
header_request['block_number_or_hash'], header_request['max_headers'],
589-
header_request['skip'], header_request['reverse'])
611+
query: Dict[str, Any]) -> None:
612+
self.logger.debug("Peer %s made header request: %s", peer, query)
613+
request = HeaderRequest(
614+
query['block_number_or_hash'],
615+
query['max_headers'],
616+
query['skip'],
617+
query['reverse'],
618+
)
619+
620+
headers = await self._handler.lookup_headers(request)
590621
self.logger.trace("Replying to %s with %d headers", peer, len(headers))
591622
peer.sub_proto.send_block_headers(headers)
592623

@@ -697,34 +728,49 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -
697728
self.logger.trace("Replying to %s with %d trie nodes", peer, len(nodes))
698729
peer.sub_proto.send_node_data(nodes)
699730

700-
async def lookup_headers(self, block_number_or_hash: Union[int, bytes], max_headers: int,
701-
skip: int, reverse: bool) -> List[BlockHeader]:
731+
async def lookup_headers(self,
732+
request: HeaderRequest) -> Tuple[BlockHeader, ...]:
702733
"""
703734
Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items
704735
between each, in reverse order if :reverse: is True.
705736
"""
706-
if isinstance(block_number_or_hash, bytes):
707-
try:
708-
header = await self.wait(
709-
self.db.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)))
710-
except HeaderNotFound:
711-
self.logger.debug(
712-
"Peer requested starting header %r that is unavailable, returning nothing",
713-
block_number_or_hash)
714-
return []
715-
block_number = header.block_number
716-
elif isinstance(block_number_or_hash, int):
717-
block_number = block_number_or_hash
737+
try:
738+
block_numbers = await self._get_block_numbers_for_request(request)
739+
except HeaderNotFound:
740+
self.logger.debug(
741+
"Peer requested starting header %r that is unavailable, returning nothing",
742+
request.block_number_or_hash)
743+
block_numbers = tuple() # type: ignore
744+
745+
headers: Tuple[BlockHeader, ...] = tuple([
746+
header
747+
async for header
748+
in self._generate_available_headers(block_numbers)
749+
])
750+
return headers
751+
752+
async def _get_block_numbers_for_request(self,
753+
request: HeaderRequest) -> Tuple[BlockNumber, ...]:
754+
"""
755+
Generate the block numbers for a given `HeaderRequest`.
756+
"""
757+
if isinstance(request.block_number_or_hash, bytes):
758+
header = await self.wait(
759+
self.db.coro_get_block_header_by_hash(cast(Hash32, request.block_number_or_hash)))
760+
return request.generate_block_numbers(header.block_number)
761+
elif isinstance(request.block_number_or_hash, int):
762+
# We don't need to pass in the block number to
763+
# `generate_block_numbers` since the request is based on a numbered
764+
# block identifier.
765+
return request.generate_block_numbers()
718766
else:
719767
raise TypeError(
720-
"Unexpected type for 'block_number_or_hash': %s", type(block_number_or_hash))
721-
722-
block_numbers = get_block_numbers_for_request(block_number, max_headers, skip, reverse)
723-
headers = [header async for header in self._generate_available_headers(block_numbers)]
724-
return headers
768+
"Invariant: unexpected type for 'block_number_or_hash': %s",
769+
type(request.block_number_or_hash),
770+
)
725771

726772
async def _generate_available_headers(
727-
self, block_numbers: Tuple[BlockNumber]) -> AsyncGenerator[BlockHeader, None]:
773+
self, block_numbers: Tuple[BlockNumber, ...]) -> AsyncGenerator[BlockHeader, None]:
728774
"""
729775
Generates the headers requested, halting on the first header that is not locally available.
730776
"""

p2p/eth.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
cast,
44
List,
55
Tuple,
6-
Union,
76
TYPE_CHECKING
87
)
98

109
from rlp import sedes
1110

11+
from eth_typing import (
12+
BlockIdentifier,
13+
)
14+
1215
from eth.rlp.headers import BlockHeader
1316
from eth.rlp.receipts import Receipt
1417
from eth.rlp.transactions import BaseTransactionFields
@@ -27,6 +30,7 @@
2730
ChainInfo
2831
)
2932

33+
3034
# Max number of items we can ask for in ETH requests. These are the values used in geth and if we
3135
# ask for more than this the peers will disconnect from us.
3236
MAX_STATE_FETCH = 384
@@ -145,8 +149,11 @@ def send_node_data(self, nodes: List[bytes]) -> None:
145149
header, body = cmd.encode(nodes)
146150
self.send(header, body)
147151

148-
def send_get_block_headers(self, block_number_or_hash: Union[int, bytes],
149-
max_headers: int, reverse: bool = True
152+
def send_get_block_headers(self,
153+
block_number_or_hash: BlockIdentifier,
154+
max_headers: int,
155+
skip: int,
156+
reverse: bool,
150157
) -> None:
151158
"""Send a GetBlockHeaders msg to the remote.
152159
@@ -159,8 +166,6 @@ def send_get_block_headers(self, block_number_or_hash: Union[int, bytes],
159166
"Cannot ask for more than {} block headers in a single request".format(
160167
MAX_HEADERS_FETCH))
161168
cmd = GetBlockHeaders(self.cmd_id_offset)
162-
# Number of block headers to skip between each item (i.e. step in python APIs).
163-
skip = 0
164169
data = {
165170
'block_number_or_hash': block_number_or_hash,
166171
'max_headers': max_headers,
@@ -169,7 +174,7 @@ def send_get_block_headers(self, block_number_or_hash: Union[int, bytes],
169174
header, body = cmd.encode(data)
170175
self.send(header, body)
171176

172-
def send_block_headers(self, headers: List[BlockHeader]) -> None:
177+
def send_block_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
173178
cmd = BlockHeaders(self.cmd_id_offset)
174179
header, body = cmd.encode(headers)
175180
self.send(header, body)

p2p/les.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
)
1010

1111
from eth_typing import (
12+
BlockIdentifier,
1213
Hash32
1314
)
1415

@@ -314,8 +315,12 @@ def send_get_block_bodies(self, block_hashes: List[bytes], request_id: int) -> N
314315
header, body = GetBlockBodies(self.cmd_id_offset).encode(data)
315316
self.send(header, body)
316317

317-
def send_get_block_headers(self, block_number_or_hash: Union[int, bytes],
318-
max_headers: int, request_id: int, reverse: bool = True
318+
def send_get_block_headers(self,
319+
block_number_or_hash: BlockIdentifier,
320+
max_headers: int,
321+
skip: int,
322+
reverse: bool,
323+
request_id: int,
319324
) -> None:
320325
"""Send a GetBlockHeaders msg to the remote.
321326
@@ -338,7 +343,7 @@ def send_get_block_headers(self, block_number_or_hash: Union[int, bytes],
338343
self.send(header, body)
339344

340345
def send_block_headers(
341-
self, headers: List[BlockHeader], buffer_value: int, request_id: int) -> None:
346+
self, headers: Tuple[BlockHeader, ...], buffer_value: int, request_id: int) -> None:
342347
data = {
343348
'request_id': request_id,
344349
'headers': headers,

p2p/lightchain.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ async def _get_block_header_by_hash(self, block_hash: Hash32, peer: LESPeer) ->
324324
self.logger.debug("Fetching header %s from %s", encode_hex(block_hash), peer)
325325
request_id = gen_request_id()
326326
max_headers = 1
327-
peer.sub_proto.send_get_block_headers(block_hash, max_headers, request_id)
327+
peer.sub_proto.send_get_block_headers(block_hash, max_headers, 0, False, request_id)
328328
reply = await self._wait_for_reply(request_id)
329329
if not reply['headers']:
330330
raise HeaderNotFound("Peer {} has no block with hash {}".format(peer, block_hash))

0 commit comments

Comments
 (0)