From e989a2f6dcc1a6564919ec01ed59f29df5ef5806 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Tue, 20 May 2025 15:25:00 +0200 Subject: [PATCH 1/4] Merging usage of `wait` and `timeout` for actions The original design of `wait` and `timeout` was to distinguish between actions that immediately return and actions that scheduled. This mechanism was however never used and resulted in an misinterpretation in the force-kill PR #6793 introducing a bug fixed in PR #6870. The mechanism of `wait` and `timeout` was also never correctly implemented. In this PR we rectify the logic and simplify it by handling immediate and scheduled actions the same way. Related commits in aiida-core 83880185bd, cd0d15c79d and plumpy 1b6ecb8f One can specifiy a `timeout <= 0` to express that the action should not wait for a response while one can specify `timeout==float('inf')` (default value) to wait until a response has been received without a timeout. --- src/aiida/cmdline/commands/cmd_process.py | 36 ++++++------ src/aiida/cmdline/params/options/main.py | 7 --- src/aiida/engine/processes/control.py | 69 +++++++---------------- tests/cmdline/commands/test_process.py | 42 +++++++------- tests/engine/processes/test_control.py | 16 +++--- 5 files changed, 65 insertions(+), 105 deletions(-) diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index a8dd349f2a..7fa4483ea7 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -25,6 +25,16 @@ verdi daemon start """ +ACTION_TIMEOUT = OverridableOption( + '-t', + '--timeout', + type=click.FLOAT, + default=float('inf'), + show_default=True, + help='Time in seconds to wait for a response before timing out. ' + 'If timeout <= 0 the command does not wait for response.', +) + def valid_projections(): """Return list of valid projections for the ``--project`` option of ``verdi process list``. @@ -320,15 +330,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes): @verdi_process.command('kill') @arguments.PROCESSES() @options.ALL(help='Kill all processes if no specific processes are specified.') -@OverridableOption( - '-t', - '--timeout', - type=click.FLOAT, - default=5.0, - show_default=True, - help='Time in seconds to wait for a response of the kill task before timing out.', -)() -@options.WAIT() +@ACTION_TIMEOUT() @OverridableOption( '-F', '--force', @@ -338,7 +340,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes): 'Note: This may lead to orphaned jobs on your HPC and should be used with caution.', )() @decorators.with_dbenv() -def process_kill(processes, all_entries, timeout, wait, force): +def process_kill(processes, all_entries, timeout, force): """Kill running processes. Kill one or multiple running processes.""" @@ -368,7 +370,6 @@ def process_kill(processes, all_entries, timeout, wait, force): force=force, all_entries=all_entries, timeout=timeout, - wait=wait, ) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') @@ -380,10 +381,9 @@ def process_kill(processes, all_entries, timeout, wait, force): @verdi_process.command('pause') @arguments.PROCESSES() @options.ALL(help='Pause all active processes if no specific processes are specified.') -@options.TIMEOUT() -@options.WAIT() +@ACTION_TIMEOUT() @decorators.with_dbenv() -def process_pause(processes, all_entries, timeout, wait): +def process_pause(processes, all_entries, timeout): """Pause running processes. Pause one or multiple running processes.""" @@ -404,7 +404,6 @@ def process_pause(processes, all_entries, timeout, wait): msg_text='Paused through `verdi process pause`', all_entries=all_entries, timeout=timeout, - wait=wait, ) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') @@ -416,10 +415,9 @@ def process_pause(processes, all_entries, timeout, wait): @verdi_process.command('play') @arguments.PROCESSES() @options.ALL(help='Play all paused processes if no specific processes are specified.') -@options.TIMEOUT() -@options.WAIT() +@ACTION_TIMEOUT() @decorators.with_dbenv() -def process_play(processes, all_entries, timeout, wait): +def process_play(processes, all_entries, timeout): """Play (unpause) paused processes. Play (unpause) one or multiple paused processes.""" @@ -435,7 +433,7 @@ def process_play(processes, all_entries, timeout, wait): with capture_logging() as stream: try: - control.play_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait) + control.play_processes(processes, all_entries=all_entries, timeout=timeout) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') diff --git a/src/aiida/cmdline/params/options/main.py b/src/aiida/cmdline/params/options/main.py index e7688f0966..364711f21a 100644 --- a/src/aiida/cmdline/params/options/main.py +++ b/src/aiida/cmdline/params/options/main.py @@ -125,7 +125,6 @@ 'USER_LAST_NAME', 'VERBOSITY', 'VISUALIZATION_FORMAT', - 'WAIT', 'WITH_ELEMENTS', 'WITH_ELEMENTS_EXCLUSIVE', 'active_process_states', @@ -690,12 +689,6 @@ def set_log_level(ctx, _param, value): help='Time in seconds to wait for a response before timing out.', ) -WAIT = OverridableOption( - '--wait/--no-wait', - default=False, - help='Wait for the action to be completed otherwise return as soon as it is scheduled.', -) - FORMULA_MODE = OverridableOption( '-f', '--formula-mode', diff --git a/src/aiida/engine/processes/control.py b/src/aiida/engine/processes/control.py index a388f37084..b9e4dce8d3 100644 --- a/src/aiida/engine/processes/control.py +++ b/src/aiida/engine/processes/control.py @@ -104,7 +104,7 @@ def revive_processes(processes: list[ProcessNode], *, wait: bool = False) -> Non def play_processes( - processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False + processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0 ) -> None: """Play (unpause) paused processes. @@ -113,7 +113,6 @@ def play_processes( :param processes: List of processes to play. :param all_entries: Play all paused processes. :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds. - :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget. :raises ``ProcessTimeoutException``: If the processes do not respond within the timeout. """ if not get_daemon_client().is_daemon_running: @@ -130,7 +129,7 @@ def play_processes( return controller = get_manager().get_process_controller() - _perform_actions(processes, controller.play_process, 'play', 'playing', timeout, wait) + _perform_actions(processes, controller.play_process, 'play', 'playing', timeout) def pause_processes( @@ -139,7 +138,6 @@ def pause_processes( msg_text: str = 'Paused through `aiida.engine.processes.control.pause_processes`', all_entries: bool = False, timeout: float = 5.0, - wait: bool = False, ) -> None: """Pause running processes. @@ -148,7 +146,6 @@ def pause_processes( :param processes: List of processes to play. :param all_entries: Pause all playing processes. :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds. - :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget. :raises ``ProcessTimeoutException``: If the processes do not respond within the timeout. """ if not get_daemon_client().is_daemon_running: @@ -166,7 +163,7 @@ def pause_processes( controller = get_manager().get_process_controller() action = functools.partial(controller.pause_process, msg_text=msg_text) - _perform_actions(processes, action, 'pause', 'pausing', timeout, wait) + _perform_actions(processes, action, 'pause', 'pausing', timeout) def kill_processes( @@ -176,7 +173,6 @@ def kill_processes( force: bool = False, all_entries: bool = False, timeout: float = 5.0, - wait: bool = False, ) -> None: """Kill running processes. @@ -185,7 +181,6 @@ def kill_processes( :param processes: List of processes to play. :param all_entries: Kill all active processes. :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds. - :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget. :raises ``ProcessTimeoutException``: If the processes do not respond within the timeout. """ if not get_daemon_client().is_daemon_running: @@ -203,7 +198,7 @@ def kill_processes( controller = get_manager().get_process_controller() action = functools.partial(controller.kill_process, msg_text=msg_text, force_kill=force) - _perform_actions(processes, action, 'kill', 'killing', timeout, wait) + _perform_actions(processes, action, 'kill', 'killing', timeout) def _perform_actions( @@ -212,7 +207,6 @@ def _perform_actions( infinitive: str, present: str, timeout: t.Optional[float] = None, - wait: bool = False, **kwargs: t.Any, ) -> None: """Perform an action on a list of processes. @@ -223,7 +217,6 @@ def _perform_actions( :param present: The present tense of the verb that represents the action. :param past: The past tense of the verb that represents the action. :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds. - :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget. :param kwargs: Keyword arguments that will be passed to the method ``action``. :raises ``ProcessTimeoutException``: If the processes do not respond within the timeout. """ @@ -241,49 +234,40 @@ def _perform_actions( else: futures[future] = process - _resolve_futures(futures, infinitive, present, wait, timeout) + _resolve_futures(futures, infinitive, present, timeout) def _resolve_futures( futures: dict[concurrent.futures.Future, ProcessNode], infinitive: str, present: str, - wait: bool = False, timeout: t.Optional[float] = None, ) -> None: """Process a mapping of futures representing an action on an active process. This function will echo the correct information strings based on the outcomes of the futures and the given verb conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a - timeout to put a maximum wait time on the actions. + timeout to put a maximum wait time on the actions. TODO fix docstring :param futures: The map of action futures and the corresponding processes. :param infinitive: The infinitive form of the action verb. :param present: The present tense form of the action verb. - :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget. :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds. """ - scheduled = {} - - def handle_result(result): - if result is True: - LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent') - elif result is False: - LOGGER.error(f'problem {present} Process<{process.pk}>') - elif isinstance(result, kiwipy.Future): - LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>') - scheduled[result] = process - else: - LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}') + if not timeout: + return + + LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}") try: for future, process in futures.items(): - # unwrap is need here since LoopCommunicator will also wrap a future + # we unwrap to the end unwrapped = unwrap_kiwi_future(future) try: - result = unwrapped.result(timeout=timeout) + # future does not interpret float('inf') correctly by changing it to None we get the intended behavior + result = unwrapped.result(timeout=None if timeout == float('inf') else timeout) except communications.TimeoutError: - cancelled = unwrapped.cancel() + cancelled = future.cancel() if cancelled: LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out and was cancelled.') else: @@ -291,27 +275,12 @@ def handle_result(result): except Exception as exception: LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}') else: - if isinstance(result, kiwipy.Future): - LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>') - scheduled[result] = process + if result is True: + LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent') + elif result is False: + LOGGER.error(f'problem {present} Process<{process.pk}>') else: - handle_result(result) - - if not wait or not scheduled: - return - - LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in scheduled.values()])}") - - for future in concurrent.futures.as_completed(scheduled.keys(), timeout=timeout): - process = scheduled[future] - - try: - result = future.result() - except Exception as exception: - LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}') - else: - handle_result(result) - + LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}') except concurrent.futures.TimeoutError: raise ProcessTimeoutException( f'timed out trying to {infinitive} processes {futures.values()}\n' diff --git a/tests/cmdline/commands/test_process.py b/tests/cmdline/commands/test_process.py index d94f2a1400..1f81aac772 100644 --- a/tests/cmdline/commands/test_process.py +++ b/tests/cmdline/commands/test_process.py @@ -175,7 +175,7 @@ def make_a_builder(sleep_seconds=0): assert 'exponential_backoff_retry' in result # force kill the process - run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait']) + run_cli_command(cmd_process.process_kill, [str(node.pk), '-F']) await_condition(lambda: node.is_killed, timeout=kill_timeout) assert node.is_killed assert node.process_status == 'Force killed through `verdi process kill`' @@ -216,11 +216,11 @@ def make_a_builder(sleep_seconds=0): assert 'exponential_backoff_retry' in result # practice a normal kill, which should fail - result = run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait', '--timeout', '1.0']) + result = run_cli_command(cmd_process.process_kill, [str(node.pk), '--timeout', '1.0']) assert f'Error: call to kill Process<{node.pk}> timed out' in result.stdout # force kill the process - result = run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait']) + result = run_cli_command(cmd_process.process_kill, [str(node.pk), '-F']) await_condition(lambda: node.is_killed, timeout=kill_timeout) assert node.process_status == 'Force killed through `verdi process kill`' @@ -259,7 +259,7 @@ def make_a_builder(sleep_seconds=0): ) # kill should start EBM and should successfully kill - run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + run_cli_command(cmd_process.process_kill, [str(node.pk)]) await_condition(lambda: node.is_killed, timeout=kill_timeout) @@ -298,12 +298,12 @@ def make_a_builder(sleep_seconds=0): ) # kill should start EBM and be not successful in EBM - run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + run_cli_command(cmd_process.process_kill, [str(node.pk)]) await_condition(lambda: not node.is_killed, timeout=kill_timeout) # kill should restart EBM and be not successful in EBM # this tests if the old task is cancelled and restarted successfully - run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + run_cli_command(cmd_process.process_kill, [str(node.pk)]) await_condition( lambda: 'Found active scheduler job cancelation that will be rescheduled.' in get_process_function_report(node), @@ -311,7 +311,7 @@ def make_a_builder(sleep_seconds=0): ) # force kill should skip EBM and successfully kill the process - run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait']) + run_cli_command(cmd_process.process_kill, [str(node.pk), '-F']) await_condition(lambda: node.is_killed, timeout=kill_timeout) @@ -886,7 +886,7 @@ def test_process_pause(submit_and_await, run_cli_command): node = submit_and_await(WaitProcess, ProcessState.WAITING) assert not node.paused - run_cli_command(cmd_process.process_pause, [str(node.pk), '--wait']) + run_cli_command(cmd_process.process_pause, [str(node.pk)]) await_condition(lambda: node.paused) # Running without identifiers should except and print something @@ -902,10 +902,10 @@ def test_process_play(submit_and_await, run_cli_command): """Test the ``verdi process play`` command.""" node = submit_and_await(WaitProcess, ProcessState.WAITING) - run_cli_command(cmd_process.process_pause, [str(node.pk), '--wait']) + run_cli_command(cmd_process.process_pause, [str(node.pk)]) await_condition(lambda: node.paused) - run_cli_command(cmd_process.process_play, [str(node.pk), '--wait']) + run_cli_command(cmd_process.process_play, [str(node.pk)]) await_condition(lambda: not node.paused) # Running without identifiers should except and print something @@ -922,11 +922,11 @@ def test_process_play_all(submit_and_await, run_cli_command): node_one = submit_and_await(WaitProcess, ProcessState.WAITING) node_two = submit_and_await(WaitProcess, ProcessState.WAITING) - run_cli_command(cmd_process.process_pause, ['--all', '--wait']) + run_cli_command(cmd_process.process_pause, ['--all']) await_condition(lambda: node_one.paused) await_condition(lambda: node_two.paused) - run_cli_command(cmd_process.process_play, ['--all', '--wait']) + run_cli_command(cmd_process.process_play, ['--all']) await_condition(lambda: not node_one.paused) await_condition(lambda: not node_two.paused) @@ -954,32 +954,32 @@ def test_process_kill(submit_and_await, run_cli_command, aiida_code_installed): # Kill a paused process node = submit_and_await(builder, ProcessState.WAITING) - run_cli_command(cmd_process.process_pause, [str(node.pk), '--wait']) + run_cli_command(cmd_process.process_pause, [str(node.pk)]) await_condition(lambda: node.paused) assert node.process_status == 'Paused through `verdi process pause`' - run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + run_cli_command(cmd_process.process_kill, [str(node.pk)]) await_condition(lambda: node.is_killed) assert node.process_status == 'Killed through `verdi process kill`' # Force kill a paused process node = submit_and_await(builder, ProcessState.WAITING) - run_cli_command(cmd_process.process_pause, [str(node.pk), '--wait']) + run_cli_command(cmd_process.process_pause, [str(node.pk)]) await_condition(lambda: node.paused) assert node.process_status == 'Paused through `verdi process pause`' - run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait']) + run_cli_command(cmd_process.process_kill, [str(node.pk), '-F']) await_condition(lambda: node.is_killed) assert node.process_status == 'Force killed through `verdi process kill`' # `verdi process kill --all` should kill all processes node_1 = submit_and_await(builder, ProcessState.WAITING) - run_cli_command(cmd_process.process_pause, [str(node_1.pk), '--wait']) + run_cli_command(cmd_process.process_pause, [str(node_1.pk)]) await_condition(lambda: node_1.paused) node_2 = submit_and_await(builder, ProcessState.WAITING) - run_cli_command(cmd_process.process_kill, ['--all', '--wait'], user_input='y') + run_cli_command(cmd_process.process_kill, ['--all'], user_input='y') await_condition(lambda: node_1.is_killed, timeout=kill_timeout) await_condition(lambda: node_2.is_killed, timeout=kill_timeout) assert node_1.process_status == 'Killed through `verdi process kill`' @@ -987,11 +987,11 @@ def test_process_kill(submit_and_await, run_cli_command, aiida_code_installed): # `verdi process kill --all -F` should Force kill all processes (running / not running) node_1 = submit_and_await(builder, ProcessState.WAITING) - run_cli_command(cmd_process.process_pause, [str(node_1.pk), '--wait']) + run_cli_command(cmd_process.process_pause, [str(node_1.pk)]) await_condition(lambda: node_1.paused) node_2 = submit_and_await(builder, ProcessState.WAITING) - run_cli_command(cmd_process.process_kill, ['--all', '--wait', '-F'], user_input='y') + run_cli_command(cmd_process.process_kill, ['--all', '-F'], user_input='y') await_condition(lambda: node_1.is_killed, timeout=kill_timeout) await_condition(lambda: node_2.is_killed, timeout=kill_timeout) assert node_1.process_status == 'Force killed through `verdi process kill`' @@ -1004,7 +1004,7 @@ def test_process_kill_all(submit_and_await, run_cli_command): """Test the ``verdi process kill --all`` command.""" node = submit_and_await(WaitProcess, ProcessState.WAITING) - run_cli_command(cmd_process.process_kill, ['--all', '--wait'], user_input='y') + run_cli_command(cmd_process.process_kill, ['--all'], user_input='y') await_condition(lambda: node.is_killed) assert node.process_status == 'Killed through `verdi process kill`' diff --git a/tests/engine/processes/test_control.py b/tests/engine/processes/test_control.py index 5bb9b8b7a6..4731b92936 100644 --- a/tests/engine/processes/test_control.py +++ b/tests/engine/processes/test_control.py @@ -35,7 +35,7 @@ def test_pause_processes(submit_and_await): node = submit_and_await(WaitProcess, ProcessState.WAITING) assert not node.paused - control.pause_processes([node], wait=True) + control.pause_processes([node], timeout=float('inf')) assert node.paused assert node.process_status == 'Paused through `aiida.engine.processes.control.pause_processes`' @@ -46,7 +46,7 @@ def test_pause_processes_all_entries(submit_and_await): node = submit_and_await(WaitProcess, ProcessState.WAITING) assert not node.paused - control.pause_processes(all_entries=True, wait=True) + control.pause_processes(all_entries=True, timeout=float('inf')) assert node.paused @@ -56,10 +56,10 @@ def test_play_processes(submit_and_await): node = submit_and_await(WaitProcess, ProcessState.WAITING) assert not node.paused - control.pause_processes([node], wait=True) + control.pause_processes([node], timeout=float('inf')) assert node.paused - control.play_processes([node], wait=True) + control.play_processes([node], timeout=float('inf')) assert not node.paused @@ -69,10 +69,10 @@ def test_play_processes_all_entries(submit_and_await): node = submit_and_await(WaitProcess, ProcessState.WAITING) assert not node.paused - control.pause_processes([node], wait=True) + control.pause_processes([node], timeout=float('inf')) assert node.paused - control.play_processes(all_entries=True, wait=True) + control.play_processes(all_entries=True, timeout=float('inf')) assert not node.paused @@ -81,7 +81,7 @@ def test_kill_processes(submit_and_await): """Test :func:`aiida.engine.processes.control.kill_processes`.""" node = submit_and_await(WaitProcess, ProcessState.WAITING) - control.kill_processes([node], wait=True) + control.kill_processes([node], timeout=float('inf')) assert node.is_terminated assert node.is_killed assert node.process_status == 'Killed through `aiida.engine.processes.control.kill_processes`' @@ -92,7 +92,7 @@ def test_kill_processes_all_entries(submit_and_await): """Test :func:`aiida.engine.processes.control.kill_processes` with ``all_entries=True``.""" node = submit_and_await(WaitProcess, ProcessState.WAITING) - control.kill_processes(all_entries=True, wait=True) + control.kill_processes(all_entries=True, timeout=float('inf')) assert node.is_terminated assert node.is_killed From ef5572222264df04945fa9fe2acd49a78ce0cdc3 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Jun 2025 09:09:36 +0200 Subject: [PATCH 2/4] Apply review as_completed --- src/aiida/engine/processes/control.py | 46 ++++++++++++++------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/src/aiida/engine/processes/control.py b/src/aiida/engine/processes/control.py index b9e4dce8d3..0f531c03cf 100644 --- a/src/aiida/engine/processes/control.py +++ b/src/aiida/engine/processes/control.py @@ -257,34 +257,36 @@ def _resolve_futures( if not timeout: return - LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}") + LOGGER.report(f"Waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}") + # Ensure that when futures are only are completed if they return an actual value (not a future) + unwrapped_futures = {unwrap_kiwi_future(future): process for future, process in futures.items()} try: - for future, process in futures.items(): - # we unwrap to the end - unwrapped = unwrap_kiwi_future(future) + # future does not interpret float('inf') correctly by changing it to None we get the intended behavior + for future in concurrent.futures.as_completed( + unwrapped_futures.keys(), timeout=None if timeout == float('inf') else timeout + ): + process = unwrapped_futures[future] try: - # future does not interpret float('inf') correctly by changing it to None we get the intended behavior - result = unwrapped.result(timeout=None if timeout == float('inf') else timeout) - except communications.TimeoutError: - cancelled = future.cancel() - if cancelled: - LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out and was cancelled.') - else: - LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.') + result = future.result() except Exception as exception: - LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}') + LOGGER.error(f'Failed to {infinitive} Process<{process.pk}>: {exception}') else: if result is True: - LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent') + LOGGER.report(f'Request to {infinitive} Process<{process.pk}> sent') elif result is False: - LOGGER.error(f'problem {present} Process<{process.pk}>') + LOGGER.error(f'Problem {present} Process<{process.pk}>') else: - LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}') + LOGGER.error(f'Got unexpected response when {present} Process<{process.pk}>: {result}') except concurrent.futures.TimeoutError: - raise ProcessTimeoutException( - f'timed out trying to {infinitive} processes {futures.values()}\n' - 'This could be because the daemon workers are too busy to respond, please try again later.\n' - 'If the problem persists, make sure the daemon and RabbitMQ are running properly by restarting them.\n' - 'If the processes remain unresponsive, as a last resort, try reviving them with ``revive_processes``.' - ) + # We cancel the tasks that are not done + undone_futures = {future: process for future, process in unwrapped_futures.items() if not future.done()} + if not undone_futures: + LOGGER.error(f'Call to {infinitive} timed out but already done.') + for future, process in undone_futures.items(): + if not future.done(): + cancelled = future.cancel() + if cancelled: + LOGGER.error(f'Call to {infinitive} Process<{process.pk}> timed out and was cancelled.') + else: + LOGGER.error(f'Call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.') From 053fa546c0c15b4cc0fd938bd7217c33dd255531 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Jun 2025 10:36:41 +0200 Subject: [PATCH 3/4] Apply testing review from Timo --- src/aiida/cmdline/commands/cmd_process.py | 4 ++-- src/aiida/engine/processes/control.py | 12 +++++++++--- tests/cmdline/commands/test_process.py | 2 +- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index 7fa4483ea7..9d7d67e171 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -28,11 +28,11 @@ ACTION_TIMEOUT = OverridableOption( '-t', '--timeout', - type=click.FLOAT, + type=click.FloatRange(0, float('inf')), default=float('inf'), show_default=True, help='Time in seconds to wait for a response before timing out. ' - 'If timeout <= 0 the command does not wait for response.', + 'If timeout is 0 the command does not wait for response.', ) diff --git a/src/aiida/engine/processes/control.py b/src/aiida/engine/processes/control.py index 0f531c03cf..9dbb93dc5c 100644 --- a/src/aiida/engine/processes/control.py +++ b/src/aiida/engine/processes/control.py @@ -229,6 +229,7 @@ def _perform_actions( try: future = action(process.pk, **kwargs) + LOGGER.report(f'Request to {infinitive} Process<{process.pk}> sent.') except communications.UnroutableError: LOGGER.error(f'Process<{process.pk}> is unreachable.') else: @@ -254,10 +255,15 @@ def _resolve_futures( :param present: The present tense form of the action verb. :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds. """ - if not timeout: + if not timeout or not futures: + if futures: + LOGGER.report( + f"Request to {infinitive} process(es) {','.join([str(proc.pk) for proc in futures.values()])}" + ' sent. Skipping waiting for response.' + ) return - LOGGER.report(f"Waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}") + LOGGER.report(f"Waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}.") # Ensure that when futures are only are completed if they return an actual value (not a future) unwrapped_futures = {unwrap_kiwi_future(future): process for future, process in futures.items()} @@ -273,7 +279,7 @@ def _resolve_futures( LOGGER.error(f'Failed to {infinitive} Process<{process.pk}>: {exception}') else: if result is True: - LOGGER.report(f'Request to {infinitive} Process<{process.pk}> sent') + LOGGER.report(f'Request to {infinitive} Process<{process.pk}> processed.') elif result is False: LOGGER.error(f'Problem {present} Process<{process.pk}>') else: diff --git a/tests/cmdline/commands/test_process.py b/tests/cmdline/commands/test_process.py index 1f81aac772..7567b18c84 100644 --- a/tests/cmdline/commands/test_process.py +++ b/tests/cmdline/commands/test_process.py @@ -217,7 +217,7 @@ def make_a_builder(sleep_seconds=0): # practice a normal kill, which should fail result = run_cli_command(cmd_process.process_kill, [str(node.pk), '--timeout', '1.0']) - assert f'Error: call to kill Process<{node.pk}> timed out' in result.stdout + assert f'Error: Call to kill Process<{node.pk}> timed out' in result.stdout # force kill the process result = run_cli_command(cmd_process.process_kill, [str(node.pk), '-F']) From c4f67f19fd07d82f2b2fcd964df549e419535de0 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Jun 2025 13:56:27 +0200 Subject: [PATCH 4/4] fix todo --- src/aiida/engine/processes/control.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/aiida/engine/processes/control.py b/src/aiida/engine/processes/control.py index 9dbb93dc5c..447dbf4f3c 100644 --- a/src/aiida/engine/processes/control.py +++ b/src/aiida/engine/processes/control.py @@ -247,13 +247,14 @@ def _resolve_futures( """Process a mapping of futures representing an action on an active process. This function will echo the correct information strings based on the outcomes of the futures and the given verb - conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a - timeout to put a maximum wait time on the actions. TODO fix docstring + conjugations. The function waits for any pending actions to be completed. By specifying a timeout the function + aborts after the specified time and cancels pending actions. :param futures: The map of action futures and the corresponding processes. :param infinitive: The infinitive form of the action verb. :param present: The present tense form of the action verb. - :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds. + :param timeout: If None or float('inf') it waits until the actions are completed otherwise it waits for response the + amount in seconds. """ if not timeout or not futures: if futures: