Skip to content

Commit 9d61d8c

Browse files
authored
Tell memoizer the task result earlier, to fix race condition #3762 (#3979)
Prior to this PR, the memoizer was told about the result in the AppFuture result callback. By the time this happens, the user workflow code can also observe that the task is completed and also observe that the memoizer has not got a copy of the result: for example, by invoking the same app again immediately and seeing a duplicate execution. The AppFuture callback shouldn't have anything in it that is related to user-observable task completion, but this PR does not remove other stuff in there. This PR moves update_memo earlier, to before setting the AppFuture result, so that the memoizer is strictly before future changes state. # Changed Behaviour Checkpointing behaviour will now be slightly less racy. In situations where that race condition would have fired, the user will now see fewer apps actually executed, and memoized results used instead. This is, I think, a very rare situation. # Fixes #3762 ## Type of change - Bug fix
1 parent 77eaa50 commit 9d61d8c

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

parsl/dataflow/dflow.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
375375
logger.info("Task {} failed due to dependency failure so skipping retries".format(task_id))
376376
task_record['time_returned'] = datetime.datetime.now()
377377
self._send_task_log_info(task_record)
378+
self.memoizer.update_memo(task_record)
378379
with task_record['app_fu']._update_lock:
379380
task_record['app_fu'].set_exception(e)
380381

@@ -400,6 +401,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
400401
self.update_task_state(task_record, States.failed)
401402
task_record['time_returned'] = datetime.datetime.now()
402403
self._send_task_log_info(task_record)
404+
self.memoizer.update_memo(task_record)
403405
with task_record['app_fu']._update_lock:
404406
task_record['app_fu'].set_exception(e)
405407

@@ -446,6 +448,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
446448
self.update_task_state(task_record, States.failed)
447449
task_record['time_returned'] = datetime.datetime.now()
448450
self._send_task_log_info(task_record)
451+
self.memoizer.update_memo(task_record)
449452
with task_record['app_fu']._update_lock:
450453
task_record['app_fu'].set_exception(
451454
TypeError(f"join_app body must return a Future or list of Futures, got {joinable} of type {type(joinable)}"))
@@ -521,6 +524,7 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional
521524

522525
self.update_task_state(task_record, States.failed)
523526
task_record['time_returned'] = datetime.datetime.now()
527+
self.memoizer.update_memo(task_record)
524528
with task_record['app_fu']._update_lock:
525529
task_record['app_fu'].set_exception(e)
526530

@@ -561,8 +565,6 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
561565
if not task_record['app_fu'] == future:
562566
logger.error("Internal consistency error: callback future is not the app_fu in task structure, for task {}".format(task_id))
563567

564-
self.memoizer.update_memo(task_record)
565-
566568
# Cover all checkpointing cases here:
567569
# Do we need to checkpoint now, or queue for later,
568570
# or do nothing?
@@ -591,6 +593,7 @@ def _complete_task(self, task_record: TaskRecord, new_state: States, result: Any
591593
logger.info(f"Task {task_record['id']} completed ({old_state.name} -> {new_state.name})")
592594
task_record['time_returned'] = datetime.datetime.now()
593595

596+
self.memoizer.update_memo(task_record)
594597
with task_record['app_fu']._update_lock:
595598
task_record['app_fu'].set_result(result)
596599

0 commit comments

Comments
 (0)