Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions src/aiida/cmdline/commands/cmd_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@
verdi daemon start
"""

ACTION_TIMEOUT = OverridableOption(
'-t',
'--timeout',
type=click.FloatRange(0, float('inf')),
default=float('inf'),
show_default=True,
help='Time in seconds to wait for a response before timing out. '
'If timeout is 0 the command does not wait for response.',
)


def valid_projections():
"""Return list of valid projections for the ``--project`` option of ``verdi process list``.
Expand Down Expand Up @@ -320,15 +330,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
@verdi_process.command('kill')
@arguments.PROCESSES()
@options.ALL(help='Kill all processes if no specific processes are specified.')
@OverridableOption(
'-t',
'--timeout',
type=click.FLOAT,
default=5.0,
show_default=True,
help='Time in seconds to wait for a response of the kill task before timing out.',
)()
@options.WAIT()
@ACTION_TIMEOUT()
@OverridableOption(
'-F',
'--force',
Expand All @@ -338,7 +340,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
'Note: This may lead to orphaned jobs on your HPC and should be used with caution.',
)()
@decorators.with_dbenv()
def process_kill(processes, all_entries, timeout, wait, force):
def process_kill(processes, all_entries, timeout, force):
"""Kill running processes.

Kill one or multiple running processes."""
Expand Down Expand Up @@ -368,7 +370,6 @@ def process_kill(processes, all_entries, timeout, wait, force):
force=force,
all_entries=all_entries,
timeout=timeout,
wait=wait,
)
except control.ProcessTimeoutException as exception:
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
Expand All @@ -380,10 +381,9 @@ def process_kill(processes, all_entries, timeout, wait, force):
@verdi_process.command('pause')
@arguments.PROCESSES()
@options.ALL(help='Pause all active processes if no specific processes are specified.')
@options.TIMEOUT()
@options.WAIT()
@ACTION_TIMEOUT()
@decorators.with_dbenv()
def process_pause(processes, all_entries, timeout, wait):
def process_pause(processes, all_entries, timeout):
"""Pause running processes.

Pause one or multiple running processes."""
Expand All @@ -404,7 +404,6 @@ def process_pause(processes, all_entries, timeout, wait):
msg_text='Paused through `verdi process pause`',
all_entries=all_entries,
timeout=timeout,
wait=wait,
)
except control.ProcessTimeoutException as exception:
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
Expand All @@ -416,10 +415,9 @@ def process_pause(processes, all_entries, timeout, wait):
@verdi_process.command('play')
@arguments.PROCESSES()
@options.ALL(help='Play all paused processes if no specific processes are specified.')
@options.TIMEOUT()
@options.WAIT()
@ACTION_TIMEOUT()
@decorators.with_dbenv()
def process_play(processes, all_entries, timeout, wait):
def process_play(processes, all_entries, timeout):
"""Play (unpause) paused processes.

Play (unpause) one or multiple paused processes."""
Expand All @@ -435,7 +433,7 @@ def process_play(processes, all_entries, timeout, wait):

with capture_logging() as stream:
try:
control.play_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait)
control.play_processes(processes, all_entries=all_entries, timeout=timeout)
except control.ProcessTimeoutException as exception:
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')

Expand Down
7 changes: 0 additions & 7 deletions src/aiida/cmdline/params/options/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@
'USER_LAST_NAME',
'VERBOSITY',
'VISUALIZATION_FORMAT',
'WAIT',
'WITH_ELEMENTS',
'WITH_ELEMENTS_EXCLUSIVE',
'active_process_states',
Expand Down Expand Up @@ -690,12 +689,6 @@ def set_log_level(ctx, _param, value):
help='Time in seconds to wait for a response before timing out.',
)

WAIT = OverridableOption(
'--wait/--no-wait',
default=False,
help='Wait for the action to be completed otherwise return as soon as it is scheduled.',
)

