Skip to content

Commit ee315b7

Browse files
committed
StateDownloader now handles block body/receipt and trie requests
Turned the ChainSyncer methods that do that into standalone funcs so they could be reused in StateDownloader
1 parent f3b2d4c commit ee315b7

File tree

2 files changed

+80
-47
lines changed

2 files changed

+80
-47
lines changed

p2p/chain.py

Lines changed: 67 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -564,55 +564,17 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
564564
elif isinstance(cmd, eth.GetBlockHeaders):
565565
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
566566
elif isinstance(cmd, eth.GetBlockBodies):
567-
await self._handle_get_block_bodies(peer, cast(List[Hash32], msg))
567+
await handle_get_block_bodies(
568+
self.db, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
568569
elif isinstance(cmd, eth.GetReceipts):
569-
await self._handle_get_receipts(peer, cast(List[Hash32], msg))
570+
await handle_get_receipts(
571+
self.db, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
570572
elif isinstance(cmd, eth.GetNodeData):
571-
await self._handle_get_node_data(peer, cast(List[Hash32], msg))
573+
await handle_get_node_data(
574+
self.db, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
572575
else:
573576
self.logger.debug("%s msg not handled yet, need to be implemented", cmd)
574577

575-
async def _handle_get_block_bodies(self, peer: ETHPeer, msg: List[Hash32]) -> None:
576-
bodies = []
577-
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
578-
hashes = msg[:eth.MAX_BODIES_FETCH]
579-
for block_hash in hashes:
580-
try:
581-
header = await self.wait(self.db.coro_get_block_header_by_hash(block_hash))
582-
except HeaderNotFound:
583-
self.logger.debug("%s asked for block we don't have: %s", peer, block_hash)
584-
continue
585-
transactions = await self.wait(
586-
self.db.coro_get_block_transactions(header, BaseTransactionFields))
587-
uncles = await self.wait(self.db.coro_get_block_uncles(header.uncles_hash))
588-
bodies.append(BlockBody(transactions, uncles))
589-
peer.sub_proto.send_block_bodies(bodies)
590-
591-
async def _handle_get_receipts(self, peer: ETHPeer, msg: List[Hash32]) -> None:
592-
receipts = []
593-
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
594-
hashes = msg[:eth.MAX_RECEIPTS_FETCH]
595-
for block_hash in hashes:
596-
try:
597-
header = await self.wait(self.db.coro_get_block_header_by_hash(block_hash))
598-
except HeaderNotFound:
599-
self.logger.debug(
600-
"%s asked receipts for block we don't have: %s", peer, block_hash)
601-
continue
602-
receipts.append(await self.wait(self.db.coro_get_receipts(header, Receipt)))
603-
peer.sub_proto.send_receipts(receipts)
604-
605-
async def _handle_get_node_data(self, peer: ETHPeer, msg: List[Hash32]) -> None:
606-
nodes = []
607-
for node_hash in msg:
608-
try:
609-
node = await self.db.coro_get(node_hash)
610-
except KeyError:
611-
self.logger.debug("%s asked for a trie node we don't have: %s", peer, node_hash)
612-
continue
613-
nodes.append(node)
614-
peer.sub_proto.send_node_data(nodes)
615-
616578
async def _process_headers(
617579
self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int:
618580
target_td = await self._calculate_td(headers)
@@ -756,6 +718,67 @@ async def lookup_headers(
756718
return headers
757719

758720

721+
async def handle_get_block_bodies(
722+
chaindb: 'AsyncChainDB', peer: ETHPeer, block_hashes: List[Hash32],
723+
logger: logging.Logger, token: CancelToken) -> None:
724+
bodies = []
725+
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
726+
for block_hash in block_hashes[:eth.MAX_BODIES_FETCH]:
727+
try:
728+
header = await wait_with_token(
729+
chaindb.coro_get_block_header_by_hash(block_hash),
730+
token=token)
731+
except HeaderNotFound:
732+
logger.debug("%s asked for block we don't have: %s", peer, block_hash)
733+
continue
734+
transactions = await wait_with_token(
735+
chaindb.coro_get_block_transactions(header, BaseTransactionFields),
736+
token=token)
737+
uncles = await wait_with_token(
738+
chaindb.coro_get_block_uncles(header.uncles_hash),
739+
token=token)
740+
bodies.append(BlockBody(transactions, uncles))
741+
peer.sub_proto.send_block_bodies(bodies)
742+
743+
744+
async def handle_get_receipts(
745+
chaindb: 'AsyncChainDB', peer: ETHPeer, block_hashes: List[Hash32],
746+
logger: logging.Logger, token: CancelToken) -> None:
747+
receipts = []
748+
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
749+
for block_hash in block_hashes[:eth.MAX_RECEIPTS_FETCH]:
750+
try:
751+
header = await wait_with_token(
752+
chaindb.coro_get_block_header_by_hash(block_hash),
753+
token=token)
754+
except HeaderNotFound:
755+
logger.debug(
756+
"%s asked receipts for block we don't have: %s", peer, block_hash)
757+
continue
758+
block_receipts = await wait_with_token(
759+
chaindb.coro_get_receipts(header, Receipt),
760+
token=token)
761+
receipts.append(block_receipts)
762+
peer.sub_proto.send_receipts(receipts)
763+
764+
765+
async def handle_get_node_data(
766+
chaindb: 'AsyncChainDB', peer: ETHPeer, node_hashes: List[Hash32],
767+
logger: logging.Logger, token: CancelToken) -> None:
768+
nodes = []
769+
# Only serve up to eth.MAX_STATE_FETCH items in every request.
770+
for node_hash in node_hashes[:eth.MAX_STATE_FETCH]:
771+
try:
772+
node = await wait_with_token(
773+
chaindb.coro_get(node_hash),
774+
token=token)
775+
except KeyError:
776+
logger.debug("%s asked for a trie node we don't have: %s", peer, node_hash)
777+
continue
778+
nodes.append(node)
779+
peer.sub_proto.send_node_data(nodes)
780+
781+
759782
def _test() -> None:
760783
import argparse
761784
import signal

p2p/state.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939

4040
from p2p import eth
4141
from p2p import protocol
42-
from p2p.chain import lookup_headers
42+
from p2p.chain import (
43+
handle_get_block_bodies, handle_get_node_data, handle_get_receipts, lookup_headers)
4344
from p2p.cancel_token import CancelToken
4445
from p2p.exceptions import OperationCancelled
4546
from p2p.peer import ETHPeer, PeerPool, PeerPoolSubscriber
@@ -127,6 +128,15 @@ async def _handle_msg(
127128
self._pending_nodes.pop(node_key, None)
128129
elif isinstance(cmd, eth.GetBlockHeaders):
129130
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
131+
elif isinstance(cmd, eth.GetBlockBodies):
132+
await handle_get_block_bodies(
133+
self.chaindb, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
134+
elif isinstance(cmd, eth.GetReceipts):
135+
await handle_get_receipts(
136+
self.chaindb, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
137+
elif isinstance(cmd, eth.GetNodeData):
138+
await handle_get_node_data(
139+
self.chaindb, peer, cast(List[Hash32], msg), self.logger, self.cancel_token)
130140
else:
131141
self.logger.warn("%s not handled during StateSync, must be implemented", cmd)
132142

@@ -208,7 +218,7 @@ async def _run(self) -> None:
208218
# received nodes (scheduling new requests), there may be cases when the
209219
# pending nodes take a while to arrive thus causing the scheduler to run out
210220
# of new requests for a while.
211-
self.logger.info("Scheduler queue is empty, sleeping a bit")
221+
self.logger.debug("Scheduler queue is empty, sleeping a bit")
212222
await self.wait_first(asyncio.sleep(0.5))
213223
continue
214224

@@ -298,7 +308,7 @@ async def run() -> None:
298308
downloader.logger.info("run() finished, exiting")
299309
sigint_received.set()
300310

301-
loop.set_debug(True)
311+
# loop.set_debug(True)
302312
asyncio.ensure_future(exit_on_sigint())
303313
asyncio.ensure_future(run())
304314
loop.run_forever()

0 commit comments

Comments
 (0)