@@ -257,34 +257,36 @@ def _resolve_futures(
257257 if not timeout :
258258 return
259259
260- LOGGER .report (f"waiting for process(es) { ',' .join ([str (proc .pk ) for proc in futures .values ()])} " )
260+ LOGGER .report (f"Waiting for process(es) { ',' .join ([str (proc .pk ) for proc in futures .values ()])} " )
261261
262+ # Ensure that when futures are only are completed if they return an actual value (not a future)
263+ unwrapped_futures = {unwrap_kiwi_future (future ): process for future , process in futures .items ()}
262264 try :
263- for future , process in futures .items ():
264- # we unwrap to the end
265- unwrapped = unwrap_kiwi_future (future )
265+ # future does not interpret float('inf') correctly by changing it to None we get the intended behavior
266+ for future in concurrent .futures .as_completed (
267+ unwrapped_futures .keys (), timeout = None if timeout == float ('inf' ) else timeout
268+ ):
269+ process = unwrapped_futures [future ]
266270 try :
267- # future does not interpret float('inf') correctly by changing it to None we get the intended behavior
268- result = unwrapped .result (timeout = None if timeout == float ('inf' ) else timeout )
269- except communications .TimeoutError :
270- cancelled = future .cancel ()
271- if cancelled :
272- LOGGER .error (f'call to { infinitive } Process<{ process .pk } > timed out and was cancelled.' )
273- else :
274- LOGGER .error (f'call to { infinitive } Process<{ process .pk } > timed out but could not be cancelled.' )
271+ result = future .result ()
275272 except Exception as exception :
276- LOGGER .error (f'failed to { infinitive } Process<{ process .pk } >: { exception } ' )
273+ LOGGER .error (f'Failed to { infinitive } Process<{ process .pk } >: { exception } ' )
277274 else :
278275 if result is True :
279- LOGGER .report (f'request to { infinitive } Process<{ process .pk } > sent' )
276+ LOGGER .report (f'Request to { infinitive } Process<{ process .pk } > sent' )
280277 elif result is False :
281- LOGGER .error (f'problem { present } Process<{ process .pk } >' )
278+ LOGGER .error (f'Problem { present } Process<{ process .pk } >' )
282279 else :
283- LOGGER .error (f'got unexpected response when { present } Process<{ process .pk } >: { result } ' )
280+ LOGGER .error (f'Got unexpected response when { present } Process<{ process .pk } >: { result } ' )
284281 except concurrent .futures .TimeoutError :
285- raise ProcessTimeoutException (
286- f'timed out trying to { infinitive } processes { futures .values ()} \n '
287- 'This could be because the daemon workers are too busy to respond, please try again later.\n '
288- 'If the problem persists, make sure the daemon and RabbitMQ are running properly by restarting them.\n '
289- 'If the processes remain unresponsive, as a last resort, try reviving them with ``revive_processes``.'
290- )
282+ # We cancel the tasks that are not done
283+ undone_futures = {future : process for future , process in unwrapped_futures .items () if not future .done ()}
284+ if not undone_futures :
285+ LOGGER .error (f'Call to { infinitive } timed out but already done.' )
286+ for future , process in undone_futures .items ():
287+ if not future .done ():
288+ cancelled = future .cancel ()
289+ if cancelled :
290+ LOGGER .error (f'Call to { infinitive } Process<{ process .pk } > timed out and was cancelled.' )
291+ else :
292+ LOGGER .error (f'Call to { infinitive } Process<{ process .pk } > timed out but could not be cancelled.' )
0 commit comments