FORMULA_MODE = OverridableOption(
'-f',
'--formula-mode',
Expand Down
110 changes: 44 additions & 66 deletions src/aiida/engine/processes/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@


def play_processes(
processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False
processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0
) -> None:
"""Play (unpause) paused processes.

Expand All @@ -113,7 +113,6 @@
:param processes: List of processes to play.
:param all_entries: Play all paused processes.
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
"""
if not get_daemon_client().is_daemon_running:
Expand All @@ -130,7 +129,7 @@
return

controller = get_manager().get_process_controller()
_perform_actions(processes, controller.play_process, 'play', 'playing', timeout, wait)
_perform_actions(processes, controller.play_process, 'play', 'playing', timeout)


def pause_processes(
Expand All @@ -139,7 +138,6 @@
msg_text: str = 'Paused through `aiida.engine.processes.control.pause_processes`',
all_entries: bool = False,
timeout: float = 5.0,
wait: bool = False,
) -> None:
"""Pause running processes.

Expand All @@ -148,7 +146,6 @@
:param processes: List of processes to play.
:param all_entries: Pause all playing processes.
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
"""
if not get_daemon_client().is_daemon_running:
Expand All @@ -166,7 +163,7 @@

controller = get_manager().get_process_controller()
action = functools.partial(controller.pause_process, msg_text=msg_text)
_perform_actions(processes, action, 'pause', 'pausing', timeout, wait)
_perform_actions(processes, action, 'pause', 'pausing', timeout)


def kill_processes(
Expand All @@ -176,7 +173,6 @@
force: bool = False,
all_entries: bool = False,
timeout: float = 5.0,
wait: bool = False,
) -> None:
"""Kill running processes.

Expand All @@ -185,7 +181,6 @@
:param processes: List of processes to play.
:param all_entries: Kill all active processes.
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
"""
if not get_daemon_client().is_daemon_running:
Expand All @@ -203,7 +198,7 @@

controller = get_manager().get_process_controller()
action = functools.partial(controller.kill_process, msg_text=msg_text, force_kill=force)
_perform_actions(processes, action, 'kill', 'killing', timeout, wait)
_perform_actions(processes, action, 'kill', 'killing', timeout)


def _perform_actions(
Expand All @@ -212,7 +207,6 @@
infinitive: str,
present: str,
timeout: t.Optional[float] = None,
wait: bool = False,
**kwargs: t.Any,
) -> None:
"""Perform an action on a list of processes.
Expand All @@ -223,7 +217,6 @@
:param present: The present tense of the verb that represents the action.
:param past: The past tense of the verb that represents the action.
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
:param kwargs: Keyword arguments that will be passed to the method ``action``.
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
"""
Expand All @@ -236,86 +229,71 @@

try:
future = action(process.pk, **kwargs)
LOGGER.report(f'Request to {infinitive} Process<{process.pk}> sent.')
except communications.UnroutableError:
LOGGER.error(f'Process<{process.pk}> is unreachable.')
else:
futures[future] = process

_resolve_futures(futures, infinitive, present, wait, timeout)
_resolve_futures(futures, infinitive, present, timeout)


def _resolve_futures(
futures: dict[concurrent.futures.Future, ProcessNode],
infinitive: str,
present: str,
wait: bool = False,
timeout: t.Optional[float] = None,
) -> None:
"""Process a mapping of futures representing an action on an active process.

This function will echo the correct information strings based on the outcomes of the futures and the given verb
conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a
timeout to put a maximum wait time on the actions.
conjugations. The function waits for any pending actions to be completed. By specifying a timeout the function
aborts after the specified time and cancels pending actions.

:param futures: The map of action futures and the corresponding processes.
:param infinitive: The infinitive form of the action verb.
:param present: The present tense form of the action verb.
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
:param timeout: If None or float('inf') it waits until the actions are completed otherwise it waits for response the
amount in seconds.
"""
scheduled = {}

def handle_result(result):
if result is True:
LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent')
elif result is False:
LOGGER.error(f'problem {present} Process<{process.pk}>')
elif isinstance(result, kiwipy.Future):
LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>')
scheduled[result] = process
else:
LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}')

try:
for future, process in futures.items():
# unwrap is need here since LoopCommunicator will also wrap a future
unwrapped = unwrap_kiwi_future(future)
try:
result = unwrapped.result(timeout=timeout)
except communications.TimeoutError:
cancelled = unwrapped.cancel()
if cancelled:
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out and was cancelled.')
else:
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.')
except Exception as exception:
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
else:
if isinstance(result, kiwipy.Future):
LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>')
scheduled[result] = process
else:
handle_result(result)

if not wait or not scheduled:
return

LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in scheduled.values()])}")
if not timeout or not futures:
if futures:
LOGGER.report(

Check warning on line 261 in src/aiida/engine/processes/control.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/processes/control.py#L260-L261

Added lines #L260 - L261 were not covered by tests
f"Request to {infinitive} process(es) {','.join([str(proc.pk) for proc in futures.values()])}"
' sent. Skipping waiting for response.'
)
return

Check warning on line 265 in src/aiida/engine/processes/control.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/processes/control.py#L265

Added line #L265 was not covered by tests

for future in concurrent.futures.as_completed(scheduled.keys(), timeout=timeout):
process = scheduled[future]
LOGGER.report(f"Waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}.")

# Ensure that when futures are only are completed if they return an actual value (not a future)
unwrapped_futures = {unwrap_kiwi_future(future): process for future, process in futures.items()}
try:
# future does not interpret float('inf') correctly by changing it to None we get the intended behavior
for future in concurrent.futures.as_completed(
unwrapped_futures.keys(), timeout=None if timeout == float('inf') else timeout
):
process = unwrapped_futures[future]
try:
result = future.result()
except Exception as exception:
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
LOGGER.error(f'Failed to {infinitive} Process<{process.pk}>: {exception}')
else:
handle_result(result)

if result is True:
LOGGER.report(f'Request to {infinitive} Process<{process.pk}> processed.')
elif result is False:
LOGGER.error(f'Problem {present} Process<{process.pk}>')
else:
LOGGER.error(f'Got unexpected response when {present} Process<{process.pk}>: {result}')

Check warning on line 287 in src/aiida/engine/processes/control.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/processes/control.py#L287

Added line #L287 was not covered by tests
except concurrent.futures.TimeoutError:
raise ProcessTimeoutException(
f'timed out trying to {infinitive} processes {futures.values()}\n'
'This could be because the daemon workers are too busy to respond, please try again later.\n'
'If the problem persists, make sure the daemon and RabbitMQ are running properly by restarting them.\n'
'If the processes remain unresponsive, as a last resort, try reviving them with ``revive_processes``.'
)
# We cancel the tasks that are not done
undone_futures = {future: process for future, process in unwrapped_futures.items() if not future.done()}
if not undone_futures:
LOGGER.error(f'Call to {infinitive} timed out but already done.')

Check warning on line 292 in src/aiida/engine/processes/control.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/processes/control.py#L292

Added line #L292 was not covered by tests
for future, process in undone_futures.items():
if not future.done():
cancelled = future.cancel()
if cancelled:
LOGGER.error(f'Call to {infinitive} Process<{process.pk}> timed out and was cancelled.')
else:
LOGGER.error(f'Call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.')

Check warning on line 299 in src/aiida/engine/processes/control.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/processes/control.py#L299

Added line #L299 was not covered by tests
Loading