Skip to content

Commit ab94167

Browse files
authored
Merge pull request #1063 from ethereum/piper/foundation-for-request-and-response-linking
Implement stronger linkage between requests for headers and their responses
2 parents 87819a1 + 2908e04 commit ab94167

File tree

12 files changed

+440
-137
lines changed

12 files changed

+440
-137
lines changed

p2p/chain.py

Lines changed: 93 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@
2929
BLANK_ROOT_HASH, EMPTY_UNCLE_HASH, GENESIS_BLOCK_NUMBER, GENESIS_PARENT_HASH)
3030
from eth.chains import AsyncChain
3131
from eth.db.trie import make_trie_root_and_nodes
32-
from eth.exceptions import HeaderNotFound, ValidationError
32+
from eth.exceptions import (
33+
HeaderNotFound,
34+
ValidationError as EthValidationError,
35+
)
3336
from eth.rlp.headers import BlockHeader
3437
from eth.rlp.receipts import Receipt
3538
from eth.rlp.transactions import BaseTransaction, BaseTransactionFields
@@ -42,11 +45,10 @@
4245
from p2p.constants import MAX_REORG_DEPTH, SEAL_CHECK_RANDOM_SAMPLE_RATE
4346
from p2p.exceptions import NoEligiblePeers, OperationCancelled
4447
from p2p.p2p_proto import DisconnectReason
45-
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerSubscriber
48+
from p2p.peer import BasePeer, ETHPeer, LESPeer, HeaderRequest, PeerPool, PeerSubscriber
4649
from p2p.rlp import BlockBody
4750
from p2p.service import BaseService
4851
from p2p.utils import (
49-
get_block_numbers_for_request,
5052
get_asyncio_executor,
5153
Timer,
5254
)
@@ -205,6 +207,13 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
205207
self.logger.warn("Timeout waiting for header batch from %s, aborting sync", peer)
206208
await peer.disconnect(DisconnectReason.timeout)
207209
break
210+
except ValueError as err:
211+
self.logger.warn(
212+
"Invalid header response sent by peer %s disconnecting: %s",
213+
peer, err,
214+
)
215+
await peer.disconnect(DisconnectReason.useless_peer)
216+
break
208217

209218
if not headers:
210219
self.logger.info("Got no new headers from %s, aborting sync", peer)
@@ -220,7 +229,7 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
220229
self.logger.debug("Got new header chain starting at #%d", first.block_number)
221230
try:
222231
await self.chain.coro_validate_chain(headers, self._seal_check_random_sample_rate)
223-
except ValidationError as e:
232+
except EthValidationError as e:
224233
self.logger.warn("Received invalid headers from %s, aborting sync: %s", peer, e)
225234
break
226235
try:
@@ -244,22 +253,39 @@ async def _fetch_missing_headers(
244253
self, peer: HeaderRequestingPeer, start_at: int) -> Tuple[BlockHeader, ...]:
245254
"""Fetch a batch of headers starting at start_at and return the ones we're missing."""
246255
self.logger.debug("Fetching chain segment starting at #%d", start_at)
247-
peer.request_block_headers(start_at, peer.max_headers_fetch, reverse=False)
256+
request = peer.request_block_headers(
257+
start_at,
258+
peer.max_headers_fetch,
259+
skip=0,
260+
reverse=False,
261+
)
262+
248263
# Pass the peer's token to self.wait() because we want to abort if either we
249264
# or the peer terminates.
250-
headers = list(await self.wait(
265+
headers = tuple(await self.wait(
251266
self._new_headers.get(),
252267
token=peer.cancel_token,
253268
timeout=self._reply_timeout))
254-
for header in headers.copy():
255-
try:
256-
await self.wait(self.db.coro_get_block_header_by_hash(header.hash))
257-
except HeaderNotFound:
258-
break
259-
else:
260-
self.logger.debug("Discarding %s as we already have it", header)
261-
headers.remove(header)
262-
return tuple(headers)
269+
270+
# check that the response headers are a valid match for our
271+
# requested headers.
272+
request.validate_headers(headers)
273+
274+
# the inner list comprehension is required to get python to evaluate
275+
# the asynchronous comprehension
276+
missing_headers = tuple([
277+
header
278+
for header
279+
in headers
280+
if not (await self.wait(self.db.coro_header_exists(header.hash)))
281+
])
282+
if len(missing_headers) != len(headers):
283+
self.logger.debug(
284+
"Discarding %d / %d headers that we already have",
285+
len(headers) - len(missing_headers),
286+
len(headers),
287+
)
288+
return headers
263289

264290
def _handle_block_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
265291
if not headers:
@@ -298,9 +324,13 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
298324

299325
async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) -> None:
300326
self.logger.debug("Peer %s made header request: %s", peer, msg)
301-
query = msg['query']
302-
headers = await self._handler.lookup_headers(
303-
query.block_number_or_hash, query.max_headers, query.skip, query.reverse)
327+
request = HeaderRequest(
328+
msg['query'].block_number_or_hash,
329+
msg['query'].max_headers,
330+
msg['query'].skip,
331+
msg['query'].reverse,
332+
)
333+
headers = await self._handler.lookup_headers(request)
304334
self.logger.trace("Replying to %s with %d headers", peer, len(headers))
305335
peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=msg['request_id'])
306336

