Skip to content

Commit b6d0fe5

Browse files
authored
Add force-kill option when killing a process (aiidateam#6793)
The force option gives the option to skip any action the process does to properly release resources. This can result in orphan resources (e.g. submitted jobs that are still running). To test the feature we have to monkey patch the workers code. We thus start a worker in a spawned process with a function that monkey patches the corresponding modules. The spawned process has to do the monekypatching by itself so we pass a function `prepare_func_args` to the spawned process when initialized For correct usage, the fixture `fork_worker_context` should be used that returns context manager starting the worker with the aiida test profile and terminates it on exit. Furthermore the `timeout` option of `verdi process kill` was not correctly passed to the inner blocking `kiwipy.Future` resulting in a blocking call. This has been fixed. This change goes hand in hand with the change in PR aiidateam/plumpy/320.
1 parent 6eb17a7 commit b6d0fe5

File tree

12 files changed

+333
-40
lines changed

12 files changed

+333
-40
lines changed

environment.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ dependencies:
1616
- docstring_parser
1717
- get-annotations~=0.1
1818
- python-graphviz~=0.19
19+
- plumpy~=0.25.0
1920
- ipython>=7
2021
- jedi<0.19
2122
- jinja2~=3.0
2223
- kiwipy[rmq]~=0.8.4
2324
- importlib-metadata~=6.0
2425
- numpy~=1.21
2526
- paramiko~=3.0
26-
- plumpy~=0.24.0
2727
- pgsu~=0.3.0
2828
- psutil~=5.6
2929
- psycopg[binary]~=3.0

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ dependencies = [
2828
'docstring-parser',
2929
'get-annotations~=0.1;python_version<"3.10"',
3030
'graphviz~=0.19',
31+
'plumpy~=0.25.0',
3132
'ipython>=7',
3233
'jedi<0.19',
3334
'jinja2~=3.0',
3435
'kiwipy[rmq]~=0.8.4',
3536
'importlib-metadata~=6.0',
3637
'numpy~=1.21',
3738
'paramiko~=3.0',
38-
'plumpy~=0.24.0',
3939
'pgsu~=0.3.0',
4040
'psutil~=5.6',
4141
'psycopg[binary]~=3.0',

src/aiida/cmdline/commands/cmd_process.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from aiida.cmdline.commands.cmd_verdi import verdi
1414
from aiida.cmdline.params import arguments, options, types
15+
from aiida.cmdline.params.options.overridable import OverridableOption
1516
from aiida.cmdline.utils import decorators, echo
1617
from aiida.common.log import LOG_LEVELS, capture_logging
1718

@@ -318,10 +319,25 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
318319
@verdi_process.command('kill')
319320
@arguments.PROCESSES()
320321
@options.ALL(help='Kill all processes if no specific processes are specified.')
321-
@options.TIMEOUT()
322+
@OverridableOption(
323+
'-t',
324+
'--timeout',
325+
type=click.FLOAT,
326+
default=5.0,
327+
show_default=True,
328+
help='Time in seconds to wait for a response of the kill task before timing out.',
329+
)()
322330
@options.WAIT()
331+
@OverridableOption(
332+
'-F',
333+
'--force-kill',
334+
is_flag=True,
335+
default=False,
336+
help='Kills the process without waiting for a confirmation if the job has been killed.\n'
337+
'Note: This may lead to orphaned jobs on your HPC and should be used with caution.',
338+
)()
323339
@decorators.with_dbenv()
324-
def process_kill(processes, all_entries, timeout, wait):
340+
def process_kill(processes, all_entries, timeout, wait, force_kill):
325341
"""Kill running processes.
326342
327343
Kill one or multiple running processes."""
@@ -338,11 +354,17 @@ def process_kill(processes, all_entries, timeout, wait):
338354
if all_entries:
339355
click.confirm('Are you sure you want to kill all processes?', abort=True)
340356

357+
if force_kill:
358+
echo.echo_warning('Force kill is enabled. This may lead to orphaned jobs on your HPC.')
359+
msg_text = 'Force killed through `verdi process kill`'
360+
else:
361+
msg_text = 'Killed through `verdi process kill`'
341362
with capture_logging() as stream:
342363
try:
343364
control.kill_processes(
344365
processes,
345-
msg_text='Killed through `verdi process kill`',
366+
msg_text=msg_text,
367+
force_kill=force_kill,
346368
all_entries=all_entries,
347369
timeout=timeout,
348370
wait=wait,

src/aiida/engine/daemon/client.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,10 @@ def get_worker_info(self, timeout: int | None = None) -> dict[str, t.Any]:
477477
command = {'command': 'stats', 'properties': {'name': self.daemon_name}}
478478
return self.call_client(command, timeout=timeout)
479479

480+
def get_number_of_workers(self, timeout: int | None = None) -> int:
481+
"""Get number of workers."""
482+
return len(self.get_worker_info(timeout).get('info', []))
483+
480484
def get_daemon_info(self, timeout: int | None = None) -> dict[str, t.Any]:
481485
"""Get statistics about this daemon itself.
482486
@@ -531,7 +535,8 @@ def start_daemon(
531535
try:
532536
subprocess.check_output(command, env=env, stderr=subprocess.STDOUT)
533537
except subprocess.CalledProcessError as exception:
534-
raise DaemonException('The daemon failed to start.') from exception
538+
# CalledProcessError is not passing the subprocess stderr in its message so we add it in DaemonException
539+
raise DaemonException(f'The daemon failed to start with error:\n{exception.stdout.decode()}') from exception
535540

536541
if not wait:
537542
return

src/aiida/engine/daemon/worker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import logging
1313
import signal
1414
import sys
15+
from typing import Union
1516

1617
from aiida.common.log import configure_logging
1718
from aiida.engine.daemon.client import get_daemon_client
@@ -32,18 +33,20 @@ async def shutdown_worker(runner: Runner) -> None:
3233
task.cancel()
3334

3435
await asyncio.gather(*tasks, return_exceptions=True)
36+
3537
runner.close()
3638

3739
LOGGER.info('Daemon worker stopped')
3840

3941

40-
def start_daemon_worker(foreground: bool = False) -> None:
42+
def start_daemon_worker(foreground: bool = False, profile_name: Union[str, None] = None) -> None:
4143
"""Start a daemon worker for the currently configured profile.
4244
4345
:param foreground: If true, the logging will be configured to write to stdout, otherwise it will be configured to
4446
write to the daemon log file.
4547
"""
46-
daemon_client = get_daemon_client()
48+
49+
daemon_client = get_daemon_client(profile_name)
4750
configure_logging(with_orm=True, daemon=not foreground, daemon_log_file=daemon_client.daemon_log_file)
4851

4952
LOGGER.debug(f'sys.executable: {sys.executable}')

src/aiida/engine/processes/calcjobs/tasks.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424
from aiida.common.datastructures import CalcJobState
2525
from aiida.common.exceptions import FeatureNotAvailable, TransportTaskException
2626
from aiida.common.folders import SandboxFolder
27+
from aiida.engine import utils
2728
from aiida.engine.daemon import execmanager
2829
from aiida.engine.processes.exit_code import ExitCode
2930
from aiida.engine.transports import TransportQueue
30-
from aiida.engine.utils import InterruptableFuture, exponential_backoff_retry, interruptable_task
31+
from aiida.engine.utils import InterruptableFuture, interruptable_task
3132
from aiida.manage.configuration import get_config_option
3233
from aiida.orm.nodes.process.calculation.calcjob import CalcJobNode
3334
from aiida.schedulers.datastructures import JobState
@@ -102,7 +103,7 @@ async def do_upload():
102103
try:
103104
logger.info(f'scheduled request to upload CalcJob<{node.pk}>')
104105
ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption)
105-
skip_submit = await exponential_backoff_retry(
106+
skip_submit = await utils.exponential_backoff_retry(
106107
do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
107108
)
108109
except PreSubmitException:
@@ -150,7 +151,7 @@ async def do_submit():
150151
try:
151152
logger.info(f'scheduled request to submit CalcJob<{node.pk}>')
152153
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
153-
result = await exponential_backoff_retry(
154+
result = await utils.exponential_backoff_retry(
154155
do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
155156
)
156157
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
@@ -208,7 +209,7 @@ async def do_update():
208209
try:
209210
logger.info(f'scheduled request to update CalcJob<{node.pk}>')
210211
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
211-
job_done = await exponential_backoff_retry(
212+
job_done = await utils.exponential_backoff_retry(
212213
do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
213214
)
214215
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
@@ -258,7 +259,7 @@ async def do_monitor():
258259
try:
259260
logger.info(f'scheduled request to monitor CalcJob<{node.pk}>')
260261
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
261-
monitor_result = await exponential_backoff_retry(
262+
monitor_result = await utils.exponential_backoff_retry(
262263
do_monitor, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
263264
)
264265
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
@@ -326,7 +327,7 @@ async def do_retrieve():
326327
try:
327328
logger.info(f'scheduled request to retrieve CalcJob<{node.pk}>')
328329
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
329-
result = await exponential_backoff_retry(
330+
result = await utils.exponential_backoff_retry(
330331
do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
331332
)
332333
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
@@ -371,7 +372,7 @@ async def do_stash():
371372
return await execmanager.stash_calculation(node, transport)
372373

373374
try:
374-
await exponential_backoff_retry(
375+
await utils.exponential_backoff_retry(
375376
do_stash,
376377
initial_interval,
377378
max_attempts,
@@ -419,7 +420,7 @@ async def do_kill():
419420

420421
try:
421422
logger.info(f'scheduled request to kill CalcJob<{node.pk}>')
422-
result = await exponential_backoff_retry(do_kill, initial_interval, max_attempts, logger=node.logger)
423+
result = await utils.exponential_backoff_retry(do_kill, initial_interval, max_attempts, logger=node.logger)
423424
except plumpy.process_states.Interruption:
424425
raise
425426
except Exception as exception:

src/aiida/engine/processes/control.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ def kill_processes(
173173
processes: list[ProcessNode] | None = None,
174174
*,
175175
msg_text: str = 'Killed through `aiida.engine.processes.control.kill_processes`',
176+
force_kill: bool = False,
176177
all_entries: bool = False,
177178
timeout: float = 5.0,
178179
wait: bool = False,
@@ -201,7 +202,7 @@ def kill_processes(
201202
return
202203

203204
controller = get_manager().get_process_controller()
204-
action = functools.partial(controller.kill_process, msg_text=msg_text)
205+
action = functools.partial(controller.kill_process, msg_text=msg_text, force_kill=force_kill)
205206
_perform_actions(processes, action, 'kill', 'killing', timeout, wait)
206207

207208

@@ -276,15 +277,17 @@ def handle_result(result):
276277
LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}')
277278

278279
try:
279-
for future in concurrent.futures.as_completed(futures.keys(), timeout=timeout):
280-
process = futures[future]
281-
280+
for future, process in futures.items():
281+
# unwrap is need here since LoopCommunicator will also wrap a future
282+
unwrapped = unwrap_kiwi_future(future)
282283
try:
283-
# unwrap is need here since LoopCommunicator will also wrap a future
284-
unwrapped = unwrap_kiwi_future(future)
285-
result = unwrapped.result()
284+
result = unwrapped.result(timeout=timeout)
286285
except communications.TimeoutError:
287-
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out')
286+
cancelled = unwrapped.cancel()
287+
if cancelled:
288+
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out and was cancelled.')
289+
else:
290+
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.')
288291
except Exception as exception:
289292
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
290293
else:

src/aiida/engine/processes/process.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ def load_instance_state(
329329

330330
self.node.logger.info(f'Loaded process<{self.node.pk}> from saved state')
331331

332-
def kill(self, msg_text: str | None = None) -> Union[bool, plumpy.futures.Future]:
332+
def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[bool, plumpy.futures.Future]:
333333
"""Kill the process and all the children calculations it called
334334
335335
:param msg: message
@@ -338,7 +338,7 @@ def kill(self, msg_text: str | None = None) -> Union[bool, plumpy.futures.Future
338338

339339
had_been_terminated = self.has_terminated()
340340

341-
result = super().kill(msg_text)
341+
result = super().kill(msg_text, force_kill)
342342

343343
# Only kill children if we could be killed ourselves
344344
if result is not False and not had_been_terminated:

src/aiida/manage/tests/pytest_fixtures.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,7 @@ def _factory(
760760
f'Daemon <{started_daemon_client.profile.name}|{daemon_status}> log file content: \n'
761761
f'{daemon_log_file}'
762762
)
763+
time.sleep(0.1)
763764

764765
return node
765766

src/aiida/tools/pytest_fixtures/daemon.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ def factory(
155155
f'Daemon <{started_daemon_client.profile.name}|{daemon_status}> log file content: \n'
156156
f'{daemon_log_file}'
157157
)
158+
time.sleep(0.1)
158159

159160
return node
160161

0 commit comments

Comments
 (0)