Skip to content

Commit dee977e

Browse files
committed
BaseService now terminates itself if a daemon dies unexpectedly
If a daemon finishes while its parent is still running, the parent terminates itself. Closes: #679
1 parent cde40db commit dee977e

File tree

4 files changed

+80
-28
lines changed

4 files changed

+80
-28
lines changed

p2p/discovery.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import collections
1010
import logging
1111
import random
12+
import socket
1213
import time
1314
from typing import (
1415
Any,
@@ -327,19 +328,29 @@ class DiscoveryService(BaseService):
327328
_last_lookup: float = 0
328329
_lookup_interval: int = 30
329330

330-
def __init__(
331-
self, proto: DiscoveryProtocol, peer_pool: PeerPool, token: CancelToken = None) -> None:
331+
def __init__(self, proto: DiscoveryProtocol, peer_pool: PeerPool,
332+
port: int, token: CancelToken = None) -> None:
332333
super().__init__(token)
333334
self.proto = proto
334335
self.peer_pool = peer_pool
336+
self.port = port
335337

336338
async def _run(self) -> None:
339+
await self._start_udp_listener()
337340
connect_loop_sleep = 2
338341
self.run_task(self.proto.bootstrap())
339342
while not self.cancel_token.triggered:
340343
await self.maybe_connect_to_more_peers()
341344
await self.sleep(connect_loop_sleep)
342345

346+
async def _start_udp_listener(self) -> None:
347+
loop = asyncio.get_event_loop()
348+
# TODO: Support IPv6 addresses as well.
349+
await loop.create_datagram_endpoint(
350+
lambda: self.proto,
351+
local_addr=('0.0.0.0', self.port),
352+
family=socket.AF_INET)
353+
343354
async def maybe_connect_to_more_peers(self) -> None:
344355
"""Connect to more peers if we're not yet maxed out to max_peers"""
345356
if self.peer_pool.is_full:

p2p/service.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
Callable,
99
List,
1010
Optional,
11+
Set,
1112
cast,
1213
)
1314
from weakref import WeakSet
@@ -31,7 +32,7 @@ def __init__(self) -> None:
3132

3233
class BaseService(ABC, CancellableMixin):
3334
logger: TraceLogger = None
34-
_child_services: List['BaseService']
35+
_child_services: Set['BaseService']
3536
# Use a WeakSet so that we don't have to bother updating it when tasks finish.
3637
_tasks: 'WeakSet[asyncio.Future[Any]]'
3738
_finished_callbacks: List[Callable[['BaseService'], None]]
@@ -48,7 +49,7 @@ def __init__(self,
4849
loop: asyncio.AbstractEventLoop = None) -> None:
4950
self.events = ServiceEvents()
5051
self._run_lock = asyncio.Lock()
51-
self._child_services = []
52+
self._child_services = set()
5253
self._tasks = WeakSet()
5354
self._finished_callbacks = []
5455

