Skip to content

Commit 342e2c8

Browse files
committed
split update_memo into two notify-exception and notify-result forms, that run before the future is set and which
means checkpointing can run in that place rather than racing with the user workflow. this is needed to address an existing race condition - that task_exit tasks aren't checkpointed by the time the workflow observes they have finished. it's debatable whether that is actually the intended functionality or not, but i think it should be. otherwise what's the behaviour? a little bit later? this manifests as awkwardness in implementation in later checkpoint methods too. and moving more checkpoint code into update_memo makes the checkpoint/memo API unification cleaner.
1 parent a797dd8 commit 342e2c8

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

parsl/dataflow/dflow.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
370370
logger.info("Task {} failed due to dependency failure so skipping retries".format(task_id))
371371
task_record['time_returned'] = datetime.datetime.now()
372372
self._send_task_log_info(task_record)
373-
self.memoizer.update_memo(task_record)
373+
self.memoizer.update_memo_exception(task_record, e)
374374
with task_record['app_fu']._update_lock:
375375
task_record['app_fu'].set_exception(e)
376376

@@ -396,7 +396,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
396396
self.update_task_state(task_record, States.failed)
397397
task_record['time_returned'] = datetime.datetime.now()
398398
self._send_task_log_info(task_record)
399-
self.memoizer.update_memo(task_record)
399+
self.memoizer.update_memo_exception(task_record, e)
400400
with task_record['app_fu']._update_lock:
401401
task_record['app_fu'].set_exception(e)
402402

@@ -443,10 +443,10 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
443443
self.update_task_state(task_record, States.failed)
444444
task_record['time_returned'] = datetime.datetime.now()
445445
self._send_task_log_info(task_record)
446-
self.memoizer.update_memo(task_record)
446+
ex = TypeError(f"join_app body must return a Future or list of Futures, got {joinable} of type {type(joinable)}")
447+
self.memoizer.update_memo_exception(task_record, ex)
447448
with task_record['app_fu']._update_lock:
448-
task_record['app_fu'].set_exception(
449-
TypeError(f"join_app body must return a Future or list of Futures, got {joinable} of type {type(joinable)}"))
449+
task_record['app_fu'].set_exception(ex)
450450

451451
self._log_std_streams(task_record)
452452

@@ -519,7 +519,7 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional
519519

520520
self.update_task_state(task_record, States.failed)
521521
task_record['time_returned'] = datetime.datetime.now()
522-
self.memoizer.update_memo(task_record)
522+
self.memoizer.update_memo_exception(task_record, e)
523523
with task_record['app_fu']._update_lock:
524524
task_record['app_fu'].set_exception(e)
525525

@@ -588,7 +588,7 @@ def _complete_task(self, task_record: TaskRecord, new_state: States, result: Any
588588
logger.info(f"Task {task_record['id']} completed ({old_state.name} -> {new_state.name})")
589589
task_record['time_returned'] = datetime.datetime.now()
590590

591-
self.memoizer.update_memo(task_record)
591+
self.memoizer.update_memo_result(task_record, result)
592592
with task_record['app_fu']._update_lock:
593593
task_record['app_fu'].set_result(result)
594594

parsl/dataflow/memoization.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,13 @@ def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
260260
assert isinstance(result, Future) or result is None
261261
return result
262262

263-
def update_memo(self, task: TaskRecord) -> None:
263+
def update_memo_result(self, task: TaskRecord, r: Any) -> None:
264+
self._update_memo(task)
265+
266+
def update_memo_exception(self, task: TaskRecord, e: BaseException) -> None:
267+
self._update_memo(task)
268+
269+
def _update_memo(self, task: TaskRecord) -> None:
264270
"""Updates the memoization lookup table with the result from a task.
265271
This doesn't move any values around but associates the memoization
266272
hashsum with the completed (by success or failure) AppFuture.

0 commit comments

Comments
 (0)