Skip to content

Commit fddffca

Browse files
authored
DirectScheduler: Ensure killing child processes (aiidateam#6572)
The current implementation only issues a kill command for the parent process, but this can leave child processes orphaned. The child processes are now retrieved and added explicitly to the kill command.
1 parent d3e9333 commit fddffca

File tree

2 files changed

+59
-4
lines changed

2 files changed

+59
-4
lines changed

src/aiida/schedulers/plugins/direct.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
###########################################################################
99
"""Plugin for direct execution."""
1010

11+
from typing import Union
12+
1113
import aiida.schedulers
1214
from aiida.common.escaping import escape_for_bash
1315
from aiida.schedulers import SchedulerError
@@ -354,13 +356,28 @@ def _parse_submit_output(self, retval, stdout, stderr):
354356

355357
return stdout.strip()
356358

357-
def _get_kill_command(self, jobid):
358-
"""Return the command to kill the job with specified jobid."""
359-
submit_command = f'kill {jobid}'
359+
def _get_kill_command(self, jobid: Union[int, str]) -> str:
360+
"""Return the command to kill the process with specified id and all its descendants.
361+
362+
:param jobid: The job id is in the case of the
363+
:py:class:`~aiida.schedulers.plugins.direct.DirectScheduler` the process id.
364+
365+
:return: A string containing the kill command.
366+
"""
367+
from psutil import Process
368+
369+
# get a list of the process id of all descendants
370+
process = Process(int(jobid))
371+
children = process.children(recursive=True)
372+
jobids = [str(jobid)]
373+
jobids.extend([str(child.pid) for child in children])
374+
jobids_str = ' '.join(jobids)
375+
376+
kill_command = f'kill {jobids_str}'
360377

361378
self.logger.info(f'killing job {jobid}')
362379

363-
return submit_command
380+
return kill_command
364381

365382
def _parse_kill_output(self, retval, stdout, stderr):
366383
"""Parse the output of the kill command.

tests/schedulers/test_direct.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,41 @@ def test_submit_script_with_num_cores_per_mpiproc(scheduler, template):
7070
)
7171
result = scheduler.get_submit_script(template)
7272
assert f'export OMP_NUM_THREADS={num_cores_per_mpiproc}' in result
73+
74+
75+
@pytest.mark.timeout(timeout=10)
76+
def test_kill_job(scheduler, tmpdir):
77+
"""Test if kill_job kill all descendant children from the process.
78+
For that we spawn a new process that runs a sleep command, then we
79+
kill it and check if the sleep process is still alive.
80+
81+
current process forked process run script.sh
82+
python─────────────python───────────────────bash──────sleep
83+
we kill this process we check if still running
84+
"""
85+
import multiprocessing
86+
import time
87+
88+
from aiida.transports.plugins.local import LocalTransport
89+
from psutil import Process
90+
91+
def run_sleep_100():
92+
import subprocess
93+
94+
script = tmpdir / 'sleep.sh'
95+
script.write('sleep 100')
96+
# this is blocking for the process entering
97+
subprocess.run(['bash', script.strpath], check=False)
98+
99+
forked_process = multiprocessing.Process(target=run_sleep_100)
100+
forked_process.start()
101+
while len(forked_process_children := Process(forked_process.pid).children(recursive=True)) != 2:
102+
time.sleep(0.1)
103+
bash_process = forked_process_children[0]
104+
sleep_process = forked_process_children[1]
105+
with LocalTransport() as transport:
106+
scheduler.set_transport(transport)
107+
scheduler.kill_job(forked_process.pid)
108+
while bash_process.is_running() or sleep_process.is_running():
109+
time.sleep(0.1)
110+
forked_process.join()

0 commit comments

Comments
 (0)