Skip to content

Commit fcf73d0

Browse files
authored
Merge pull request ethereum#1214 from carver/service-is-operational
Add BaseService.is_operational
2 parents 14aec04 + 75068c0 commit fcf73d0

File tree

14 files changed

+36
-25
lines changed

14 files changed

+36
-25
lines changed

p2p/DEVELOPMENT.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ library.
1818
## BaseService
1919

2020
- If your service needs to run coroutines in the background, you should use the `BaseService.run_task()` method and
21-
ensure they exit when `is_running` is False or when the cancel token is triggered.
21+
ensure they exit when `is_operational` is False or when the cancel token is triggered.
2222
- If your service runs other services in the background, you should pass your CancelToken down to
2323
those services and run those using `BaseService.run_child_service()`, or
2424
`BaseService.run_daemon()` if you want the parent to be terminated when the child dies

p2p/discovery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ async def _run(self) -> None:
337337
await self._start_udp_listener()
338338
connect_loop_sleep = 2
339339
self.run_task(self.proto.bootstrap())
340-
while not self.cancel_token.triggered:
340+
while self.is_operational:
341341
await self.maybe_connect_to_more_peers()
342342
await self.sleep(connect_loop_sleep)
343343

p2p/nat.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ async def _run(self) -> None:
7272
On every iteration we configure the port mapping with a lifetime of 30 minutes and then
7373
sleep for that long as well.
7474
"""
75-
while not self.cancel_token.triggered:
75+
while self.is_operational:
7676
try:
7777
# Wait for the port mapping lifetime, and then try registering it again
7878
await self.wait(asyncio.sleep(self._nat_portmap_lifetime))

p2p/peer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ async def _cleanup(self) -> None:
342342
self.close()
343343

344344
async def _run(self) -> None:
345-
while not self.cancel_token.triggered:
345+
while self.is_operational:
346346
try:
347347
cmd, msg = await self.read_msg()
348348
except (PeerConnectionLost, TimeoutError) as err:
@@ -530,7 +530,7 @@ async def disconnect(self, reason: DisconnectReason) -> None:
530530
self.logger.debug("Disconnecting from remote peer; reason: %s", reason.name)
531531
self.base_protocol.send_disconnect(reason.value)
532532
self.close()
533-
if self.is_running:
533+
if self.is_operational:
534534
await self.cancel()
535535

536536
def select_sub_protocol(self, remote_capabilities: List[Tuple[bytes, int]]
@@ -912,7 +912,7 @@ def get_peers(self, min_td: int) -> List[BasePeer]:
912912
return [peer for peer in peers if peer.head_td >= min_td]
913913

914914
async def _periodically_report_stats(self) -> None:
915-
while self.is_running:
915+
while self.is_operational:
916916
inbound_peers = len(
917917
[peer for peer in self.connected_nodes.values() if peer.inbound])
918918
self.logger.info("Connected peers: %d inbound, %d outbound",

p2p/service.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ async def run(
8989
"""
9090
if self.is_running:
9191
raise RuntimeError("Cannot start the service while it's already running")
92-
elif self.cancel_token.triggered:
92+
elif self.is_cancelled:
9393
raise RuntimeError("Cannot restart a service that has already been cancelled")
9494

9595
if finished_callback:
@@ -135,6 +135,7 @@ async def _run_task_wrapper() -> None:
135135
pass
136136
except Exception as e:
137137
self.logger.warning("Task %s finished unexpectedly: %s", awaitable, e)
138+
self.logger.warning("Task failure traceback", exc_info=True)
138139
else:
139140
self.logger.debug("Task %s finished with no errors", awaitable)
140141
self._tasks.add(asyncio.ensure_future(_run_task_wrapper()))
@@ -158,7 +159,7 @@ async def _run_daemon_wrapper() -> None:
158159
try:
159160
await service.run()
160161
finally:
161-
if not self.cancel_token.triggered:
162+
if not self.is_cancelled:
162163
self.logger.debug(
163164
"%s finished while we're still running, terminating as well", service)
164165
self.cancel_token.trigger()
@@ -188,7 +189,7 @@ async def cleanup(self) -> None:
188189

