Skip to content

Commit 274431d

Browse files
authored
Merge pull request #515 from minrk/test-windows
Add Windows test coverage
2 parents 88e431d + 0385479 commit 274431d

File tree

6 files changed

+158
-75
lines changed

6 files changed

+158
-75
lines changed

.github/workflows/test.yml

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ env:
1010

1111
jobs:
1212
test:
13-
runs-on: ubuntu-20.04
14-
timeout-minutes: 10
13+
runs-on: ${{ matrix.runs_on || 'ubuntu-20.04' }}
14+
timeout-minutes: 15
1515

1616
strategy:
1717
# Keep running even if one variation of the job fail
@@ -22,12 +22,14 @@ jobs:
2222
ssh: ssh
2323
- python: "3.6"
2424
tornado: "5.1.1"
25-
- python: "3.6"
2625
- python: "3.7"
2726
controller_ip: "*"
27+
- python: "3.8"
28+
runs_on: windows-2019
2829
- python: "3.8"
2930
mpi: mpi
3031
- python: "3.9"
32+
runs_on: macos-10.15
3133

3234
steps:
3335
- uses: actions/checkout@v2
@@ -74,9 +76,11 @@ jobs:
7476
pip install --upgrade pip
7577
pip install --pre --upgrade .[test] distributed joblib codecov
7678
pip install --only-binary :all: matplotlib || echo "no matplotlib"
77-
if [ "${{ matrix.tornado }}" != "" ]; then
78-
pip install tornado==${{ matrix.tornado }}
79-
fi
79+
80+
- name: Install pinned tornado
81+
if: matrix.tornado
82+
run: |
83+
pip install tornado==${{ matrix.tornado }}
8084
8185
- name: Show environment
8286
run: pip freeze

ipyparallel/cluster/launcher.py

Lines changed: 76 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
import sys
1616
import threading
1717
import time
18+
from concurrent.futures import ThreadPoolExecutor
1819
from functools import partial
19-
from signal import SIGINT
2020
from signal import SIGTERM
2121
from subprocess import check_output
2222
from subprocess import PIPE
@@ -85,6 +85,12 @@ class UnknownStatus(LauncherError):
8585
class BaseLauncher(LoggingConfigurable):
8686
"""An abstraction for starting, stopping and signaling a process."""
8787

