diff --git a/backend/api/routes_sync.py b/backend/api/routes_sync.py index 07309c89..e2f37709 100644 --- a/backend/api/routes_sync.py +++ b/backend/api/routes_sync.py @@ -32,6 +32,7 @@ from opentelemetry import trace from opentelemetry.trace import Status, StatusCode from services.hygraph_service import HygraphService +from services.circuit_breaker import CircuitOpenError logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/sync", tags=["sync"]) @@ -239,6 +240,16 @@ async def hygraph_pull( raise HTTPException(status_code=400, detail=_error_envelope("BAD_REQUEST", "unsupported type")) except HTTPException: raise + except CircuitOpenError as exc: + hygraph_cb_trips_total.inc() + fallback_result = exc.fallback_result + if fallback_result is not None: + hygraph_cb_fallback_total.inc() + return {"ok": True, "data": fallback_result, "cached": True} + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=_error_envelope("SERVICE_UNAVAILABLE", "Hygraph unavailable", {"type": sync_type}), + ) except Exception as e: # noqa: BLE001 record_sync_failure(sync_type or "all") record_sync_duration(sync_type or "all", (time.perf_counter() - start) * 1000) diff --git a/backend/services/cache.py b/backend/services/cache.py new file mode 100644 index 00000000..e4ac06b4 --- /dev/null +++ b/backend/services/cache.py @@ -0,0 +1,73 @@ +"""Simple Redis-backed cache helper with in-memory fallback.""" + +from __future__ import annotations + +import json +import time +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +try: + from redis import asyncio as redis_asyncio +except Exception: # pragma: no cover - optional dependency guard + redis_asyncio = None # type: ignore[assignment] + + +@dataclass +class RedisCache: + """Minimal JSON cache built on top of Redis with TTL support.""" + + url: str + namespace: str = "hygraph" + use_memory_fallback: bool = True + _client: Optional["redis_asyncio.Redis"] = field(default=None, init=False, repr=False) + _memory_store: Dict[str, tuple[Any, Optional[float]]] = field(default_factory=dict, init=False, repr=False) + + def __post_init__(self) -> None: + if redis_asyncio is not None: + try: + self._client = redis_asyncio.from_url(self.url, decode_responses=True) + except Exception: + self._client = None + + def _prefixed(self, key: str) -> str: + return f"{self.namespace}:{key}" if self.namespace else key + + async def get_json(self, key: str) -> Any | None: + full_key = self._prefixed(key) + if self._client is not None: + try: + raw = await self._client.get(full_key) + except Exception: + raw = None + if raw is not None: + try: + return json.loads(raw) + except json.JSONDecodeError: + return None + if not self.use_memory_fallback: + return None + payload = self._memory_store.get(full_key) + if payload is None: + return None + value, expires_at = payload + if expires_at is not None and expires_at < time.monotonic(): + self._memory_store.pop(full_key, None) + return None + return value + + async def set_json(self, key: str, value: Any, *, ttl: Optional[int] = None) -> None: + full_key = self._prefixed(key) + dumped = json.dumps(value) + if self._client is not None: + try: + await self._client.set(full_key, dumped, ex=ttl) + except Exception: + # Fall back to memory cache if Redis write fails. + pass + if not self.use_memory_fallback: + return + expires_at = None + if ttl is not None: + expires_at = time.monotonic() + ttl + self._memory_store[full_key] = (value, expires_at) diff --git a/backend/services/circuit_breaker.py b/backend/services/circuit_breaker.py new file mode 100644 index 00000000..a8c5b042 --- /dev/null +++ b/backend/services/circuit_breaker.py @@ -0,0 +1,108 @@ +"""Simple circuit breaker implementation for Hygraph integrations.""" + +from __future__ import annotations + +import time +from dataclasses import dataclass, field +from typing import Any, Callable, Optional, TypeVar + + +T = TypeVar("T") + + +class CircuitOpenError(RuntimeError): + """Raised when the circuit breaker is OPEN and calls are blocked.""" + + def __init__(self, message: str, fallback_result: Any | None = None) -> None: + super().__init__(message) + self.fallback_result = fallback_result + + +@dataclass +class CircuitBreaker: + """Dataclass based circuit breaker with simple HALF_OPEN support.""" + + failure_threshold: int = 5 + recovery_timeout: float = 30.0 + half_open_success_threshold: int = 1 + time_provider: Callable[[], float] = time.monotonic + state: str = field(default="CLOSED", init=False) + failure_count: int = field(default=0, init=False) + success_count: int = field(default=0, init=False) + opened_at: Optional[float] = field(default=None, init=False) + + CLOSED: str = "CLOSED" + OPEN: str = "OPEN" + HALF_OPEN: str = "HALF_OPEN" + + def _transition_to_open(self) -> None: + self.state = self.OPEN + self.opened_at = self.time_provider() + self.failure_count = 0 + self.success_count = 0 + + def _transition_to_half_open(self) -> None: + self.state = self.HALF_OPEN + self.opened_at = None + self.failure_count = 0 + self.success_count = 0 + + def _transition_to_closed(self) -> None: + self.state = self.CLOSED + self.opened_at = None + self.failure_count = 0 + self.success_count = 0 + + def _ready_for_half_open(self) -> bool: + if self.opened_at is None: + return False + return (self.time_provider() - self.opened_at) >= self.recovery_timeout + + def _record_failure(self) -> None: + if self.state == self.HALF_OPEN: + self._transition_to_open() + return + self.failure_count += 1 + if self.failure_count >= self.failure_threshold: + self._transition_to_open() + + def _record_success(self) -> None: + if self.state == self.HALF_OPEN: + self.success_count += 1 + if self.success_count >= self.half_open_success_threshold: + self._transition_to_closed() + return + elif self.state == self.OPEN: + # Should not happen, but close proactively. + self._transition_to_closed() + else: + self.failure_count = 0 + + def call( + self, + func: Callable[..., T], + *args: Any, + fallback: Optional[Callable[[], T]] = None, + **kwargs: Any, + ) -> T: + """Execute ``func`` guarding access through the circuit breaker.""" + + if self.state == self.OPEN: + if self._ready_for_half_open(): + self._transition_to_half_open() + else: + fallback_result: T | None = fallback() if fallback else None + raise CircuitOpenError("Circuit breaker is open", fallback_result) + + try: + result = func(*args, **kwargs) + except Exception as exc: + was_half_open = self.state == self.HALF_OPEN + self._record_failure() + if was_half_open: + fallback_result: T | None = fallback() if fallback else None + raise CircuitOpenError("Circuit breaker is open", fallback_result) from exc + raise + else: + self._record_success() + return result diff --git a/backend/services/hygraph_client.py b/backend/services/hygraph_client.py new file mode 100644 index 00000000..6c55d4d4 --- /dev/null +++ b/backend/services/hygraph_client.py @@ -0,0 +1,77 @@ +"""Hygraph GraphQL client with circuit breaker protection.""" + +from __future__ import annotations + +import httpx +from typing import Any, Callable, Dict, Optional + +from services.circuit_breaker import CircuitBreaker, CircuitOpenError + + +class HygraphGraphQLError(RuntimeError): + """Raised when Hygraph returns GraphQL errors.""" + + def __init__(self, errors: Any) -> None: + super().__init__("Hygraph GraphQL error") + self.errors = errors + + +class HygraphClient: + """HTTPX based client for Hygraph GraphQL requests.""" + + def __init__( + self, + endpoint: str, + token: str, + *, + timeout: float = 6.0, + breaker: Optional[CircuitBreaker] = None, + client: Optional[httpx.Client] = None, + ) -> None: + if not endpoint: + raise ValueError("Hygraph endpoint must be configured") + self._endpoint = endpoint + self._token = token + self._client = client or httpx.Client(timeout=timeout) + self._breaker = breaker or CircuitBreaker() + self._client.headers.setdefault("Content-Type", "application/json") + self._client.headers.setdefault("Accept", "application/json") + if token: + self._client.headers.setdefault("Authorization", f"Bearer {token}") + + @property + def breaker(self) -> CircuitBreaker: + return self._breaker + + def execute( + self, + query: str, + variables: Optional[Dict[str, Any]] = None, + *, + fallback: Optional[Callable[[], Any]] = None, + ) -> Dict[str, Any]: + """Run the GraphQL ``query`` through the circuit breaker.""" + + def _request() -> Dict[str, Any]: + response = self._client.post( + self._endpoint, + json={"query": query, "variables": variables or {}}, + ) + response.raise_for_status() + payload = response.json() + errors = payload.get("errors") + if errors: + raise HygraphGraphQLError(errors) + data = payload.get("data") + if not isinstance(data, dict): + return {} + return data + + try: + return self._breaker.call(_request, fallback=fallback) + except CircuitOpenError: + # Re-raise so callers can handle fallback semantics. + raise + + def close(self) -> None: + self._client.close() diff --git a/backend/services/hygraph_service.py b/backend/services/hygraph_service.py index e4c7baba..cbe7091e 100644 --- a/backend/services/hygraph_service.py +++ b/backend/services/hygraph_service.py @@ -1,21 +1,45 @@ -"""Hygraph sync service with signature validation and idempotency (H4).""" +"""Hygraph sync service with caching and circuit breaker integration.""" from __future__ import annotations -import hmac +import asyncio import hashlib +import hmac import time from typing import Any, Dict, Optional +from api.config import get_settings +from services.cache import RedisCache +from services.circuit_breaker import CircuitOpenError +from services.hygraph_client import HygraphClient, HygraphGraphQLError + + +_settings = get_settings() +_cache = RedisCache(url=getattr(_settings, "redis_url", "redis://localhost:6379/0")) +_client: Optional[HygraphClient] = None + + +def _get_client() -> HygraphClient: + global _client + if _client is None: + if not _settings.hygraph_endpoint: + raise RuntimeError("Hygraph endpoint is not configured") + _client = HygraphClient( + endpoint=_settings.hygraph_endpoint, + token=_settings.hygraph_token, + ) + return _client + class HygraphService: + CACHE_TTL_SECONDS = 300 + MAX_PAGE_SIZE = 200 + def __init__(self, webhook_secret: str | None = None) -> None: self._secret = webhook_secret or "" - # naive idempotency memory store; replace with persistent store in prod self._seen_events: set[str] = set() def verify_signature(self, payload: bytes, signature_header: str) -> bool: - """Validate HMAC-SHA256 signature from Hygraph webhook.""" if not self._secret: return False expected = hmac.new(self._secret.encode("utf-8"), payload, hashlib.sha256).hexdigest() @@ -32,49 +56,136 @@ def is_idempotent(self, event_id: str) -> bool: return True def sync(self, event: Dict[str, Any]) -> Dict[str, Any]: - """Process incoming Hygraph event. - - MVP: returns a structured acknowledgement. Real sync logic will fetch - content via GraphQL and upsert into local DB. - """ return { "status": "accepted", "received_at": int(time.time()), "event": event, } - @staticmethod - async def pull_materials(db, page_size: Optional[int] = None) -> Dict[str, int]: - # Placeholder; tests will monkeypatch this - return {"processed": 0} + @classmethod + def _clamp_page_size(cls, page_size: Optional[int]) -> Optional[int]: + if page_size is None: + return None + return max(1, min(cls.MAX_PAGE_SIZE, page_size)) - @staticmethod - async def pull_modules(db, page_size: Optional[int] = None) -> Dict[str, int]: - # Placeholder; tests will monkeypatch this - return {"processed": 0} + @classmethod + def _cache_key(cls, resource: str, page_size: Optional[int]) -> str: + suffix = f":{page_size}" if page_size else "" + return f"{resource}{suffix}" @staticmethod - async def pull_systems(db, page_size: Optional[int] = None) -> Dict[str, int]: - # Placeholder; tests will monkeypatch this - return {"processed": 0} + def _extract_items(payload: Dict[str, Any], key: str) -> list[Any]: + data = payload.get(key) + if isinstance(data, list): + return [item for item in data if item is not None] + if isinstance(data, dict): + if "nodes" in data and isinstance(data["nodes"], list): + return [item for item in data["nodes"] if item is not None] + if "edges" in data and isinstance(data["edges"], list): + items = [] + for edge in data["edges"]: + if isinstance(edge, dict): + node = edge.get("node") + if node is not None: + items.append(node) + return items + return [] - @staticmethod - async def pull_all(db, page_size: Optional[int] = None) -> Dict[str, int]: - # Placeholder; tests will monkeypatch this - return {"materials": 0, "modules": 0, "systems": 0} + @classmethod + async def _fetch_collection( + cls, + resource: str, + query: str, + *, + page_size: Optional[int] = None, + ) -> Dict[str, Any]: + client = _get_client() + size = cls._clamp_page_size(page_size) + cache_key = cls._cache_key(resource, size) + cached_result = await _cache.get_json(cache_key) - @staticmethod - def _hygraph_primary_expr(session, json_column) -> Any: - """Return a SQL expression extracting hygraph_primary from JSON. + fallback = None + if cached_result is not None: + def _fallback() -> Dict[str, Any]: + copy = dict(cached_result) + copy["from_cache"] = True + return copy - Works for SQLite (json_extract) and PostgreSQL (->> operator). + fallback = _fallback + + try: + data = await asyncio.to_thread( + client.execute, + query, + {"first": size} if size else None, + fallback=fallback, + ) + except CircuitOpenError: + raise + except HygraphGraphQLError: + raise + result = { + "processed": len(cls._extract_items(data, resource)), + "data": data, + "from_cache": False, + } + await _cache.set_json(cache_key, result, ttl=cls.CACHE_TTL_SECONDS) + return result + + @classmethod + async def pull_materials(cls, db, page_size: Optional[int] = None) -> Dict[str, Any]: + query = """ + query Materials($first: Int) { + materials(first: $first) { + id + updatedAt + } + } + """ + return await cls._fetch_collection("materials", query, page_size=page_size) + + @classmethod + async def pull_modules(cls, db, page_size: Optional[int] = None) -> Dict[str, Any]: + query = """ + query Modules($first: Int) { + modules(first: $first) { + id + updatedAt + } + } + """ + return await cls._fetch_collection("modules", query, page_size=page_size) + + @classmethod + async def pull_systems(cls, db, page_size: Optional[int] = None) -> Dict[str, Any]: + query = """ + query Systems($first: Int) { + systems(first: $first) { + id + updatedAt + } + } """ + return await cls._fetch_collection("systems", query, page_size=page_size) + + @classmethod + async def pull_all(cls, db, page_size: Optional[int] = None) -> Dict[str, int]: + materials = await cls.pull_materials(db, page_size=page_size) + modules = await cls.pull_modules(db, page_size=page_size) + systems = await cls.pull_systems(db, page_size=page_size) + return { + "materials": int(materials.get("processed", 0)), + "modules": int(modules.get("processed", 0)), + "systems": int(systems.get("processed", 0)), + } + + @staticmethod + def _hygraph_primary_expr(session, json_column) -> Any: from sqlalchemy import func + bind = session.get_bind() dialect = getattr(bind, "dialect", None) name = getattr(dialect, "name", "") if dialect else "" if name == "postgresql": - # json_column ->> 'hygraph_primary' return json_column["hygraph_primary"].astext - # SQLite/others - return func.json_extract(json_column, "$.hygraph_primary") \ No newline at end of file + return func.json_extract(json_column, "$.hygraph_primary") diff --git a/docs/hygraph-runbook.md b/docs/hygraph-runbook.md new file mode 100644 index 00000000..2579fb66 --- /dev/null +++ b/docs/hygraph-runbook.md @@ -0,0 +1,50 @@ +# Hygraph Circuit Breaker Runbook + +This runbook documents the operational steps for managing the Hygraph integration, including warming the cache, validating breaker behaviour, and observing the related Prometheus metrics. + +## Prerequisites + +* Backend application running with access to Redis (defaults to `redis://localhost:6379/0`). +* Administrative API write token and Hygraph credentials configured via environment variables. +* `curl` (or similar HTTP client) available for manual checks. + +## Warming the Cache + +1. Ensure Redis is reachable: + ```bash + redis-cli -u "${REDIS_URL:-redis://localhost:6379/0}" ping + ``` +2. Trigger a pull for each catalog type so that fresh data is stored in the cache: + ```bash + AUTH_HEADER="Authorization: Bearer ${API_WRITE_TOKEN}" + curl -X POST http://localhost:8000/api/sync/hygraph/pull \ + -H "Content-Type: application/json" \ + -H "$AUTH_HEADER" \ + -d '{"type": "materials"}' + curl -X POST http://localhost:8000/api/sync/hygraph/pull \ + -H "Content-Type: application/json" \ + -H "$AUTH_HEADER" \ + -d '{"type": "modules"}' + curl -X POST http://localhost:8000/api/sync/hygraph/pull \ + -H "Content-Type: application/json" \ + -H "$AUTH_HEADER" \ + -d '{"type": "systems"}' + ``` +3. Verify cached responses by repeating one of the calls while Hygraph is reachable; the payload includes `"from_cache": false` when the live API is used and switches to `true` when the fallback is served. + +## Exercising Breaker States + +1. **Closed → Open transition:** Temporarily simulate Hygraph failure (for example by pointing `HYGRAPH_ENDPOINT` to `http://localhost:9`). Re-run a pull request to accumulate failures. After the configured failure threshold, the response should be a `503 Service Unavailable` and `hygraph_cb_trips_total` will increment. +2. **Open fallback:** With the endpoint still failing, repeat the request once the cache is populated. The API should return the cached payload (`"cached": true` in the response) and increment `hygraph_cb_fallback_total`. +3. **Half-open recovery:** Restore the Hygraph endpoint and wait longer than the breaker timeout (30 seconds by default). The next successful request will close the circuit and return live data (`"from_cache": false`). + +## Observability + +* Scrape `/metrics` to confirm the following counters: + * `hygraph_cb_trips_total` – total number of times the circuit has opened. + * `hygraph_cb_fallback_total` – count of responses served from the cached fallback. + * Existing sync counters (`sync_success_total`, `sync_failure_total`, `sync_records_upserted_total`) continue to track per-type outcomes. + +* Background webhook synchronisation (`POST /api/sync/hygraph`) surfaces identical breaker behaviour in logs and metrics when the admin pull uses the cached fallback. + +Keep the cache warm before scheduled maintenance on Hygraph to ensure the fallback path remains fresh.