@@ -581,12 +611,16 @@ async def _handle_block_bodies(self,
581611
async def _handle_get_block_headers(
582612
self,
583613
peer: ETHPeer,
584-
header_request: Dict[str, Any]) -> None:
585-
self.logger.debug("Peer %s made header request: %s", peer, header_request)
586-
587-
headers = await self._handler.lookup_headers(
588-
header_request['block_number_or_hash'], header_request['max_headers'],
589-
header_request['skip'], header_request['reverse'])
614+
query: Dict[str, Any]) -> None:
615+
self.logger.debug("Peer %s made header request: %s", peer, query)
616+
request = HeaderRequest(
617+
query['block_number_or_hash'],
618+
query['max_headers'],
619+
query['skip'],
620+
query['reverse'],
621+
)
622+
623+
headers = await self._handler.lookup_headers(request)
590624
self.logger.trace("Replying to %s with %d headers", peer, len(headers))
591625
peer.sub_proto.send_block_headers(headers)
592626

@@ -697,34 +731,49 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -
697731
self.logger.trace("Replying to %s with %d trie nodes", peer, len(nodes))
698732
peer.sub_proto.send_node_data(nodes)
699733

700-
async def lookup_headers(self, block_number_or_hash: Union[int, bytes], max_headers: int,
701-
skip: int, reverse: bool) -> List[BlockHeader]:
734+
async def lookup_headers(self,
735+
request: HeaderRequest) -> Tuple[BlockHeader, ...]:
702736
"""
703737
Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items
704738
between each, in reverse order if :reverse: is True.
705739
"""
706-
if isinstance(block_number_or_hash, bytes):
707-
try:
708-
header = await self.wait(
709-
self.db.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)))
710-
except HeaderNotFound:
711-
self.logger.debug(
712-
"Peer requested starting header %r that is unavailable, returning nothing",
713-
block_number_or_hash)
714-
return []
715-
block_number = header.block_number
716-
elif isinstance(block_number_or_hash, int):
717-
block_number = block_number_or_hash
740+
try:
741+
block_numbers = await self._get_block_numbers_for_request(request)
742+
except HeaderNotFound:
743+
self.logger.debug(
744+
"Peer requested starting header %r that is unavailable, returning nothing",
745+
request.block_number_or_hash)
746+
block_numbers = tuple() # type: ignore
747+
748+
headers: Tuple[BlockHeader, ...] = tuple([
749+
header
750+
async for header
751+
in self._generate_available_headers(block_numbers)
752+
])
753+
return headers
754+
755+
async def _get_block_numbers_for_request(self,
756+
request: HeaderRequest) -> Tuple[BlockNumber, ...]:
757+
"""
758+
Generate the block numbers for a given `HeaderRequest`.
759+
"""
760+
if isinstance(request.block_number_or_hash, bytes):
761+
header = await self.wait(
762+
self.db.coro_get_block_header_by_hash(cast(Hash32, request.block_number_or_hash)))
763+
return request.generate_block_numbers(header.block_number)
764+
elif isinstance(request.block_number_or_hash, int):
765+
# We don't need to pass in the block number to
766+
# `generate_block_numbers` since the request is based on a numbered
767+
# block identifier.
768+
return request.generate_block_numbers()
718769
else:
719770
raise TypeError(
720-
"Unexpected type for 'block_number_or_hash': %s", type(block_number_or_hash))
721-
722-
block_numbers = get_block_numbers_for_request(block_number, max_headers, skip, reverse)
723-
headers = [header async for header in self._generate_available_headers(block_numbers)]
724-
return headers
771+
"Invariant: unexpected type for 'block_number_or_hash': %s",
772+
type(request.block_number_or_hash),
773+
)
725774

