Skip to content

Commit 7c4630d

Browse files
committed
Ensure isolated plugins are stopped concurrently
So, far isolated plugins were taken down sequentially. The more trinity moves towards a multi process architecture, the more this is likely to become a problem. Taking down these processes concurrently is a bit of a challenge because the code runs under the asumption that the event loop may or may not be closed already. Hence, this code tries to avoid introducing async for the sake of isolated process take down.
1 parent b0fa355 commit 7c4630d

File tree

5 files changed

+91
-44
lines changed

5 files changed

+91
-44
lines changed

docs/api/trinity/extensibility/api.extensibility.plugin.rst

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,6 @@ BasePlugin
1414
.. autoclass:: trinity.extensibility.plugin.BasePlugin
1515
:members:
1616

17-
BaseSyncStopPlugin
18-
------------------
19-
20-
.. autoclass:: trinity.extensibility.plugin.BaseSyncStopPlugin
21-
:members:
22-
2317
BaseAsyncStopPlugin
2418
-------------------
2519

trinity/extensibility/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
BaseMainProcessPlugin,
77
BaseIsolatedPlugin,
88
BasePlugin,
9-
BaseSyncStopPlugin,
109
DebugPlugin,
1110
PluginContext,
1211
TrinityBootInfo,

trinity/extensibility/plugin.py

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,6 @@
4444
EventBusNotReady,
4545
InvalidPluginStatus,
4646
)
47-
from trinity.utils.ipc import (
48-
kill_process_gracefully
49-
)
5047
from trinity.utils.mp import (
5148
ctx,
5249
)
@@ -229,26 +226,6 @@ def do_start(self) -> None:
229226
pass
230227

231228

232-
class BaseSyncStopPlugin(BasePlugin):
233-
"""
234-
A :class:`~trinity.extensibility.plugin.BaseSyncStopPlugin` unwinds synchronoulsy, hence blocks
235-
until the shutdown is done.
236-
"""
237-
def do_stop(self) -> None:
238-
"""
239-
Stop the plugin. Should be overwritten by subclasses.
240-
"""
241-
pass
242-
243-
def stop(self) -> None:
244-
"""
245-
Delegate to :meth:`~trinity.extensibility.plugin.BaseSyncStopPlugin.do_stop` causing the
246-
plugin to stop and setting ``running`` to ``False``.
247-
"""
248-
self.do_stop()
249-
self._status = PluginStatus.STOPPED
250-
251-
252229
class BaseAsyncStopPlugin(BasePlugin):
253230
"""
254231
A :class:`~trinity.extensibility.plugin.BaseAsyncStopPlugin` unwinds asynchronoulsy, hence
@@ -279,7 +256,7 @@ class BaseMainProcessPlugin(BasePlugin):
279256
pass
280257

281258

282-
class BaseIsolatedPlugin(BaseSyncStopPlugin):
259+
class BaseIsolatedPlugin(BasePlugin):
283260
"""
284261
A :class:`~trinity.extensibility.plugin.BaseIsolatedPlugin` runs in an isolated process and
285262
hence provides security and flexibility by not making assumptions about its internal
@@ -292,6 +269,13 @@ class BaseIsolatedPlugin(BaseSyncStopPlugin):
292269

293270
_process: Process = None
294271

