Skip to content

Commit 4e05590

Browse files
authored
Merge pull request #987 from gsalgado/state-downloader-serve-data
StateDownloader now handles data requests from peers
2 parents 997953c + 42560e8 commit 4e05590

File tree

6 files changed

+218
-167
lines changed

6 files changed

+218
-167
lines changed

p2p/cancel_token.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
cast,
66
List,
77
Sequence,
8+
TypeVar,
89
)
910

1011
from p2p.exceptions import (
@@ -88,6 +89,40 @@ def __str__(self) -> str:
8889
return self.name
8990

9091

92+
class CancellableMixin:
93+
cancel_token: CancelToken = None
94+
95+
_TReturn = TypeVar('_TReturn')
96+
97+
async def wait(self,
98+
awaitable: Awaitable[_TReturn],
99+
token: CancelToken = None,
100+
timeout: float = None) -> _TReturn:
101+
"""See wait_first()"""
102+
return await self.wait_first(awaitable, token=token, timeout=timeout)
103+
104+
async def wait_first(self,
105+
*awaitables: Awaitable[_TReturn],
106+
token: CancelToken = None,
107+
timeout: float = None) -> _TReturn:
108+
"""Wait for the first awaitable to complete, unless we timeout or the token chain is triggered.
109+
110+
The given token is chained with this service's token, so triggering either will cancel
111+
this.
112+
113+
Returns the result of the first one to complete.
114+
115+
Raises TimeoutError if we timeout or OperationCancelled if the token chain is triggered.
116+
117+
All pending futures are cancelled before returning.
118+
"""
119+
if token is None:
120+
token_chain = self.cancel_token
121+
else:
122+
token_chain = token.chain(self.cancel_token)
123+
return await wait_with_token(*awaitables, token=token_chain, timeout=timeout)
124+
125+
91126
async def wait_with_token(*awaitables: Awaitable[Any],
92127
token: CancelToken,
93128
timeout: float = None) -> Any:

p2p/chain.py

Lines changed: 137 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@
3232
from evm.rlp.headers import BlockHeader
3333
from evm.rlp.receipts import Receipt
3434
from evm.rlp.transactions import BaseTransaction, BaseTransactionFields
35+
from evm.utils.logging import TraceLogger
3536

3637
from p2p import protocol
3738
from p2p import eth
3839
from p2p import les
39-
from p2p.cancel_token import CancelToken
40+
from p2p.cancel_token import CancellableMixin, CancelToken
4041
from p2p.constants import MAX_REORG_DEPTH
4142
from p2p.exceptions import NoEligiblePeers, OperationCancelled
4243
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerPoolSubscriber
@@ -79,6 +80,7 @@ def __init__(self,
7980
self.chain = chain
8081
self.db = db
8182
self.peer_pool = peer_pool
83+
self._handler = PeerRequestHandler(self.db, self.logger, self.cancel_token)
8284
self._syncing = False
8385
self._sync_complete = asyncio.Event()
8486
self._sync_requests: asyncio.Queue[HeaderRequestingPeer] = asyncio.Queue()
@@ -255,56 +257,6 @@ def _handle_block_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
255257
"Got BlockHeaders from %d to %d", headers[0].block_number, headers[-1].block_number)
256258
self._new_headers.put_nowait(headers)
257259

258-
async def _get_block_numbers_for_request(
259-
self, block_number_or_hash: Union[int, bytes], max_headers: int, skip: int,
260-
reverse: bool) -> List[BlockNumber]:
261-
"""
262-
Generates the block numbers requested, subject to local availability.
263-
"""
264-
block_number_or_hash = block_number_or_hash
265-
if isinstance(block_number_or_hash, bytes):
266-
header = await self.wait(
267-
self.db.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)),
268-
)
269-
block_number = header.block_number
270-
elif isinstance(block_number_or_hash, int):
271-
block_number = block_number_or_hash
272-
else:
273-
raise TypeError(
274-
"Unexpected type for 'block_number_or_hash': %s",
275-
type(block_number_or_hash),
276-
)
277-
278-
limit = max(max_headers, eth.MAX_HEADERS_FETCH)
279-
step = skip + 1
280-
if reverse:
281-
low = max(0, block_number - limit)
282-
high = block_number + 1
283-
block_numbers = reversed(range(low, high, step))
284-
else:
285-
low = block_number
286-
high = block_number + limit
287-
block_numbers = iter(range(low, high, step)) # mypy thinks range isn't iterable
288-
return list(block_numbers)
289-
290-
async def _generate_available_headers(
291-
self,
292-
block_numbers: List[BlockNumber]) -> AsyncGenerator[BlockHeader, None]:
293-
"""
294-
Generates the headers requested, halting on the first header that is not locally available.
295-
"""
296-
for block_num in block_numbers:
297-
try:
298-
yield await self.wait(
299-
self.db.coro_get_canonical_block_header_by_number(block_num)
300-
)
301-
except HeaderNotFound:
302-
self.logger.debug(
303-
"Peer requested header number %s that is unavailable, stopping search.",
304-
block_num,
305-
)
306-
break
307-
308260
@abstractmethod
309261
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
310262
msg: protocol._DecodedMsgType) -> None:
@@ -335,19 +287,8 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
335287
async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) -> None:
336288
self.logger.debug("Peer %s made header request: %s", peer, msg)
337289
query = msg['query']
338-
try:
339-
block_numbers = await self._get_block_numbers_for_request(
340-
query.block_number_or_hash, query.max_headers,
341-
query.skip, query.reverse)
342-
except HeaderNotFound:
343-
self.logger.debug(
344-
"Peer %r requested starting header %r that is unavailable, returning nothing",
345-
peer,
346-
query.block_number_or_hash,
347-
)
348-
block_numbers = []
349-
350-
headers = [header async for header in self._generate_available_headers(block_numbers)]
290+
headers = await self._handler.lookup_headers(
291+
query.block_number_or_hash, query.max_headers, query.skip, query.reverse)
351292
peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=msg['request_id'])
352293

