Skip to content

Commit cb9556e

Browse files
authored
Pull more completion code into _complete_task and rename (#4001)
This PR adds _send_task_log_info as a mandatory part of task completion, which is already manually enforced. In one place, around line 500, one _send_task_log_info is split into two: one which then disappears into complete_task_result, and the other which remains in place for the non-factored error handling path. This PR renames _complete_task to _complete_task_result to reflect that it is for the result path not the exception path (taking naming from the Future concepts of the same name), and to open up the way for factorisation of the exception path into _complete_task_exception. # Changed Behaviour none ## Type of change - Code maintenance/cleanup
1 parent 48d652b commit cb9556e

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

parsl/dataflow/dflow.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -386,12 +386,10 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
386386

387387
else:
388388
if task_record['from_memo']:
389-
self._complete_task(task_record, States.memo_done, res)
390-
self._send_task_log_info(task_record)
389+
self._complete_task_result(task_record, States.memo_done, res)
391390
else:
392391
if not task_record['join']:
393-
self._complete_task(task_record, States.exec_done, res)
394-
self._send_task_log_info(task_record)
392+
self._complete_task_result(task_record, States.exec_done, res)
395393
else:
396394
# This is a join task, and the original task's function code has
397395
# completed. That means that the future returned by that code
@@ -505,6 +503,7 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional
505503
self.memoizer.update_memo(task_record)
506504
with task_record['app_fu']._update_lock:
507505
task_record['app_fu'].set_exception(e)
506+
self._send_task_log_info(task_record)
508507

509508
else:
510509
# all the joinables succeeded, so construct a result:
@@ -517,12 +516,10 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional
517516
res.append(future.result())
518517
else:
519518
raise TypeError(f"Unknown joinable type {type(joinable)}")
520-
self._complete_task(task_record, States.exec_done, res)
519+
self._complete_task_result(task_record, States.exec_done, res)
521520

522521
self._log_std_streams(task_record)
523522

524-
self._send_task_log_info(task_record)
525-
526523
def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
527524
"""This function is called as a callback when an AppFuture
528525
is in its final state.
@@ -548,7 +545,7 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
548545
self.wipe_task(task_id)
549546
return
550547

551-
def _complete_task(self, task_record: TaskRecord, new_state: States, result: Any) -> None:
548+
def _complete_task_result(self, task_record: TaskRecord, new_state: States, result: Any) -> None:
552549
"""Set a task into a completed state
553550
"""
554551
assert new_state in FINAL_STATES
@@ -561,6 +558,9 @@ def _complete_task(self, task_record: TaskRecord, new_state: States, result: Any
561558
task_record['time_returned'] = datetime.datetime.now()
562559

563560
self.memoizer.update_memo(task_record)
561+
562+
self._send_task_log_info(task_record)
563+
564564
with task_record['app_fu']._update_lock:
565565
task_record['app_fu'].set_result(result)
566566

0 commit comments

Comments
 (0)