Skip to content

Commit 5eb2a11

Browse files
committed
Move request-handling funcs into new PeerRequestHandler
1 parent ee315b7 commit 5eb2a11

File tree

2 files changed

+149
-161
lines changed

2 files changed

+149
-161
lines changed

p2p/chain.py

Lines changed: 142 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from evm.rlp.headers import BlockHeader
3333
from evm.rlp.receipts import Receipt
3434
from evm.rlp.transactions import BaseTransaction, BaseTransactionFields
35+
from evm.utils.logging import TraceLogger
3536

3637
from p2p import protocol
3738
from p2p import eth
@@ -79,6 +80,7 @@ def __init__(self,
7980
self.chain = chain
8081
self.db = db
8182
self.peer_pool = peer_pool
83+
self._handler = PeerRequestHandler(self.db, self.logger, self.cancel_token)
8284
self._syncing = False
8385
self._sync_complete = asyncio.Event()
8486
self._sync_requests: asyncio.Queue[HeaderRequestingPeer] = asyncio.Queue()
@@ -285,9 +287,8 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
285287
async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) -> None:
286288
self.logger.debug("Peer %s made header request: %s", peer, msg)
287289
query = msg['query']
288-
headers = await lookup_headers(
289-
self.db, query.block_number_or_hash, query.max_headers,
290-
query.skip, query.reverse, self.logger, self.cancel_token)
290+
headers = await self._handler.lookup_headers(
291+
query.block_number_or_hash, query.max_headers, query.skip, query.reverse)
291292
peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=msg['request_id'])
292293