@@ -126,20 +127,44 @@ def run_task(self, awaitable: Awaitable[Any]) -> None:
126127
127128
If it raises OperationCancelled, that is caught and ignored.
128129
"""
129-
async def f() -> None:
130+
async def _run_task_wrapper() -> None:
131+
self.logger.debug("Running task %s", awaitable)
130132
try:
131133
await awaitable
132134
except OperationCancelled:
133135
pass
134-
self._tasks.add(asyncio.ensure_future(f()))
136+
except Exception as e:
137+
self.logger.warning("Task %s finished unexpectedly: %s", awaitable, e)
138+
else:
139+
self.logger.debug("Task %s finished with no errors", awaitable)
140+
self._tasks.add(asyncio.ensure_future(_run_task_wrapper()))
135141

136142
def run_child_service(self, child_service: 'BaseService') -> None:
137143
"""
138144
Run a child service and keep a reference to it to be considered during the cleanup.
139145
"""
140-
self._child_services.append(child_service)
146+
self._child_services.add(child_service)
141147
self.run_task(child_service.run())
142148

149+
def run_daemon(self, service: 'BaseService') -> None:
150+
"""
151+
Run a service and keep a reference to it to be considered during the cleanup.
152+
153+
If the service finishes while we're still running, we'll terminate as well.
154+
"""
155+
self._child_services.add(service)
156+
157+
async def _run_daemon_wrapper() -> None:
158+
try:
159+
await service.run()
160+
finally:
161+
if not self.cancel_token.triggered:
162+
self.logger.debug(
163+
"%s finished while we're still running, terminating as well", service)
164+
self.cancel_token.trigger()
165+
166+
self.run_task(_run_daemon_wrapper())
167+
143168
async def _run_in_executor(self, callback: Callable[..., Any], *args: Any) -> Any:
144169
loop = self.get_event_loop()
145170
return await self.wait(loop.run_in_executor(self._executor, callback, *args))

tests/p2p/test_service.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import asyncio
2+
3+
import pytest
4+
5+
from p2p.service import BaseService
6+
7+
8+
class ParentService(BaseService):
9+
"""A Service which just runs WaitService with run_daemon() and waits for its cancel token to
10+
be triggered.
11+
"""
12+
13+
async def _run(self):
14+
self.daemon = WaitService(token=self.cancel_token)
15+
self.run_daemon(self.daemon)
16+
await self.cancel_token.wait()
17+
18+
19+
class WaitService(BaseService):
20+
21+
async def _run(self):
22+
await self.cancel_token.wait()
23+
24+
25+
@pytest.mark.asyncio
26+
async def test_daemon_exit_causes_parent_cancellation():
27+
service = ParentService()
28+
asyncio.ensure_future(service.run())
29+
await asyncio.sleep(0.01)
30+
assert service.daemon.is_running
31+
await service.daemon.cancel()
32+
await asyncio.sleep(0.01)
33+
assert not service.is_running
34+
await service.events.cleaned_up.wait()

trinity/server.py

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
import asyncio
22
import logging
33
import secrets
4-
import socket
54
from typing import (
6-
cast,
75
Sequence,
86
Tuple,
97
Type,
@@ -28,7 +26,6 @@
2826
REPLY_TIMEOUT,
2927
)
3028
from p2p.discovery import (
31-
DiscoveryProtocol,
3229
DiscoveryService,
3330
PreferredNodeDiscoveryProtocol,
3431
)
@@ -66,8 +63,6 @@
6663
class Server(BaseService):
6764
"""Server listening for incoming connections"""
6865
_tcp_listener = None
69-
_udp_listener = None
70-
_udp_transport = None
7166

7267
peer_pool: PeerPool = None
7368

@@ -118,18 +113,6 @@ async def _close_tcp_listener(self) -> None:
118113
self._tcp_listener.close()
119114
await self._tcp_listener.wait_closed()
120115

121-
async def _start_udp_listener(self, discovery: DiscoveryProtocol) -> None:
122-
loop = asyncio.get_event_loop()
123-
# TODO: Support IPv6 addresses as well.
124-
self._udp_transport, _ = await loop.create_datagram_endpoint(
125-
lambda: discovery,
126-
local_addr=('0.0.0.0', self.port),
127-
family=socket.AF_INET)
128-
129-
async def _close_udp_listener(self) -> None:
130-
if self._udp_transport:
131-
cast(asyncio.DatagramTransport, self._udp_transport).abort()
132-
133116
def _make_syncer(self, peer_pool: PeerPool) -> BaseService:
134117
# This method exists only so that ShardSyncer can provide a different implementation.
135118
return FullNodeSyncer(
@@ -166,8 +149,8 @@ async def _run(self) -> None:
166149
addr = Address(external_ip, self.port, self.port)
167150
discovery_proto = PreferredNodeDiscoveryProtocol(
168151
self.privkey, addr, self.bootstrap_nodes, self.preferred_nodes)
169-
await self._start_udp_listener(discovery_proto)
170-
self.discovery = DiscoveryService(discovery_proto, self.peer_pool, self.cancel_token)
152+
self.discovery = DiscoveryService(
153+
discovery_proto, self.peer_pool, self.port, self.cancel_token)
171154
self.run_child_service(self.peer_pool)
172155
self.run_child_service(self.discovery)
173156
self.run_child_service(self.upnp_service)
@@ -176,8 +159,7 @@ async def _run(self) -> None:
176159

177160
async def _cleanup(self) -> None:
178161
self.logger.info("Closing server...")
179-
await asyncio.gather(
180-
self._close_tcp_listener(), self._close_udp_listener())
162+
await self._close_tcp_listener()
181163

182164
async def receive_handshake(
183165
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:

0 commit comments

Comments
 (0)