Skip to content

Commit 42560e8

Browse files
committed
More PR feedback changes
1 parent 5eb2a11 commit 42560e8

File tree

3 files changed

+61
-68
lines changed

3 files changed

+61
-68
lines changed

p2p/cancel_token.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
cast,
66
List,
77
Sequence,
8+
TypeVar,
89
)
910

1011
from p2p.exceptions import (
@@ -88,6 +89,40 @@ def __str__(self) -> str:
8889
return self.name
8990

9091

92+
class CancellableMixin:
93+
cancel_token: CancelToken = None
94+
95+
_TReturn = TypeVar('_TReturn')
96+
97+
async def wait(self,
98+
awaitable: Awaitable[_TReturn],
99+
token: CancelToken = None,
100+
timeout: float = None) -> _TReturn:
101+
"""See wait_first()"""
102+
return await self.wait_first(awaitable, token=token, timeout=timeout)
103+
104+
async def wait_first(self,
105+
*awaitables: Awaitable[_TReturn],
106+
token: CancelToken = None,
107+
timeout: float = None) -> _TReturn:
108+
"""Wait for the first awaitable to complete, unless we timeout or the token chain is triggered.
109+
110+
The given token is chained with this service's token, so triggering either will cancel
111+
this.
112+
113+
Returns the result of the first one to complete.
114+
115+
Raises TimeoutError if we timeout or OperationCancelled if the token chain is triggered.
116+
117+
All pending futures are cancelled before returning.
118+
"""
119+
if token is None:
120+
token_chain = self.cancel_token
121+
else:
122+
token_chain = token.chain(self.cancel_token)
123+
return await wait_with_token(*awaitables, token=token_chain, timeout=timeout)
124+
125+
91126
async def wait_with_token(*awaitables: Awaitable[Any],
92127
token: CancelToken,
93128
timeout: float = None) -> Any:

p2p/chain.py

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from p2p import protocol
3838
from p2p import eth
3939
from p2p import les
40-
from p2p.cancel_token import CancelToken, wait_with_token
40+
from p2p.cancel_token import CancellableMixin, CancelToken
4141
from p2p.constants import MAX_REORG_DEPTH
4242
from p2p.exceptions import NoEligiblePeers, OperationCancelled
4343
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerPoolSubscriber
@@ -565,11 +565,17 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
565565
elif isinstance(cmd, eth.GetBlockHeaders):
566566
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
567567
elif isinstance(cmd, eth.GetBlockBodies):
568+
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
569+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH]
568570
await self._handler.handle_get_block_bodies(peer, cast(List[Hash32], msg))
569571
elif isinstance(cmd, eth.GetReceipts):
570-
await self._handler.handle_get_receipts(peer, cast(List[Hash32], msg))
572+
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
573+
block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH]
574+
await self._handler.handle_get_receipts(peer, block_hashes)
571575
elif isinstance(cmd, eth.GetNodeData):
572-
await self._handler.handle_get_node_data(peer, cast(List[Hash32], msg))
576+
# Only serve up to eth.MAX_STATE_FETCH items in every request.
577+
node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH]
578+
await self._handler.handle_get_node_data(peer, node_hashes)
573579
else:
574580
self.logger.debug("%s msg not handled yet, need to be implemented", cmd)
575581

@@ -610,7 +616,7 @@ async def _process_headers(
610616
return head.block_number
611617

612618

613-
class PeerRequestHandler:
619+
class PeerRequestHandler(CancellableMixin):
614620

615621
def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken) -> None:
616622
self.db = db
@@ -620,52 +626,38 @@ def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken)
620626
async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
621627
chaindb = cast('AsyncChainDB', self.db)
622628
bodies = []
623-
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
624-
for block_hash in block_hashes[:eth.MAX_BODIES_FETCH]:
629+
for block_hash in block_hashes:
625630
try:
626-
header = await wait_with_token(
627-
chaindb.coro_get_block_header_by_hash(block_hash),
628-
token=self.cancel_token)
631+
header = await self.wait(chaindb.coro_get_block_header_by_hash(block_hash))
629632
except HeaderNotFound:
630633
self.logger.debug("%s asked for block we don't have: %s", peer, block_hash)
631634
continue
632-
transactions = await wait_with_token(
633-
chaindb.coro_get_block_transactions(header, BaseTransactionFields),
634-
token=self.cancel_token)
635-
uncles = await wait_with_token(
636-
chaindb.coro_get_block_uncles(header.uncles_hash),
637-
token=self.cancel_token)
635+
transactions = await self.wait(
636+
chaindb.coro_get_block_transactions(header, BaseTransactionFields))
637+
uncles = await self.wait(chaindb.coro_get_block_uncles(header.uncles_hash))
638638
bodies.append(BlockBody(transactions, uncles))
639639
peer.sub_proto.send_block_bodies(bodies)
640640

