Skip to content

Commit c6c23d6

Browse files
committed
Apply review as_completed (aiidateam#6902)
1 parent a63a086 commit c6c23d6

File tree

1 file changed

+24
-22
lines changed

1 file changed

+24
-22
lines changed

src/aiida/engine/processes/control.py

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -257,34 +257,36 @@ def _resolve_futures(
257257
if not timeout:
258258
return
259259

260-
LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}")
260+
LOGGER.report(f"Waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}")
261261

262+
# Ensure that when futures are only are completed if they return an actual value (not a future)
263+
unwrapped_futures = {unwrap_kiwi_future(future): process for future, process in futures.items()}
262264
try:
263-
for future, process in futures.items():
264-
# we unwrap to the end
265-
unwrapped = unwrap_kiwi_future(future)
265+
# future does not interpret float('inf') correctly by changing it to None we get the intended behavior
266+
for future in concurrent.futures.as_completed(
267+
unwrapped_futures.keys(), timeout=None if timeout == float('inf') else timeout
268+
):
269+
process = unwrapped_futures[future]
266270
try:
267-
# future does not interpret float('inf') correctly by changing it to None we get the intended behavior
268-
result = unwrapped.result(timeout=None if timeout == float('inf') else timeout)
269-
except communications.TimeoutError:
270-
cancelled = future.cancel()
271-
if cancelled:
272-
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out and was cancelled.')
273-
else:
274-
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.')
271+
result = future.result()
275272
except Exception as exception:
276-
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
273+
LOGGER.error(f'Failed to {infinitive} Process<{process.pk}>: {exception}')
277274
else:
278275
if result is True:
279-
LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent')
276+
LOGGER.report(f'Request to {infinitive} Process<{process.pk}> sent')
280277
elif result is False:
281-
LOGGER.error(f'problem {present} Process<{process.pk}>')
278+
LOGGER.error(f'Problem {present} Process<{process.pk}>')
282279
else:
283-
LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}')
280+
LOGGER.error(f'Got unexpected response when {present} Process<{process.pk}>: {result}')
284281
except concurrent.futures.TimeoutError:
285-
raise ProcessTimeoutException(
286-
f'timed out trying to {infinitive} processes {futures.values()}\n'
287-
'This could be because the daemon workers are too busy to respond, please try again later.\n'
288-
'If the problem persists, make sure the daemon and RabbitMQ are running properly by restarting them.\n'
289-
'If the processes remain unresponsive, as a last resort, try reviving them with ``revive_processes``.'
290-
)
282+
# We cancel the tasks that are not done
283+
undone_futures = {future: process for future, process in unwrapped_futures.items() if not future.done()}
284+
if not undone_futures:
285+
LOGGER.error(f'Call to {infinitive} timed out but already done.')
286+
for future, process in undone_futures.items():
287+
if not future.done():
288+
cancelled = future.cancel()
289+
if cancelled:
290+
LOGGER.error(f'Call to {infinitive} Process<{process.pk}> timed out and was cancelled.')
291+
else:
292+
LOGGER.error(f'Call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.')

0 commit comments

Comments
 (0)