Skip to content

Commit cfbc5bd

Browse files
committed
Merging usage of wait and timeout for actions
The original design of `wait` and `timeout` was to distinguish between actions that immediately return and actions that scheduled. This mechanism was however never used and resulted in an misinterpretation in the force-kill PR aiidateam#6793 introducing a bug fixed in PR aiidateam#6870. The mechanism of `wait` and `timeout` was also never correctly implemented. In this PR we rectify the logic and simplify it by handling immediate and scheduled actions the same way. Related commits in aiida-core 8388018, cd0d15c and plumpy 1b6ecb8 One can specifiy a `timeout <= 0` to express that the action should not wait for a response while one can specify `timeout==float('inf')` (default value) to wait until a response has been received without a timeout.
1 parent e768b70 commit cfbc5bd

File tree

5 files changed

+65
-105
lines changed

5 files changed

+65
-105
lines changed

src/aiida/cmdline/commands/cmd_process.py

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@
2424
verdi daemon start
2525
"""
2626

27+
ACTION_TIMEOUT = OverridableOption(
28+
'-t',
29+
'--timeout',
30+
type=click.FLOAT,
31+
default=float('inf'),
32+
show_default=True,
33+
help='Time in seconds to wait for a response before timing out. '
34+
'If timeout <= 0 the command does not wait for response.',
35+
)
36+
2737

2838
def valid_projections():
2939
"""Return list of valid projections for the ``--project`` option of ``verdi process list``.
@@ -319,15 +329,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
319329
@verdi_process.command('kill')
320330
@arguments.PROCESSES()
321331
@options.ALL(help='Kill all processes if no specific processes are specified.')
322-
@OverridableOption(
323-
'-t',
324-
'--timeout',
325-
type=click.FLOAT,
326-
default=5.0,
327-
show_default=True,
328-
help='Time in seconds to wait for a response of the kill task before timing out.',
329-
)()
330-
@options.WAIT()
332+
@ACTION_TIMEOUT()
331333
@OverridableOption(
332334
'-F',
333335
'--force-kill',
@@ -337,7 +339,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
337339
'Note: This may lead to orphaned jobs on your HPC and should be used with caution.',
338340
)()
339341
@decorators.with_dbenv()
340-
def process_kill(processes, all_entries, timeout, wait, force_kill):
342+
def process_kill(processes, all_entries, timeout, force_kill):
341343
"""Kill running processes.
342344
343345
Kill one or multiple running processes."""
@@ -367,7 +369,6 @@ def process_kill(processes, all_entries, timeout, wait, force_kill):
367369
force_kill=force_kill,
368370
all_entries=all_entries,
369371
timeout=timeout,
370-
wait=wait,
371372
)
372373
except control.ProcessTimeoutException as exception:
373374
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
@@ -379,10 +380,9 @@ def process_kill(processes, all_entries, timeout, wait, force_kill):
379380
@verdi_process.command('pause')
380381
@arguments.PROCESSES()
381382
@options.ALL(help='Pause all active processes if no specific processes are specified.')
382-
@options.TIMEOUT()
383-
@options.WAIT()
383+
@ACTION_TIMEOUT()
384384
@decorators.with_dbenv()
385-
def process_pause(processes, all_entries, timeout, wait):
385+
def process_pause(processes, all_entries, timeout):
386386
"""Pause running processes.
387387
388388
Pause one or multiple running processes."""
@@ -403,7 +403,6 @@ def process_pause(processes, all_entries, timeout, wait):
403403
msg_text='Paused through `verdi process pause`',
404404
all_entries=all_entries,
405405
timeout=timeout,
406-
wait=wait,
407406
)
408407
except control.ProcessTimeoutException as exception:
409408
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
@@ -415,10 +414,9 @@ def process_pause(processes, all_entries, timeout, wait):
415414
@verdi_process.command('play')
416415
@arguments.PROCESSES()
417416
@options.ALL(help='Play all paused processes if no specific processes are specified.')
418-
@options.TIMEOUT()
419-
@options.WAIT()
417+
@ACTION_TIMEOUT()
420418
@decorators.with_dbenv()
421-
def process_play(processes, all_entries, timeout, wait):
419+
def process_play(processes, all_entries, timeout):
422420
"""Play (unpause) paused processes.
423421
424422
Play (unpause) one or multiple paused processes."""
@@ -434,7 +432,7 @@ def process_play(processes, all_entries, timeout, wait):
434432