641641
async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
642642
chaindb = cast('AsyncChainDB', self.db)
643643
receipts = []
644-
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
645-
for block_hash in block_hashes[:eth.MAX_RECEIPTS_FETCH]:
644+
for block_hash in block_hashes:
646645
try:
647-
header = await wait_with_token(
648-
chaindb.coro_get_block_header_by_hash(block_hash),
649-
token=self.cancel_token)
646+
header = await self.wait(chaindb.coro_get_block_header_by_hash(block_hash))
650647
except HeaderNotFound:
651648
self.logger.debug(
652649
"%s asked receipts for block we don't have: %s", peer, block_hash)
653650
continue
654-
block_receipts = await wait_with_token(
655-
chaindb.coro_get_receipts(header, Receipt),
656-
token=self.cancel_token)
651+
block_receipts = await self.wait(chaindb.coro_get_receipts(header, Receipt))
657652
receipts.append(block_receipts)
658653
peer.sub_proto.send_receipts(receipts)
659654

660655
async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -> None:
661656
chaindb = cast('AsyncChainDB', self.db)
662657
nodes = []
663-
# Only serve up to eth.MAX_STATE_FETCH items in every request.
664-
for node_hash in node_hashes[:eth.MAX_STATE_FETCH]:
658+
for node_hash in node_hashes:
665659
try:
666-
node = await wait_with_token(
667-
chaindb.coro_get(node_hash),
668-
token=self.cancel_token)
660+
node = await self.wait(chaindb.coro_get(node_hash))
669661
except KeyError:
670662
self.logger.debug("%s asked for a trie node we don't have: %s", peer, node_hash)
671663
continue
@@ -698,10 +690,8 @@ async def _get_block_numbers_for_request(
698690
"""
699691
block_number_or_hash = block_number_or_hash
700692
if isinstance(block_number_or_hash, bytes):
701-
header = await wait_with_token(
702-
self.db.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)),
703-
token=self.cancel_token,
704-
)
693+
header = await self.wait(
694+
self.db.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)))
705695
block_number = header.block_number
706696
elif isinstance(block_number_or_hash, int):
707697
block_number = block_number_or_hash
@@ -730,10 +720,8 @@ async def _generate_available_headers(
730720
"""
731721
for block_num in block_numbers:
732722
try:
733-
yield await wait_with_token(
734-
self.db.coro_get_canonical_block_header_by_number(block_num),
735-
token=self.cancel_token
736-
)
723+
yield await self.wait(
724+
self.db.coro_get_canonical_block_header_by_number(block_num))
737725
except HeaderNotFound:
738726
self.logger.debug(
739727
"Peer requested header number %s that is unavailable, stopping search.",

p2p/service.py

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313

1414
from evm.utils.logging import TraceLogger
1515

16-
from p2p.cancel_token import CancelToken, wait_with_token
16+
from p2p.cancel_token import CancellableMixin, CancelToken
1717
from p2p.exceptions import OperationCancelled
1818

1919

20-
class BaseService(ABC):
20+
class BaseService(ABC, CancellableMixin):
2121
logger: TraceLogger = None
2222
_child_services: List['BaseService'] = []
2323
# Number of seconds cancel() will wait for run() to finish.
@@ -37,36 +37,6 @@ def __init__(self, token: CancelToken=None) -> None:
3737
else:
3838
self.cancel_token = base_token.chain(token)
3939

40-
_TReturn = TypeVar('_TReturn')
41-
42-
async def wait(self,
43-
awaitable: Awaitable[_TReturn],
44-
token: CancelToken = None,
45-
timeout: float = None) -> _TReturn:
46-
"""See wait_first()"""
47-
return await self.wait_first(awaitable, token=token, timeout=timeout)
48-
49-
async def wait_first(self,
50-
*awaitables: Awaitable[_TReturn],
51-
token: CancelToken = None,
52-
timeout: float = None) -> _TReturn:
53-
"""Wait for the first awaitable to complete, unless we timeout or the token chain is triggered.
54-
55-
The given token is chained with this service's token, so triggering either will cancel
56-
this.
57-
58-
Returns the result of the first one to complete.
59-
60-
Raises TimeoutError if we timeout or OperationCancelled if the token chain is triggered.
61-
62-
All pending futures are cancelled before returning.
63-
"""
64-
if token is None:
65-
token_chain = self.cancel_token
66-
else:
67-
token_chain = token.chain(self.cancel_token)
68-
return await wait_with_token(*awaitables, token=token_chain, timeout=timeout)
69-
7040
async def run(
7141
self,
7242
finished_callback: Optional[Callable[['BaseService'], None]] = None) -> None:

0 commit comments

Comments
 (0)