Skip to content

Commit 2b206b5

Browse files
committed
TODO: factor a complete_exception method, like complete_task. and make all this stuff happen inside those?
TODO: should these run in the app_fu update lock? there's a bunch of other completion stuff that is not locked, so probably no? in general this lock isn't protecting against two completion actions happening at once. split update_memo into two notify-exception and notify-result forms, that run before the future is set and which means checkpointing can run in that place rather than racing with the user workflow. this is needed to address an existing race condition - that task_exit tasks aren't checkpointed by the time the workflow observes they have finished. it's debatable whether that is actually the intended functionality or not, but i think it should be. otherwise what's the behaviour? a little bit later? this manifests as awkwardness in implementation in later checkpoint methods too. and moving more checkpoint code into update_memo makes the checkpoint/memo API unification cleaner. See PR #NNNN which talks about the app future being used internally when it should only be user-facing.
1 parent 7cc81ac commit 2b206b5

File tree

2 files changed

+9
-3
lines changed

2 files changed

+9
-3
lines changed

parsl/dataflow/dflow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ def _complete_task_result(self, task_record: TaskRecord, new_state: States, resu
539539
logger.info(f"Task {task_record['id']} completed ({old_state.name} -> {new_state.name})")
540540
task_record['time_returned'] = datetime.datetime.now()
541541

542-
self.memoizer.update_memo(task_record)
542+
self.memoizer.update_memo_result(task_record, result)
543543

544544
self._send_task_log_info(task_record)
545545

@@ -558,7 +558,7 @@ def _complete_task_exception(self, task_record: TaskRecord, new_state: States, e
558558
logger.info(f"Task {task_record['id']} failed ({old_state.name} -> {new_state.name})")
559559
task_record['time_returned'] = datetime.datetime.now()
560560

561-
self.memoizer.update_memo(task_record)
561+
self.memoizer.update_memo_exception(task_record, exception)
562562

563563
self._send_task_log_info(task_record)
564564

parsl/dataflow/memoization.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,13 @@ def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
283283
assert isinstance(result, Future) or result is None
284284
return result
285285

286-
def update_memo(self, task: TaskRecord) -> None:
286+
def update_memo_result(self, task: TaskRecord, r: Any) -> None:
287+
self._update_memo(task)
288+
289+
def update_memo_exception(self, task: TaskRecord, e: BaseException) -> None:
290+
self._update_memo(task)
291+
292+
def _update_memo(self, task: TaskRecord) -> None:
287293
"""Updates the memoization lookup table with the result from a task.
288294
This doesn't move any values around but associates the memoization
289295
hashsum with the completed (by success or failure) AppFuture.

0 commit comments

Comments
 (0)