88+
stop_timeout = Integer(
89+
60,
90+
config=True,
91+
help="The number of seconds to wait for a process to exit before raising a TimeoutError in stop",
92+
)
93+
8894
# In all of the launchers, the work_dir is where child processes will be
8995
# run. This will usually be the profile_dir, but may not be. any work_dir
9096
# passed into the __init__ method will override the config value.
@@ -249,6 +255,10 @@ def signal(self, sig):
249255
"""
250256
raise NotImplementedError('signal must be implemented in a subclass')
251257

258+
def join(self, timeout=None):
259+
"""Wait for the process to finish"""
260+
raise NotImplementedError('join must be implemented in a subclass')
261+
252262
output_limit = Integer(
253263
100,
254264
config=True,
@@ -376,6 +386,12 @@ def _default_output_file(self):
376386
os.makedirs(log_dir, exist_ok=True)
377387
return os.path.join(log_dir, f'{self.identifier}.log')
378388

389+
stop_seconds_until_kill = Integer(
390+
5,
391+
config=True,
392+
help="""The number of seconds to wait for a process to exit after sending SIGTERM before sending SIGKILL""",
393+
)
394+
379395
stdout = None
380396
stderr = None
381397
process = None
@@ -446,6 +462,18 @@ def start(self):
446462
if self.log.level <= logging.DEBUG:
447463
self._start_streaming()
448464

465+
async def join(self, timeout=None):
466+
"""Wait for the process to exit"""
467+
with ThreadPoolExecutor(1) as pool:
468+
try:
469+
await asyncio.wrap_future(
470+
pool.submit(partial(self.process.wait, timeout))
471+
)
472+
except psutil.TimeoutExpired:
473+
raise TimeoutError(
474+
f"Process {self.pid} did not complete in {timeout} seconds."
475+
)
476+
449477
def _stream_file(self, path):
450478
"""Stream one file"""
451479
with open(path, 'r') as f:
@@ -460,7 +488,7 @@ def _stream_file(self, path):
460488
time.sleep(0.1)
461489

462490
def _start_streaming(self):
463-
t = threading.Thread(
491+
self._stream_thread = t = threading.Thread(
464492
target=partial(self._stream_file, self.output_file),
465493
name=f"Stream Output {self.identifier}",
466494
daemon=True,
@@ -483,35 +511,46 @@ def get_output(self, remove=False):
483511

484512
if remove and os.path.isfile(self.output_file):
485513
self.log.debug(f"Removing {self.output_file}")
486-
os.remove(self.output_file)
514+
try:
515+
os.remove(self.output_file)
516+
except Exception as e:
517+
# don't crash on failure to remove a file,
518+
# e.g. due to another processing having it open
519+
self.log.error(f"Failed to remove {self.output_file}: {e}")
487520

488521
return self._output
489522

490-
def stop(self):
491-
return self.interrupt_then_kill()
523+
async def stop(self):
524+
try:
525+
self.signal(SIGTERM)
526+
except Exception as e:
527+
self.log.debug(f"TERM failed: {e!r}")
528+
529+
try:
530+
await self.join(timeout=self.stop_seconds_until_kill)
531+
except TimeoutError:
532+
self.log.warning(
533+
f"Process {self.pid} did not exit in {self.stop_seconds_until_kill} seconds after TERM"
534+
)
535+
else:
536+
return
537+
538+
try:
539+
self.signal(SIGKILL)
540+
except Exception as e:
541+
self.log.debug(f"KILL failed: {e!r}")
542+
543+
await self.join(timeout=self.stop_timeout)
492544

493545
def signal(self, sig):
494546
if self.state == 'running':
495-
if WINDOWS and sig != SIGINT:
547+
if WINDOWS and sig == SIGKILL:
496548
# use Windows tree-kill for better child cleanup
497-
cmd = ['taskkill', '-pid', str(self.process.pid), '-t']
498-
if sig == SIGKILL:
499-
cmd.append("-f")
549+
cmd = ['taskkill', '/pid', str(self.process.pid), '/t', '/F']
500550
check_output(cmd)
501551
else:
502552
self.process.send_signal(sig)
503553

504-
def interrupt_then_kill(self, delay=2.0):
505-
"""Send TERM, wait a delay and then send KILL."""
506-
try:
507-
self.signal(SIGTERM)
508-
except Exception as e:
509-
self.log.debug(f"interrupt failed: {e!r}")
510-
pass
511-
self.killer = asyncio.get_event_loop().call_later(
512-
delay, lambda: self.signal(SIGKILL)
513-
)
514-
515554
# callbacks, etc:
516555

517556
def handle_stdout(self, fd, events):
@@ -637,21 +676,18 @@ def find_args(self):
637676
return ['engine set']
638677

639678
def signal(self, sig):
640-
dlist = []
641-
for el in itervalues(self.launchers):
642-
d = el.signal(sig)
643-
dlist.append(d)
644-
return dlist
679+
for el in list(self.launchers.values()):
680+
el.signal(sig)
645681

646-
def interrupt_then_kill(self, delay=1.0):
647-
dlist = []
648-
for el in itervalues(self.launchers):
649-
d = el.interrupt_then_kill(delay)
650-
dlist.append(d)
651-
return dlist
682+
async def stop(self):
683+
futures = []
684+
for el in list(self.launchers.values()):
685+
f = el.stop()
686+
if inspect.isawaitable(f):
687+
futures.append(asyncio.ensure_future(f))
652688

653-
def stop(self):
654-
return self.interrupt_then_kill()
689+
if futures:
690+
await asyncio.gather(*futures)
655691

656692
def _notice_engine_stopped(self, data):
657693
identifier = data['identifier']
@@ -1146,6 +1182,12 @@ def wait_one(self, timeout):
11461182
raise TimeoutError("still running")
11471183
return int(values.get("exit_code", -1))
11481184

1185+
async def join(self, timeout=None):
1186+
with ThreadPoolExecutor(1) as pool:
1187+
await asyncio.wrap_future(
1188+
pool.submit(partial(self.wait_one, timeout=timeout))
1189+
)
1190+
11491191
def signal(self, sig):
11501192
if self.state == 'running':
11511193
check_output(
@@ -1306,7 +1348,7 @@ def start(self, n):
13061348
return dlist
13071349

13081350

1309-
class SSHProxyEngineSetLauncher(SSHLauncher):
1351+
class SSHProxyEngineSetLauncher(SSHLauncher, EngineLauncher):
13101352
"""Launcher for calling
13111353
`ipcluster engines` on a remote machine.
13121354

ipyparallel/tests/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
"""toplevel setup/teardown for parallel tests."""
22
from __future__ import print_function
33

4+
import asyncio
45
import os
5-
import tempfile
66
import time
7-
from subprocess import PIPE
87
from subprocess import Popen
9-
from subprocess import STDOUT
108

119
from IPython.paths import get_ipython_dir
1210

