Skip to content

Commit 24d2a6f

Browse files
authored
Implement exception version of complete_task_result factorisation (#4006)
The new complete_task_exception and the older complete_task_result share a lot of code -- but upcoming PRs in my current checkpoint work will make them behave a bit differently later, so I don't want to merge them now and unmerge them later. The order of task completion operations is not preserved by this PR: before this PR, different instances of the exception completion boiler plate ran the same operations in different orders. I don't expect that to be a problem, but if you've bisected to this PR with a strange ordering/race condition, there's a clue. This PR removes a comment from one instance of the boilerplate that asks a question fixed by a previous PR #4004 that also accidentally introduced the question while tidying it. # Changed Behaviour none ## Type of change - Code maintenance/cleanup
1 parent 341994e commit 24d2a6f

File tree

1 file changed

+26
-26
lines changed

1 file changed

+26
-26
lines changed

parsl/dataflow/dflow.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -352,14 +352,8 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
352352
task_record['fail_cost'] += 1
353353

354354
if isinstance(e, DependencyError):
355-
# was this sending two task log infos? if so would I see the row twice in the monitoring db?
356-
self.update_task_state(task_record, States.dep_fail)
357355
logger.info("Task {} failed due to dependency failure so skipping retries".format(task_id))
358-
task_record['time_returned'] = datetime.datetime.now()
359-
self._send_task_log_info(task_record)
360-
self.memoizer.update_memo(task_record)
361-
with task_record['app_fu']._update_lock:
362-
task_record['app_fu'].set_exception(e)
356+
self._complete_task_exception(task_record, States.dep_fail, e)
363357

364358
elif task_record['fail_cost'] <= self._config.retries:
365359

@@ -379,12 +373,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
379373
else:
380374
logger.exception("Task {} failed after {} retry attempts".format(task_id,
381375
task_record['try_id']))
382-
self.update_task_state(task_record, States.failed)
383-
task_record['time_returned'] = datetime.datetime.now()
384-
self._send_task_log_info(task_record)
385-
self.memoizer.update_memo(task_record)
386-
with task_record['app_fu']._update_lock:
387-
task_record['app_fu'].set_exception(e)
376+
self._complete_task_exception(task_record, States.failed, e)
388377

389378
else:
390379
if task_record['from_memo']:
@@ -422,13 +411,10 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
422411
for inner_future in joinable:
423412
inner_future.add_done_callback(partial(self.handle_join_update, task_record))
424413
else:
425-
self.update_task_state(task_record, States.failed)
426-
task_record['time_returned'] = datetime.datetime.now()
427-
self._send_task_log_info(task_record)
428-
self.memoizer.update_memo(task_record)
429-
with task_record['app_fu']._update_lock:
430-
task_record['app_fu'].set_exception(
431-
TypeError(f"join_app body must return a Future or list of Futures, got {joinable} of type {type(joinable)}"))
414+
self._complete_task_exception(
415+
task_record,
416+
States.failed,
417+
TypeError(f"join_app body must return a Future or list of Futures, got {joinable} of type {type(joinable)}"))
432418

433419
self._log_std_streams(task_record)
434420

@@ -499,12 +485,7 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional
499485
# no need to update the fail cost because join apps are never
500486
# retried
501487

502-
self.update_task_state(task_record, States.failed)
503-
task_record['time_returned'] = datetime.datetime.now()
504-
self.memoizer.update_memo(task_record)
505-
with task_record['app_fu']._update_lock:
506-
task_record['app_fu'].set_exception(e)
507-
self._send_task_log_info(task_record)
488+
self._complete_task_exception(task_record, States.failed, e)
508489

509490
else:
510491
# all the joinables succeeded, so construct a result:
@@ -565,6 +546,25 @@ def _complete_task_result(self, task_record: TaskRecord, new_state: States, resu
565546
with task_record['app_fu']._update_lock:
566547
task_record['app_fu'].set_result(result)
567548

549+
def _complete_task_exception(self, task_record: TaskRecord, new_state: States, exception: BaseException) -> None:
550+
"""Set a task into a failure state
551+
"""
552+
assert new_state in FINAL_STATES
553+
assert new_state in FINAL_FAILURE_STATES
554+
old_state = task_record['status']
555+
556+
self.update_task_state(task_record, new_state)
557+
558+
logger.info(f"Task {task_record['id']} failed ({old_state.name} -> {new_state.name})")
559+
task_record['time_returned'] = datetime.datetime.now()
560+
561+
self.memoizer.update_memo(task_record)
562+
563+
self._send_task_log_info(task_record)
564+
565+
with task_record['app_fu']._update_lock:
566+
task_record['app_fu'].set_exception(exception)
567+
568568
def update_task_state(self, task_record: TaskRecord, new_state: States) -> None:
569569
"""Updates a task record state, and recording an appropriate change
570570
to task state counters.

0 commit comments

Comments
 (0)