Skip to content

Commit 21d175c

Browse files
authored
Add Hygraph circuit breaker client and caching (#108)
* Add Hygraph circuit breaker client and caching * Handle half-open failures with fallback
1 parent 0a22cab commit 21d175c

File tree

6 files changed

+461
-31
lines changed

6 files changed

+461
-31
lines changed

backend/api/routes_sync.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from opentelemetry import trace
3333
from opentelemetry.trace import Status, StatusCode
3434
from services.hygraph_service import HygraphService
35+
from services.circuit_breaker import CircuitOpenError
3536

3637
logger = logging.getLogger(__name__)
3738
router = APIRouter(prefix="/api/sync", tags=["sync"])
@@ -239,6 +240,16 @@ async def hygraph_pull(
239240
raise HTTPException(status_code=400, detail=_error_envelope("BAD_REQUEST", "unsupported type"))
240241
except HTTPException:
241242
raise
243+
except CircuitOpenError as exc:
244+
hygraph_cb_trips_total.inc()
245+
fallback_result = exc.fallback_result
246+
if fallback_result is not None:
247+
hygraph_cb_fallback_total.inc()
248+
return {"ok": True, "data": fallback_result, "cached": True}
249+
raise HTTPException(
250+
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
251+
detail=_error_envelope("SERVICE_UNAVAILABLE", "Hygraph unavailable", {"type": sync_type}),
252+
)
242253
except Exception as e: # noqa: BLE001
243254
record_sync_failure(sync_type or "all")
244255
record_sync_duration(sync_type or "all", (time.perf_counter() - start) * 1000)

backend/services/cache.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""Simple Redis-backed cache helper with in-memory fallback."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
import time
7+
from dataclasses import dataclass, field
8+
from typing import Any, Dict, Optional
9+
10+
try:
11+
from redis import asyncio as redis_asyncio
12+
except Exception: # pragma: no cover - optional dependency guard
13+
redis_asyncio = None # type: ignore[assignment]
14+
15+
16+
@dataclass
17+
class RedisCache:
18+
"""Minimal JSON cache built on top of Redis with TTL support."""
19+
20+
url: str
21+
namespace: str = "hygraph"
22+
use_memory_fallback: bool = True
23+
_client: Optional["redis_asyncio.Redis"] = field(default=None, init=False, repr=False)
24+
_memory_store: Dict[str, tuple[Any, Optional[float]]] = field(default_factory=dict, init=False, repr=False)
25+
26+
def __post_init__(self) -> None:
27+
if redis_asyncio is not None:
28+
try:
29+
self._client = redis_asyncio.from_url(self.url, decode_responses=True)
30+
except Exception:
31+
self._client = None
32+
33+
def _prefixed(self, key: str) -> str:
34+
return f"{self.namespace}:{key}" if self.namespace else key
35+
36+
async def get_json(self, key: str) -> Any | None:
37+
full_key = self._prefixed(key)
38+
if self._client is not None:
39+
try:
40+
raw = await self._client.get(full_key)
41+
except Exception:
42+
raw = None
43+
if raw is not None:
44+
try:
45+
return json.loads(raw)
46+
except json.JSONDecodeError:
47+
return None
48+
if not self.use_memory_fallback:
49+
return None
50+
payload = self._memory_store.get(full_key)
51+
if payload is None:
52+
return None
53+
value, expires_at = payload
54+
if expires_at is not None and expires_at < time.monotonic():
55+
self._memory_store.pop(full_key, None)
56+
return None
57+
return value
58+
59+
async def set_json(self, key: str, value: Any, *, ttl: Optional[int] = None) -> None:
60+
full_key = self._prefixed(key)
61+
dumped = json.dumps(value)
62+
if self._client is not None:
63+
try:
64+
await self._client.set(full_key, dumped, ex=ttl)
65+
except Exception:
66+
# Fall back to memory cache if Redis write fails.
67+
pass
68+
if not self.use_memory_fallback:
69+
return
70+
expires_at = None
71+
if ttl is not None:
72+
expires_at = time.monotonic() + ttl
73+
self._memory_store[full_key] = (value, expires_at)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Simple circuit breaker implementation for Hygraph integrations."""
2+
3+
from __future__ import annotations
4+
5+
import time
6+
from dataclasses import dataclass, field
7+
from typing import Any, Callable, Optional, TypeVar
8+
9+
10+
T = TypeVar("T")
11+
12+
13+
class CircuitOpenError(RuntimeError):
14+
"""Raised when the circuit breaker is OPEN and calls are blocked."""
15+
16+
def __init__(self, message: str, fallback_result: Any | None = None) -> None:
17+
super().__init__(message)
18+
self.fallback_result = fallback_result
19+
20+
21+
@dataclass
22+
class CircuitBreaker:
23+
"""Dataclass based circuit breaker with simple HALF_OPEN support."""
24+
25+
failure_threshold: int = 5
26+
recovery_timeout: float = 30.0
27+
half_open_success_threshold: int = 1
28+
time_provider: Callable[[], float] = time.monotonic
29+
state: str = field(default="CLOSED", init=False)
30+
failure_count: int = field(default=0, init=False)
31+
success_count: int = field(default=0, init=False)
32+
opened_at: Optional[float] = field(default=None, init=False)
33+
34+
CLOSED: str = "CLOSED"
35+
OPEN: str = "OPEN"
36+
HALF_OPEN: str = "HALF_OPEN"
37+
38+
def _transition_to_open(self) -> None:
39+
self.state = self.OPEN
40+
self.opened_at = self.time_provider()
41+
self.failure_count = 0
42+
self.success_count = 0
43+
44+
def _transition_to_half_open(self) -> None:
45+
self.state = self.HALF_OPEN
46+
self.opened_at = None
47+
self.failure_count = 0
48+
self.success_count = 0
49+
50+
def _transition_to_closed(self) -> None:
51+
self.state = self.CLOSED
52+
self.opened_at = None
53+
self.failure_count = 0
54+
self.success_count = 0
55+
56+
def _ready_for_half_open(self) -> bool:
57+
if self.opened_at is None:
58+
return False
59+
return (self.time_provider() - self.opened_at) >= self.recovery_timeout
60+
61+
def _record_failure(self) -> None:
62+
if self.state == self.HALF_OPEN:
63+
self._transition_to_open()
64+
return
65+
self.failure_count += 1
66+
if self.failure_count >= self.failure_threshold:
67+
self._transition_to_open()
68+
69+
def _record_success(self) -> None:
70+
if self.state == self.HALF_OPEN:
71+
self.success_count += 1
72+
if self.success_count >= self.half_open_success_threshold:
73+
self._transition_to_closed()
74+
return
75+
elif self.state == self.OPEN:
76+
# Should not happen, but close proactively.
77+
self._transition_to_closed()
78+
else:
79+
self.failure_count = 0
80+
81+
def call(
82+
self,
83+
func: Callable[..., T],
84+
*args: Any,
85+
fallback: Optional[Callable[[], T]] = None,
86+
**kwargs: Any,
87+
) -> T:
88+
"""Execute ``func`` guarding access through the circuit breaker."""
89+
90+
if self.state == self.OPEN:
91+
if self._ready_for_half_open():
92+
self._transition_to_half_open()
93+
else:
94+
fallback_result: T | None = fallback() if fallback else None
95+
raise CircuitOpenError("Circuit breaker is open", fallback_result)
96+
97+
try:
98+
result = func(*args, **kwargs)
99+
except Exception as exc:
100+
was_half_open = self.state == self.HALF_OPEN
101+
self._record_failure()
102+
if was_half_open:
103+
fallback_result: T | None = fallback() if fallback else None
104+
raise CircuitOpenError("Circuit breaker is open", fallback_result) from exc
105+
raise
106+
else:
107+
self._record_success()
108+
return result

backend/services/hygraph_client.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
"""Hygraph GraphQL client with circuit breaker protection."""
2+
3+
from __future__ import annotations
4+
5+
import httpx
6+
from typing import Any, Callable, Dict, Optional
7+
8+
from services.circuit_breaker import CircuitBreaker, CircuitOpenError
9+
10+
11+
class HygraphGraphQLError(RuntimeError):
12+
"""Raised when Hygraph returns GraphQL errors."""
13+
14+
def __init__(self, errors: Any) -> None:
15+
super().__init__("Hygraph GraphQL error")
16+
self.errors = errors
17+
18+
19+
class HygraphClient:
20+
"""HTTPX based client for Hygraph GraphQL requests."""
21+
22+
def __init__(
23+
self,
24+
endpoint: str,
25+
token: str,
26+
*,
27+
timeout: float = 6.0,
28+
breaker: Optional[CircuitBreaker] = None,
29+
client: Optional[httpx.Client] = None,
30+
) -> None:
31+
if not endpoint:
32+
raise ValueError("Hygraph endpoint must be configured")
33+
self._endpoint = endpoint
34+
self._token = token
35+
self._client = client or httpx.Client(timeout=timeout)
36+
self._breaker = breaker or CircuitBreaker()
37+
self._client.headers.setdefault("Content-Type", "application/json")
38+
self._client.headers.setdefault("Accept", "application/json")
39+
if token:
40+
self._client.headers.setdefault("Authorization", f"Bearer {token}")
41+
42+
@property
43+
def breaker(self) -> CircuitBreaker:
44+
return self._breaker
45+
46+
def execute(
47+
self,
48+
query: str,
49+
variables: Optional[Dict[str, Any]] = None,
50+
*,
51+
fallback: Optional[Callable[[], Any]] = None,
52+
) -> Dict[str, Any]:
53+
"""Run the GraphQL ``query`` through the circuit breaker."""
54+
55+
def _request() -> Dict[str, Any]:
56+
response = self._client.post(
57+
self._endpoint,
58+
json={"query": query, "variables": variables or {}},
59+
)
60+
response.raise_for_status()
61+
payload = response.json()
62+
errors = payload.get("errors")
63+
if errors:
64+
raise HygraphGraphQLError(errors)
65+
data = payload.get("data")
66+
if not isinstance(data, dict):
67+
return {}
68+
return data
69+
70+
try:
71+
return self._breaker.call(_request, fallback=fallback)
72+
except CircuitOpenError:
73+
# Re-raise so callers can handle fallback semantics.
74+
raise
75+
76+
def close(self) -> None:
77+
self._client.close()

0 commit comments

Comments
 (0)