353294
async def _process_headers(
@@ -598,19 +539,9 @@ async def _handle_get_block_headers(
598539
header_request: Dict[str, Any]) -> None:
599540
self.logger.debug("Peer %s made header request: %s", peer, header_request)
600541

601-
try:
602-
block_numbers = await self._get_block_numbers_for_request(
603-
header_request['block_number_or_hash'], header_request['max_headers'],
604-
header_request['skip'], header_request['reverse'])
605-
except HeaderNotFound:
606-
self.logger.debug(
607-
"Peer %r requested starting header %r that is unavailable, returning nothing",
608-
peer,
609-
header_request['block_number_or_hash'],
610-
)
611-
block_numbers = []
612-
613-
headers = [header async for header in self._generate_available_headers(block_numbers)]
542+
headers = await self._handler.lookup_headers(
543+
header_request['block_number_or_hash'], header_request['max_headers'],
544+
header_request['skip'], header_request['reverse'])
614545
peer.sub_proto.send_block_headers(headers)
615546

616547

@@ -634,55 +565,20 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
634565
elif isinstance(cmd, eth.GetBlockHeaders):
635566
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
636567
elif isinstance(cmd, eth.GetBlockBodies):
637-
await self._handle_get_block_bodies(peer, cast(List[Hash32], msg))
568+
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
569+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH]
570+
await self._handler.handle_get_block_bodies(peer, cast(List[Hash32], msg))
638571
elif isinstance(cmd, eth.GetReceipts):
639-
await self._handle_get_receipts(peer, cast(List[Hash32], msg))
572+
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
573+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH]
574+
await self._handler.handle_get_receipts(peer, block_hashes)
640575
elif isinstance(cmd, eth.GetNodeData):
641-
await self._handle_get_node_data(peer, cast(List[Hash32], msg))
576+
# Only serve up to eth.MAX_STATE_FETCH items in every request.
577+
node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH]
578+
await self._handler.handle_get_node_data(peer, node_hashes)
642579
else:
643580
self.logger.debug("%s msg not handled yet, need to be implemented", cmd)
644581

