Skip to content

Commit 895a6ec

Browse files
committed
split update_memo into two notify-exception and notify-result forms, that run before the future is set and which
means checkpointing can run in that place rather than racing with the user workflow. this is needed to address an existing race condition - that task_exit tasks aren't checkpointed by the time the workflow observes they have finished. it's debatable whether that is actually the intended functionality or not, but i think it should be. otherwise what's the behaviour? a little bit later? this manifests as awkwardness in implementation in later checkpoint methods too. and moving more checkpoint code into update_memo makes the checkpoint/memo API unification cleaner.
1 parent b71cf8b commit 895a6ec

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

parsl/dataflow/dflow.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
357357
logger.info("Task {} failed due to dependency failure so skipping retries".format(task_id))
358358
task_record['time_returned'] = datetime.datetime.now()
359359
self._send_task_log_info(task_record)
360-
self.memoizer.update_memo(task_record)
360+
self.memoizer.update_memo_exception(task_record, e)
361361
with task_record['app_fu']._update_lock:
362362
task_record['app_fu'].set_exception(e)
363363

@@ -383,7 +383,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
383383
self.update_task_state(task_record, States.failed)
384384
task_record['time_returned'] = datetime.datetime.now()
385385
self._send_task_log_info(task_record)
386-
self.memoizer.update_memo(task_record)
386+
self.memoizer.update_memo_exception(task_record, e)
387387
with task_record['app_fu']._update_lock:
388388
task_record['app_fu'].set_exception(e)
389389

@@ -430,10 +430,10 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
430430
self.update_task_state(task_record, States.failed)
431431
task_record['time_returned'] = datetime.datetime.now()
432432
self._send_task_log_info(task_record)
433-
self.memoizer.update_memo(task_record)
433+
ex = TypeError(f"join_app body must return a Future or list of Futures, got {joinable} of type {type(joinable)}")
434+
self.memoizer.update_memo_exception(task_record, ex)
434435
with task_record['app_fu']._update_lock:
435-
task_record['app_fu'].set_exception(
436-
TypeError(f"join_app body must return a Future or list of Futures, got {joinable} of type {type(joinable)}"))
436+
task_record['app_fu'].set_exception(ex)
437437

438438
self._log_std_streams(task_record)
439439

@@ -506,7 +506,7 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional
506506

507507
self.update_task_state(task_record, States.failed)
508508
task_record['time_returned'] = datetime.datetime.now()
509-
self.memoizer.update_memo(task_record)
509+
self.memoizer.update_memo_exception(task_record, e)
510510
with task_record['app_fu']._update_lock:
511511
task_record['app_fu'].set_exception(e)
512512

@@ -564,7 +564,7 @@ def _complete_task(self, task_record: TaskRecord, new_state: States, result: Any
564564
logger.info(f"Task {task_record['id']} completed ({old_state.name} -> {new_state.name})")
565565
task_record['time_returned'] = datetime.datetime.now()
566566

567-
self.memoizer.update_memo(task_record)
567+
self.memoizer.update_memo_result(task_record, result)
568568
with task_record['app_fu']._update_lock:
569569
task_record['app_fu'].set_result(result)
570570

parsl/dataflow/memoization.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,13 @@ def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
286286
assert isinstance(result, Future) or result is None
287287
return result
288288

289-
def update_memo(self, task: TaskRecord) -> None:
289+
def update_memo_result(self, task: TaskRecord, r: Any) -> None:
290+
self._update_memo(task)
291+
292+
def update_memo_exception(self, task: TaskRecord, e: BaseException) -> None:
293+
self._update_memo(task)
294+
295+
def _update_memo(self, task: TaskRecord) -> None:
290296
"""Updates the memoization lookup table with the result from a task.
291297
This doesn't move any values around but associates the memoization
292298
hashsum with the completed (by success or failure) AppFuture.

0 commit comments

Comments
 (0)