|
27 | 27 | from clusterfuzz._internal.bot.webserver import http_server |
28 | 28 | from clusterfuzz._internal.metrics import logs |
29 | 29 | from clusterfuzz._internal.metrics import monitoring_metrics |
| 30 | +from clusterfuzz._internal.protos import uworker_msg_pb2 |
30 | 31 | from clusterfuzz._internal.system import environment |
31 | 32 |
|
32 | 33 | # Define an alias to appease pylint. |
@@ -74,12 +75,15 @@ class _MetricRecorder(contextlib.AbstractContextManager): |
74 | 75 | Members: |
75 | 76 | start_time_ns (int): The time at which this recorder was constructed, in |
76 | 77 | nanoseconds since the Unix epoch. |
| 78 | + utask_main_failure: this class stores the uworker_output.ErrorType |
| 79 | + object returned by utask_main, and uses it to emmit a metric. |
77 | 80 | """ |
78 | 81 |
|
79 | 82 | def __init__(self, subtask: _Subtask): |
80 | 83 | self.start_time_ns = time.time_ns() |
81 | 84 | self._subtask = subtask |
82 | 85 | self._labels = None |
| 86 | + self.utask_main_failure = None |
83 | 87 |
|
84 | 88 | if subtask == _Subtask.PREPROCESS: |
85 | 89 | self._preprocess_start_time_ns = self.start_time_ns |
@@ -138,6 +142,30 @@ def __exit__(self, _exc_type, _exc_value, _traceback): |
138 | 142 | monitoring_metrics.UTASK_SUBTASK_E2E_DURATION_SECS.add( |
139 | 143 | e2e_duration_secs, self._labels) |
140 | 144 |
|
| 145 | + # The only case where a task might fail without throwing, is in |
| 146 | + # utask_main, by returning an ErrorType proto which indicates |
| 147 | + # failure. |
| 148 | + outcome = 'error' if _exc_type or self.utask_main_failure else 'success' |
| 149 | + monitoring_metrics.TASK_OUTCOME_COUNT.increment({ |
| 150 | + **self._labels, 'outcome': outcome |
| 151 | + }) |
| 152 | + if outcome == "success": |
| 153 | + error_condition = 'N/A' |
| 154 | + elif _exc_type: |
| 155 | + error_condition = 'UNHANDLED_EXCEPTION' |
| 156 | + else: |
| 157 | + error_condition = uworker_msg_pb2.ErrorType.Name( # pylint: disable=no-member |
| 158 | + self.utask_main_failure) |
| 159 | + # Get rid of job as a label, so we can have another metric to make |
| 160 | + # error conditions more explicit, respecting the 30k distinct |
| 161 | + # labels limit recommended by gcp. |
| 162 | + trimmed_labels = self._labels |
| 163 | + del trimmed_labels['job'] |
| 164 | + trimmed_labels['outcome'] = outcome |
| 165 | + trimmed_labels['error_condition'] = error_condition |
| 166 | + monitoring_metrics.TASK_OUTCOME_COUNT_BY_ERROR_TYPE.increment( |
| 167 | + trimmed_labels) |
| 168 | + |
141 | 169 |
|
142 | 170 | def ensure_uworker_env_type_safety(uworker_env): |
143 | 171 | """Converts all values in |uworker_env| to str types. |
@@ -226,6 +254,8 @@ def uworker_main_no_io(utask_module, serialized_uworker_input): |
226 | 254 | return None |
227 | 255 |
|
228 | 256 | # NOTE: Keep this in sync with `uworker_main()`. |
| 257 | + if uworker_output.error_type != uworker_msg_pb2.ErrorType.NO_ERROR: # pylint: disable=no-member |
| 258 | + recorder.utask_main_failure = uworker_output.error_type |
229 | 259 | uworker_output.bot_name = environment.get_value('BOT_NAME', '') |
230 | 260 | uworker_output.platform_id = environment.get_platform_id() |
231 | 261 |
|
@@ -306,6 +336,9 @@ def uworker_main(input_download_url) -> None: |
306 | 336 | logs.info('Starting utask_main: %s.' % utask_module) |
307 | 337 | uworker_output = utask_module.utask_main(uworker_input) |
308 | 338 |
|
| 339 | + if uworker_output.error_type != uworker_msg_pb2.ErrorType.NO_ERROR: # pylint: disable=no-member |
| 340 | + recorder.utask_main_failure = uworker_output.error_type |
| 341 | + |
309 | 342 | # NOTE: Keep this in sync with `uworker_main_no_io()`. |
310 | 343 | uworker_output.bot_name = environment.get_value('BOT_NAME', '') |
311 | 344 | uworker_output.platform_id = environment.get_platform_id() |
|
0 commit comments