@@ -389,47 +389,46 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
389389 else :
390390 if task_record ['from_memo' ]:
391391 self ._complete_task_result (task_record , States .memo_done , res )
392+ elif not task_record ['join' ]:
393+ self ._complete_task_result (task_record , States .exec_done , res )
392394 else :
393- if not task_record ['join' ]:
394- self ._complete_task_result (task_record , States .exec_done , res )
395+ # This is a join task, and the original task's function code has
396+ # completed. That means that the future returned by that code
397+ # will be available inside the executor future, so we can now
398+ # record the inner app ID in monitoring, and add a completion
399+ # listener to that inner future.
400+
401+ joinable = future .result ()
402+
403+ # Fail with a TypeError if the joinapp python body returned
404+ # something we can't join on.
405+ if isinstance (joinable , Future ):
406+ self .update_task_state (task_record , States .joining )
407+ task_record ['joins' ] = joinable
408+ task_record ['join_lock' ] = threading .Lock ()
409+ self ._send_task_log_info (task_record )
410+ joinable .add_done_callback (partial (self .handle_join_update , task_record ))
411+ elif joinable == []: # got a list, but it had no entries, and specifically, no Futures.
412+ self .update_task_state (task_record , States .joining )
413+ task_record ['joins' ] = joinable
414+ task_record ['join_lock' ] = threading .Lock ()
415+ self ._send_task_log_info (task_record )
416+ self .handle_join_update (task_record , None )
417+ elif isinstance (joinable , list ) and [j for j in joinable if not isinstance (j , Future )] == []:
418+ self .update_task_state (task_record , States .joining )
419+ task_record ['joins' ] = joinable
420+ task_record ['join_lock' ] = threading .Lock ()
421+ self ._send_task_log_info (task_record )
422+ for inner_future in joinable :
423+ inner_future .add_done_callback (partial (self .handle_join_update , task_record ))
395424 else :
396- # This is a join task, and the original task's function code has
397- # completed. That means that the future returned by that code
398- # will be available inside the executor future, so we can now
399- # record the inner app ID in monitoring, and add a completion
400- # listener to that inner future.
401-
402- joinable = future .result ()
403-
404- # Fail with a TypeError if the joinapp python body returned
405- # something we can't join on.
406- if isinstance (joinable , Future ):
407- self .update_task_state (task_record , States .joining )
408- task_record ['joins' ] = joinable
409- task_record ['join_lock' ] = threading .Lock ()
410- self ._send_task_log_info (task_record )
411- joinable .add_done_callback (partial (self .handle_join_update , task_record ))
412- elif joinable == []: # got a list, but it had no entries, and specifically, no Futures.
413- self .update_task_state (task_record , States .joining )
414- task_record ['joins' ] = joinable
415- task_record ['join_lock' ] = threading .Lock ()
416- self ._send_task_log_info (task_record )
417- self .handle_join_update (task_record , None )
418- elif isinstance (joinable , list ) and [j for j in joinable if not isinstance (j , Future )] == []:
419- self .update_task_state (task_record , States .joining )
420- task_record ['joins' ] = joinable
421- task_record ['join_lock' ] = threading .Lock ()
422- self ._send_task_log_info (task_record )
423- for inner_future in joinable :
424- inner_future .add_done_callback (partial (self .handle_join_update , task_record ))
425- else :
426- self .update_task_state (task_record , States .failed )
427- task_record ['time_returned' ] = datetime .datetime .now ()
428- self ._send_task_log_info (task_record )
429- self .memoizer .update_memo (task_record )
430- with task_record ['app_fu' ]._update_lock :
431- task_record ['app_fu' ].set_exception (
432- TypeError (f"join_app body must return a Future or list of Futures, got { joinable } of type { type (joinable )} " ))
425+ self .update_task_state (task_record , States .failed )
426+ task_record ['time_returned' ] = datetime .datetime .now ()
427+ self ._send_task_log_info (task_record )
428+ self .memoizer .update_memo (task_record )
429+ with task_record ['app_fu' ]._update_lock :
430+ task_record ['app_fu' ].set_exception (
431+ TypeError (f"join_app body must return a Future or list of Futures, got { joinable } of type { type (joinable )} " ))
433432
434433 self ._log_std_streams (task_record )
435434
0 commit comments