293294
async def _process_headers(
@@ -538,9 +539,9 @@ async def _handle_get_block_headers(
538539
header_request: Dict[str, Any]) -> None:
539540
self.logger.debug("Peer %s made header request: %s", peer, header_request)
540541

541-
headers = await lookup_headers(
542-
self.db, header_request['block_number_or_hash'], header_request['max_headers'],
543-
header_request['skip'], header_request['reverse'], self.logger, self.cancel_token)
542+
headers = await self._handler.lookup_headers(
543+
header_request['block_number_or_hash'], header_request['max_headers'],
544+
header_request['skip'], header_request['reverse'])
544545
peer.sub_proto.send_block_headers(headers)
545546

546547

@@ -564,14 +565,11 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
564565
elif isinstance(cmd, eth.GetBlockHeaders):
565566
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
566567
elif isinstance(cmd, eth.GetBlockBodies):
567-
await handle_get_block_bodies(
568-
self.db, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
568+
await self._handler.handle_get_block_bodies(peer, cast(List[Hash32], msg))
569569
elif isinstance(cmd, eth.GetReceipts):
570-
await handle_get_receipts(
571-
self.db, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
570+
await self._handler.handle_get_receipts(peer, cast(List[Hash32], msg))
572571
elif isinstance(cmd, eth.GetNodeData):
573-
await handle_get_node_data(
574-
self.db, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
572+
await self._handler.handle_get_node_data(peer, cast(List[Hash32], msg))
575573
else:
576574
self.logger.debug("%s msg not handled yet, need to be implemented", cmd)
577575

@@ -612,6 +610,138 @@ async def _process_headers(
612610
return head.block_number
613611

614612

613+
class PeerRequestHandler:
614+
615+
def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken) -> None:
616+
self.db = db
617+
self.logger = logger
618+
self.cancel_token = token
619+
620+
async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
621+
chaindb = cast('AsyncChainDB', self.db)
622+
bodies = []
623+
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
624+
for block_hash in block_hashes[:eth.MAX_BODIES_FETCH]:
625+
try:
626+
header = await wait_with_token(
627+
chaindb.coro_get_block_header_by_hash(block_hash),
628+
token=self.cancel_token)
629+
except HeaderNotFound:
630+
self.logger.debug("%s asked for block we don't have: %s", peer, block_hash)
631+
continue
632+
transactions = await wait_with_token(
633+
chaindb.coro_get_block_transactions(header, BaseTransactionFields),
634+
token=self.cancel_token)
635+
uncles = await wait_with_token(
636+
chaindb.coro_get_block_uncles(header.uncles_hash),
637+
token=self.cancel_token)
638+
bodies.append(BlockBody(transactions, uncles))
639+
peer.sub_proto.send_block_bodies(bodies)
640+
641+
async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
642+
chaindb = cast('AsyncChainDB', self.db)
643+
receipts = []
644+
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
645+
for block_hash in block_hashes[:eth.MAX_RECEIPTS_FETCH]:
646+
try:
647+
header = await wait_with_token(
648+
chaindb.coro_get_block_header_by_hash(block_hash),
649+
token=self.cancel_token)
650+
except HeaderNotFound:
651+
self.logger.debug(
652+
"%s asked receipts for block we don't have: %s", peer, block_hash)
653+
continue
654+
block_receipts = await wait_with_token(
655+
chaindb.coro_get_receipts(header, Receipt),
656+
token=self.cancel_token)
657+
receipts.append(block_receipts)
658+
peer.sub_proto.send_receipts(receipts)
659+
660+
async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -> None:
661+
chaindb = cast('AsyncChainDB', self.db)
662+
nodes = []
663+
# Only serve up to eth.MAX_STATE_FETCH items in every request.
664+
for node_hash in node_hashes[:eth.MAX_STATE_FETCH]:
665+
try:
666+
node = await wait_with_token(
667+
chaindb.coro_get(node_hash),
668+
token=self.cancel_token)
669+
except KeyError:
670+
self.logger.debug("%s asked for a trie node we don't have: %s", peer, node_hash)
671+
continue
672+
nodes.append(node)
673+
peer.sub_proto.send_node_data(nodes)
674+
675+
async def lookup_headers(self, block_number_or_hash: Union[int, bytes], max_headers: int,
676+
skip: int, reverse: bool) -> List[BlockHeader]:
677+
"""
678+
Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items
679+
between each, in reverse order if :reverse: is True.
680+
"""
681+
try:
682+
block_numbers = await self._get_block_numbers_for_request(
683+
block_number_or_hash, max_headers, skip, reverse)
684+
except HeaderNotFound:
685+
self.logger.debug(
686+
"Peer requested starting header %r that is unavailable, returning nothing",
687+
block_number_or_hash)
688+
block_numbers = []
689+
690+
headers = [header async for header in self._generate_available_headers(block_numbers)]
691+
return headers
692+
693+
async def _get_block_numbers_for_request(
694+
self, block_number_or_hash: Union[int, bytes], max_headers: int,
695+
skip: int, reverse: bool) -> List[BlockNumber]:
696+
"""
697+
Generates the block numbers requested, subject to local availability.
698+
"""
699+
block_number_or_hash = block_number_or_hash
700+
if isinstance(block_number_or_hash, bytes):
701+
header = await wait_with_token(
702+
self.db.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)),
703+
token=self.cancel_token,
704+
)
705+
block_number = header.block_number
706+
elif isinstance(block_number_or_hash, int):
707+
block_number = block_number_or_hash
708+
else:
709+
raise TypeError(
710+
"Unexpected type for 'block_number_or_hash': %s",
711+
type(block_number_or_hash),
712+
)
713+
714+
limit = max(max_headers, eth.MAX_HEADERS_FETCH)
715+
step = skip + 1
716+
if reverse:
717+
low = max(0, block_number - limit)
718+
high = block_number + 1
719+
block_numbers = reversed(range(low, high, step))
720+
else:
721+
low = block_number
722+
high = block_number + limit
723+
block_numbers = iter(range(low, high, step)) # mypy thinks range isn't iterable
724+
return list(block_numbers)
725+
726+
async def _generate_available_headers(
727+
self, block_numbers: List[BlockNumber]) -> AsyncGenerator[BlockHeader, None]:
728+
"""
729+
Generates the headers requested, halting on the first header that is not locally available.
730+
"""
731+
for block_num in block_numbers:
732+
try:
733+
yield await wait_with_token(
734+
self.db.coro_get_canonical_block_header_by_number(block_num),
735+
token=self.cancel_token
736+
)
737+
except HeaderNotFound:
738+
self.logger.debug(
739+
"Peer requested header number %s that is unavailable, stopping search.",
740+
block_num,
741+
)
742+
break
743+
744+
615745
class DownloadedBlockPart(NamedTuple):
616746
part: Union[eth.BlockBody, List[Receipt]]
617747
unique_key: Union[bytes, Tuple[bytes, bytes]]
@@ -641,144 +771,6 @@ def _is_receipts_empty(header: BlockHeader) -> bool:
641771
return header.receipt_root == BLANK_ROOT_HASH
642772

643773

644-
async def _get_block_numbers_for_request(
645-
headerdb: 'AsyncHeaderDB', block_number_or_hash: Union[int, bytes], max_headers: int,
646-
skip: int, reverse: bool, token: CancelToken) -> List[BlockNumber]:
647-
"""
648-
Generates the block numbers requested, subject to local availability.
649-
"""
650-
block_number_or_hash = block_number_or_hash
651-
if isinstance(block_number_or_hash, bytes):
652-
header = await wait_with_token(
653-
headerdb.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)),
654-
token=token,
655-
)
656-
block_number = header.block_number
657-
elif isinstance(block_number_or_hash, int):
658-
block_number = block_number_or_hash
659-
else:
660-
raise TypeError(
661-
"Unexpected type for 'block_number_or_hash': %s",
662-
type(block_number_or_hash),
663-
)
664-
665-
limit = max(max_headers, eth.MAX_HEADERS_FETCH)
666-
step = skip + 1
667-
if reverse:
668-
low = max(0, block_number - limit)
669-
high = block_number + 1
670-
block_numbers = reversed(range(low, high, step))
671-
else:
672-
low = block_number
673-
high = block_number + limit
674-
block_numbers = iter(range(low, high, step)) # mypy thinks range isn't iterable
675-
return list(block_numbers)
676-
677-
678-
async def _generate_available_headers(
679-
headerdb: 'AsyncHeaderDB',
680-
block_numbers: List[BlockNumber],
681-
logger: logging.Logger,
682-
token: CancelToken) -> AsyncGenerator[BlockHeader, None]:
683-
"""
684-
Generates the headers requested, halting on the first header that is not locally available.
685-
"""
686-
for block_num in block_numbers:
687-
try:
688-
yield await wait_with_token(
689-
headerdb.coro_get_canonical_block_header_by_number(block_num),
690-
token=token
691-
)
692-
except HeaderNotFound:
693-
logger.debug(
694-
"Peer requested header number %s that is unavailable, stopping search.",
695-
block_num,
696-
)
697-
break
698-
699-
700-
async def lookup_headers(
701-
headerdb: 'AsyncHeaderDB', block_number_or_hash: Union[int, bytes], max_headers: int,
702-
skip: int, reverse: bool, logger: logging.Logger, token: CancelToken) -> List[BlockHeader]:
703-
"""
704-
Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items between
705-
each, in reverse order if :reverse: is True.
706-
"""
707-
try:
708-
block_numbers = await _get_block_numbers_for_request(
709-
headerdb, block_number_or_hash, max_headers, skip, reverse, token)
710-
except HeaderNotFound:
711-
logger.debug(
712-
"Peer requested starting header %r that is unavailable, returning nothing",
713-
block_number_or_hash)
714-
block_numbers = []
715-
716-
headers = [header async for header in _generate_available_headers(
717-
headerdb, block_numbers, logger, token)]
718-
return headers
719-
720-
721-
async def handle_get_block_bodies(
722-
chaindb: 'AsyncChainDB', peer: ETHPeer, block_hashes: List[Hash32],
723-
logger: logging.Logger, token: CancelToken) -> None:
724-
bodies = []
725-
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
726-
for block_hash in block_hashes[:eth.MAX_BODIES_FETCH]:
727-
try:
728-
header = await wait_with_token(
729-
chaindb.coro_get_block_header_by_hash(block_hash),
730-
token=token)
731-
except HeaderNotFound:
732-
logger.debug("%s asked for block we don't have: %s", peer, block_hash)
733-
continue
734-
transactions = await wait_with_token(
735-
chaindb.coro_get_block_transactions(header, BaseTransactionFields),
736-
token=token)
737-
uncles = await wait_with_token(
738-
chaindb.coro_get_block_uncles(header.uncles_hash),
739-
token=token)
740-
bodies.append(BlockBody(transactions, uncles))
741-
peer.sub_proto.send_block_bodies(bodies)
742-
743-
744-
async def handle_get_receipts(
745-
chaindb: 'AsyncChainDB', peer: ETHPeer, block_hashes: List[Hash32],
746-
logger: logging.Logger, token: CancelToken) -> None:
747-
receipts = []
748-
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
749-
for block_hash in block_hashes[:eth.MAX_RECEIPTS_FETCH]:
750-
try:
751-
header = await wait_with_token(
752-
chaindb.coro_get_block_header_by_hash(block_hash),
753-
token=token)
754-
except HeaderNotFound:
755-
logger.debug(
756-
"%s asked receipts for block we don't have: %s", peer, block_hash)
757-
continue
758-
block_receipts = await wait_with_token(
759-
chaindb.coro_get_receipts(header, Receipt),
760-
token=token)
761-
receipts.append(block_receipts)
762-
peer.sub_proto.send_receipts(receipts)
763-
764-
765-
async def handle_get_node_data(
766-
chaindb: 'AsyncChainDB', peer: ETHPeer, node_hashes: List[Hash32],
767-
logger: logging.Logger, token: CancelToken) -> None:
768-
nodes = []
769-
# Only serve up to eth.MAX_STATE_FETCH items in every request.
770-
for node_hash in node_hashes[:eth.MAX_STATE_FETCH]:
771-
try:
772-
node = await wait_with_token(
773-
chaindb.coro_get(node_hash),
774-
token=token)
775-
except KeyError:
776-
logger.debug("%s asked for a trie node we don't have: %s", peer, node_hash)
777-
continue
778-
nodes.append(node)
779-
peer.sub_proto.send_node_data(nodes)
780-
781-
782774
def _test() -> None:
783775
import argparse
784776
import signal

