Skip to content

Commit 34915a0

Browse files
committed
Merge wait and timeout to one CLI option for verdi process {kill|play|pause} (aiidateam#6902)
The original design of `wait` and `timeout` was to distinguish between actions that immediately return and actions that scheduled. This mechanism was however never used and resulted in an misinterpretation in the force-kill PR aiidateam#6793 introducing a bug fixed in PR $6870. The mechanism of `wait` and `timeout` was also never correctly implemented. In this PR we rectify the logic and simplify it by handling immediate and scheduled actions the same way. My interpretation is that the original logic (seen in commit 8388018) is that it unwraps the future once, if it is a another future then it is a scheduled action otherwise an immediate one. Then 1b6ecb8 in plumpy introduced a double wrapping for `play`, `pause` and `kill` of the return value which I think is correct as these are scheduled actions. The problem appears in cd0d15c where on the aiida-core side in the first round unwrapping is always done till the actual (nonfuture) result by using `unwrap_kiwi_future`. This makes the usage of `wait` completely obsolete as we always get the final nonfuture result in the first step. Also the `timeout` was not passed to the `result` which made it a blocking action for scheduled tasked (the `timeout` is only applied on the first layer of future in `futures.as_completed` which is meaningless since the first layer is always a future and returns immediately). This is fixed by simplifying the two step procedure, unwrap once and if future unwrap again, to one step: Unwrap until nonfuture result gained with timeout. One can specify a `timeout == 0` to express that the action should not wait for a response while one can specify `timeout==float('inf')` (default value) to wait until a response has been received without a timeout.
1 parent 2af9480 commit 34915a0

File tree

5 files changed

+91
-122
lines changed

5 files changed

+91
-122
lines changed

src/aiida/cmdline/commands/cmd_process.py

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,16 @@
2525
verdi daemon start
2626
"""
2727

28+
ACTION_TIMEOUT = OverridableOption(
29+
'-t',
30+
'--timeout',
31+
type=click.FloatRange(0, float('inf')),
32+
default=float('inf'),
33+
show_default=True,
34+
help='Time in seconds to wait for a response before timing out. '
35+
'If timeout is 0 the command does not wait for response.',
36+
)
37+
2838

2939
def valid_projections():
3040
"""Return list of valid projections for the ``--project`` option of ``verdi process list``.
@@ -320,15 +330,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
320330
@verdi_process.command('kill')
321331
@arguments.PROCESSES()
322332
@options.ALL(help='Kill all processes if no specific processes are specified.')
323-
@OverridableOption(
324-
'-t',
325-
'--timeout',
326-
type=click.FLOAT,
327-
default=5.0,
328-
show_default=True,
329-
help='Time in seconds to wait for a response of the kill task before timing out.',
330-
)()
331-
@options.WAIT()
333+
@ACTION_TIMEOUT()
332334
@OverridableOption(
333335
'-F',
334336
'--force',
@@ -338,7 +340,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
338340
'Note: This may lead to orphaned jobs on your HPC and should be used with caution.',
339341
)()
340342
@decorators.with_dbenv()
341-
def process_kill(processes, all_entries, timeout, wait, force):
343+
def process_kill(processes, all_entries, timeout, force):
342344
"""Kill running processes.
343345
344346
Kill one or multiple running processes."""
@@ -368,7 +370,6 @@ def process_kill(processes, all_entries, timeout, wait, force):
368370
force=force,
369371
all_entries=all_entries,
370372
timeout=timeout,
371-
wait=wait,
372373
)
373374
except control.ProcessTimeoutException as exception:
374375
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
@@ -380,10 +381,9 @@ def process_kill(processes, all_entries, timeout, wait, force):
380381
@verdi_process.command('pause')
381382
@arguments.PROCESSES()
382383
@options.ALL(help='Pause all active processes if no specific processes are specified.')
383-
@options.TIMEOUT()
384-
@options.WAIT()
384+
@ACTION_TIMEOUT()
385385
@decorators.with_dbenv()
386-
def process_pause(processes, all_entries, timeout, wait):
386+
def process_pause(processes, all_entries, timeout):
387387
"""Pause running processes.
388388
389389
Pause one or multiple running processes."""
@@ -404,7 +404,6 @@ def process_pause(processes, all_entries, timeout, wait):
404404
msg_text='Paused through `verdi process pause`',
405405
all_entries=all_entries,
406406
timeout=timeout,
407-
wait=wait,
408407
)
409408
except control.ProcessTimeoutException as exception:
410409
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
@@ -416,10 +415,9 @@ def process_pause(processes, all_entries, timeout, wait):
416415
@verdi_process.command('play')
417416
@arguments.PROCESSES()
418417
@options.ALL(help='Play all paused processes if no specific processes are specified.')
419-
@options.TIMEOUT()
420-
@options.WAIT()
418+
@ACTION_TIMEOUT()
421419
@decorators.with_dbenv()
422-
def process_play(processes, all_entries, timeout, wait):
420+
def process_play(processes, all_entries, timeout):
423421
"""Play (unpause) paused processes.
424422
425423
Play (unpause) one or multiple paused processes."""
@@ -435,7 +433,7 @@ def process_play(processes, all_entries, timeout, wait):
435433

