Skip to content

Commit db9d25c

Browse files
authored
Merge pull request #993 from gsalgado/fast-sync-handle-data-requests
FastChainSyncer now handles data requests from peers
2 parents 4e05590 + 9183298 commit db9d25c

File tree

5 files changed

+68
-52
lines changed

5 files changed

+68
-52
lines changed

p2p/chain.py

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from p2p.cancel_token import CancellableMixin, CancelToken
4141
from p2p.constants import MAX_REORG_DEPTH
4242
from p2p.exceptions import NoEligiblePeers, OperationCancelled
43+
from p2p.p2p_proto import DisconnectReason
4344
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerPoolSubscriber
4445
from p2p.rlp import BlockBody
4546
from p2p.service import BaseService
@@ -187,7 +188,7 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
187188
headers = await self._fetch_missing_headers(peer, start_at)
188189
except TimeoutError:
189190
self.logger.warn("Timeout waiting for header batch from %s, aborting sync", peer)
190-
await peer.cancel()
191+
await peer.disconnect(DisconnectReason.timeout)
191192
break
192193

193194
if not headers:
@@ -289,6 +290,7 @@ async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) ->
289290
query = msg['query']
290291
headers = await self._handler.lookup_headers(
291292
query.block_number_or_hash, query.max_headers, query.skip, query.reverse)
293+
self.logger.trace("Replying to %s with %d headers", peer, len(headers))
292294
peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=msg['request_id'])
293295

294296
async def _process_headers(
@@ -490,8 +492,28 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
490492
await self._handle_new_block(peer, cast(Dict[str, Any], msg))
491493
elif isinstance(cmd, eth.GetBlockHeaders):
492494
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
495+
elif isinstance(cmd, eth.GetBlockBodies):
496+
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
497+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH]
498+
await self._handler.handle_get_block_bodies(peer, block_hashes)
499+
elif isinstance(cmd, eth.GetReceipts):
500+
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
501+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH]
502+
await self._handler.handle_get_receipts(peer, block_hashes)
503+
elif isinstance(cmd, eth.GetNodeData):
504+
# Only serve up to eth.MAX_STATE_FETCH items in every request.
505+
node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH]
506+
await self._handler.handle_get_node_data(peer, node_hashes)
507+
elif isinstance(cmd, eth.Transactions):
508+
# Transactions msgs are handled by our TxPool service.
509+
pass
510+
elif isinstance(cmd, eth.NodeData):
511+
# When doing a chain sync we never send GetNodeData requests, so peers should not send
512+
# us NodeData msgs.
513+
self.logger.warn("Unexpected NodeData msg from %s, disconnecting", peer)
514+
await peer.disconnect(DisconnectReason.bad_protocol)
493515
else:
494-
self.logger.debug("Ignoring %s message from %s", cmd, peer)
516+
self.logger.debug("%s msg not handled yet, need to be implemented", cmd)
495517