p2p/state.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@
3939

4040
from p2p import eth
4141
from p2p import protocol
42-
from p2p.chain import (
43-
handle_get_block_bodies, handle_get_node_data, handle_get_receipts, lookup_headers)
42+
from p2p.chain import PeerRequestHandler
4443
from p2p.cancel_token import CancelToken
4544
from p2p.exceptions import OperationCancelled
4645
from p2p.peer import ETHPeer, PeerPool, PeerPoolSubscriber
@@ -71,6 +70,7 @@ def __init__(self,
7170
self.peer_pool = peer_pool
7271
self.root_hash = root_hash
7372
self.scheduler = StateSync(root_hash, account_db)
73+
self._handler = PeerRequestHandler(self.chaindb, self.logger, self.cancel_token)
7474
self._peers_with_pending_requests: Dict[ETHPeer, float] = {}
7575
self._executor = get_process_pool_executor()
7676

@@ -129,21 +129,17 @@ async def _handle_msg(
129129
elif isinstance(cmd, eth.GetBlockHeaders):
130130
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
131131
elif isinstance(cmd, eth.GetBlockBodies):
132-
await handle_get_block_bodies(
133-
self.chaindb, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
132+
await self._handler.handle_get_block_bodies(peer, cast(List[Hash32], msg))
134133
elif isinstance(cmd, eth.GetReceipts):
135-
await handle_get_receipts(
136-
self.chaindb, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
134+
await self._handler.handle_get_receipts(peer, cast(List[Hash32], msg))
137135
elif isinstance(cmd, eth.GetNodeData):
138-
await handle_get_node_data(
139-
self.chaindb, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
136+
await self._handler.handle_get_node_data(peer, cast(List[Hash32], msg))
140137
else:
141138
self.logger.warn("%s not handled during StateSync, must be implemented", cmd)
142139

143140
async def _handle_get_block_headers(self, peer: ETHPeer, msg: Dict[str, Any]) -> None:
144-
headers = await lookup_headers(
145-
self.chaindb, msg['block_number_or_hash'], msg['max_headers'],
146-
msg['skip'], msg['reverse'], self.logger, self.cancel_token)
141+
headers = await self._handler.lookup_headers(
142+
msg['block_number_or_hash'], msg['max_headers'], msg['skip'], msg['reverse'])
147143
peer.sub_proto.send_block_headers(headers)
148144

149145
async def _cleanup(self) -> None:

0 commit comments

Comments
 (0)