436434
with capture_logging() as stream:
437435
try:
438-
control.play_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait)
436+
control.play_processes(processes, all_entries=all_entries, timeout=timeout)
439437
except control.ProcessTimeoutException as exception:
440438
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
441439

src/aiida/cmdline/params/options/main.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@
125125
'USER_LAST_NAME',
126126
'VERBOSITY',
127127
'VISUALIZATION_FORMAT',
128-
'WAIT',
129128
'WITH_ELEMENTS',
130129
'WITH_ELEMENTS_EXCLUSIVE',
131130
'active_process_states',
@@ -690,12 +689,6 @@ def set_log_level(ctx, _param, value):
690689
help='Time in seconds to wait for a response before timing out.',
691690
)
692691

693-
WAIT = OverridableOption(
694-
'--wait/--no-wait',
695-
default=False,
696-
help='Wait for the action to be completed otherwise return as soon as it is scheduled.',
697-
)
698-
699692
FORMULA_MODE = OverridableOption(
700693
'-f',
701694
'--formula-mode',

src/aiida/engine/processes/control.py

Lines changed: 44 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def revive_processes(processes: list[ProcessNode], *, wait: bool = False) -> Non
104104

105105

106106
def play_processes(
107-
processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False
107+
processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0
108108
) -> None:
109109
"""Play (unpause) paused processes.
110110
@@ -113,7 +113,6 @@ def play_processes(
113113
:param processes: List of processes to play.
114114
:param all_entries: Play all paused processes.
115115
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
116-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
117116
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
118117
"""
119118
if not get_daemon_client().is_daemon_running:
@@ -130,7 +129,7 @@ def play_processes(
130129
return
131130

132131
controller = get_manager().get_process_controller()
133-
_perform_actions(processes, controller.play_process, 'play', 'playing', timeout, wait)
132+
_perform_actions(processes, controller.play_process, 'play', 'playing', timeout)
134133

135134

136135
def pause_processes(
@@ -139,7 +138,6 @@ def pause_processes(
139138
msg_text: str = 'Paused through `aiida.engine.processes.control.pause_processes`',
140139
all_entries: bool = False,
141140
timeout: float = 5.0,
142-
wait: bool = False,
143141
) -> None:
144142
"""Pause running processes.
145143
@@ -148,7 +146,6 @@ def pause_processes(
148146
:param processes: List of processes to play.
149147
:param all_entries: Pause all playing processes.
150148
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
151-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
152149
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
153150
"""
154151
if not get_daemon_client().is_daemon_running:
@@ -166,7 +163,7 @@ def pause_processes(
166163

167164
controller = get_manager().get_process_controller()
168165
action = functools.partial(controller.pause_process, msg_text=msg_text)
169-
_perform_actions(processes, action, 'pause', 'pausing', timeout, wait)
166+
_perform_actions(processes, action, 'pause', 'pausing', timeout)
170167

171168

172169
def kill_processes(
@@ -176,7 +173,6 @@ def kill_processes(
176173
force: bool = False,
177174
all_entries: bool = False,
178175
timeout: float = 5.0,
179-
wait: bool = False,
180176
) -> None:
181177
"""Kill running processes.
182178
@@ -185,7 +181,6 @@ def kill_processes(
185181
:param processes: List of processes to play.
186182
:param all_entries: Kill all active processes.
187183
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
188-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
189184
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
190185
"""
191186
if not get_daemon_client().is_daemon_running:
@@ -203,7 +198,7 @@ def kill_processes(
203198

204199
controller = get_manager().get_process_controller()
205200
action = functools.partial(controller.kill_process, msg_text=msg_text, force_kill=force)
206-
_perform_actions(processes, action, 'kill', 'killing', timeout, wait)
201+
_perform_actions(processes, action, 'kill', 'killing', timeout)
207202

208203

209204
def _perform_actions(
@@ -212,7 +207,6 @@ def _perform_actions(
212207
infinitive: str,
213208
present: str,
214209
timeout: t.Optional[float] = None,
215-
wait: bool = False,
216210
**kwargs: t.Any,
217211
) -> None:
218212
"""Perform an action on a list of processes.
@@ -223,7 +217,6 @@ def _perform_actions(
223217
:param present: The present tense of the verb that represents the action.
224218
:param past: The past tense of the verb that represents the action.
225219
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
226-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
227220
:param kwargs: Keyword arguments that will be passed to the method ``action``.
228221
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
229222
"""
@@ -236,86 +229,71 @@ def _perform_actions(
236229

237230
try:
238231
future = action(process.pk, **kwargs)
232+
LOGGER.report(f'Request to {infinitive} Process<{process.pk}> sent.')
239233
except communications.UnroutableError:
240234
LOGGER.error(f'Process<{process.pk}> is unreachable.')
241235
else:
242236
futures[future] = process
243237

244-
_resolve_futures(futures, infinitive, present, wait, timeout)
238+
_resolve_futures(futures, infinitive, present, timeout)
245239

246240

247241
def _resolve_futures(
248242
futures: dict[concurrent.futures.Future, ProcessNode],
249243
infinitive: str,
250244
present: str,
251-
wait: bool = False,
252245
timeout: t.Optional[float] = None,
253246
) -> None:
254247
"""Process a mapping of futures representing an action on an active process.
255248
256249
This function will echo the correct information strings based on the outcomes of the futures and the given verb
257-
conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a
258-
timeout to put a maximum wait time on the actions.
250+
conjugations. The function waits for any pending actions to be completed. By specifying a timeout the function
251+
aborts after the specified time and cancels pending actions.
259252
260253
:param futures: The map of action futures and the corresponding processes.
261254
:param infinitive: The infinitive form of the action verb.
262255
:param present: The present tense form of the action verb.
263-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
264-
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
256+
:param timeout: If None or float('inf') it waits until the actions are completed otherwise it waits for response the
257+
amount in seconds.
265258
"""
266-
scheduled = {}
267-
268-
def handle_result(result):
269-
if result is True:
270-
LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent')
271-
elif result is False:
272-
LOGGER.error(f'problem {present} Process<{process.pk}>')
273-
elif isinstance(result, kiwipy.Future):
274-
LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>')
275-
scheduled[result] = process
276-
else:
277-
LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}')
278-
279-
try:
280-
for future, process in futures.items():
281-
# unwrap is need here since LoopCommunicator will also wrap a future
282-
unwrapped = unwrap_kiwi_future(future)
283-
try:
284-
result = unwrapped.result(timeout=timeout)
285-
except communications.TimeoutError:
286-
cancelled = unwrapped.cancel()
287-
if cancelled:
288-
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out and was cancelled.')
289-
else:
290-
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.')
291-
except Exception as exception:
292-
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
293-
else:
294-
if isinstance(result, kiwipy.Future):
295-
LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>')
296-
scheduled[result] = process
297-
else:
298-
handle_result(result)
299-
300-
if not wait or not scheduled:
301-
return
302-
303-
LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in scheduled.values()])}")
259+
if not timeout or not futures:
260+
if futures:
261+
LOGGER.report(
262+
f"Request to {infinitive} process(es) {','.join([str(proc.pk) for proc in futures.values()])}"
263+
' sent. Skipping waiting for response.'
264+
)
265+
return
304266

305-
for future in concurrent.futures.as_completed(scheduled.keys(), timeout=timeout):
306-
process = scheduled[future]
267+
LOGGER.report(f"Waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}.")
307268

269+
# Ensure that when futures are only are completed if they return an actual value (not a future)
270+
unwrapped_futures = {unwrap_kiwi_future(future): process for future, process in futures.items()}
271+
try:
272+
# future does not interpret float('inf') correctly by changing it to None we get the intended behavior
273+
for future in concurrent.futures.as_completed(
274+
unwrapped_futures.keys(), timeout=None if timeout == float('inf') else timeout
275+
):
276+
process = unwrapped_futures[future]
308277
try:
309278
result = future.result()
310279
except Exception as exception:
311-
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
280+
LOGGER.error(f'Failed to {infinitive} Process<{process.pk}>: {exception}')
312281
else:
313-
handle_result(result)
314-
282+
if result is True:
283+
LOGGER.report(f'Request to {infinitive} Process<{process.pk}> processed.')
284+
elif result is False:
285+
LOGGER.error(f'Problem {present} Process<{process.pk}>')
286+
else:
287+
LOGGER.error(f'Got unexpected response when {present} Process<{process.pk}>: {result}')
315288
except concurrent.futures.TimeoutError:
316-
raise ProcessTimeoutException(
317-
f'timed out trying to {infinitive} processes {futures.values()}\n'
318-
'This could be because the daemon workers are too busy to respond, please try again later.\n'
319-
'If the problem persists, make sure the daemon and RabbitMQ are running properly by restarting them.\n'
320-
'If the processes remain unresponsive, as a last resort, try reviving them with ``revive_processes``.'
321-
)
289+
# We cancel the tasks that are not done
290+
undone_futures = {future: process for future, process in unwrapped_futures.items() if not future.done()}
291+
if not undone_futures:
292+
LOGGER.error(f'Call to {infinitive} timed out but already done.')
293+
for future, process in undone_futures.items():
294+
if not future.done():
295+
cancelled = future.cancel()
296+
if cancelled:
297+
LOGGER.error(f'Call to {infinitive} Process<{process.pk}> timed out and was cancelled.')
298+
else:
299+
LOGGER.error(f'Call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.')

0 commit comments

Comments
 (0)