189190
async def cancel(self) -> None:
190191
"""Trigger the CancelToken and wait for the cleaned_up event to be set."""
191-
if self.cancel_token.triggered:
192+
if self.is_cancelled:
192193
self.logger.warning("Tried to cancel %s, but it was already cancelled", self)
193194
return
194195
elif not self.is_running:
@@ -215,6 +216,14 @@ def _forcibly_cancel_all_tasks(self) -> None:
215216
for task in self._tasks:
216217
task.cancel()
217218

219+
@property
220+
def is_cancelled(self) -> bool:
221+
return self.cancel_token.triggered
222+
223+
@property
224+
def is_operational(self) -> bool:
225+
return self.events.started.is_set() and not self.cancel_token.triggered
226+
218227
@property
219228
def is_running(self) -> bool:
220229
return self._run_lock.locked()

tests/p2p/test_service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ async def test_daemon_exit_causes_parent_cancellation():
2727
service = ParentService()
2828
asyncio.ensure_future(service.run())
2929
await asyncio.sleep(0.01)
30+
assert service.daemon.is_operational
3031
assert service.daemon.is_running
3132
await service.daemon.cancel()
3233
await asyncio.sleep(0.01)
34+
assert not service.is_operational
3335
assert not service.is_running
3436
await service.events.cleaned_up.wait()

tests/trinity/core/integration_test_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
async def connect_to_peers_loop(peer_pool, nodes):
1818
"""Loop forever trying to connect to one of the given nodes if the pool is not yet full."""
19-
while not peer_pool.cancel_token.triggered:
19+
while peer_pool.is_operational:
2020
try:
2121
if not peer_pool.is_full:
2222
await peer_pool.connect_to_nodes(nodes)

trinity/plugins/builtin/tx_pool/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ async def _run(self) -> None:
7373
self.logger.info("Running Tx Pool")
7474

7575
with self.subscribe(self._peer_pool):
76-
while not self.cancel_token.triggered:
76+
while self.is_operational:
7777
peer, cmd, msg = await self.wait(
7878
self.msg_queue.get(), token=self.cancel_token)
7979
peer = cast(ETHPeer, peer)

trinity/protocol/common/exchanges.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ async def get_result(
6060
- the manager service is running
6161
- the payload validator is primed with the request payload
6262
"""
63-
if not self._manager.is_running:
63+
if not self._manager.is_operational:
6464
await self._manager.launch_service()
6565

6666
# bind the outbound request payload to the payload validator

trinity/protocol/common/managers.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ async def _run(self) -> None:
118118
self.logger.debug("Launching %s for peer %s", self.__class__.__name__, self._peer)
119119

120120
with self.subscribe_peer(self._peer):
121-
while not self.cancel_token.triggered:
121+
while self.is_operational:
122122
peer, cmd, msg = await self.wait(self.msg_queue.get())
123123
if peer != self._peer:
124124
self.logger.error("Unexpected peer: %s expected: %s", peer, self._peer)
@@ -202,8 +202,8 @@ async def launch_service(self) -> None:
202202
await self._response_stream.events.started.wait()
203203

204204
@property
205-
def is_running(self) -> bool:
206-
return self.service is not None and self.service.is_running
205+
def is_operational(self) -> bool:
206+
return self.service is not None and self.service.is_operational
207207

208208
async def get_result(
209209
self,
@@ -213,7 +213,7 @@ async def get_result(
213213
payload_validator: Callable[[TResponsePayload], None],
214214
timeout: int = None) -> TResult:
215215

216-
if not self.is_running:
216+
if not self.is_operational:
217217
raise ValidationError("You must call `launch_service` before initiating a peer request")
218218

219219
stream = self._response_stream

0 commit comments

Comments
 (0)