|
1 | | -from typing import Optional, Sequence, Tuple |
| 1 | +from typing import List, Sequence, Tuple |
2 | 2 |
|
3 | 3 | from parsl.errors import ParslError |
4 | 4 |
|
@@ -29,35 +29,77 @@ def __str__(self) -> str: |
29 | 29 | return self.reason |
30 | 30 |
|
31 | 31 |
|
32 | | -class DependencyError(DataFlowException): |
33 | | - """Error raised if an app cannot run because there was an error |
34 | | - in a dependency. |
| 32 | +class PropagatedException(DataFlowException): |
| 33 | + """Error raised if an app fails because there was an error |
| 34 | + in a related task. This is intended to be subclassed for |
| 35 | + dependency and join_app errors. |
35 | 36 |
|
36 | 37 | Args: |
37 | | - - dependent_exceptions_tids: List of exceptions and identifiers for |
38 | | - dependencies which failed. The identifier might be a task ID or |
39 | | - the repr of a non-DFK Future. |
| 38 | + - dependent_exceptions_tids: List of exceptions and brief descriptions |
| 39 | + for dependencies which failed. The description might be a task ID or |
| 40 | + the repr of a non-AppFuture. |
40 | 41 | - task_id: Task ID of the task that failed because of the dependency error |
41 | 42 | """ |
42 | 43 |
|
43 | | - def __init__(self, dependent_exceptions_tids: Sequence[Tuple[Exception, str]], task_id: int) -> None: |
| 44 | + def __init__(self, |
| 45 | + dependent_exceptions_tids: Sequence[Tuple[BaseException, str]], |
| 46 | + task_id: int, |
| 47 | + *, |
| 48 | + failure_description: str) -> None: |
44 | 49 | self.dependent_exceptions_tids = dependent_exceptions_tids |
45 | 50 | self.task_id = task_id |
| 51 | + self._failure_description = failure_description |
| 52 | + |
| 53 | + (cause, cause_sequence) = self._find_any_root_cause() |
| 54 | + self.__cause__ = cause |
| 55 | + self._cause_sequence = cause_sequence |
46 | 56 |
|
47 | 57 | def __str__(self) -> str: |
48 | | - deps = ", ".join(tid for _exc, tid in self.dependent_exceptions_tids) |
49 | | - return f"Dependency failure for task {self.task_id} with failed dependencies from {deps}" |
| 58 | + sequence_text = " <- ".join(self._cause_sequence) |
| 59 | + return f"{self._failure_description} for task {self.task_id}. " \ |
| 60 | + f"The representative cause is via {sequence_text}" |
| 61 | + |
| 62 | + def _find_any_root_cause(self) -> Tuple[BaseException, List[str]]: |
| 63 | + """Looks recursively through self.dependent_exceptions_tids to find |
| 64 | + an exception that caused this propagated error, that is not itself |
| 65 | + a propagated error. |
| 66 | + """ |
| 67 | + e: BaseException = self |
| 68 | + dep_ids = [] |
| 69 | + while isinstance(e, PropagatedException) and len(e.dependent_exceptions_tids) >= 1: |
| 70 | + id_txt = e.dependent_exceptions_tids[0][1] |
| 71 | + assert isinstance(id_txt, str) |
| 72 | + # if there are several causes for this exception, label that |
| 73 | + # there are more so that we know that the representative fail |
| 74 | + # sequence is not the full story. |
| 75 | + if len(e.dependent_exceptions_tids) > 1: |
| 76 | + id_txt += " (+ others)" |
| 77 | + dep_ids.append(id_txt) |
| 78 | + e = e.dependent_exceptions_tids[0][0] |
| 79 | + return e, dep_ids |
| 80 | + |
| 81 | + |
| 82 | +class DependencyError(PropagatedException): |
| 83 | + """Error raised if an app cannot run because there was an error |
| 84 | + in a dependency. There can be several exceptions (one from each |
| 85 | + dependency) and DependencyError collects them all together. |
50 | 86 |
|
| 87 | + Args: |
| 88 | + - dependent_exceptions_tids: List of exceptions and brief descriptions |
| 89 | + for dependencies which failed. The description might be a task ID or |
| 90 | + the repr of a non-AppFuture. |
| 91 | + - task_id: Task ID of the task that failed because of the dependency error |
| 92 | + """ |
| 93 | + def __init__(self, dependent_exceptions_tids: Sequence[Tuple[BaseException, str]], task_id: int) -> None: |
| 94 | + super().__init__(dependent_exceptions_tids, task_id, |
| 95 | + failure_description="Dependency failure") |
51 | 96 |
|
52 | | -class JoinError(DataFlowException): |
| 97 | + |
| 98 | +class JoinError(PropagatedException): |
53 | 99 | """Error raised if apps joining into a join_app raise exceptions. |
54 | 100 | There can be several exceptions (one from each joining app), |
55 | 101 | and JoinError collects them all together. |
56 | 102 | """ |
57 | | - def __init__(self, dependent_exceptions_tids: Sequence[Tuple[BaseException, Optional[str]]], task_id: int) -> None: |
58 | | - self.dependent_exceptions_tids = dependent_exceptions_tids |
59 | | - self.task_id = task_id |
60 | | - |
61 | | - def __str__(self) -> str: |
62 | | - dep_tids = [tid for (exception, tid) in self.dependent_exceptions_tids] |
63 | | - return "Join failure for task {} with failed join dependencies from tasks {}".format(self.task_id, dep_tids) |
| 103 | + def __init__(self, dependent_exceptions_tids: Sequence[Tuple[BaseException, str]], task_id: int) -> None: |
| 104 | + super().__init__(dependent_exceptions_tids, task_id, |
| 105 | + failure_description="Join failure") |
0 commit comments