645-
async def _handle_get_block_bodies(self, peer: ETHPeer, msg: List[Hash32]) -> None:
646-
bodies = []
647-
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
648-
hashes = msg[:eth.MAX_BODIES_FETCH]
649-
for block_hash in hashes:
650-
try:
651-
header = await self.wait(self.db.coro_get_block_header_by_hash(block_hash))
652-
except HeaderNotFound:
653-
self.logger.debug("%s asked for block we don't have: %s", peer, block_hash)
654-
continue
655-
transactions = await self.wait(
656-
self.db.coro_get_block_transactions(header, BaseTransactionFields))
657-
uncles = await self.wait(self.db.coro_get_block_uncles(header.uncles_hash))
658-
bodies.append(BlockBody(transactions, uncles))
659-
peer.sub_proto.send_block_bodies(bodies)
660-
661-
async def _handle_get_receipts(self, peer: ETHPeer, msg: List[Hash32]) -> None:
662-
receipts = []
663-
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
664-
hashes = msg[:eth.MAX_RECEIPTS_FETCH]
665-
for block_hash in hashes:
666-
try:
667-
header = await self.wait(self.db.coro_get_block_header_by_hash(block_hash))
668-
except HeaderNotFound:
669-
self.logger.debug(
670-
"%s asked receipts for block we don't have: %s", peer, block_hash)
671-
continue
672-
receipts.append(await self.wait(self.db.coro_get_receipts(header, Receipt)))
673-
peer.sub_proto.send_receipts(receipts)
674-
675-
async def _handle_get_node_data(self, peer: ETHPeer, msg: List[Hash32]) -> None:
676-
nodes = []
677-
for node_hash in msg:
678-
try:
679-
node = await self.db.coro_get(node_hash)
680-
except KeyError:
681-
self.logger.debug("%s asked for a trie node we don't have: %s", peer, node_hash)
682-
continue
683-
nodes.append(node)
684-
peer.sub_proto.send_node_data(nodes)
685-
686582
async def _process_headers(
687583
self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int:
688584
target_td = await self._calculate_td(headers)
@@ -720,6 +616,120 @@ async def _process_headers(
720616
return head.block_number
721617

722618

619+
class PeerRequestHandler(CancellableMixin):
620+
621+
def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken) -> None:
622+
self.db = db
623+
self.logger = logger
624+
self.cancel_token = token
625+
626+
async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
627+
chaindb = cast('AsyncChainDB', self.db)
628+
bodies = []
629+
for block_hash in block_hashes:
630+
try:
631+
header = await self.wait(chaindb.coro_get_block_header_by_hash(block_hash))
632+
except HeaderNotFound:
633+
self.logger.debug("%s asked for block we don't have: %s", peer, block_hash)
634+
continue
635+
transactions = await self.wait(
636+
chaindb.coro_get_block_transactions(header, BaseTransactionFields))
637+
uncles = await self.wait(chaindb.coro_get_block_uncles(header.uncles_hash))
638+
bodies.append(BlockBody(transactions, uncles))
639+
peer.sub_proto.send_block_bodies(bodies)
640+
641+
async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
642+
chaindb = cast('AsyncChainDB', self.db)
643+
receipts = []
644+
for block_hash in block_hashes:
645+
try:
646+
header = await self.wait(chaindb.coro_get_block_header_by_hash(block_hash))
647+
except HeaderNotFound:
648+
self.logger.debug(
649+
"%s asked receipts for block we don't have: %s", peer, block_hash)
650+
continue
651+
block_receipts = await self.wait(chaindb.coro_get_receipts(header, Receipt))
652+
receipts.append(block_receipts)
653+
peer.sub_proto.send_receipts(receipts)
654+
655+
async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -> None:
656+
chaindb = cast('AsyncChainDB', self.db)
657+
nodes = []
658+
for node_hash in node_hashes:
659+
try:
660+
node = await self.wait(chaindb.coro_get(node_hash))
661+
except KeyError:
662+
self.logger.debug("%s asked for a trie node we don't have: %s", peer, node_hash)
663+
continue
664+
nodes.append(node)
665+
peer.sub_proto.send_node_data(nodes)
666+
667+
async def lookup_headers(self, block_number_or_hash: Union[int, bytes], max_headers: int,
668+
skip: int, reverse: bool) -> List[BlockHeader]:
669+
"""
670+
Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items
671+
between each, in reverse order if :reverse: is True.
672+
"""
673+
try:
674+
block_numbers = await self._get_block_numbers_for_request(
675+
block_number_or_hash, max_headers, skip, reverse)
676+
except HeaderNotFound:
677+
self.logger.debug(
678+
"Peer requested starting header %r that is unavailable, returning nothing",
679+
block_number_or_hash)
680+
block_numbers = []
681+
682+
headers = [header async for header in self._generate_available_headers(block_numbers)]
683+
return headers
684+
685+
async def _get_block_numbers_for_request(
686+
self, block_number_or_hash: Union[int, bytes], max_headers: int,
687+
skip: int, reverse: bool) -> List[BlockNumber]:
688+
"""
689+
Generates the block numbers requested, subject to local availability.
690+
"""
691+
block_number_or_hash = block_number_or_hash
692+
if isinstance(block_number_or_hash, bytes):
693+
header = await self.wait(
694+
self.db.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)))
695+
block_number = header.block_number
696+
elif isinstance(block_number_or_hash, int):
697+
block_number = block_number_or_hash
698+
else:
699+
raise TypeError(
700+
"Unexpected type for 'block_number_or_hash': %s",
701+
type(block_number_or_hash),
702+
)
703+
704+
limit = max(max_headers, eth.MAX_HEADERS_FETCH)
705+
step = skip + 1
706+
if reverse:
707+
low = max(0, block_number - limit)
708+
high = block_number + 1
709+
block_numbers = reversed(range(low, high, step))
710+
else:
711+
low = block_number
712+
high = block_number + limit
713+
block_numbers = iter(range(low, high, step)) # mypy thinks range isn't iterable
714+
return list(block_numbers)
715+
716+
async def _generate_available_headers(
717+
self, block_numbers: List[BlockNumber]) -> AsyncGenerator[BlockHeader, None]:
718+
"""
719+
Generates the headers requested, halting on the first header that is not locally available.
720+
"""
721+
for block_num in block_numbers:
722+
try:
723+
yield await self.wait(
724+
self.db.coro_get_canonical_block_header_by_number(block_num))
725+
except HeaderNotFound:
726+
self.logger.debug(
727+
"Peer requested header number %s that is unavailable, stopping search.",
728+
block_num,
729+
)
730+
break
731+
732+
723733
class DownloadedBlockPart(NamedTuple):
724734
part: Union[eth.BlockBody, List[Receipt]]
725735
unique_key: Union[bytes, Tuple[bytes, bytes]]
@@ -815,9 +825,14 @@ async def exit_on_sigint() -> None:
815825
await syncer.cancel()
816826
loop.stop()
817827

828+
async def run() -> None:
829+
await syncer.run()
830+
syncer.logger.info("run() finished, exiting")
831+
sigint_received.set()
832+
818833
# loop.set_debug(True)
819834
asyncio.ensure_future(exit_on_sigint())
820-
asyncio.ensure_future(syncer.run())
835+
asyncio.ensure_future(run())
821836
loop.run_forever()
822837
loop.close()
823838

0 commit comments

Comments
 (0)