Skip to content

Commit 7fb1e95

Browse files
authored
Merge pull request #842 from wtsi-hgi/fix-832
Kill child processes when cwltool exits
2 parents 6e0c1f1 + c5b6117 commit 7fb1e95

File tree

8 files changed

+64
-10
lines changed

8 files changed

+64
-10
lines changed

cwltool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
from cwltool import main
1212

1313
if __name__ == "__main__":
14-
sys.exit(main.main(sys.argv[1:]))
14+
main.run(sys.argv[1:])

cwltool/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44

55
from . import main
66

7-
sys.exit(main.main())
7+
main.run()

cwltool/job.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from .utils import bytes2str_in_dicts # pylint: disable=unused-import
3232
from .utils import ( # pylint: disable=unused-import
3333
DEFAULT_TMP_PREFIX, Directory, copytree_with_merge, json_dump, json_dumps,
34-
onWindows, subprocess)
34+
onWindows, subprocess, processes_to_kill)
3535
from .context import (RuntimeContext, # pylint: disable=unused-import
3636
getdefault)
3737
if TYPE_CHECKING:
@@ -521,6 +521,7 @@ def _job_popen(
521521
stderr=stderr,
522522
env=env,
523523
cwd=cwd)
524+
processes_to_kill.append(sproc)
524525

525526
if sproc.stdin:
526527
sproc.stdin.close()
@@ -534,6 +535,7 @@ def terminate():
534535
except OSError:
535536
pass
536537
tm = Timer(timelimit, terminate)
538+
tm.daemon = True
537539
tm.start()
538540

539541
rcode = sproc.wait()
@@ -588,6 +590,7 @@ def terminate():
588590
stderr=sys.stderr,
589591
stdin=subprocess.PIPE,
590592
)
593+
processes_to_kill.append(sproc)
591594
if sproc.stdin:
592595
sproc.stdin.close()
593596

cwltool/main.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io
1111
import logging
1212
import os
13+
import signal
1314
import sys
1415

1516
from typing import (IO, Any, Callable, Dict, # pylint: disable=unused-import
@@ -47,10 +48,43 @@
4748
from .stdfsaccess import StdFsAccess
4849
from .update import ALLUPDATES, UPDATES
4950
from .utils import (DEFAULT_TMP_PREFIX, add_sizes, json_dumps, onWindows,
50-
versionstring, windows_default_container_id)
51+
versionstring, windows_default_container_id,
52+
processes_to_kill)
5153
from .context import LoadingContext, RuntimeContext, getdefault
5254
from .builder import HasReqsHints
5355

56+
57+
def _terminate_processes():
58+
# type: () -> None
59+
"""Kill all spawned processes.
60+
61+
Processes to be killed must be appended to `utils.processes_to_kill`
62+
as they are spawned.
63+
64+
An important caveat: since there's no supported way to kill another
65+
thread in Python, this function cannot stop other threads from
66+
continuing to execute while it kills the processes that they've
67+
spawned. This may occasionally lead to unexpected behaviour.
68+
"""
69+
# It's possible that another thread will spawn a new task while
70+
# we're executing, so it's not safe to use a for loop here.
71+
while processes_to_kill:
72+
processes_to_kill.popleft().kill()
73+
74+
75+
def _signal_handler(signum, frame):
76+
# type: (int, Any) -> None
77+
"""Kill all spawned processes and exit.
78+
79+
Note that it's possible for another thread to spawn a process after
80+
all processes have been killed, but before Python exits.
81+
82+
Refer to the docstring for _terminate_processes() for other caveats.
83+
"""
84+
_terminate_processes()
85+
sys.exit(signum)
86+
87+
5488
def generate_example_input(inptype):
5589
# type: (Union[Text, Dict[Text, Any]]) -> Any
5690
defaults = {u'null': 'null',
@@ -680,5 +714,15 @@ def find_default_container(builder, # type: HasReqsHints
680714
return default_container
681715

682716

717+
def run(*args, **kwargs):
718+
# type: (...) -> None
719+
"""Run cwltool."""
720+
signal.signal(signal.SIGTERM, _signal_handler)
721+
try:
722+
sys.exit(main(*args, **kwargs))
723+
finally:
724+
_terminate_processes()
725+
726+
683727
if __name__ == "__main__":
684-
sys.exit(main(sys.argv[1:]))
728+
run(sys.argv[1:])

cwltool/sandboxjs.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import six
1515
from pkg_resources import resource_stream
1616

17-
from .utils import json_dumps, onWindows, subprocess
17+
from .utils import json_dumps, onWindows, subprocess, processes_to_kill
1818

1919
try:
2020
import queue # type: ignore
@@ -71,7 +71,7 @@ def new_js_proc(js_text, force_docker_pull=False):
7171
stdin=subprocess.PIPE,
7272
stdout=subprocess.PIPE,
7373
stderr=subprocess.PIPE)
74-
74+
processes_to_kill.append(nodejs)
7575
required_node_version = check_js_threshold_version(n)
7676
break
7777
except (subprocess.CalledProcessError, OSError):
@@ -95,6 +95,7 @@ def new_js_proc(js_text, force_docker_pull=False):
9595
"--sig-proxy=true", "--interactive",
9696
"--rm", nodeimg, "node", "--eval", js_text],
9797
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
98+
processes_to_kill.append(nodejs)
9899
docker = True
99100
except OSError as e:
100101
if e.errno == errno.ENOENT:
@@ -181,6 +182,7 @@ def terminate():
181182
timeout = 20
182183

183184
tm = threading.Timer(timeout, terminate)
185+
tm.daemon = True
184186
tm.start()
185187

186188
stdin_text = u""

cwltool/utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import absolute_import
22

3+
import collections
34
import json
45
import os
56
import sys
@@ -9,8 +10,8 @@
910
import pkg_resources
1011
from functools import partial # pylint: disable=unused-import
1112
from typing import (IO, Any, AnyStr, Callable, # pylint: disable=unused-import
12-
Dict, Iterable, List, Optional, Text, Tuple, TypeVar,
13-
Union)
13+
Dict, Iterable, List, Optional, Text, Tuple, TypeVar, Union)
14+
from typing_extensions import Deque
1415

1516
import six
1617
from six.moves import urllib, zip_longest
@@ -32,6 +33,8 @@
3233

3334
DEFAULT_TMP_PREFIX = "tmp"
3435

36+
processes_to_kill = collections.deque() # type: Deque[subprocess.Popen]
37+
3538
def versionstring():
3639
# type: () -> Text
3740
'''

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ bagit==1.6.4
1010
mypy-extensions
1111
psutil
1212
subprocess32 >= 3.5.0; os.name=="posix"
13+
typing-extensions

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
'psutil',
6161
'prov == 1.5.1',
6262
'bagit >= 1.6.4',
63+
'typing-extensions',
6364
],
6465
extras_require={
6566
':os.name=="posix"': ['subprocess32 >= 3.5.0'],
@@ -72,7 +73,7 @@
7273
test_suite='tests',
7374
tests_require=['pytest', 'mock >= 2.0.0', 'arcp >= 0.2.0', 'rdflib-jsonld >= 0.4.0'],
7475
entry_points={
75-
'console_scripts': ["cwltool=cwltool.main:main"]
76+
'console_scripts': ["cwltool=cwltool.main:run"]
7677
},
7778
zip_safe=True,
7879
cmdclass={'egg_info': tagger},

0 commit comments

Comments
 (0)