Skip to content

Commit 9f8bee7

Browse files
committed
Add a sleep() method to CancellableMixin
1 parent e95fd33 commit 9f8bee7

File tree

2 files changed

+12
-6
lines changed

2 files changed

+12
-6
lines changed

p2p/service.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ async def threadsafe_cancel(self) -> None:
133133
asyncio.run_coroutine_threadsafe(self.cancel(), loop=self.loop)
134134
await asyncio.wait_for(self.cleaned_up.wait(), timeout=self._wait_until_finished_timeout)
135135

136+
async def sleep(self, delay: float) -> None:
137+
"""Coroutine that completes after a given time (in seconds)."""
138+
await self.wait(asyncio.sleep(delay))
139+
136140
@abstractmethod
137141
async def _run(self) -> None:
138142
"""Run the service's loop.

p2p/state.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ async def _process_nodes(self, nodes: Iterable[Tuple[Hash32, bytes]]) -> None:
129129
# XXX: This is a quick workaround for
130130
# https://github.com/ethereum/py-evm/issues/1074, which will be replaced soon
131131
# with a proper fix.
132-
await self.wait(asyncio.sleep(0))
132+
await self.sleep(0)
133133

134134
async def _handle_msg(
135135
self, peer: ETHPeer, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None:
@@ -192,6 +192,8 @@ async def _handle_get_block_headers(self, peer: ETHPeer, request: HeaderRequest)
192192
async def _cleanup(self) -> None:
193193
# We don't need to cancel() anything, but we yield control just so that the coroutines we
194194
# run in the background notice the cancel token has been triggered and return.
195+
# Also, don't use self.sleep() here as the cancel token will be triggered and that will
196+
# raise OperationCancelled.
195197
await asyncio.sleep(0)
196198

197199
async def request_nodes(self, node_keys: Iterable[Hash32]) -> None:
@@ -202,7 +204,7 @@ async def request_nodes(self, node_keys: Iterable[Hash32]) -> None:
202204
except NoEligiblePeers:
203205
self.logger.debug(
204206
"No idle peers have any of the trie nodes we want, sleeping a bit")
205-
await self.wait(asyncio.sleep(0.2))
207+
await self.sleep(0.2)
206208
continue
207209

208210
candidates = list(not_yet_requested.difference(self._peer_missing_nodes[peer]))
@@ -238,7 +240,7 @@ async def _periodically_retry_timedout(self) -> None:
238240
now = time.time()
239241
sleep_duration = (oldest_request_time + self._reply_timeout) - now
240242
try:
241-
await self.wait(asyncio.sleep(sleep_duration))
243+
await self.sleep(sleep_duration)
242244
except OperationCancelled:
243245
break
244246

@@ -257,7 +259,7 @@ async def _run(self) -> None:
257259
# This ensures we yield control and give _handle_msg() a chance to process any nodes
258260
# we may have received already, also ensuring we exit when our cancel token is
259261
# triggered.
260-
await self.wait(asyncio.sleep(0))
262+
await self.sleep(0)
261263

262264
requests = self.scheduler.next_batch(eth.MAX_STATE_FETCH)
263265
if not requests:
@@ -266,7 +268,7 @@ async def _run(self) -> None:
266268
# pending nodes take a while to arrive thus causing the scheduler to run out
267269
# of new requests for a while.
268270
self.logger.debug("Scheduler queue is empty, sleeping a bit")
269-
await self.wait(asyncio.sleep(0.5))
271+
await self.sleep(0.5)
270272
continue
271273

272274
await self.request_nodes([request.node_key for request in requests])
@@ -287,7 +289,7 @@ async def _periodically_report_progress(self) -> None:
287289
"Nodes scheduled but not requested yet: %d", len(self.scheduler.requests))
288290
self.logger.info("Total nodes timed out: %d", self._total_timeouts)
289291
try:
290-
await self.wait(asyncio.sleep(self._report_interval))
292+
await self.sleep(self._report_interval)
291293
except OperationCancelled:
292294
break
293295

0 commit comments

Comments
 (0)