Skip to content

Commit ded41f3

Browse files
committed
FastChainSyncer now handles data requests from peers
It used to handle only GetBlockHeader requests, and now handles the rest as well
1 parent 4e05590 commit ded41f3

File tree

3 files changed

+46
-34
lines changed

3 files changed

+46
-34
lines changed

p2p/chain.py

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) ->
289289
query = msg['query']
290290
headers = await self._handler.lookup_headers(
291291
query.block_number_or_hash, query.max_headers, query.skip, query.reverse)
292+
self.logger.trace("Replying to %s with %d headers", peer, len(headers))
292293
peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=msg['request_id'])
293294

294295
async def _process_headers(
@@ -490,8 +491,27 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
490491
await self._handle_new_block(peer, cast(Dict[str, Any], msg))
491492
elif isinstance(cmd, eth.GetBlockHeaders):
492493
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
494+
elif isinstance(cmd, eth.GetBlockBodies):
495+
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
496+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH]
497+
await self._handler.handle_get_block_bodies(peer, block_hashes)
498+
elif isinstance(cmd, eth.GetReceipts):
499+
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
500+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH]
501+
await self._handler.handle_get_receipts(peer, block_hashes)
502+
elif isinstance(cmd, eth.GetNodeData):
503+
# Only serve up to eth.MAX_STATE_FETCH items in every request.
504+
node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH]
505+
await self._handler.handle_get_node_data(peer, node_hashes)
506+
elif isinstance(cmd, eth.Transactions):
507+
# Transactions msgs are handled by our TxPool service.
508+
pass
509+
elif isinstance(cmd, eth.NodeData):
510+
# When doing a chain sync we never send GetNodeData requests, so peers should not send
511+
# us NodeData msgs.
512+
self.logger.warn("Unexpected NodeData msg from %s", peer)
493513
else:
494-
self.logger.debug("Ignoring %s message from %s", cmd, peer)
514+
self.logger.debug("%s msg not handled yet, need to be implemented", cmd)
495515

