|
25 | 25 | import logging |
26 | 26 | from datetime import datetime, timezone |
27 | 27 | from functools import wraps |
28 | | -from typing import Any, Callable, Optional, TypedDict |
| 28 | +from typing import Any, Callable, List, Optional, TypedDict |
29 | 29 |
|
30 | 30 | import kopf |
31 | 31 |
|
@@ -271,6 +271,41 @@ def _init_handler_starttime( |
271 | 271 | "ref": self.ref, |
272 | 272 | } |
273 | 273 |
|
| 274 | + def _get_failed_dependent_handlers( |
| 275 | + self, annotations: dict, logger: logging.Logger |
| 276 | + ) -> List[str]: |
| 277 | + """ |
| 278 | + Determine which dependent handlers have failed based on kopf annotations. |
| 279 | +
|
| 280 | + This inspects the progress annotations stored by kopf for each dependency in |
| 281 | + ``self.depends_on`` and returns a list of handlers that did not succeed. |
| 282 | +
|
| 283 | + Used by subhandlers that need to decide whether to perform cleanup or rollback |
| 284 | + actions depending on the outcome of previous handlers. |
| 285 | + """ |
| 286 | + failed_handlers: List[str] = [] |
| 287 | + logger.info("Checking dependent handler status...") |
| 288 | + |
| 289 | + for dep in self.depends_on: |
| 290 | + key = kopf.AnnotationsProgressStorage( |
| 291 | + v1=False, prefix=KOPF_STATE_STORE_PREFIX |
| 292 | + ).make_v2_key(dep) |
| 293 | + |
| 294 | + status_str = annotations.get(key) |
| 295 | + logger.info(f"[{dep}] annotation raw value: {status_str}") |
| 296 | + if not status_str: |
| 297 | + continue |
| 298 | + |
| 299 | + try: |
| 300 | + parsed = json.loads(status_str) |
| 301 | + if not parsed.get("success"): |
| 302 | + failed_handlers.append(dep) |
| 303 | + except json.JSONDecodeError as e: |
| 304 | + logger.warning(f"Failed to parse status annotation for {dep}: {e}") |
| 305 | + |
| 306 | + logger.info(f"Failed handlers detected: {failed_handlers}") |
| 307 | + return failed_handlers |
| 308 | + |
274 | 309 |
|
275 | 310 | async def send_webhook_notification( |
276 | 311 | namespace: str, |
|
0 commit comments