Skip to content

Commit 2f8f89a

Browse files
authored
Merge pull request #1089 from ethereum/piper/full-request-to-response-await-API
Further flesh out round trip request/response API
2 parents 7d0273e + c36d0f1 commit 2f8f89a

File tree

10 files changed

+509
-169
lines changed

10 files changed

+509
-169
lines changed

p2p/chain.py

Lines changed: 42 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@
4343
from p2p import les
4444
from p2p.cancellable import CancellableMixin
4545
from p2p.constants import MAX_REORG_DEPTH, SEAL_CHECK_RANDOM_SAMPLE_RATE
46-
from p2p.exceptions import NoEligiblePeers
46+
from p2p.exceptions import NoEligiblePeers, ValidationError
4747
from p2p.p2p_proto import DisconnectReason
48-
from p2p.peer import BasePeer, ETHPeer, LESPeer, HeaderRequest, PeerPool, PeerSubscriber
48+
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerSubscriber
4949
from p2p.rlp import BlockBody
5050
from p2p.service import BaseService
5151
from p2p.utils import (
@@ -91,7 +91,6 @@ def __init__(self,
9191
self._syncing = False
9292
self._sync_complete = asyncio.Event()
9393
self._sync_requests: asyncio.Queue[HeaderRequestingPeer] = asyncio.Queue()
94-
self._new_headers: asyncio.Queue[Tuple[BlockHeader, ...]] = asyncio.Queue()
9594
self._executor = get_asyncio_executor()
9695

9796
@property
@@ -207,7 +206,7 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
207206
self.logger.warn("Timeout waiting for header batch from %s, aborting sync", peer)
208207
await peer.disconnect(DisconnectReason.timeout)
209208
break
210-
except ValueError as err:
209+
except ValidationError as err:
211210
self.logger.warn(
212211
"Invalid header response sent by peer %s disconnecting: %s",
213212
peer, err,
@@ -253,47 +252,37 @@ async def _fetch_missing_headers(
253252
self, peer: HeaderRequestingPeer, start_at: int) -> Tuple[BlockHeader, ...]:
254253
"""Fetch a batch of headers starting at start_at and return the ones we're missing."""
255254
self.logger.debug("Fetching chain segment starting at #%d", start_at)
256-
request = peer.request_block_headers(
255+
256+
headers = await peer.get_block_headers(
257257
start_at,
258258
peer.max_headers_fetch,
259259
skip=0,
260260
reverse=False,
261261
)
262262

263-
# Pass the peer's token to self.wait() because we want to abort if either we
264-
# or the peer terminates.
265-
headers = tuple(await self.wait(
266-
self._new_headers.get(),
267-
token=peer.cancel_token,
268-
timeout=self._reply_timeout))
269-
270-
# check that the response headers are a valid match for our
271-
# requested headers.
272-
request.validate_headers(headers)
273-
274-
# the inner list comprehension is required to get python to evaluate
275-
# the asynchronous comprehension
276-
missing_headers = tuple([
277-
header
278-
for header
279-
in headers
280-
if not (await self.wait(self.db.coro_header_exists(header.hash)))
281-
])
282-
if len(missing_headers) != len(headers):
283-
self.logger.debug(
284-
"Discarding %d / %d headers that we already have",
285-
len(headers) - len(missing_headers),
286-
len(headers),
287-
)
288-
return headers
289-
290-
def _handle_block_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
291-
if not headers:
292-
self.logger.warn("Got an empty BlockHeaders msg")
293-
return
294-
self.logger.debug(
295-
"Got BlockHeaders from %d to %d", headers[0].block_number, headers[-1].block_number)
296-
self._new_headers.put_nowait(headers)
263+
# We only want headers that are missing, so we iterate over the list
264+
# until we find the first missing header, after which we return all of
265+
# the remaining headers.
266+
async def get_missing_tail(self: 'BaseHeaderChainSyncer',
267+
headers: Tuple[BlockHeader, ...]
268+
) -> AsyncGenerator[BlockHeader, None]:
269+
iter_headers = iter(headers)
270+
for header in iter_headers:
271+
is_missing = not await self.wait(self.db.coro_header_exists(header.hash))
272+
if is_missing:
273+
yield header
274+
break
275+
else:
276+
self.logger.debug("Discarding header that we already have: %s", header)
277+
278+
for header in iter_headers:
279+
yield header
280+
281+
# The inner list comprehension is needed because async_generators
282+
# cannot be cast to a tuple.
283+
tail_headers = tuple([header async for header in get_missing_tail(self, headers)])
284+
285+
return tail_headers
297286

298287
@abstractmethod
299288
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
@@ -313,26 +302,27 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
313302
msg: protocol._DecodedMsgType) -> None:
314303
if isinstance(cmd, les.Announce):
315304
self._sync_requests.put_nowait(peer)
316-
elif isinstance(cmd, les.BlockHeaders):
317-
msg = cast(Dict[str, Any], msg)
318-
self._handle_block_headers(tuple(cast(Tuple[BlockHeader, ...], msg['headers'])))
319305
elif isinstance(cmd, les.GetBlockHeaders):
320306
msg = cast(Dict[str, Any], msg)
321307
await self._handle_get_block_headers(cast(LESPeer, peer), msg)
308+
elif isinstance(cmd, les.BlockHeaders):
309+
# `BlockHeaders` messages are handled at the peer level.
310+
pass
322311
else:
323312
self.logger.debug("Ignoring %s message from %s", cmd, peer)
324313

325314
async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) -> None:
326315
self.logger.debug("Peer %s made header request: %s", peer, msg)
327-
request = HeaderRequest(
316+
request = les.HeaderRequest(
328317
msg['query'].block_number_or_hash,
329318
msg['query'].max_headers,
330319
msg['query'].skip,
331320
msg['query'].reverse,
321+
msg['request_id'],
332322
)
333323
headers = await self._handler.lookup_headers(request)
334324
self.logger.trace("Replying to %s with %d headers", peer, len(headers))
335-
peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=msg['request_id'])
325+
peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=request.request_id)
336326

337327
async def _process_headers(
338328
self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int:
@@ -538,16 +528,17 @@ def request_receipts(self, target_td: int, headers: List[BlockHeader]) -> int:
538528
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
539529
msg: protocol._DecodedMsgType) -> None:
540530
peer = cast(ETHPeer, peer)
541-
if isinstance(cmd, eth.BlockHeaders):
542-
self._handle_block_headers(tuple(cast(Tuple[BlockHeader, ...], msg)))
543-
elif isinstance(cmd, eth.BlockBodies):
531+
if isinstance(cmd, eth.BlockBodies):
544532
await self._handle_block_bodies(peer, list(cast(Tuple[BlockBody], msg)))
545533
elif isinstance(cmd, eth.Receipts):
546534
await self._handle_block_receipts(peer, cast(List[List[Receipt]], msg))
547535
elif isinstance(cmd, eth.NewBlock):
548536
await self._handle_new_block(peer, cast(Dict[str, Any], msg))
549537
elif isinstance(cmd, eth.GetBlockHeaders):
550538
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
539+
elif isinstance(cmd, eth.BlockHeaders):
540+
# `BlockHeaders` messages are handled at the peer level.
541+
pass
551542
elif isinstance(cmd, eth.GetBlockBodies):
552543
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
553544
block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH]
@@ -613,7 +604,7 @@ async def _handle_get_block_headers(
613604
peer: ETHPeer,
614605
query: Dict[str, Any]) -> None:
615606
self.logger.debug("Peer %s made header request: %s", peer, query)
616-
request = HeaderRequest(
607+
request = eth.HeaderRequest(
617608
query['block_number_or_hash'],
618609
query['max_headers'],
619610
query['skip'],
@@ -732,7 +723,7 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -
732723
peer.sub_proto.send_node_data(nodes)
733724

734725
async def lookup_headers(self,
735-
request: HeaderRequest) -> Tuple[BlockHeader, ...]:
726+
request: protocol.BaseHeaderRequest) -> Tuple[BlockHeader, ...]:
736727
"""
737728
Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items
738729
between each, in reverse order if :reverse: is True.
@@ -753,7 +744,8 @@ async def lookup_headers(self,
753744
return headers
754745

755746
async def _get_block_numbers_for_request(self,
756-
request: HeaderRequest) -> Tuple[BlockNumber, ...]:
747+
request: protocol.BaseHeaderRequest
748+
) -> Tuple[BlockNumber, ...]:
757749
"""
758750
Generate the block numbers for a given `HeaderRequest`.
759751
"""

p2p/eth.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
from typing import (
3+
Any,
34
cast,
45
List,
56
Tuple,
@@ -16,8 +17,10 @@
1617
from eth.rlp.receipts import Receipt
1718
from eth.rlp.transactions import BaseTransactionFields
1819

20+
from p2p.exceptions import ValidationError
1921
from p2p.protocol import (
2022
BaseBlockHeaders,
23+
BaseHeaderRequest,
2124
Command,
2225
Protocol,
2326
_DecodedMsgType,
@@ -70,6 +73,31 @@ class GetBlockHeaders(Command):
7073
]
7174

7275

76+
class HeaderRequest(BaseHeaderRequest):
77+
max_size = MAX_HEADERS_FETCH
78+
79+
def __init__(self,
80+
block_number_or_hash: BlockIdentifier,
81+
max_headers: int,
82+
skip: int,
83+
reverse: bool) -> None:
84+
self.block_number_or_hash = block_number_or_hash
85+
self.max_headers = max_headers
86+
self.skip = skip
87+
self.reverse = reverse
88+
89+
def validate_response(self, response: Any) -> None:
90+
"""
91+
Core `Request` API used for validation.
92+
"""
93+
if not isinstance(response, tuple):
94+
raise ValidationError("Response to `HeaderRequest` must be a tuple")
95+
elif not all(isinstance(item, BlockHeader) for item in response):
96+
raise ValidationError("Response must be a tuple of `BlockHeader` objects")
97+
98+
return self.validate_headers(cast(Tuple[BlockHeader, ...], response))
99+
100+
73101
class BlockHeaders(BaseBlockHeaders):
74102
_cmd_id = 4
75103
structure = sedes.CountableList(BlockHeader)

p2p/exceptions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,3 +159,10 @@ class NoInternalAddressMatchesDevice(BaseP2PError):
159159
def __init__(self, *args: Any, device_hostname: str=None, **kwargs: Any) -> None:
160160
super().__init__(*args, **kwargs)
161161
self.device_hostname = device_hostname
162+
163+
164+
class ValidationError(BaseP2PError):
165+
"""
166+
Raised when something does not pass a validation check.
167+
"""
168+
pass

p2p/les.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
from eth.rlp.headers import BlockHeader
1717
from eth.rlp.receipts import Receipt
1818

19+
from p2p.exceptions import (
20+
ValidationError,
21+
)
1922
from p2p.protocol import (
2023
BaseBlockHeaders,
24+
BaseHeaderRequest,
2125
Command,
2226
Protocol,
2327
_DecodedMsgType,
@@ -163,6 +167,44 @@ class GetBlockHeadersQuery(rlp.Serializable):
163167
]
164168

165169

170+
class HeaderRequest(BaseHeaderRequest):
171+
request_id: int
172+
173+
max_size = MAX_HEADERS_FETCH
174+
175+
def __init__(self,
176+
block_number_or_hash: BlockIdentifier,
177+
max_headers: int,
178+
skip: int,
179+
reverse: bool,
180+
request_id: int) -> None:
181+
self.block_number_or_hash = block_number_or_hash
182+
self.max_headers = max_headers
183+
self.skip = skip
184+
self.reverse = reverse
185+
self.request_id = request_id
186+
187+
def validate_response(self, response: Any) -> None:
188+
"""
189+
Core `Request` API used for validation.
190+
"""
191+
if not isinstance(response, dict):
192+
raise ValidationError("Response to `HeaderRequest` must be a dict")
193+
194+
request_id = response['request_id']
195+
if request_id != self.request_id:
196+
raise ValidationError(
197+
"Response `request_id` does not match. expected: %s | got: %s".format(
198+
self.request_id,
199+
request_id,
200+
)
201+
)
202+
elif not all(isinstance(item, BlockHeader) for item in response['headers']):
203+
raise ValidationError("Response must be a tuple of `BlockHeader` objects")
204+
205+
return self.validate_headers(cast(Tuple[BlockHeader, ...], response['headers']))
206+
207+
166208
class GetBlockHeaders(Command):
167209
_cmd_id = 2
168210
structure = [

0 commit comments

Comments
 (0)