435433
with capture_logging() as stream:
436434
try:
437-
control.play_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait)
435+
control.play_processes(processes, all_entries=all_entries, timeout=timeout)
438436
except control.ProcessTimeoutException as exception:
439437
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
440438

src/aiida/cmdline/params/options/main.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@
110110
'USER_LAST_NAME',
111111
'VERBOSITY',
112112
'VISUALIZATION_FORMAT',
113-
'WAIT',
114113
'WITH_ELEMENTS',
115114
'WITH_ELEMENTS_EXCLUSIVE',
116115
'active_process_states',
@@ -675,12 +674,6 @@ def set_log_level(ctx, _param, value):
675674
help='Time in seconds to wait for a response before timing out.',
676675
)
677676

678-
WAIT = OverridableOption(
679-
'--wait/--no-wait',
680-
default=False,
681-
help='Wait for the action to be completed otherwise return as soon as it is scheduled.',
682-
)
683-
684677
FORMULA_MODE = OverridableOption(
685678
'-f',
686679
'--formula-mode',

src/aiida/engine/processes/control.py

Lines changed: 19 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def revive_processes(processes: list[ProcessNode], *, wait: bool = False) -> Non
104104

105105

106106
def play_processes(
107-
processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False
107+
processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0
108108
) -> None:
109109
"""Play (unpause) paused processes.
110110
@@ -113,7 +113,6 @@ def play_processes(
113113
:param processes: List of processes to play.
114114
:param all_entries: Play all paused processes.
115115
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
116-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
117116
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
118117
"""
119118
if not get_daemon_client().is_daemon_running:
@@ -130,7 +129,7 @@ def play_processes(
130129
return
131130

132131
controller = get_manager().get_process_controller()
133-
_perform_actions(processes, controller.play_process, 'play', 'playing', timeout, wait)
132+
_perform_actions(processes, controller.play_process, 'play', 'playing', timeout)
134133

135134

136135
def pause_processes(
@@ -139,7 +138,6 @@ def pause_processes(
139138
msg_text: str = 'Paused through `aiida.engine.processes.control.pause_processes`',
140139
all_entries: bool = False,
141140
timeout: float = 5.0,
142-
wait: bool = False,
143141
) -> None:
144142
"""Pause running processes.
145143
@@ -148,7 +146,6 @@ def pause_processes(
148146
:param processes: List of processes to play.
149147
:param all_entries: Pause all playing processes.
150148
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
151-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
152149
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
153150
"""
154151
if not get_daemon_client().is_daemon_running:
@@ -166,7 +163,7 @@ def pause_processes(
166163

167164
controller = get_manager().get_process_controller()
168165
action = functools.partial(controller.pause_process, msg_text=msg_text)
169-
_perform_actions(processes, action, 'pause', 'pausing', timeout, wait)
166+
_perform_actions(processes, action, 'pause', 'pausing', timeout)
170167

171168

172169
def kill_processes(
@@ -176,7 +173,6 @@ def kill_processes(
176173
force_kill: bool = False,
177174
all_entries: bool = False,
178175
timeout: float = 5.0,
179-
wait: bool = False,
180176
) -> None:
181177
"""Kill running processes.
182178
@@ -185,7 +181,6 @@ def kill_processes(
185181
:param processes: List of processes to play.
186182
:param all_entries: Kill all active processes.
187183
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
188-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
189184
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
190185
"""
191186
if not get_daemon_client().is_daemon_running:
@@ -203,7 +198,7 @@ def kill_processes(
203198

204199
controller = get_manager().get_process_controller()
205200
action = functools.partial(controller.kill_process, msg_text=msg_text, force_kill=force_kill)
206-
_perform_actions(processes, action, 'kill', 'killing', timeout, wait)
201+
_perform_actions(processes, action, 'kill', 'killing', timeout)
207202

208203

209204
def _perform_actions(
@@ -212,7 +207,6 @@ def _perform_actions(
212207
infinitive: str,
213208
present: str,
214209
timeout: t.Optional[float] = None,
215-
wait: bool = False,
216210
**kwargs: t.Any,
217211
) -> None:
218212
"""Perform an action on a list of processes.
@@ -223,7 +217,6 @@ def _perform_actions(
223217
:param present: The present tense of the verb that represents the action.
224218
:param past: The past tense of the verb that represents the action.
225219
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
226-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
227220
:param kwargs: Keyword arguments that will be passed to the method ``action``.
228221
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
229222
"""
@@ -241,77 +234,53 @@ def _perform_actions(
241234
else:
242235
futures[future] = process
243236

244-
_resolve_futures(futures, infinitive, present, wait, timeout)
237+
_resolve_futures(futures, infinitive, present, timeout)
245238

246239

247240
def _resolve_futures(
248241
futures: dict[concurrent.futures.Future, ProcessNode],
249242
infinitive: str,
250243
present: str,
251-
wait: bool = False,
252244
timeout: t.Optional[float] = None,
253245
) -> None:
254246
"""Process a mapping of futures representing an action on an active process.
255247
256248
This function will echo the correct information strings based on the outcomes of the futures and the given verb
257249
conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a
258-
timeout to put a maximum wait time on the actions.
250+
timeout to put a maximum wait time on the actions. TODO fix docstring
259251
260252
:param futures: The map of action futures and the corresponding processes.
261253
:param infinitive: The infinitive form of the action verb.
262254
:param present: The present tense form of the action verb.
263-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
264255
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
265256
"""
266-
scheduled = {}
267-
268-
def handle_result(result):
269-
if result is True:
270-
LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent')
271-
elif result is False:
272-
LOGGER.error(f'problem {present} Process<{process.pk}>')
273-
elif isinstance(result, kiwipy.Future):
274-
LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>')
275-
scheduled[result] = process
276-
else:
277-
LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}')
257+
if not timeout:
258+
return
259+
260+
LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}")
278261

279262
try:
280263
for future, process in futures.items():
281-
# unwrap is need here since LoopCommunicator will also wrap a future
264+
# we unwrap to the end
282265
unwrapped = unwrap_kiwi_future(future)
283266
try:
284-
result = unwrapped.result(timeout=timeout)
267+
# future does not interpret float('inf') correctly by changing it to None we get the intended behavior
268+
result = unwrapped.result(timeout=None if timeout == float('inf') else timeout)
285269
except communications.TimeoutError:
286-
cancelled = unwrapped.cancel()
270+
cancelled = future.cancel()
287271
if cancelled:
288272
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out and was cancelled.')
289273
else:
290274
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.')
291275
except Exception as exception:
292276
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
293277
else:
294-
if isinstance(result, kiwipy.Future):
295-
LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>')
296-
scheduled[result] = process
278+
if result is True:
279+
LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent')
280+
elif result is False:
281+
LOGGER.error(f'problem {present} Process<{process.pk}>')
297282
else:
298-
handle_result(result)
299-
300-
if not wait or not scheduled:
301-
return
302-
303-
LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in scheduled.values()])}")
304-
305-
for future in concurrent.futures.as_completed(scheduled.keys(), timeout=timeout):
306-
process = scheduled[future]
307-
308-
try:
309-
result = future.result()
310-
except Exception as exception:
311-
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
312-
else:
313-
handle_result(result)
314-
283+
LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}')
315284
except concurrent.futures.TimeoutError:
316285
raise ProcessTimeoutException(
317286
f'timed out trying to {infinitive} processes {futures.values()}\n'

0 commit comments

Comments
 (0)