496518
async def _handle_new_block(self, peer: ETHPeer, msg: Dict[str, Any]) -> None:
497519
self._sync_requests.put_nowait(peer)
@@ -515,7 +537,7 @@ async def _handle_block_receipts(self,
515537

516538
async def _handle_block_bodies(self,
517539
peer: ETHPeer,
518-
bodies: List[eth.BlockBody]) -> None:
540+
bodies: List[BlockBody]) -> None:
519541
self.logger.debug("Got Bodies for %d blocks from %s", len(bodies), peer)
520542
loop = asyncio.get_event_loop()
521543
iterator = map(make_trie_root_and_nodes, [body.transactions for body in bodies])
@@ -542,6 +564,7 @@ async def _handle_get_block_headers(
542564
headers = await self._handler.lookup_headers(
543565
header_request['block_number_or_hash'], header_request['max_headers'],
544566
header_request['skip'], header_request['reverse'])
567+
self.logger.trace("Replying to %s with %d headers", peer, len(headers))
545568
peer.sub_proto.send_block_headers(headers)
546569

547570

@@ -553,31 +576,11 @@ class RegularChainSyncer(FastChainSyncer):
553576
"""
554577
_exit_on_sync_complete = False
555578

556-
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
557-
msg: protocol._DecodedMsgType) -> None:
558-
peer = cast(ETHPeer, peer)
559-
if isinstance(cmd, eth.BlockHeaders):
560-
self._handle_block_headers(tuple(cast(Tuple[BlockHeader, ...], msg)))
561-
elif isinstance(cmd, eth.BlockBodies):
562-
await self._handle_block_bodies(peer, list(cast(Tuple[eth.BlockBody], msg)))
563-
elif isinstance(cmd, eth.NewBlock):
564-
await self._handle_new_block(peer, cast(Dict[str, Any], msg))
565-
elif isinstance(cmd, eth.GetBlockHeaders):
566-
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
567-
elif isinstance(cmd, eth.GetBlockBodies):
568-
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
569-
block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH]
570-
await self._handler.handle_get_block_bodies(peer, cast(List[Hash32], msg))
571-
elif isinstance(cmd, eth.GetReceipts):
572-
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
573-
block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH]
574-
await self._handler.handle_get_receipts(peer, block_hashes)
575-
elif isinstance(cmd, eth.GetNodeData):
576-
# Only serve up to eth.MAX_STATE_FETCH items in every request.
577-
node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH]
578-
await self._handler.handle_get_node_data(peer, node_hashes)
579-
else:
580-
self.logger.debug("%s msg not handled yet, need to be implemented", cmd)
579+
async def _handle_block_receipts(
580+
self, peer: ETHPeer, receipts_by_block: List[List[eth.Receipt]]) -> None:
581+
# When doing a regular sync we never request receipts.
582+
self.logger.warn("Unexpected BlockReceipts msg from %s, disconnecting", peer)
583+
await peer.disconnect(DisconnectReason.bad_protocol)
581584

582585
async def _process_headers(
583586
self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int:
@@ -599,7 +602,7 @@ async def _process_headers(
599602
transactions: List[BaseTransaction] = []
600603
uncles: List[BlockHeader] = []
601604
else:
602-
body = cast(eth.BlockBody, downloaded_parts[_body_key(header)])
605+
body = cast(BlockBody, downloaded_parts[_body_key(header)])
603606
tx_class = block_class.get_transaction_class()
604607
transactions = [tx_class.from_base_transaction(tx)
605608
for tx in body.transactions]
@@ -624,6 +627,7 @@ def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken)
624627
self.cancel_token = token
625628

626629
async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
630+
self.logger.trace("%s requested bodies for %d blocks", peer, len(block_hashes))
627631
chaindb = cast('AsyncChainDB', self.db)
628632
bodies = []
629633
for block_hash in block_hashes:
@@ -636,9 +640,11 @@ async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32
636640
chaindb.coro_get_block_transactions(header, BaseTransactionFields))
637641
uncles = await self.wait(chaindb.coro_get_block_uncles(header.uncles_hash))
638642
bodies.append(BlockBody(transactions, uncles))
643+
self.logger.trace("Replying to %s with %d block bodies", peer, len(bodies))
639644
peer.sub_proto.send_block_bodies(bodies)
640645

641646
async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
647+
self.logger.trace("%s requested receipts for %d blocks", peer, len(block_hashes))
642648
chaindb = cast('AsyncChainDB', self.db)
643649
receipts = []
644650
for block_hash in block_hashes:
@@ -650,9 +656,11 @@ async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -
650656
continue
651657
block_receipts = await self.wait(chaindb.coro_get_receipts(header, Receipt))
652658
receipts.append(block_receipts)
659+
self.logger.trace("Replying to %s with receipts for %d blocks", peer, len(receipts))
653660
peer.sub_proto.send_receipts(receipts)
654661

655662
async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -> None:
663+
self.logger.trace("%s requested %d trie nodes", peer, len(node_hashes))
656664
chaindb = cast('AsyncChainDB', self.db)
657665
nodes = []
658666
for node_hash in node_hashes:
@@ -662,6 +670,7 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -
662670
self.logger.debug("%s asked for a trie node we don't have: %s", peer, node_hash)
663671
continue
664672
nodes.append(node)
673+
self.logger.trace("Replying to %s with %d trie nodes", peer, len(nodes))
665674
peer.sub_proto.send_node_data(nodes)
666675

667676
async def lookup_headers(self, block_number_or_hash: Union[int, bytes], max_headers: int,
@@ -731,7 +740,7 @@ async def _generate_available_headers(
731740

732741

733742
class DownloadedBlockPart(NamedTuple):
734-
part: Union[eth.BlockBody, List[Receipt]]
743+
part: Union[BlockBody, List[Receipt]]
735744
unique_key: Union[bytes, Tuple[bytes, bytes]]
736745

737746

p2p/peer.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ async def do_p2p_handshake(self) -> None:
276276
# Peers sometimes send a disconnect msg before they send the initial P2P handshake.
277277
raise HandshakeFailure("{} disconnected before completing handshake: {}".format(
278278
self, msg['reason_name']))
279-
self.process_p2p_handshake(cmd, msg)
279+
await self.process_p2p_handshake(cmd, msg)
280280

281281
@property
282282
async def genesis(self) -> BlockHeader:
@@ -393,16 +393,17 @@ def process_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> N
393393
else:
394394
self.handle_sub_proto_msg(cmd, msg)
395395

396-
def process_p2p_handshake(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None:
396+
async def process_p2p_handshake(
397+
self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None:
397398
msg = cast(Dict[str, Any], msg)
398399
if not isinstance(cmd, Hello):
399-
self.disconnect(DisconnectReason.bad_protocol)
400+
await self.disconnect(DisconnectReason.bad_protocol)
400401
raise HandshakeFailure("Expected a Hello msg, got {}, disconnecting".format(cmd))
401402
remote_capabilities = msg['capabilities']
402403
try:
403404
self.sub_proto = self.select_sub_protocol(remote_capabilities)
404405
except NoMatchingPeerCapabilities:
405-
self.disconnect(DisconnectReason.useless_peer)
406+
await self.disconnect(DisconnectReason.useless_peer)
406407
raise HandshakeFailure(
407408
"No matching capabilities between us ({}) and {} ({}), disconnecting".format(
408409
self.capabilities, self.remote, remote_capabilities))
@@ -474,9 +475,11 @@ def send(self, header: bytes, body: bytes) -> None:
474475
self.logger.trace("Sending msg with cmd_id: %s", cmd_id)
475476
self.writer.write(self.encrypt(header, body))
476477

477-
def disconnect(self, reason: DisconnectReason) -> None:
478+
async def disconnect(self, reason: DisconnectReason) -> None:
478479
"""Send a disconnect msg to the remote node and stop this Peer.
479480
481+
Also awaits for self.cancel() to ensure any pending tasks are cleaned up.
482+
480483
:param reason: An item from the DisconnectReason enum.
481484
"""
482485
if not isinstance(reason, DisconnectReason):
@@ -485,6 +488,8 @@ def disconnect(self, reason: DisconnectReason) -> None:
485488
self.logger.debug("Disconnecting from remote peer; reason: %s", reason.name)
486489
self.base_protocol.send_disconnect(reason.value)
487490
self.close()
491+
if self.is_running:
492+
await self.cancel()
488493

489494
def select_sub_protocol(self, remote_capabilities: List[Tuple[bytes, int]]
490495
) -> protocol.Protocol:
@@ -537,18 +542,18 @@ async def send_sub_proto_handshake(self) -> None:
537542
async def process_sub_proto_handshake(
538543
self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None:
539544
if not isinstance(cmd, (les.Status, les.StatusV2)):
540-
self.disconnect(DisconnectReason.subprotocol_error)
545+
await self.disconnect(DisconnectReason.subprotocol_error)
541546
raise HandshakeFailure(
542547
"Expected a LES Status msg, got {}, disconnecting".format(cmd))
543548
msg = cast(Dict[str, Any], msg)
544549
if msg['networkId'] != self.network_id:
545-
self.disconnect(DisconnectReason.useless_peer)
550+
await self.disconnect(DisconnectReason.useless_peer)
546551
raise HandshakeFailure(
547552
"{} network ({}) does not match ours ({}), disconnecting".format(
548553
self, msg['networkId'], self.network_id))
549554
genesis = await self.genesis
550555
if msg['genesisHash'] != genesis.hash:
551-
self.disconnect(DisconnectReason.useless_peer)
556+
await self.disconnect(DisconnectReason.useless_peer)
552557
raise HandshakeFailure(
553558
"{} genesis ({}) does not match ours ({}), disconnecting".format(
554559
self, encode_hex(msg['genesisHash']), genesis.hex_hash))
@@ -628,18 +633,18 @@ async def send_sub_proto_handshake(self) -> None:
628633
async def process_sub_proto_handshake(
629634
self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None:
630635
if not isinstance(cmd, eth.Status):
631-
self.disconnect(DisconnectReason.subprotocol_error)
636+
await self.disconnect(DisconnectReason.subprotocol_error)
632637
raise HandshakeFailure(
633638
"Expected a ETH Status msg, got {}, disconnecting".format(cmd))
634639
msg = cast(Dict[str, Any], msg)
635640
if msg['network_id'] != self.network_id:
636-
self.disconnect(DisconnectReason.useless_peer)
641+
await self.disconnect(DisconnectReason.useless_peer)
637642
raise HandshakeFailure(
638643
"{} network ({}) does not match ours ({}), disconnecting".format(
639644
self, msg['network_id'], self.network_id))
640645
genesis = await self.genesis
641646
if msg['genesis_hash'] != genesis.hash:
642-
self.disconnect(DisconnectReason.useless_peer)
647+
await self.disconnect(DisconnectReason.useless_peer)
643648
raise HandshakeFailure(
644649
"{} genesis ({}) does not match ours ({}), disconnecting".format(
645650
self, encode_hex(msg['genesis_hash']), genesis.hex_hash))
@@ -770,12 +775,8 @@ async def _run(self) -> None:
770775

771776
async def stop_all_peers(self) -> None:
772777
self.logger.info("Stopping all peers ...")
773-
774778
peers = self.connected_nodes.values()
775-
for peer in peers:
776-
peer.disconnect(DisconnectReason.client_quitting)
777-
778-
await asyncio.gather(*[peer.cancel() for peer in peers])
779+
await asyncio.gather(*[peer.disconnect(DisconnectReason.client_quitting) for peer in peers])
779780

780781
async def _cleanup(self) -> None:
781782
await self.stop_all_peers()

p2p/sharding_peer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ async def process_sub_proto_handshake(self,
6767
cmd: Command,
6868
msg: protocol._DecodedMsgType) -> None:
6969
if not isinstance(cmd, Status):
70-
self.disconnect(DisconnectReason.subprotocol_error)
70+
await self.disconnect(DisconnectReason.subprotocol_error)
7171
raise HandshakeFailure("Expected status msg, got {}, disconnecting".format(cmd))
7272

7373
async def _get_headers_at_chain_split(

p2p/state.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,17 @@ async def _handle_msg(
129129
elif isinstance(cmd, eth.GetBlockHeaders):
130130
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
131131
elif isinstance(cmd, eth.GetBlockBodies):
132-
await self._handler.handle_get_block_bodies(peer, cast(List[Hash32], msg))
132+
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
133+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH]
134+
await self._handler.handle_get_block_bodies(peer, block_hashes)
133135
elif isinstance(cmd, eth.GetReceipts):
134-
await self._handler.handle_get_receipts(peer, cast(List[Hash32], msg))
136+
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
137+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH]
138+
await self._handler.handle_get_receipts(peer, block_hashes)
135139
elif isinstance(cmd, eth.GetNodeData):
136-
await self._handler.handle_get_node_data(peer, cast(List[Hash32], msg))
140+
# Only serve up to eth.MAX_STATE_FETCH items in every request.
141+
node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH]
142+
await self._handler.handle_get_node_data(peer, node_hashes)
137143
else:
138144
self.logger.warn("%s not handled during StateSync, must be implemented", cmd)
139145

trinity/plugins/builtin/tx_pool/pool.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ async def _run(self) -> None:
7373

7474
async def _handle_tx(self, peer: ETHPeer, txs: List[BaseTransactionFields]) -> None:
7575

76-
self.logger.debug('Received transactions from %r: %r', peer, txs)
76+
self.logger.trace('Received transactions from %r: %r', peer, txs)
7777

7878
self._add_txs_to_bloom(peer, txs)
7979

@@ -87,7 +87,7 @@ async def _handle_tx(self, peer: ETHPeer, txs: List[BaseTransactionFields]) -> N
8787
if len(filtered_tx) == 0:
8888
continue
8989

90-
self.logger.debug(
90+
self.logger.trace(
9191
'Sending transactions to %r: %r',
9292
receiving_peer,
9393
filtered_tx

0 commit comments

Comments
 (0)