496516
async def _handle_new_block(self, peer: ETHPeer, msg: Dict[str, Any]) -> None:
497517
self._sync_requests.put_nowait(peer)
@@ -515,7 +535,7 @@ async def _handle_block_receipts(self,
515535

516536
async def _handle_block_bodies(self,
517537
peer: ETHPeer,
518-
bodies: List[eth.BlockBody]) -> None:
538+
bodies: List[BlockBody]) -> None:
519539
self.logger.debug("Got Bodies for %d blocks from %s", len(bodies), peer)
520540
loop = asyncio.get_event_loop()
521541
iterator = map(make_trie_root_and_nodes, [body.transactions for body in bodies])
@@ -542,6 +562,7 @@ async def _handle_get_block_headers(
542562
headers = await self._handler.lookup_headers(
543563
header_request['block_number_or_hash'], header_request['max_headers'],
544564
header_request['skip'], header_request['reverse'])
565+
self.logger.trace("Replying to %s with %d headers", peer, len(headers))
545566
peer.sub_proto.send_block_headers(headers)
546567

547568

@@ -553,31 +574,10 @@ class RegularChainSyncer(FastChainSyncer):
553574
"""
554575
_exit_on_sync_complete = False
555576

556-
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
557-
msg: protocol._DecodedMsgType) -> None:
558-
peer = cast(ETHPeer, peer)
559-
if isinstance(cmd, eth.BlockHeaders):
560-
self._handle_block_headers(tuple(cast(Tuple[BlockHeader, ...], msg)))
561-
elif isinstance(cmd, eth.BlockBodies):
562-
await self._handle_block_bodies(peer, list(cast(Tuple[eth.BlockBody], msg)))
563-
elif isinstance(cmd, eth.NewBlock):
564-
await self._handle_new_block(peer, cast(Dict[str, Any], msg))
565-
elif isinstance(cmd, eth.GetBlockHeaders):
566-
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
567-
elif isinstance(cmd, eth.GetBlockBodies):
568-
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
569-
block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH]
570-
await self._handler.handle_get_block_bodies(peer, cast(List[Hash32], msg))
571-
elif isinstance(cmd, eth.GetReceipts):
572-
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
573-
block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH]
574-
await self._handler.handle_get_receipts(peer, block_hashes)
575-
elif isinstance(cmd, eth.GetNodeData):
576-
# Only serve up to eth.MAX_STATE_FETCH items in every request.
577-
node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH]
578-
await self._handler.handle_get_node_data(peer, node_hashes)
579-
else:
580-
self.logger.debug("%s msg not handled yet, need to be implemented", cmd)
577+
async def _handle_block_receipts(
578+
self, peer: ETHPeer, receipts_by_block: List[List[eth.Receipt]]) -> None:
579+
# When doing a regular sync we never request receipts.
580+
self.logger.warn("Unexpected BlockReceipts msg from %s", peer)
581581

582582
async def _process_headers(
583583
self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int:
@@ -599,7 +599,7 @@ async def _process_headers(
599599
transactions: List[BaseTransaction] = []
600600
uncles: List[BlockHeader] = []
601601
else:
602-
body = cast(eth.BlockBody, downloaded_parts[_body_key(header)])
602+
body = cast(BlockBody, downloaded_parts[_body_key(header)])
603603
tx_class = block_class.get_transaction_class()
604604
transactions = [tx_class.from_base_transaction(tx)
605605
for tx in body.transactions]
@@ -624,6 +624,7 @@ def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken)
624624
self.cancel_token = token
625625

626626
async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
627+
self.logger.trace("%s requested bodies for %d blocks", peer, len(block_hashes))
627628
chaindb = cast('AsyncChainDB', self.db)
628629
bodies = []
629630
for block_hash in block_hashes:
@@ -636,9 +637,11 @@ async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32
636637
chaindb.coro_get_block_transactions(header, BaseTransactionFields))
637638
uncles = await self.wait(chaindb.coro_get_block_uncles(header.uncles_hash))
638639
bodies.append(BlockBody(transactions, uncles))
640+
self.logger.trace("Replying to %s with %d block bodies", peer, len(bodies))
639641
peer.sub_proto.send_block_bodies(bodies)
640642

641643
async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
644+
self.logger.trace("%s requested receipts for %d blocks", peer, len(block_hashes))
642645
chaindb = cast('AsyncChainDB', self.db)
643646
receipts = []
644647
for block_hash in block_hashes:
@@ -650,9 +653,11 @@ async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -
650653
continue
651654
block_receipts = await self.wait(chaindb.coro_get_receipts(header, Receipt))
652655
receipts.append(block_receipts)
656+
self.logger.trace("Replying to %s with receipts for %d blocks", peer, len(receipts))
653657
peer.sub_proto.send_receipts(receipts)
654658

655659
async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -> None:
660+
self.logger.trace("%s requested %d trie nodes", peer, len(node_hashes))
656661
chaindb = cast('AsyncChainDB', self.db)
657662
nodes = []
658663
for node_hash in node_hashes:
@@ -662,6 +667,7 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -
662667
self.logger.debug("%s asked for a trie node we don't have: %s", peer, node_hash)
663668
continue
664669
nodes.append(node)
670+
self.logger.trace("Replying to %s with %d trie nodes", peer, len(nodes))
665671
peer.sub_proto.send_node_data(nodes)
666672

667673
async def lookup_headers(self, block_number_or_hash: Union[int, bytes], max_headers: int,
@@ -731,7 +737,7 @@ async def _generate_available_headers(
731737

732738

733739
class DownloadedBlockPart(NamedTuple):
734-
part: Union[eth.BlockBody, List[Receipt]]
740+
part: Union[BlockBody, List[Receipt]]
735741
unique_key: Union[bytes, Tuple[bytes, bytes]]
736742

737743

p2p/state.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,17 @@ async def _handle_msg(
129129
elif isinstance(cmd, eth.GetBlockHeaders):
130130
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
131131
elif isinstance(cmd, eth.GetBlockBodies):
132-
await self._handler.handle_get_block_bodies(peer, cast(List[Hash32], msg))
132+
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
133+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH]
134+
await self._handler.handle_get_block_bodies(peer, block_hashes)
133135
elif isinstance(cmd, eth.GetReceipts):
134-
await self._handler.handle_get_receipts(peer, cast(List[Hash32], msg))
136+
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
137+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH]
138+
await self._handler.handle_get_receipts(peer, block_hashes)
135139
elif isinstance(cmd, eth.GetNodeData):
136-
await self._handler.handle_get_node_data(peer, cast(List[Hash32], msg))
140+
# Only serve up to eth.MAX_STATE_FETCH items in every request.
141+
node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH]
142+
await self._handler.handle_get_node_data(peer, node_hashes)
137143
else:
138144
self.logger.warn("%s not handled during StateSync, must be implemented", cmd)
139145

trinity/plugins/builtin/tx_pool/pool.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ async def _run(self) -> None:
7373

7474
async def _handle_tx(self, peer: ETHPeer, txs: List[BaseTransactionFields]) -> None:
7575

76-
self.logger.debug('Received transactions from %r: %r', peer, txs)
76+
self.logger.trace('Received transactions from %r: %r', peer, txs)
7777

7878
self._add_txs_to_bloom(peer, txs)
7979

@@ -87,7 +87,7 @@ async def _handle_tx(self, peer: ETHPeer, txs: List[BaseTransactionFields]) -> N
8787
if len(filtered_tx) == 0:
8888
continue
8989

90-
self.logger.debug(
90+
self.logger.trace(
9191
'Sending transactions to %r: %r',
9292
receiving_peer,
9393
filtered_tx

0 commit comments

Comments
 (0)