Skip to content

Commit 0e74a40

Browse files
committed
Merging usage of wait and timeout for actions (aiidateam#6902)
The original design of `wait` and `timeout` was to distinguish between actions that immediately return and actions that scheduled. This mechanism was however never used and resulted in an misinterpretation in the force-kill PR aiidateam#6793 introducing a bug fixed in PR aiidateam#6870. The mechanism of `wait` and `timeout` was also never correctly implemented. In this PR we rectify the logic and simplify it by handling immediate and scheduled actions the same way. Related commits in aiida-core 8388018, cd0d15c and plumpy 1b6ecb8 One can specifiy a `timeout <= 0` to express that the action should not wait for a response while one can specify `timeout==float('inf')` (default value) to wait until a response has been received without a timeout.
1 parent cf029fb commit 0e74a40

File tree

5 files changed

+65
-105
lines changed

5 files changed

+65
-105
lines changed

src/aiida/cmdline/commands/cmd_process.py

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,16 @@
2525
verdi daemon start
2626
"""
2727

28+
ACTION_TIMEOUT = OverridableOption(
29+
'-t',
30+
'--timeout',
31+
type=click.FLOAT,
32+
default=float('inf'),
33+
show_default=True,
34+
help='Time in seconds to wait for a response before timing out. '
35+
'If timeout <= 0 the command does not wait for response.',
36+
)
37+
2838

2939
def valid_projections():
3040
"""Return list of valid projections for the ``--project`` option of ``verdi process list``.
@@ -320,15 +330,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
320330
@verdi_process.command('kill')
321331
@arguments.PROCESSES()
322332
@options.ALL(help='Kill all processes if no specific processes are specified.')
323-
@OverridableOption(
324-
'-t',
325-
'--timeout',
326-
type=click.FLOAT,
327-
default=5.0,
328-
show_default=True,
329-
help='Time in seconds to wait for a response of the kill task before timing out.',
330-
)()
331-
@options.WAIT()
333+
@ACTION_TIMEOUT()
332334
@OverridableOption(
333335
'-F',
334336
'--force',
@@ -338,7 +340,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
338340
'Note: This may lead to orphaned jobs on your HPC and should be used with caution.',
339341
)()
340342
@decorators.with_dbenv()
341-
def process_kill(processes, all_entries, timeout, wait, force):
343+
def process_kill(processes, all_entries, timeout, force):
342344
"""Kill running processes.
343345
344346
Kill one or multiple running processes."""
@@ -368,7 +370,6 @@ def process_kill(processes, all_entries, timeout, wait, force):
368370
force=force,
369371
all_entries=all_entries,
370372
timeout=timeout,
371-
wait=wait,
372373
)
373374
except control.ProcessTimeoutException as exception:
374375
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
@@ -380,10 +381,9 @@ def process_kill(processes, all_entries, timeout, wait, force):
380381
@verdi_process.command('pause')
381382
@arguments.PROCESSES()
382383
@options.ALL(help='Pause all active processes if no specific processes are specified.')
383-
@options.TIMEOUT()
384-
@options.WAIT()
384+
@ACTION_TIMEOUT()
385385
@decorators.with_dbenv()
386-
def process_pause(processes, all_entries, timeout, wait):
386+
def process_pause(processes, all_entries, timeout):
387387
"""Pause running processes.
388388
389389
Pause one or multiple running processes."""
@@ -404,7 +404,6 @@ def process_pause(processes, all_entries, timeout, wait):
404404
msg_text='Paused through `verdi process pause`',
405405
all_entries=all_entries,
406406
timeout=timeout,
407-
wait=wait,
408407
)
409408
except control.ProcessTimeoutException as exception:
410409
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
@@ -416,10 +415,9 @@ def process_pause(processes, all_entries, timeout, wait):
416415
@verdi_process.command('play')
417416
@arguments.PROCESSES()
418417
@options.ALL(help='Play all paused processes if no specific processes are specified.')
419-
@options.TIMEOUT()
420-
@options.WAIT()
418+
@ACTION_TIMEOUT()
421419
@decorators.with_dbenv()
422-
def process_play(processes, all_entries, timeout, wait):
420+
def process_play(processes, all_entries, timeout):
423421
"""Play (unpause) paused processes.
424422
425423
Play (unpause) one or multiple paused processes."""
@@ -435,7 +433,7 @@ def process_play(processes, all_entries, timeout, wait):
435433

