Skip to content

Commit 740a0e2

Browse files
Report error categories (#50)
* Add yield_file_content helper function with test * Add get_error_category function to report error category * Prevent duplicate error messages for all failed workfloe steps * Reports only the first found error per failed task. Error categories are sorted by priorities. If high level error occur it might make the lower error categories markers to appear in the log, so we need to skip them.
1 parent dfcb2f5 commit 740a0e2

File tree

4 files changed

+98
-2
lines changed

4 files changed

+98
-2
lines changed

cwl_airflow/utilities/cwl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,7 @@ def execute_workflow_step(
587587
sys.stderr = _stderr
588588

589589
if step_status != "success":
590-
raise ValueError
590+
raise ValueError("Failed to run workflow step")
591591

592592
# To remove "http://commonwl.org/cwltool#generation": 0 (copied from cwltool)
593593
visit_class(step_outputs, ("File",), MutationManager().unset_generation)

cwl_airflow/utilities/helpers.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,20 @@
1515
from typing import MutableMapping, MutableSequence
1616

1717

18+
# is not actually used anywhere
19+
def yield_file_content(location):
20+
"""
21+
Yields lines from the text file.
22+
\n at the end of the lines are trimmed.
23+
Empty lines or with only spaces/tabs are excluded.
24+
"""
25+
26+
with open(location, "r") as input_stream:
27+
for line in input_stream:
28+
if line.strip():
29+
yield line.strip()
30+
31+
1832
def get_compressed(data_str, reset_position=None):
1933
"""
2034
Converts character string "data_str" as "utf-8" into bytes ("utf-8"

cwl_airflow/utilities/report.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from airflow.models import Variable
77
from airflow.utils.state import State
88
from airflow.hooks.http_hook import HttpHook
9+
from airflow.configuration import conf
910

1011

1112
CONN_ID = "process_report"
@@ -32,7 +33,67 @@ def sign_with_jwt(data):
3233
return data
3334

3435

36+
def get_error_category(context):
37+
"""
38+
This function should be called only from the dag_run failure callback.
39+
It's higly relies on the log files, so logging level in airflow.cfg
40+
shouldn't be lower than ERROR. We load log file only for the latest task
41+
retry, because the get_error_category function is called when the dag_run
42+
has failed, so all previous task retries didn't bring any positive results.
43+
We load logs only for the actually failed task, not for upstream_failed
44+
tasks. All error categories are sorted by priority from higher level to the
45+
lower one. We report only one (the highest, the first found) error category
46+
per failed task. Error categories from all failed tasks are combined and
47+
deduplicated. The "Failed to run workflow step" category additionally is
48+
filled with failed task ids. The returned value is always a string.
49+
"""
50+
51+
ERROR_MARKERS = {
52+
"Docker is not available for this tool": "Docker or Network problems. Contact support team",
53+
"ERROR - Received SIGTERM. Terminating subprocesses": "Workflow was stopped. Restart with the lower threads or memory parameters",
54+
"Failed to run workflow step": "Workflow step(s) {} failed. Contact support team"
55+
}
56+
57+
# docker daemon is not running; networks is unavailable to pull the docker image or it doesn't exist
58+
# something took too much resources and Aiflow killed the process or something externally has stopped the task
59+
# cwltool exited with error when executing workflow step
60+
61+
dag_run = context["dag_run"]
62+
failed_tis = dag_run.get_task_instances(state=State.FAILED)
63+
log_handler = next( # to get access to logs
64+
(
65+
h for h in logging.getLogger("airflow.task").handlers
66+
if h.name == conf.get("core", "task_log_reader")
67+
),
68+
None
69+
)
70+
71+
categories = set() # use set to prevent duplicates
72+
73+
for ti in failed_tis:
74+
ti.task = context["dag"].get_task(ti.task_id) # for some reasons when retreived from DagRun we need to set "task" property from the DAG
75+
try: # in case log files were deleted or unavailable
76+
logs, _ = log_handler.read(ti) # logs is always a list.
77+
for marker, category in ERROR_MARKERS.items():
78+
if marker in logs[-1]: # logs[-1] is a string with \n from the last task retry
79+
categories.add(category)
80+
break
81+
except Exception as err:
82+
logging.debug(f"Failed to define the error category for task {ti.task_id}. \n {err}")
83+
84+
if categories:
85+
return ". ".join(categories).format(", ".join( [ti.task_id for ti in failed_tis] )) # mainly to fill in the placeholder with failed task ids
86+
return "Unknown error. Contact support team"
87+
88+
3589
def post_progress(context, from_task=None):
90+
"""
91+
If dag_run failed but this function was run from the task callback,
92+
error would be always "". The "error" is not "" only when this function
93+
will be called from the DAG callback, thus making it the last and the only
94+
message with the meaningful error description.
95+
"""
96+
3697
from_task = False if from_task is None else from_task
3798
try:
3899
dag_run = context["dag_run"]
@@ -44,7 +105,7 @@ def post_progress(context, from_task=None):
44105
"dag_id": dag_run.dag_id,
45106
"run_id": dag_run.run_id,
46107
"progress": int(len_tis_success / len_tis * 100),
47-
"error": context["reason"] if dag_run.state == State.FAILED else ""
108+
"error": get_error_category(context) if dag_run.state == State.FAILED and not from_task else ""
48109
}
49110
)
50111
http_hook.run(endpoint=ROUTES["progress"], json={"payload": data})
@@ -105,10 +166,12 @@ def task_on_success(context):
105166

106167

107168
def task_on_failure(context):
169+
# no need to post progress as it hasn't been changed
108170
post_status(context)
109171

110172

111173
def task_on_retry(context):
174+
# no need to post progress as it hasn't been changed
112175
post_status(context)
113176

114177

@@ -118,4 +181,5 @@ def dag_on_success(context):
118181

119182

120183
def dag_on_failure(context):
184+
# we need to post progress, because we will also report error in it
121185
post_progress(context)

tests/test_helpers.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from cwl_airflow.utilities.helpers import (
1313
CleanAirflowImport,
14+
yield_file_content,
1415
load_yaml,
1516
remove_field_from_dict,
1617
get_compressed,
@@ -24,6 +25,23 @@
2425
tempfile.tempdir = "/private/tmp"
2526

2627

28+
@pytest.mark.parametrize(
29+
"location, control_count",
30+
[
31+
(
32+
path.join(DATA_FOLDER, "jobs", "bam-bedgraph-bigwig.json"),
33+
11
34+
)
35+
]
36+
)
37+
def test_yield_file_content(location, control_count):
38+
count = 0
39+
for _ in yield_file_content(location):
40+
count += 1
41+
assert control_count==count, \
42+
"Failed to read a proper number of lines from the text file"
43+
44+
2745
@pytest.mark.parametrize(
2846
"location, control_md5sum",
2947
[

0 commit comments

Comments
 (0)