726775
async def _generate_available_headers(
727-
self, block_numbers: Tuple[BlockNumber]) -> AsyncGenerator[BlockHeader, None]:
776+
self, block_numbers: Tuple[BlockNumber, ...]) -> AsyncGenerator[BlockHeader, None]:
728777
"""
729778
Generates the headers requested, halting on the first header that is not locally available.
730779
"""

p2p/eth.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
cast,
44
List,
55
Tuple,
6-
Union,
76
TYPE_CHECKING
87
)
98

109
from rlp import sedes
1110

11+
from eth_typing import (
12+
BlockIdentifier,
13+
)
14+
1215
from eth.rlp.headers import BlockHeader
1316
from eth.rlp.receipts import Receipt
1417
from eth.rlp.transactions import BaseTransactionFields
@@ -27,6 +30,7 @@
2730
ChainInfo
2831
)
2932

33+
3034
# Max number of items we can ask for in ETH requests. These are the values used in geth and if we
3135
# ask for more than this the peers will disconnect from us.
3236
MAX_STATE_FETCH = 384
@@ -145,8 +149,11 @@ def send_node_data(self, nodes: List[bytes]) -> None:
145149
header, body = cmd.encode(nodes)
146150
self.send(header, body)
147151

148-
def send_get_block_headers(self, block_number_or_hash: Union[int, bytes],
149-
max_headers: int, reverse: bool = True
152+
def send_get_block_headers(self,
153+
block_number_or_hash: BlockIdentifier,
154+
max_headers: int,
155+
skip: int,
156+
reverse: bool,
150157
) -> None:
151158
"""Send a GetBlockHeaders msg to the remote.
152159
@@ -159,8 +166,6 @@ def send_get_block_headers(self, block_number_or_hash: Union[int, bytes],
159166
"Cannot ask for more than {} block headers in a single request".format(
160167
MAX_HEADERS_FETCH))
161168
cmd = GetBlockHeaders(self.cmd_id_offset)
162-
# Number of block headers to skip between each item (i.e. step in python APIs).
163-
skip = 0
164169
data = {
165170
'block_number_or_hash': block_number_or_hash,
166171
'max_headers': max_headers,
@@ -169,7 +174,7 @@ def send_get_block_headers(self, block_number_or_hash: Union[int, bytes],
169174
header, body = cmd.encode(data)
170175
self.send(header, body)
171176

172-
def send_block_headers(self, headers: List[BlockHeader]) -> None:
177+
def send_block_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
173178
cmd = BlockHeaders(self.cmd_id_offset)
174179
header, body = cmd.encode(headers)
175180
self.send(header, body)

p2p/les.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
)
1010

1111
from eth_typing import (
12+
BlockIdentifier,
1213
Hash32
1314
)
1415

@@ -314,8 +315,12 @@ def send_get_block_bodies(self, block_hashes: List[bytes], request_id: int) -> N
314315
header, body = GetBlockBodies(self.cmd_id_offset).encode(data)
315316
self.send(header, body)
316317

317-
def send_get_block_headers(self, block_number_or_hash: Union[int, bytes],
318-
max_headers: int, request_id: int, reverse: bool = True
318+
def send_get_block_headers(self,
319+
block_number_or_hash: BlockIdentifier,
320+
max_headers: int,
321+
skip: int,
322+
reverse: bool,
323+
request_id: int,
319324
) -> None:
320325
"""Send a GetBlockHeaders msg to the remote.
321326
@@ -338,7 +343,7 @@ def send_get_block_headers(self, block_number_or_hash: Union[int, bytes],
338343
self.send(header, body)
339344

340345
def send_block_headers(
341-
self, headers: List[BlockHeader], buffer_value: int, request_id: int) -> None:
346+
self, headers: Tuple[BlockHeader, ...], buffer_value: int, request_id: int) -> None:
342347
data = {
343348
'request_id': request_id,
344349
'headers': headers,

p2p/lightchain.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ async def _get_block_header_by_hash(self, block_hash: Hash32, peer: LESPeer) ->
324324
self.logger.debug("Fetching header %s from %s", encode_hex(block_hash), peer)
325325
request_id = gen_request_id()
326326
max_headers = 1
327-
peer.sub_proto.send_get_block_headers(block_hash, max_headers, request_id)
327+
peer.sub_proto.send_get_block_headers(block_hash, max_headers, 0, False, request_id)
328328
reply = await self._wait_for_reply(request_id)
329329
if not reply['headers']:
330330
raise HeaderNotFound("Peer {} has no block with hash {}".format(peer, block_hash))

0 commit comments

Comments
 (0)