436434
with capture_logging() as stream:
437435
try:
438-
control.play_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait)
436+
control.play_processes(processes, all_entries=all_entries, timeout=timeout)
439437
except control.ProcessTimeoutException as exception:
440438
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
441439

src/aiida/cmdline/params/options/main.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@
125125
'USER_LAST_NAME',
126126
'VERBOSITY',
127127
'VISUALIZATION_FORMAT',
128-
'WAIT',
129128
'WITH_ELEMENTS',
130129
'WITH_ELEMENTS_EXCLUSIVE',
131130
'active_process_states',
@@ -690,12 +689,6 @@ def set_log_level(ctx, _param, value):
690689
help='Time in seconds to wait for a response before timing out.',
691690
)
692691

693-
WAIT = OverridableOption(
694-
'--wait/--no-wait',
695-
default=False,
696-
help='Wait for the action to be completed otherwise return as soon as it is scheduled.',
697-
)
698-
699692
FORMULA_MODE = OverridableOption(
700693
'-f',
701694
'--formula-mode',

src/aiida/engine/processes/control.py

Lines changed: 19 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def revive_processes(processes: list[ProcessNode], *, wait: bool = False) -> Non
104104

105105

106106
def play_processes(
107-
processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False
107+
processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0
108108
) -> None:
109109
"""Play (unpause) paused processes.
110110
@@ -113,7 +113,6 @@ def play_processes(
113113
:param processes: List of processes to play.
114114
:param all_entries: Play all paused processes.
115115
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
116-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
117116
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
118117
"""
119118
if not get_daemon_client().is_daemon_running:
@@ -130,7 +129,7 @@ def play_processes(
130129
return
131130

132131
controller = get_manager().get_process_controller()
133-
_perform_actions(processes, controller.play_process, 'play', 'playing', timeout, wait)
132+
_perform_actions(processes, controller.play_process, 'play', 'playing', timeout)
134133

135134

136135
def pause_processes(
@@ -139,7 +138,6 @@ def pause_processes(
139138
msg_text: str = 'Paused through `aiida.engine.processes.control.pause_processes`',
140139
all_entries: bool = False,
141140
timeout: float = 5.0,
142-
wait: bool = False,
143141
) -> None:
144142
"""Pause running processes.
145143
@@ -148,7 +146,6 @@ def pause_processes(
148146
:param processes: List of processes to play.
149147
:param all_entries: Pause all playing processes.
150148
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
151-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
152149
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
153150
"""
154151
if not get_daemon_client().is_daemon_running:
@@ -166,7 +163,7 @@ def pause_processes(
166163

167164
controller = get_manager().get_process_controller()
168165
action = functools.partial(controller.pause_process, msg_text=msg_text)
169-
_perform_actions(processes, action, 'pause', 'pausing', timeout, wait)
166+
_perform_actions(processes, action, 'pause', 'pausing', timeout)
170167

171168

172169
def kill_processes(
@@ -176,7 +173,6 @@ def kill_processes(
176173
force: bool = False,
177174
all_entries: bool = False,
178175
timeout: float = 5.0,
179-
wait: bool = False,
180176
) -> None:
181177
"""Kill running processes.
182178
@@ -185,7 +181,6 @@ def kill_processes(
185181
:param processes: List of processes to play.
186182
:param all_entries: Kill all active processes.
187183
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
188-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
189184
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
190185
"""
191186
if not get_daemon_client().is_daemon_running:
@@ -203,7 +198,7 @@ def kill_processes(
203198

204199
controller = get_manager().get_process_controller()
205200
action = functools.partial(controller.kill_process, msg_text=msg_text, force_kill=force)
206-
_perform_actions(processes, action, 'kill', 'killing', timeout, wait)
201+
_perform_actions(processes, action, 'kill', 'killing', timeout)
207202

208203

209204
def _perform_actions(
@@ -212,7 +207,6 @@ def _perform_actions(
212207
infinitive: str,
213208
present: str,
214209
timeout: t.Optional[float] = None,
215-
wait: bool = False,
216210
**kwargs: t.Any,
217211
) -> None:
218212
"""Perform an action on a list of processes.
@@ -223,7 +217,6 @@ def _perform_actions(
223217
:param present: The present tense of the verb that represents the action.
224218
:param past: The past tense of the verb that represents the action.
225219
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
226-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
227220
:param kwargs: Keyword arguments that will be passed to the method ``action``.
228221
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
229222
"""
@@ -241,77 +234,53 @@ def _perform_actions(
241234
else:
242235
futures[future] = process
243236

244-
_resolve_futures(futures, infinitive, present, wait, timeout)
237+
_resolve_futures(futures, infinitive, present, timeout)
245238

246239

247240
def _resolve_futures(
248241
futures: dict[concurrent.futures.Future, ProcessNode],
249242
infinitive: str,
250243
present: str,
251-
wait: bool = False,
252244
timeout: t.Optional[float] = None,
253245
) -> None:
254246
"""Process a mapping of futures representing an action on an active process.
255247
256248
This function will echo the correct information strings based on the outcomes of the futures and the given verb
257249
conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a
258-
timeout to put a maximum wait time on the actions.
250+
timeout to put a maximum wait time on the actions. TODO fix docstring
259251
260252
:param futures: The map of action futures and the corresponding processes.
261253
:param infinitive: The infinitive form of the action verb.
262254
:param present: The present tense form of the action verb.
263-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
264255
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
265256
"""
266-
scheduled = {}
267-
268-
def handle_result(result):
269-
if result is True:
270-
LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent')
271-
elif result is False:
272-
LOGGER.error(f'problem {present} Process<{process.pk}>')
273-
elif isinstance(result, kiwipy.Future):
274-
LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>')
275-
scheduled[result] = process
276-
else:
277-
LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}')
257+
if not timeout:
258+
return
259+
260+
LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}")
278261

279262
try:
280263
for future, process in futures.items():
281-
# unwrap is need here since LoopCommunicator will also wrap a future
264+
# we unwrap to the end
282265
unwrapped = unwrap_kiwi_future(future)
283266
try:
284-
result = unwrapped.result(timeout=timeout)
267+
# future does not interpret float('inf') correctly by changing it to None we get the intended behavior
268+
result = unwrapped.result(timeout=None if timeout == float('inf') else timeout)
285269
except communications.TimeoutError:
286-
cancelled = unwrapped.cancel()
270+
cancelled = future.cancel()
287271
if cancelled:
288272
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out and was cancelled.')
289273
else:
290274
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.')
291275
except Exception as exception:
292276
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
293277
else:
294-
if isinstance(result, kiwipy.Future):
295-
LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>')
296-
scheduled[result] = process
278+
if result is True:
279+
LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent')
280+
elif result is False:
281+
LOGGER.error(f'problem {present} Process<{process.pk}>')
297282
else:
298-
handle_result(result)
299-
300-
if not wait or not scheduled:
301-
return
302-
303-
LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in scheduled.values()])}")
304-
305-
for future in concurrent.futures.as_completed(scheduled.keys(), timeout=timeout):
306-
process = scheduled[future]
307-
308-
try:
309-
result = future.result()
310-
except Exception as exception:
311-
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
312-
else:
313-
handle_result(result)
314-
283+
LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}')
315284
except concurrent.futures.TimeoutError:
316285
raise ProcessTimeoutException(
317286
f'timed out trying to {infinitive} processes {futures.values()}\n'

0 commit comments

Comments
 (0)