272+
@property
273+
def process(self) -> Process:
274+
"""
275+
Return the ``Process`` created by the isolated plugin.
276+
"""
277+
return self._process
278+
295279
def start(self) -> None:
296280
"""
297281
Prepare the plugin to get started and eventually call ``do_start`` in a separate process.
@@ -314,8 +298,14 @@ def _prepare_start(self) -> None:
314298
)
315299
self.do_start()
316300

317-
def do_stop(self) -> None:
318-
kill_process_gracefully(self._process, self.logger)
301+
def stop(self) -> None:
302+
"""
303+
Set the ``status`` to `STOPPED`` but rely on the
304+
:class:`~trinity.extensibility.plugin_manager.PluginManager` to tear down the process. This
305+
allows isolated plugins to be taken down concurrently without depending on a running
306+
event loop.
307+
"""
308+
self._status = PluginStatus.STOPPED
319309

320310

321311
class DebugPlugin(BaseAsyncStopPlugin):

trinity/extensibility/plugin_manager.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@
3535
BaseIsolatedPlugin,
3636
BaseMainProcessPlugin,
3737
BasePlugin,
38-
BaseSyncStopPlugin,
3938
PluginContext,
4039
TrinityBootInfo,
4140
)
41+
from trinity.utils.ipc import (
42+
kill_processes_gracefully,
43+
)
4244

4345

4446
class BaseManagerProcessScope(ABC):
@@ -215,17 +217,19 @@ def shutdown_blocking(self) -> None:
215217

216218
self._logger.info("Shutting down PluginManager with scope %s", type(self._scope))
217219

218-
for plugin in self._plugin_store:
220+
plugins = [
221+
plugin for plugin in self._plugin_store
222+
if isinstance(plugin, BaseIsolatedPlugin) and plugin.running
223+
]
224+
processes = [plugin.process for plugin in plugins]
219225

220-
if not isinstance(plugin, BaseSyncStopPlugin) or not plugin.running:
221-
continue
226+
for plugin in plugins:
227+
self._logger.info("Stopping plugin: %s", plugin.name)
222228

223-
try:
224-
self._logger.info("Stopping plugin: %s", plugin.name)
225-
plugin.stop()
226-
self._logger.info("Successfully stopped plugin: %s", plugin.name)
227-
except Exception:
228-
self._logger.exception("Exception thrown while stopping plugin %s", plugin.name)
229+
kill_processes_gracefully(processes, self._logger)
230+
231+
for plugin in plugins:
232+
self._logger.info("Successfully stopped plugin: %s", plugin.name)
229233

230234
async def shutdown(self) -> None:
231235
"""

trinity/utils/ipc.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import signal
66
import subprocess
77
import time
8-
from typing import Callable
8+
from typing import Callable, Iterable
99

1010

1111
def wait_for_ipc(ipc_path: pathlib.Path, timeout: int=10) -> None:
@@ -35,6 +35,42 @@ def kill_process_gracefully(
3535
kill_process_id_gracefully(process.pid, process.join, logger, SIGINT_timeout, SIGTERM_timeout)
3636

3737

38+
def kill_processes_gracefully(
39+
processes: Iterable[Process],
40+
logger: Logger,
41+
SIGINT_timeout: int=DEFAULT_SIGINT_TIMEOUT,
42+
SIGTERM_timeout: int=DEFAULT_SIGTERM_TIMEOUT) -> None:
43+
44+
# Send SIGINT to each process without blocking
45+
for process in processes:
46+
sigint_process_id(process.pid, lambda _: None, logger, SIGINT_timeout)
47+
48+
# Now block on each process as long as we have time left in the budget
49+
sigint_at = time.time()
50+
for process in processes:
51+
waited_sec = time.time() - sigint_at
52+
if waited_sec >= SIGINT_timeout:
53+
logger.debug("Waited %d on SIGINT, moving on", waited_sec)
54+
break
55+
process.join(SIGINT_timeout)
56+
57+
# Send SIGTERM to each process without blocking
58+
for process in processes:
59+
sigterm_process_id(process.pid, lambda _: None, logger, SIGTERM_timeout)
60+
61+
# Now block on each process as long as we have time left in the budget
62+
sigterm_at = time.time()
63+
for process in processes:
64+
waited_sec = time.time() - sigterm_at
65+
if waited_sec >= SIGTERM_timeout:
66+
logger.debug("Waited %d on SIGINT, moving on", waited_sec)
67+
break
68+
process.join(SIGTERM_timeout)
69+
70+
for process in processes:
71+
sigkill_process_id(process.pid, logger)
72+
73+
3874
def kill_popen_gracefully(
3975
popen: subprocess.Popen,
4076
logger: Logger,
@@ -56,6 +92,18 @@ def kill_process_id_gracefully(
5692
logger: Logger,
5793
SIGINT_timeout: int=DEFAULT_SIGINT_TIMEOUT,
5894
SIGTERM_timeout: int=DEFAULT_SIGTERM_TIMEOUT) -> None:
95+
96+
sigint_process_id(process_id, wait_for_completion, logger, SIGINT_timeout)
97+
sigterm_process_id(process_id, wait_for_completion, logger, SIGTERM_timeout)
98+
sigkill_process_id(process_id, logger)
99+
100+
101+
def sigint_process_id(
102+
process_id: int,
103+
wait_for_completion: Callable[[int], None],
104+
logger: Logger,
105+
SIGINT_timeout: int=DEFAULT_SIGINT_TIMEOUT) -> None:
106+
59107
try:
60108
try:
61109
os.kill(process_id, signal.SIGINT)
@@ -72,6 +120,13 @@ def kill_process_id_gracefully(
72120
"with CTRL+C two more times."
73121
)
74122

123+
124+
def sigterm_process_id(
125+
process_id: int,
126+
wait_for_completion: Callable[[int], None],
127+
logger: Logger,
128+
SIGTERM_timeout: int=DEFAULT_SIGTERM_TIMEOUT) -> None:
129+
75130
try:
76131
try:
77132
os.kill(process_id, signal.SIGTERM)
@@ -88,6 +143,11 @@ def kill_process_id_gracefully(
88143
"with CTRL+C one more time."
89144
)
90145

146+
147+
def sigkill_process_id(
148+
process_id: int,
149+
logger: Logger) -> None:
150+
91151
try:
92152
os.kill(process_id, signal.SIGKILL)
93153
except ProcessLookupError:

0 commit comments

Comments
 (0)