Skip to content

Commit 507f778

Browse files
committed
remove handle app update
wipe_task now happens in complete task, the moral successor of handle_app_update but it now happens *before* the user observes a task complete. this makes the task be able to be removed from the tasks table before the user observes the future is complete. you might say thats a new race condition: the user cannot see the task table entry at the moment of completion. but that was never strongly guaranteed: the callback to remove the entry could happen before or after the user observed the completion. now theres a stronger assertion: it will definitely happen before the user observes task completion via an AppFuture. TODO: are there already tests about this? When i removed wipe_task calls entirely, no per-config test failed... parsl/tests/test_python_apps/test_garbage_collect.py - this test was very slightly racey before and this was noticeable in some race-condition fuzzing work I've done
1 parent ebedfa9 commit 507f778

File tree

1 file changed

+4
-24
lines changed

1 file changed

+4
-24
lines changed

parsl/dataflow/dflow.py

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -502,29 +502,6 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional
502502

503503
self._log_std_streams(task_record)
504504

505-
def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
506-
"""This function is called as a callback when an AppFuture
507-
is in its final state.
508-
509-
It will trigger post-app processing such as checkpointing.
510-
511-
Args:
512-
task_record : Task record
513-
future (Future) : The relevant app future (which should be
514-
consistent with the task structure 'app_fu' entry
515-
516-
"""
517-
518-
task_id = task_record['id']
519-
520-
if not task_record['app_fu'].done():
521-
logger.error("Internal consistency error: app_fu is not done for task {}".format(task_id))
522-
if not task_record['app_fu'] == future:
523-
logger.error("Internal consistency error: callback future is not the app_fu in task structure, for task {}".format(task_id))
524-
525-
self.wipe_task(task_id)
526-
return
527-
528505
def _complete_task_result(self, task_record: TaskRecord, new_state: States, result: Any) -> None:
529506
"""Set a task into a completed state
530507
"""
@@ -541,6 +518,8 @@ def _complete_task_result(self, task_record: TaskRecord, new_state: States, resu
541518

542519
self._send_task_log_info(task_record)
543520

521+
self.wipe_task(task_record['id'])
522+
544523
with task_record['app_fu']._update_lock:
545524
task_record['app_fu'].set_result(result)
546525

@@ -560,6 +539,8 @@ def _complete_task_exception(self, task_record: TaskRecord, new_state: States, e
560539

561540
self._send_task_log_info(task_record)
562541

542+
self.wipe_task(task_record['id'])
543+
563544
with task_record['app_fu']._update_lock:
564545
task_record['app_fu'].set_exception(exception)
565546

@@ -1051,7 +1032,6 @@ def submit(self,
10511032
task_record['func_name'],
10521033
waiting_message))
10531034

1054-
app_fu.add_done_callback(partial(self.handle_app_update, task_record))
10551035
self.update_task_state(task_record, States.pending)
10561036
logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu']))
10571037

0 commit comments

Comments
 (0)