From b0b9129129371b4b5c400e747d2f75b7df8a2977 Mon Sep 17 00:00:00 2001 From: Shayan Date: Thu, 16 Oct 2025 11:22:22 -0700 Subject: [PATCH 1/2] Add Hygraph circuit breaker with caching and fallback metrics --- backend/api/metrics.py | 14 ++- backend/api/routes_sync.py | 116 +++++++++++------ backend/pyproject.toml | 1 + backend/services/cache.py | 67 ++++++++++ backend/services/circuit_breaker.py | 84 +++++++++++++ backend/services/hygraph_client.py | 69 +++++++++++ backend/services/hygraph_service.py | 186 ++++++++++++++++++++++------ docs/backend-development.md | 21 ++++ 8 files changed, 486 insertions(+), 72 deletions(-) create mode 100644 backend/services/cache.py create mode 100644 backend/services/circuit_breaker.py create mode 100644 backend/services/hygraph_client.py diff --git a/backend/api/metrics.py b/backend/api/metrics.py index 1b484b9e..b4a84579 100644 --- a/backend/api/metrics.py +++ b/backend/api/metrics.py @@ -18,4 +18,16 @@ "sync_records_upserted_total", "Records upserted during Hygraph sync", labelnames=("type",), -) \ No newline at end of file +) + +hygraph_cb_trips_total = Counter( + "hygraph_cb_trips_total", + "Hygraph circuit breaker open events", + labelnames=("type",), +) + +hygraph_cb_fallback_total = Counter( + "hygraph_cb_fallback_total", + "Hygraph responses served from cache while circuit open", + labelnames=("type",), +) diff --git a/backend/api/routes_sync.py b/backend/api/routes_sync.py index e15b6365..fe87e9d8 100644 --- a/backend/api/routes_sync.py +++ b/backend/api/routes_sync.py @@ -27,7 +27,10 @@ sync_success_total, sync_failure_total, sync_records_upserted_total, + hygraph_cb_trips_total, + hygraph_cb_fallback_total, ) +from services.circuit_breaker import CircuitOpenError from services.hygraph_service import HygraphService logger = logging.getLogger(__name__) @@ -44,20 +47,22 @@ def get_hygraph_service() -> HygraphService: def _error_envelope(code: str, message: str, details: Optional[dict] = None) -> Dict[str, Any]: - """ - Builds a standardized error envelope for API responses. - - Parameters: - code (str): Machine-readable error code identifying the error. - message (str): Human-readable error message describing the failure. - details (Optional[dict]): Additional contextual information; defaults to an empty dict when not provided. - - Returns: - Dict[str, Any]: Dictionary with keys `ok` (False) and `error` containing `code`, `message`, and `details`. - """ + """Build a standardized error envelope for API responses.""" + return {"ok": False, "error": {"code": code, "message": message, "details": details or {}}} +def _processed_value(payload: Any) -> int: + if isinstance(payload, dict): + value = payload.get("processed", 0) + else: + value = payload + try: + return int(value or 0) + except Exception: # noqa: BLE001 + return 0 + + @router.post( "/hygraph", status_code=status.HTTP_202_ACCEPTED, @@ -68,14 +73,8 @@ async def hygraph_webhook( background: BackgroundTasks, db: Session = Depends(get_db), ) -> Dict[str, Any]: - """ - Webhook receiver: - - HMAC validated (dependency) - - Single size guard (2MB) already enforced by dependency; body/raw set on request.state - - DB dedup via SyncEvent(event_id, body_sha256 unique) - - 202 fast-ack with background processing (pull_all) - - Structured JSON log line and Prometheus counters - """ + """Webhook receiver for Hygraph change notifications.""" + start = time.perf_counter() raw = getattr(request.state, "raw_body", b"") body_sha = getattr(request.state, "body_sha256", "") @@ -104,18 +103,19 @@ async def hygraph_webhook( return JSONResponse({"ok": True, "dedup": True}, status_code=200) try: - payload = json.loads(raw) if raw else {} + _ = json.loads(raw) if raw else {} except json.JSONDecodeError: raise HTTPException(status_code=400, detail=_error_envelope("BAD_REQUEST", "Invalid JSON payload")) + async def _process(event_id_local: Optional[str], body_sha_local: str) -> None: t0 = time.perf_counter() try: counts = await HygraphService.pull_all(db) for t, c in counts.items(): - sync_records_upserted_total.labels(t).inc(int(c or 0)) + processed = _processed_value(c) + sync_records_upserted_total.labels(t).inc(processed) sync_success_total.labels("all").inc() - # Log at WARNING so caplog sees it without level configuration logger.warning( "hygraph_webhook", extra={ @@ -125,6 +125,33 @@ async def _process(event_id_local: Optional[str], body_sha_local: str) -> None: "elapsed_ms": int((time.perf_counter() - t0) * 1000), }, ) + except CircuitOpenError as exc: + hygraph_cb_trips_total.labels("all").inc() + fallback_payload = exc.fallback or {} + if fallback_payload: + hygraph_cb_fallback_total.labels("all").inc() + for t, payload in fallback_payload.items(): + processed = _processed_value(payload) + sync_records_upserted_total.labels(t).inc(processed) + logger.warning( + "hygraph_webhook_fallback", + extra={ + "event_id": event_id_local, + "dedup": False, + "fallback": fallback_payload, + "elapsed_ms": int((time.perf_counter() - t0) * 1000), + }, + ) + else: + sync_failure_total.labels("all").inc() + logger.warning( + "hygraph_webhook_circuit_open", + extra={ + "event_id": event_id_local, + "dedup": False, + "error": "Circuit breaker open with empty cache", + }, + ) except Exception as e: # noqa: BLE001 sync_failure_total.labels("all").inc() logger.exception( @@ -137,7 +164,6 @@ async def _process(event_id_local: Optional[str], body_sha_local: str) -> None: ) background.add_task(_process, event_id, body_sha) - # Explicit background attachment ensures Starlette runs it before TestClient returns return JSONResponse({"ok": True, "accepted": True}, status_code=202, background=background) @@ -146,12 +172,8 @@ async def hygraph_pull( body: Dict[str, Any] = Body(...), db: Session = Depends(get_db), ) -> Dict[str, Any]: - """ - Admin pull: - - Auth via Bearer token (constant-time compare) - - Accepts "type" or "sync_type" + optional "page_size" - - Validates positive page_size and caps inside service (≤200) - """ + """Admin-triggered Hygraph syncs supporting manual pull types.""" + sync_type = str((body.get("type") or body.get("sync_type") or "")).lower().strip() page_size_raw = body.get("page_size") page_size: Optional[int] = None @@ -163,26 +185,50 @@ async def hygraph_pull( except (TypeError, ValueError): raise HTTPException(status_code=400, detail=_error_envelope("BAD_REQUEST", "page_size must be a positive integer")) + try: if sync_type == "materials": counts = await HygraphService.pull_materials(db, page_size=page_size) + processed = _processed_value(counts) sync_success_total.labels("materials").inc() - sync_records_upserted_total.labels("materials").inc(int(counts.get("processed", 0))) + sync_records_upserted_total.labels("materials").inc(processed) elif sync_type == "modules": counts = await HygraphService.pull_modules(db, page_size=page_size) + processed = _processed_value(counts) sync_success_total.labels("modules").inc() - sync_records_upserted_total.labels("modules").inc(int(counts.get("processed", 0))) + sync_records_upserted_total.labels("modules").inc(processed) elif sync_type == "systems": counts = await HygraphService.pull_systems(db, page_size=page_size) + processed = _processed_value(counts) sync_success_total.labels("systems").inc() - sync_records_upserted_total.labels("systems").inc(int(counts.get("processed", 0))) + sync_records_upserted_total.labels("systems").inc(processed) elif sync_type == "all": counts = await HygraphService.pull_all(db, page_size=page_size) - for t, c in counts.items(): + for t, payload in counts.items(): + processed = _processed_value(payload) sync_success_total.labels(t).inc() - sync_records_upserted_total.labels(t).inc(int(c or 0)) + sync_records_upserted_total.labels(t).inc(processed) else: raise HTTPException(status_code=400, detail=_error_envelope("BAD_REQUEST", "unsupported type")) + except CircuitOpenError as exc: + circuit_type = sync_type or "all" + hygraph_cb_trips_total.labels(circuit_type).inc() + fallback_payload = exc.fallback + if fallback_payload: + hygraph_cb_fallback_total.labels(circuit_type).inc() + return { + "ok": True, + "breaker_open": True, + "data": fallback_payload, + } + raise HTTPException( + status_code=503, + detail=_error_envelope( + "SERVICE_UNAVAILABLE", + "Hygraph circuit breaker open and no cached data available", + {"type": circuit_type}, + ), + ) except HTTPException: raise except Exception as e: # noqa: BLE001 @@ -190,4 +236,4 @@ async def hygraph_pull( logger.exception("hygraph_pull_failure", extra={"type": sync_type, "error": str(e)}) raise HTTPException(status_code=500, detail=_error_envelope("INTERNAL", "sync failed", {"type": sync_type})) - return {"ok": True, "data": counts} \ No newline at end of file + return {"ok": True, "data": counts} diff --git a/backend/pyproject.toml b/backend/pyproject.toml index ef751d75..86c212e6 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "alembic>=1.13.2", "psycopg[binary]>=3.2.1", "prometheus-fastapi-instrumentator", + "httpx>=0.27.0", "sentry-sdk[fastapi,httpx,opentelemetry]==2.42.0", ] diff --git a/backend/services/cache.py b/backend/services/cache.py new file mode 100644 index 00000000..5627086e --- /dev/null +++ b/backend/services/cache.py @@ -0,0 +1,67 @@ +"""Simple cache helper that prefers Redis but falls back to an in-memory store.""" + +from __future__ import annotations + +import json +import os +import threading +import time +from typing import Any, Dict, Optional + +try: # pragma: no cover - optional dependency + import redis # type: ignore +except Exception: # pragma: no cover - optional dependency + redis = None # type: ignore[assignment] + + +class RedisCache: + """Tiny wrapper that abstracts Redis availability.""" + + def __init__(self, url: str | None = None, *, namespace: str = "hygraph") -> None: + self._url = url or os.getenv("HYGRAPH_CACHE_URL") or os.getenv("REDIS_URL") + self._namespace = namespace + self._lock = threading.Lock() + self._memory: Dict[str, tuple[str, Optional[float]]] = {} + if redis is not None and self._url: + self._client = redis.Redis.from_url(self._url, decode_responses=True) + else: # pragma: no cover - exercised implicitly when redis missing + self._client = None + + def _ns(self, key: str) -> str: + return f"{self._namespace}:{key}" + + def get(self, key: str) -> Any | None: + namespaced = self._ns(key) + if self._client is not None: + raw = self._client.get(namespaced) + return json.loads(raw) if raw else None + with self._lock: + value = self._memory.get(namespaced) + if value is None: + return None + raw, expires_at = value + if expires_at is not None and expires_at < time.time(): + del self._memory[namespaced] + return None + return json.loads(raw) + + def set(self, key: str, value: Any, *, ttl: int | None = None) -> None: + payload = json.dumps(value) + namespaced = self._ns(key) + if self._client is not None: + if ttl is not None: + self._client.setex(namespaced, ttl, payload) + else: + self._client.set(namespaced, payload) + return + expires_at = (time.time() + ttl) if ttl else None + with self._lock: + self._memory[namespaced] = (payload, expires_at) + + def clear(self, key: str) -> None: + namespaced = self._ns(key) + if self._client is not None: + self._client.delete(namespaced) + return + with self._lock: + self._memory.pop(namespaced, None) diff --git a/backend/services/circuit_breaker.py b/backend/services/circuit_breaker.py new file mode 100644 index 00000000..d9b17d53 --- /dev/null +++ b/backend/services/circuit_breaker.py @@ -0,0 +1,84 @@ +"""Circuit breaker utility for wrapping outbound service calls.""" + +from __future__ import annotations + +import time +from dataclasses import dataclass, field +from typing import Any, Callable, TypeVar + + +T = TypeVar("T") + + +class CircuitOpenError(RuntimeError): + """Raised when the circuit breaker refuses to execute a call.""" + + def __init__(self, message: str = "Circuit breaker is open", *, fallback: Any | None = None) -> None: + super().__init__(message) + self.fallback = fallback + + +@dataclass +class CircuitBreaker: + """Simple circuit breaker implementation with configurable thresholds.""" + + failure_threshold: int = 5 + recovery_timeout: float = 30.0 + success_threshold: int = 1 + state: str = field(default="CLOSED", init=False) + failure_count: int = field(default=0, init=False) + success_count: int = field(default=0, init=False) + last_failure_time: float | None = field(default=None, init=False) + + CLOSED: str = field(default="CLOSED", init=False) + OPEN: str = field(default="OPEN", init=False) + HALF_OPEN: str = field(default="HALF_OPEN", init=False) + + def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T: + """Execute ``func`` respecting the breaker state machine.""" + + now = time.monotonic() + if self.state == self.OPEN: + if self.last_failure_time is None or (now - self.last_failure_time) < self.recovery_timeout: + raise CircuitOpenError() + self.state = self.HALF_OPEN + self.success_count = 0 + + try: + result = func(*args, **kwargs) + except Exception: # noqa: BLE001 + self._record_failure() + raise + + self._record_success() + return result + + def _record_failure(self) -> None: + now = time.monotonic() + if self.state == self.HALF_OPEN: + self._trip(now) + return + + self.failure_count += 1 + if self.failure_count >= self.failure_threshold: + self._trip(now) + + def _record_success(self) -> None: + if self.state == self.HALF_OPEN: + self.success_count += 1 + if self.success_count >= self.success_threshold: + self._reset() + else: + self._reset() + + def _trip(self, when: float) -> None: + self.state = self.OPEN + self.last_failure_time = when + self.failure_count = 0 + self.success_count = 0 + + def _reset(self) -> None: + self.state = self.CLOSED + self.failure_count = 0 + self.success_count = 0 + self.last_failure_time = None diff --git a/backend/services/hygraph_client.py b/backend/services/hygraph_client.py new file mode 100644 index 00000000..5abfa837 --- /dev/null +++ b/backend/services/hygraph_client.py @@ -0,0 +1,69 @@ +"""Hygraph GraphQL client with circuit breaker protection.""" + +from __future__ import annotations + +from typing import Any, Callable, Dict, Optional + +import httpx + +from services.circuit_breaker import CircuitBreaker, CircuitOpenError + + +class HygraphClient: + """HTTPX-based GraphQL client guarded by a circuit breaker.""" + + def __init__( + self, + *, + endpoint: str, + token: str | None = None, + timeout: float = 6.0, + breaker: CircuitBreaker | None = None, + client: httpx.Client | None = None, + ) -> None: + self._endpoint = endpoint + self._token = token or "" + self._breaker = breaker or CircuitBreaker() + self._client = client or httpx.Client(timeout=timeout) + + def close(self) -> None: + self._client.close() + + def execute( + self, + query: str, + *, + variables: Optional[Dict[str, Any]] = None, + fallback: Callable[[], Any] | None = None, + ) -> Dict[str, Any]: + """Execute a GraphQL query and return the ``data`` payload.""" + + def _do_request() -> Dict[str, Any]: + payload = {"query": query, "variables": variables or {}} + headers = {"Content-Type": "application/json"} + if self._token: + headers["Authorization"] = f"Bearer {self._token}" + response = self._client.post(self._endpoint, json=payload, headers=headers) + response.raise_for_status() + body = response.json() + if isinstance(body, dict) and body.get("errors"): + raise RuntimeError("Hygraph GraphQL responded with errors") + data = body.get("data") if isinstance(body, dict) else None + if not isinstance(data, dict): + raise RuntimeError("Hygraph GraphQL response missing data") + return data + + try: + return self._breaker.call(_do_request) + except CircuitOpenError as exc: + fallback_payload = None + if fallback is not None: + try: + fallback_payload = fallback() + except Exception: # noqa: BLE001 + fallback_payload = None + raise CircuitOpenError(str(exc), fallback=fallback_payload) from exc + + @property + def breaker(self) -> CircuitBreaker: + return self._breaker diff --git a/backend/services/hygraph_service.py b/backend/services/hygraph_service.py index e4c7baba..505c0c7e 100644 --- a/backend/services/hygraph_service.py +++ b/backend/services/hygraph_service.py @@ -1,21 +1,41 @@ -"""Hygraph sync service with signature validation and idempotency (H4).""" +"""Hygraph sync service with signature validation, caching, and circuit breaker support.""" from __future__ import annotations -import hmac +import asyncio import hashlib +import hmac import time from typing import Any, Dict, Optional +from api.config import get_settings +from services.cache import RedisCache +from services.circuit_breaker import CircuitOpenError +from services.hygraph_client import HygraphClient + +_SETTINGS = get_settings() +_CLIENT = HygraphClient(endpoint=_SETTINGS.hygraph_endpoint, token=_SETTINGS.hygraph_token) +_CACHE = RedisCache(namespace="hygraph") +_CACHE_TTL_SECONDS = 300 +_DEFAULT_PAGE_SIZE = 50 +_MAX_PAGE_SIZE = 200 + + +def _fallback_envelope(resource: str, items: list[dict[str, Any]] | list[Any]) -> Dict[str, Any]: + return { + "resource": resource, + "items": items, + "processed": len(items), + "cached": True, + } + class HygraphService: def __init__(self, webhook_secret: str | None = None) -> None: self._secret = webhook_secret or "" - # naive idempotency memory store; replace with persistent store in prod self._seen_events: set[str] = set() def verify_signature(self, payload: bytes, signature_header: str) -> bool: - """Validate HMAC-SHA256 signature from Hygraph webhook.""" if not self._secret: return False expected = hmac.new(self._secret.encode("utf-8"), payload, hashlib.sha256).hexdigest() @@ -32,49 +52,143 @@ def is_idempotent(self, event_id: str) -> bool: return True def sync(self, event: Dict[str, Any]) -> Dict[str, Any]: - """Process incoming Hygraph event. - - MVP: returns a structured acknowledgement. Real sync logic will fetch - content via GraphQL and upsert into local DB. - """ return { "status": "accepted", "received_at": int(time.time()), "event": event, } - @staticmethod - async def pull_materials(db, page_size: Optional[int] = None) -> Dict[str, int]: - # Placeholder; tests will monkeypatch this - return {"processed": 0} - - @staticmethod - async def pull_modules(db, page_size: Optional[int] = None) -> Dict[str, int]: - # Placeholder; tests will monkeypatch this - return {"processed": 0} - - @staticmethod - async def pull_systems(db, page_size: Optional[int] = None) -> Dict[str, int]: - # Placeholder; tests will monkeypatch this - return {"processed": 0} - - @staticmethod - async def pull_all(db, page_size: Optional[int] = None) -> Dict[str, int]: - # Placeholder; tests will monkeypatch this - return {"materials": 0, "modules": 0, "systems": 0} - @staticmethod def _hygraph_primary_expr(session, json_column) -> Any: - """Return a SQL expression extracting hygraph_primary from JSON. - - Works for SQLite (json_extract) and PostgreSQL (->> operator). - """ from sqlalchemy import func + bind = session.get_bind() dialect = getattr(bind, "dialect", None) name = getattr(dialect, "name", "") if dialect else "" if name == "postgresql": - # json_column ->> 'hygraph_primary' return json_column["hygraph_primary"].astext - # SQLite/others - return func.json_extract(json_column, "$.hygraph_primary") \ No newline at end of file + return func.json_extract(json_column, "$.hygraph_primary") + + @classmethod + def _normalize_page_size(cls, page_size: Optional[int]) -> int: + if page_size is None: + return _DEFAULT_PAGE_SIZE + return max(1, min(int(page_size), _MAX_PAGE_SIZE)) + + @classmethod + async def _pull_resource( + cls, + resource: str, + query: str, + *, + page_size: Optional[int] = None, + ) -> Dict[str, Any]: + limit = cls._normalize_page_size(page_size) + cached_items = _CACHE.get(resource) or [] + fallback_payload: Dict[str, Any] | None = None + + def _fallback() -> Dict[str, Any] | None: + nonlocal fallback_payload + if fallback_payload is None: + if cached_items: + fallback_payload = _fallback_envelope(resource, cached_items) + else: + fallback_payload = None + return fallback_payload + + try: + data = await asyncio.to_thread( + _CLIENT.execute, + query, + variables={"first": limit}, + fallback=_fallback, + ) + except CircuitOpenError as err: + payload = err.fallback if err.fallback is not None else _fallback() + raise CircuitOpenError(str(err), fallback=payload) from err + + items = data.get(resource) if isinstance(data, dict) else None + if isinstance(items, dict): + if isinstance(items.get("nodes"), list): + items = items["nodes"] + elif isinstance(items.get("edges"), list): + edge_nodes = [] + for edge in items["edges"]: + if isinstance(edge, dict) and isinstance(edge.get("node"), dict): + edge_nodes.append(edge["node"]) + items = edge_nodes + else: + items = [] + if not isinstance(items, list): + items = [] + _CACHE.set(resource, items, ttl=_CACHE_TTL_SECONDS) + return { + "resource": resource, + "items": items, + "processed": len(items), + "cached": False, + } + + @classmethod + async def pull_materials(cls, db, page_size: Optional[int] = None) -> Dict[str, Any]: + query = """ + query Materials($first: Int!) { + materials(first: $first) { + id + updatedAt + } + } + """ + return await cls._pull_resource("materials", query, page_size=page_size) + + @classmethod + async def pull_modules(cls, db, page_size: Optional[int] = None) -> Dict[str, Any]: + query = """ + query Modules($first: Int!) { + modules(first: $first) { + id + updatedAt + } + } + """ + return await cls._pull_resource("modules", query, page_size=page_size) + + @classmethod + async def pull_systems(cls, db, page_size: Optional[int] = None) -> Dict[str, Any]: + query = """ + query Systems($first: Int!) { + systems(first: $first) { + id + updatedAt + } + } + """ + return await cls._pull_resource("systems", query, page_size=page_size) + + @classmethod + async def pull_all(cls, db, page_size: Optional[int] = None) -> Dict[str, Any]: + try: + materials = await cls.pull_materials(db, page_size=page_size) + modules = await cls.pull_modules(db, page_size=page_size) + systems = await cls.pull_systems(db, page_size=page_size) + except CircuitOpenError as err: + fallback_raw = err.fallback + if isinstance(fallback_raw, dict) and "resource" in fallback_raw: + fallback = {fallback_raw["resource"]: fallback_raw} + elif isinstance(fallback_raw, dict): + fallback = fallback_raw + else: + fallback = {} + if not fallback: + fallback = {} + for key in ("materials", "modules", "systems"): + cached = _CACHE.get(key) or [] + if cached: + fallback[key] = _fallback_envelope(key, cached) + raise CircuitOpenError("Hygraph circuit open", fallback=fallback) from err + + return { + "materials": materials["processed"], + "modules": modules["processed"], + "systems": systems["processed"], + } diff --git a/docs/backend-development.md b/docs/backend-development.md index 0592502c..87d1ccb2 100644 --- a/docs/backend-development.md +++ b/docs/backend-development.md @@ -173,3 +173,24 @@ FastAPI automatically generates interactive API documentation. When the server i - Swagger UI: http://localhost:8000/docs - ReDoc: http://localhost:8000/redoc + +## Hygraph Circuit Breaker Runbook + +The Hygraph service layer now caches payloads in Redis (or an in-memory fallback) and guards outbound GraphQL traffic with a circuit breaker. Use the following checklist when operating the integration: + +### Warming the Cache + +1. Ensure the Redis instance defined by `HYGRAPH_CACHE_URL`/`REDIS_URL` is reachable from the backend container. +2. Temporarily disable the circuit breaker by resetting it so fresh responses flow from Hygraph. +3. Trigger each sync endpoint with a small page size to populate cached datasets: + - `POST /api/sync/hygraph/pull` with `{"type": "materials"}` + - `POST /api/sync/hygraph/pull` with `{"type": "modules"}` + - `POST /api/sync/hygraph/pull` with `{"type": "systems"}` +4. Confirm the Redis keys `hygraph:materials`, `hygraph:modules`, and `hygraph:systems` contain JSON payloads with the expected counts. + +### Exercising Breaker States + +1. **Closed → Open**: Stop outbound connectivity (e.g., block the Hygraph host with `iptables` or override DNS). Issue a pull request; after `failure_threshold` attempts (default 5) the breaker opens and the Prometheus counter `hygraph_cb_trips_total` increments. +2. **Open → Fallback**: While the breaker is open, hit the same endpoint again. The API should return cached results with `breaker_open=true` and increment `hygraph_cb_fallback_total`. +3. **Half-Open → Closed**: Restore connectivity and wait `recovery_timeout` seconds (default 30). The next request is allowed through; if it succeeds, the breaker closes and live data refreshes the cache. +4. Monitor `/metrics` to verify the counters and ensure `sync_failure_total` remains flat when serving cached responses. From 8f09a212e3958dc78df94594765373fe961672ab Mon Sep 17 00:00:00 2001 From: Shayan Date: Thu, 16 Oct 2025 12:07:26 -0700 Subject: [PATCH 2/2] Fix fallback handling and cache resiliency --- backend/services/cache.py | 36 +++++++++++++++++++------ backend/services/hygraph_service.py | 41 ++++++++++++++++++++++------- 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/backend/services/cache.py b/backend/services/cache.py index 5627086e..e261f8ff 100644 --- a/backend/services/cache.py +++ b/backend/services/cache.py @@ -13,6 +13,14 @@ except Exception: # pragma: no cover - optional dependency redis = None # type: ignore[assignment] +if redis is not None: # pragma: no cover - runtime behaviour depends on redis + RedisError = redis.exceptions.RedisError # type: ignore[attr-defined] +else: # pragma: no cover - exercised implicitly when redis missing + class RedisError(Exception): + """Fallback Redis error type when redis-py is unavailable.""" + + pass + class RedisCache: """Tiny wrapper that abstracts Redis availability.""" @@ -33,8 +41,12 @@ def _ns(self, key: str) -> str: def get(self, key: str) -> Any | None: namespaced = self._ns(key) if self._client is not None: - raw = self._client.get(namespaced) - return json.loads(raw) if raw else None + try: + raw = self._client.get(namespaced) + except RedisError: + self._client = None + else: + return json.loads(raw) if raw else None with self._lock: value = self._memory.get(namespaced) if value is None: @@ -49,11 +61,15 @@ def set(self, key: str, value: Any, *, ttl: int | None = None) -> None: payload = json.dumps(value) namespaced = self._ns(key) if self._client is not None: - if ttl is not None: - self._client.setex(namespaced, ttl, payload) + try: + if ttl is not None: + self._client.setex(namespaced, ttl, payload) + else: + self._client.set(namespaced, payload) + except RedisError: + self._client = None else: - self._client.set(namespaced, payload) - return + return expires_at = (time.time() + ttl) if ttl else None with self._lock: self._memory[namespaced] = (payload, expires_at) @@ -61,7 +77,11 @@ def set(self, key: str, value: Any, *, ttl: int | None = None) -> None: def clear(self, key: str) -> None: namespaced = self._ns(key) if self._client is not None: - self._client.delete(namespaced) - return + try: + self._client.delete(namespaced) + except RedisError: + self._client = None + else: + return with self._lock: self._memory.pop(namespaced, None) diff --git a/backend/services/hygraph_service.py b/backend/services/hygraph_service.py index 505c0c7e..27b2e452 100644 --- a/backend/services/hygraph_service.py +++ b/backend/services/hygraph_service.py @@ -173,18 +173,39 @@ async def pull_all(cls, db, page_size: Optional[int] = None) -> Dict[str, Any]: systems = await cls.pull_systems(db, page_size=page_size) except CircuitOpenError as err: fallback_raw = err.fallback + fallback: dict[str, dict[str, Any]] = {} if isinstance(fallback_raw, dict) and "resource" in fallback_raw: - fallback = {fallback_raw["resource"]: fallback_raw} + resource_name = fallback_raw.get("resource") + if isinstance(resource_name, str): + fallback[resource_name] = fallback_raw elif isinstance(fallback_raw, dict): - fallback = fallback_raw - else: - fallback = {} - if not fallback: - fallback = {} - for key in ("materials", "modules", "systems"): - cached = _CACHE.get(key) or [] - if cached: - fallback[key] = _fallback_envelope(key, cached) + for key, value in fallback_raw.items(): + if isinstance(key, str) and isinstance(value, dict): + fallback[key] = value + + successful_results = { + "materials": locals().get("materials"), + "modules": locals().get("modules"), + "systems": locals().get("systems"), + } + + for key in ("materials", "modules", "systems"): + if key in fallback and isinstance(fallback[key], dict): + continue + + result = successful_results.get(key) + if isinstance(result, dict): + items = result.get("items") + if not isinstance(items, list): + items = [] + fallback[key] = _fallback_envelope(key, items) + continue + + cached_items = _CACHE.get(key) or [] + if not isinstance(cached_items, list): + cached_items = [] + fallback[key] = _fallback_envelope(key, cached_items) + raise CircuitOpenError("Hygraph circuit open", fallback=fallback) from err return {