Skip to content

Commit df56a87

Browse files
committed
terminate with SIGTERM
now that engines handle SIGINT
1 parent d51e7c7 commit df56a87

File tree

3 files changed

+49
-22
lines changed

3 files changed

+49
-22
lines changed

ipyparallel/cluster/cluster.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,12 +307,13 @@ async def stop_engines(self, engine_set_id=None):
307307
"""Stop an engine set
308308
309309
If engine_set_id is not given,
310-
all engines are stopped"""
310+
all engines are stopped.
311+
"""
311312
if engine_set_id is None:
312313
for engine_set_id in list(self._engine_sets):
313314
await self.stop_engines(engine_set_id)
314315
return
315-
316+
self.log.info(f"Stopping engine(s): {engine_set_id}")
316317
engine_set = self._engine_sets[engine_set_id]
317318
r = engine_set.stop()
318319
if inspect.isawaitable(r):
@@ -352,12 +353,14 @@ async def signal_engine(self, engine_id, signum):
352353

353354
async def signal_engines(self, engine_set_id, signum):
354355
"""Signal all engines in a set"""
356+
self.log.info(f"Sending signal {signum} to engine(s) {engine_set_id}")
355357
engine_set = self._engine_sets[engine_set_id]
356358
engine_set.signal(signum)
357359

358360
async def stop_controller(self):
359361
"""Stop the controller"""
360362
if self._controller and self._controller.running:
363+
self.log.info("Stopping controller")
361364
r = self._controller.stop()
362365
if inspect.isawaitable(r):
363366
await r

ipyparallel/cluster/launcher.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
try:
1818
from signal import SIGKILL
1919
except ImportError:
20-
# Windows
21-
SIGKILL = SIGTERM
20+
# Windows, just need a singleton.
21+
# value is not relevant.
22+
SIGKILL = -1
2223

2324
try:
2425
# Windows >= 2.7, 3.2
@@ -317,14 +318,17 @@ def signal(self, sig):
317318
if self.state == 'running':
318319
if WINDOWS and sig != SIGINT:
319320
# use Windows tree-kill for better child cleanup
320-
check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
321+
cmd = ['taskkill', '-pid', str(self.process.pid), '-t']
322+
if sig == SIGKILL:
323+
cmd.append("-f")
324+
check_output(cmd)
321325
else:
322326
self.process.send_signal(sig)
323327

324328
def interrupt_then_kill(self, delay=2.0):
325-
"""Send INT, wait a delay and then send KILL."""
329+
"""Send TERM, wait a delay and then send KILL."""
326330
try:
327-
self.signal(SIGINT)
331+
self.signal(SIGTERM)
328332
except Exception:
329333
self.log.debug("interrupt failed")
330334
pass

ipyparallel/tests/test_cluster.py

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,39 @@
11
import asyncio
2+
import logging
23
import shutil
34
import signal
5+
import sys
46
import time
57

68
import pytest
9+
from traitlets.config import Config
710

811
from .clienttest import raises_remote
912
from ipyparallel import cluster
10-
from ipyparallel.cluster import launcher
13+
from ipyparallel.cluster.launcher import find_launcher_class
1114

1215

1316
@pytest.fixture
1417
def Cluster(request):
1518
"""Fixture for instantiating Clusters"""
1619

1720
def ClusterConstructor(**kwargs):
18-
kwargs.setdefault('log_level', 10)
19-
launcher_prefix = kwargs.pop("launcher_prefix")
20-
21-
if launcher_prefix == "MPI" and shutil.which("mpiexec") is None:
21+
log = logging.getLogger(__file__)
22+
log.setLevel(logging.DEBUG)
23+
log.handlers = [logging.StreamHandler(sys.stdout)]
24+
kwargs['log'] = log
25+
engine_launcher_class = kwargs.get("engine_launcher_class")
26+
27+
if (
28+
isinstance(engine_launcher_class, str)
29+
and "MPI" in engine_launcher_class
30+
and shutil.which("mpiexec") is None
31+
):
2232
pytest.skip("requires mpiexec")
23-
launcher_class = launcher.find_launcher_class(launcher_prefix, "EngineSet")
24-
kwargs['engine_launcher_class'] = launcher_class
33+
34+
cfg = kwargs.setdefault("config", Config())
35+
cfg.EngineMixin.engine_args = ['--log-level=10']
36+
cfg.ControllerMixin.controller_args = ['--log-level=10']
2537

2638
c = cluster.Cluster(**kwargs)
2739
request.addfinalizer(c.stop_cluster_sync)
@@ -59,14 +71,14 @@ async def test_start_stop_controller(Cluster):
5971
# TODO: test file cleanup
6072

6173

62-
@pytest.mark.parametrize("launcher_prefix", ["Local", "MPI"])
63-
async def test_start_stop_engines(Cluster, launcher_prefix):
64-
cluster = Cluster(launcher_prefix=launcher_prefix)
74+
@pytest.mark.parametrize("engine_launcher_class", ["Local", "MPI"])
75+
async def test_start_stop_engines(Cluster, engine_launcher_class):
76+
cluster = Cluster(engine_launcher_class=engine_launcher_class)
6577
await cluster.start_controller()
6678
engine_set_id = await cluster.start_engines(n=3)
6779
assert engine_set_id in cluster._engine_sets
6880
engine_set = cluster._engine_sets[engine_set_id]
69-
launcher_class = launcher_class = find_launcher_class(launcher_prefix, "EngineSet")
81+
launcher_class = find_launcher_class(engine_launcher_class, "EngineSet")
7082
assert isinstance(engine_set, launcher_class)
7183
await cluster.stop_engines(engine_set_id)
7284
assert cluster._engine_sets == {}
@@ -76,20 +88,28 @@ async def test_start_stop_engines(Cluster, launcher_prefix):
7688
await cluster.stop_controller()
7789

7890

79-
@pytest.mark.parametrize("launcher_prefix", ["Local", "MPI"])
80-
async def test_signal_engines(Cluster, launcher_prefix):
81-
cluster = Cluster(launcher_prefix=launcher_prefix)
91+
@pytest.mark.parametrize("engine_launcher_class", ["Local", "MPI"])
92+
async def test_signal_engines(Cluster, engine_launcher_class):
93+
cluster = Cluster(engine_launcher_class=engine_launcher_class)
8294
await cluster.start_controller()
8395
engine_set_id = await cluster.start_engines(n=3)
8496
rc = cluster.connect_client()
8597
while len(rc) < 3:
8698
await asyncio.sleep(0.1)
99+
# seems to be a problem if we start too soon...
100+
await asyncio.sleep(1)
101+
# ensure responsive
102+
rc[:].apply_async(lambda: None).get(timeout=10)
103+
# submit request to be interrupted
87104
ar = rc[:].apply_async(time.sleep, 3)
105+
# wait for it to be running
88106
await asyncio.sleep(0.5)
107+
# send signal
89108
await cluster.signal_engines(engine_set_id, signal.SIGINT)
90109

110+
# wait for result, which should raise KeyboardInterrupt
91111
with raises_remote(KeyboardInterrupt) as e:
92-
ar.get()
112+
ar.get(timeout=10)
93113

94114
await cluster.stop_engines()
95115
await cluster.stop_controller()

0 commit comments

Comments
 (0)