diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index e70547f094..af5eb1e31d 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -582,7 +582,6 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override except TransportTaskException as exception: raise plumpy.process_states.PauseInterruption(f'Pausing after failed transport task: {exception}') except plumpy.process_states.KillInterruption as exception: - await self._kill_job(node, transport_queue) node.set_process_status(str(exception)) return self.retrieve(monitor_result=self._monitor_result) except (plumpy.futures.CancelledError, asyncio.CancelledError): diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index edbeca8704..cba3b3a211 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -51,7 +51,9 @@ from aiida.common.lang import classproperty, override from aiida.common.links import LinkType from aiida.common.log import LOG_LEVEL_REPORT +from aiida.engine.utils import InterruptableFuture from aiida.orm.implementation.utils import clean_value +from aiida.orm.nodes.process.calculation.calcjob import CalcJobNode from aiida.orm.utils import serialize from .builder import ProcessBuilder @@ -72,6 +74,7 @@ class Process(PlumpyProcess): have full provenance saved in the database. """ + _cancelling_scheduler_job: asyncio.Task | None = None _node_class = orm.ProcessNode _spec_class = ProcessSpec @@ -336,10 +339,37 @@ def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[b """ self.node.logger.info(f'Request to kill Process<{self.node.pk}>') - had_been_terminated = self.has_terminated() + if self.killed(): + # Already killed + return True + + if self.has_terminated(): + # Can't kill + return False + + # Cancel scheduler job + if not force_kill and isinstance(self.node, CalcJobNode): + if self._killing: + self._killing.cancel() + + if self._cancelling_scheduler_job: + self._cancelling_scheduler_job.cancel() + self.node.logger.report('Found active scheduler job cancelation that will be rescheduled.') + + from .calcjobs.tasks import task_kill_job + + coro = self._launch_task(task_kill_job, self.node, self.runner.transport) + self._cancelling_scheduler_job = asyncio.create_task(coro) + try: + self.loop.run_until_complete(self._cancelling_scheduler_job) + except Exception as exc: + self.node.logger.error(f'While cancelling the scheduler job an error was raised: {exc}') + return False result = super().kill(msg_text, force_kill) + had_been_terminated = self.has_terminated() + # Only kill children if we could be killed ourselves if result is not False and not had_been_terminated: killing = [] @@ -374,6 +404,22 @@ def done(done_future: plumpy.futures.Future): return result + async def _launch_task(self, coro, *args, **kwargs): + """Launch a coroutine as a task, making sure to make it interruptable.""" + import functools + + from aiida.engine.utils import interruptable_task + + self._task: Union[InterruptableFuture, None] + + task_fn = functools.partial(coro, *args, **kwargs) + try: + self._task = interruptable_task(task_fn) + result = await self._task + return result + finally: + self._task = None + @override def out(self, output_port: str, value: Any = None) -> None: """Attach output to output port. diff --git a/tests/cmdline/commands/test_process.py b/tests/cmdline/commands/test_process.py index 411f013397..40c902f1d1 100644 --- a/tests/cmdline/commands/test_process.py +++ b/tests/cmdline/commands/test_process.py @@ -25,7 +25,8 @@ from aiida.common.log import LOG_LEVEL_REPORT from aiida.engine import Process, ProcessState from aiida.engine.processes import control as process_control -from aiida.orm import CalcJobNode, Group, WorkChainNode, WorkflowNode, WorkFunctionNode +from aiida.engine.utils import exponential_backoff_retry +from aiida.orm import CalcJobNode, Group, Int, WorkChainNode, WorkflowNode, WorkFunctionNode from tests.utils.processes import WaitProcess FuncArgs = tuple[t.Any, ...] @@ -53,6 +54,7 @@ def start_daemon_worker_in_foreground_and_redirect_streams( try: pid = os.getpid() + # For easier debugging you can change these to stdout sys.stdout = open(log_dir / f'worker-{pid}.out', 'w') sys.stderr = open(log_dir / f'worker-{pid}.err', 'w') start_daemon_worker(False, aiida_profile_name) @@ -72,10 +74,22 @@ def mock_open(_): raise Exception('Mock open exception') @staticmethod - async def mock_exponential_backoff_retry(*_, **__): + async def exponential_backoff_retry_fail_upload(fct: t.Callable[..., t.Any], *args, **kwargs): from aiida.common.exceptions import TransportTaskException - raise TransportTaskException + if 'do_upload' in fct.__name__: + raise TransportTaskException + else: + return await exponential_backoff_retry(fct, *args, **kwargs) + + @staticmethod + async def exponential_backoff_retry_fail_kill(fct: t.Callable[..., t.Any], *args, **kwargs): + from aiida.common.exceptions import TransportTaskException + + if 'do_kill' in fct.__name__: + raise TransportTaskException + else: + return await exponential_backoff_retry(fct, *args, **kwargs) @pytest.fixture(scope='function') @@ -138,7 +152,6 @@ def test_process_kill_failing_transport( A failure in opening a transport connection results in the EBM to be fired blocking a regular kill command. The force kill command will ignore the EBM and kill the process in any case.""" from aiida.cmdline.utils.common import get_process_function_report - from aiida.orm import Int code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash') @@ -179,7 +192,6 @@ def test_process_kill_failing_transport_failed_kill( """ from aiida.cmdline.utils.common import get_process_function_report - from aiida.orm import Int code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash') @@ -213,14 +225,14 @@ def make_a_builder(sleep_seconds=0): @pytest.mark.requires_rmq @pytest.mark.usefixtures('started_daemon_client') -def test_process_kill_failng_ebm( +def test_process_kill_failing_ebm_transport( fork_worker_context, submit_and_await, aiida_code_installed, run_cli_command, monkeypatch ): - """9) Kill a process that is paused after EBM (5 times failed). It should be possible to kill it normally. - # (e.g. in scenarios that transport is working again) - """ - from aiida.orm import Int + """Kill a process that is waiting after failed EBM during a transport task. + It should be possible to kill it normally. A process that failed upload (e.g. in scenarios that transport is working + again) and is then killed + """ code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash') def make_a_builder(sleep_seconds=0): @@ -232,7 +244,10 @@ def make_a_builder(sleep_seconds=0): kill_timeout = 10 - monkeypatch_args = ('aiida.engine.utils.exponential_backoff_retry', MockFunctions.mock_exponential_backoff_retry) + monkeypatch_args = ( + 'aiida.engine.utils.exponential_backoff_retry', + MockFunctions.exponential_backoff_retry_fail_upload, + ) with fork_worker_context(monkeypatch.setattr, monkeypatch_args): node = submit_and_await(make_a_builder(), ProcessState.WAITING) await_condition( @@ -241,7 +256,60 @@ def make_a_builder(sleep_seconds=0): timeout=kill_timeout, ) + # kill should start EBM and should successfully kill + run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + await_condition(lambda: node.is_killed, timeout=kill_timeout) + + +@pytest.mark.requires_rmq +@pytest.mark.usefixtures('started_daemon_client') +def test_process_kill_failing_ebm_kill( + fork_worker_context, submit_and_await, aiida_code_installed, run_cli_command, monkeypatch +): + """Kill a process that had previously failed with an EBM. + + Killing a process tries to gracefully cancel the job on the remote node. If there are connection problems it retries + it in using the EBM. If this fails another kill command can be send to restart the cancelation of the job scheduler. + """ + from aiida.cmdline.utils.common import get_process_function_report + + code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash') + + def make_a_builder(sleep_seconds=0): + builder = code.get_builder() + builder.x = Int(1) + builder.y = Int(1) + builder.metadata.options.sleep = sleep_seconds + return builder + + kill_timeout = 10 + + monkeypatch_args = ( + 'aiida.engine.utils.exponential_backoff_retry', + MockFunctions.exponential_backoff_retry_fail_kill, + ) + with fork_worker_context(monkeypatch.setattr, monkeypatch_args): + node = submit_and_await(make_a_builder(kill_timeout + 10), ProcessState.WAITING, timeout=kill_timeout) + await_condition( + lambda: node.process_status == 'Monitoring scheduler: job state RUNNING', + timeout=kill_timeout, + ) + + # kill should start EBM and be not successful in EBM run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + await_condition(lambda: not node.is_killed, timeout=kill_timeout) + + # kill should restart EBM and be not successful in EBM + # this tests if the old task is cancelled and restarted successfully + run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + await_condition( + lambda: 'Found active scheduler job cancelation that will be rescheduled.' + in get_process_function_report(node), + timeout=kill_timeout, + ) + + # force kill should skip EBM and successfully kill the process + run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait']) await_condition(lambda: node.is_killed, timeout=kill_timeout) @@ -758,8 +826,6 @@ def test_process_kill(submit_and_await, run_cli_command, aiida_code_installed): assert result.exit_code == ExitCode.USAGE_ERROR assert len(result.output_lines) > 0 - from aiida.orm import Int - code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash') builder = code.get_builder() builder.x = Int(2)