@@ -125,7 +123,9 @@ def teardown():
125123
p = launchers.pop()
126124
if p.poll() is None:
127125
try:
128-
p.stop()
126+
f = p.stop()
127+
if f:
128+
asyncio.run(f)
129129
except Exception as e:
130130
print(e)
131131
pass

ipyparallel/tests/conftest.py

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import os
55
import sys
6+
from contextlib import contextmanager
67
from subprocess import check_call
78
from subprocess import check_output
89
from tempfile import TemporaryDirectory
@@ -21,16 +22,33 @@
2122
from . import teardown
2223

2324

24-
@pytest.fixture(autouse=True, scope="session")
25-
def ipython_dir():
26-
with TemporaryDirectory(suffix="dotipython") as td:
27-
with mock.patch.dict(os.environ, {"IPYTHONDIR": td}):
28-
assert IPython.paths.get_ipython_dir() == td
29-
pd = ProfileDir.create_profile_dir_by_name(td, name="default")
30-
# configure fast heartbeats for quicker tests with small numbers of local engines
31-
with open(os.path.join(pd.location, "ipcontroller_config.py"), "w") as f:
32-
f.write("c.HeartMonitor.period = 200")
25+
@contextmanager
26+
def temporary_ipython_dir():
27+
# FIXME: cleanup has issues on Windows
28+
# this is *probably* a real bug of holding open files,
29+
# but it is preventing feedback about test failures
30+
td_obj = TemporaryDirectory(suffix=".ipython")
31+
td = td_obj.name
32+
33+
with mock.patch.dict(os.environ, {"IPYTHONDIR": td}):
34+
assert IPython.paths.get_ipython_dir() == td
35+
pd = ProfileDir.create_profile_dir_by_name(td, name="default")
36+
# configure fast heartbeats for quicker tests with small numbers of local engines
37+
with open(os.path.join(pd.location, "ipcontroller_config.py"), "w") as f:
38+
f.write("c.HeartMonitor.period = 200")
39+
try:
3340
yield td
41+
finally:
42+
try:
43+
td_obj.cleanup()
44+
except Exception as e:
45+
print(f"Failed to cleanup {td}: {e}", file=sys.stderr)
46+
47+
48+
@pytest.fixture(autouse=True, scope="module")
49+
def ipython_dir(request):
50+
with temporary_ipython_dir() as ipython_dir:
51+
yield ipython_dir
3452

3553

3654
def pytest_collection_modifyitems(items):
@@ -46,19 +64,23 @@ def pytest_collection_modifyitems(items):
4664
assert not inspect.isasyncgenfunction(item.obj)
4765

4866

49-
@pytest.fixture(scope="session")
50-
def cluster(request):
67+
@pytest.fixture(scope="module")
68+
def cluster(request, ipython_dir):
5169
"""Setup IPython parallel cluster"""
5270
setup()
53-
request.addfinalizer(teardown)
71+
try:
72+
yield
73+
finally:
74+
teardown()
5475

5576

56-
@pytest.fixture(scope='session')
57-
def ipython():
77+
@pytest.fixture(scope='module')
78+
def ipython(ipython_dir):
5879
config = default_config()
5980
config.TerminalInteractiveShell.simple_prompt = True
6081
shell = TerminalInteractiveShell.instance(config=config)
61-
return shell
82+
yield shell
83+
TerminalInteractiveShell.clear_instance()
6284

6385

6486
@pytest.fixture()
@@ -81,7 +103,7 @@ def Context():
81103

82104

83105
@pytest.fixture
84-
def Cluster(request, io_loop):
106+
def Cluster(request, ipython_dir, io_loop):
85107
"""Fixture for instantiating Clusters"""
86108

87109
def ClusterConstructor(**kwargs):

ipyparallel/tests/test_client.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,9 +659,18 @@ def test_wait_for_engines(self):
659659
self.add_engines(1)
660660
assert f.result() is None
661661

662+
@pytest.mark.skipif(
663+
sys.platform.startswith("win"), reason="Signal tests don't pass on Windows yet"
664+
)
662665
def test_signal_engines(self):
666+
663667
view = self.client[:]
664-
for sig in (signal.SIGINT, 'SIGINT'):
668+
if sys.platform.startswith("win"):
669+
signame = 'CTRL_C_EVENT'
670+
else:
671+
signame = 'SIGINT'
672+
signum = getattr(signal, signame)
673+
for sig in (signum, signame):
665674
ar = view.apply_async(time.sleep, 10)
666675
# FIXME: use status:busy to wait for tasks to start
667676
time.sleep(1)

0 commit comments

Comments
 (0)