Skip to content

Commit dd304cc

Browse files
authored
Merge pull request #1245 from carver/service-exceptions
Clearer service exceptions
2 parents fd5c5d4 + 943a511 commit dd304cc

File tree

5 files changed

+88
-11
lines changed

5 files changed

+88
-11
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ clean-pyc:
2323
find . -name '*~' -exec rm -f {} +
2424

2525
lint:
26-
tox -elint-py3{6,5}
26+
tox -epy3{6,5}-lint
2727

2828
test:
2929
py.test --tb native tests

p2p/peer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ def unsubscribe(self, subscriber: PeerSubscriber) -> None:
732732
peer.remove_subscriber(subscriber)
733733

734734
async def start_peer(self, peer: BasePeer) -> None:
735-
self.run_task(peer.run())
735+
self.run_child_service(peer)
736736
await self.wait(peer.events.started.wait(), timeout=1)
737737
try:
738738
# Although connect() may seem like a more appropriate place to perform the DAO fork

p2p/service.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@
88
Callable,
99
List,
1010
Optional,
11-
Set,
1211
cast,
1312
)
1413
from weakref import WeakSet
1514

16-
from eth.tools.logging import TraceLogger
17-
1815
from cancel_token import CancelToken, OperationCancelled
16+
from eth_utils import (
17+
ValidationError,
18+
)
19+
20+
from eth.tools.logging import TraceLogger
1921

2022
from p2p.cancellable import CancellableMixin
2123
from p2p.utils import get_asyncio_executor
@@ -32,8 +34,8 @@ def __init__(self) -> None:
3234

3335
class BaseService(ABC, CancellableMixin):
3436
logger: TraceLogger = None
35-
_child_services: Set['BaseService']
3637
# Use a WeakSet so that we don't have to bother updating it when tasks finish.
38+
_child_services: 'WeakSet[BaseService]'
3739
_tasks: 'WeakSet[asyncio.Future[Any]]'
3840
_finished_callbacks: List[Callable[['BaseService'], None]]
3941
# Number of seconds cancel() will wait for run() to finish.
@@ -49,7 +51,7 @@ def __init__(self,
4951
loop: asyncio.AbstractEventLoop = None) -> None:
5052
self.events = ServiceEvents()
5153
self._run_lock = asyncio.Lock()
52-
self._child_services = set()
54+
self._child_services = WeakSet()
5355
self._tasks = WeakSet()
5456
self._finished_callbacks = []
5557

@@ -88,9 +90,9 @@ async def run(
8890
finished_callback (if one was passed).
8991
"""
9092
if self.is_running:
91-
raise RuntimeError("Cannot start the service while it's already running")
93+
raise ValidationError("Cannot start the service while it's already running")
9294
elif self.is_cancelled:
93-
raise RuntimeError("Cannot restart a service that has already been cancelled")
95+
raise ValidationError("Cannot restart a service that has already been cancelled")
9496

9597
if finished_callback:
9698
self._finished_callbacks.append(finished_callback)
@@ -144,6 +146,15 @@ def run_child_service(self, child_service: 'BaseService') -> None:
144146
"""
145147
Run a child service and keep a reference to it to be considered during the cleanup.
146148
"""
149+
if child_service.is_running:
150+
raise ValidationError(
151+
f"Can't start service {child_service!r}, child of {self!r}: it's already running"
152+
)
153+
elif child_service.is_cancelled:
154+
raise ValidationError(
155+
f"Can't restart {child_service!r}, child of {self!r}: it's already completed"
156+
)
157+
147158
self._child_services.add(child_service)
148159
self.run_task(child_service.run())
149160

@@ -153,6 +164,15 @@ def run_daemon(self, service: 'BaseService') -> None:
153164
154165
If the service finishes while we're still running, we'll terminate as well.
155166
"""
167+
if service.is_running:
168+
raise ValidationError(
169+
f"Can't start daemon {service!r}, child of {self!r}: it's already running"
170+
)
171+
elif service.is_cancelled:
172+
raise ValidationError(
173+
f"Can't restart daemon {service!r}, child of {self!r}: it's already completed"
174+
)
175+
156176
self._child_services.add(service)
157177

158178
async def _run_daemon_wrapper() -> None:
@@ -193,7 +213,7 @@ async def cancel(self) -> None:
193213
self.logger.warning("Tried to cancel %s, but it was already cancelled", self)
194214
return
195215
elif not self.is_running:
196-
raise RuntimeError("Cannot cancel a service that has not been started")
216+
raise ValidationError("Cannot cancel a service that has not been started")
197217

198218
self.logger.debug("Cancelling %s", self)
199219
self.events.cancelled.set()

tests/p2p/test_service.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,60 @@ async def test_daemon_exit_causes_parent_cancellation():
3939
assert not service.is_running
4040

4141
await asyncio.wait_for(service.events.cleaned_up.wait(), timeout=1)
42+
43+
44+
@pytest.mark.asyncio
45+
async def test_service_tasks_do_not_leak_memory():
46+
service = WaitService()
47+
asyncio.ensure_future(service.run())
48+
49+
end = asyncio.Event()
50+
51+
async def run_until_end():
52+
await end.wait()
53+
54+
service.run_task(run_until_end())
55+
56+
# inspect internals to determine if memory is leaking
57+
58+
# confirm that task is tracked:
59+
assert len(service._tasks) == 1
60+
61+
end.set()
62+
# allow the coro to exit
63+
await asyncio.sleep(0)
64+
65+
# confirm that task is no longer tracked:
66+
assert len(service._tasks) == 0
67+
68+
# test cleanup
69+
await service.cancel()
70+
71+
72+
@pytest.mark.asyncio
73+
async def test_service_children_do_not_leak_memory():
74+
parent = WaitService()
75+
child = WaitService()
76+
asyncio.ensure_future(parent.run())
77+
78+
parent.run_child_service(child)
79+
80+
# inspect internals to determine if memory is leaking
81+
82+
# confirm that child service is tracked:
83+
assert len(parent._child_services) == 1
84+
85+
# give child a chance to start
86+
await asyncio.sleep(0)
87+
88+
# ... and then end it
89+
await child.cancel()
90+
91+
# remove the final reference to the child service
92+
del child
93+
94+
# confirm that child service is no longer tracked:
95+
assert len(parent._child_services) == 0
96+
97+
# test cleanup
98+
await parent.cancel()

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ envlist=
66
py{36}-rpc-state-{frontier,homestead,eip150,eip158,byzantium,quadratic}
77
py{35,36}-native-state-{frontier,homestead,eip150,eip158,byzantium,constantinople,metropolis}
88
py37-{core,trinity,trinity-integration}
9-
lint-py{35,36}
9+
py{35,36}-lint
1010

1111
[flake8]
1212
max-line-length= 100

0 commit comments

Comments
 (0)