Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion backend/api/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,16 @@
"sync_records_upserted_total",
"Records upserted during Hygraph sync",
labelnames=("type",),
)
)

hygraph_cb_trips_total = Counter(
"hygraph_cb_trips_total",
"Hygraph circuit breaker open events",
labelnames=("type",),
)

hygraph_cb_fallback_total = Counter(
"hygraph_cb_fallback_total",
"Hygraph responses served from cache while circuit open",
labelnames=("type",),
)
116 changes: 81 additions & 35 deletions backend/api/routes_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
sync_success_total,
sync_failure_total,
sync_records_upserted_total,
hygraph_cb_trips_total,
hygraph_cb_fallback_total,
)
from services.circuit_breaker import CircuitOpenError
from services.hygraph_service import HygraphService

logger = logging.getLogger(__name__)
Expand All @@ -44,20 +47,22 @@ def get_hygraph_service() -> HygraphService:


def _error_envelope(code: str, message: str, details: Optional[dict] = None) -> Dict[str, Any]:
"""
Builds a standardized error envelope for API responses.

Parameters:
code (str): Machine-readable error code identifying the error.
message (str): Human-readable error message describing the failure.
details (Optional[dict]): Additional contextual information; defaults to an empty dict when not provided.

Returns:
Dict[str, Any]: Dictionary with keys `ok` (False) and `error` containing `code`, `message`, and `details`.
"""
"""Build a standardized error envelope for API responses."""

return {"ok": False, "error": {"code": code, "message": message, "details": details or {}}}


def _processed_value(payload: Any) -> int:
if isinstance(payload, dict):
value = payload.get("processed", 0)
else:
value = payload
try:
return int(value or 0)
except Exception: # noqa: BLE001
return 0


@router.post(
"/hygraph",
status_code=status.HTTP_202_ACCEPTED,
Expand All @@ -68,14 +73,8 @@ async def hygraph_webhook(
background: BackgroundTasks,
db: Session = Depends(get_db),
) -> Dict[str, Any]:
"""
Webhook receiver:
- HMAC validated (dependency)
- Single size guard (2MB) already enforced by dependency; body/raw set on request.state
- DB dedup via SyncEvent(event_id, body_sha256 unique)
- 202 fast-ack with background processing (pull_all)
- Structured JSON log line and Prometheus counters
"""
"""Webhook receiver for Hygraph change notifications."""

start = time.perf_counter()
raw = getattr(request.state, "raw_body", b"")
body_sha = getattr(request.state, "body_sha256", "")
Expand Down Expand Up @@ -104,18 +103,19 @@ async def hygraph_webhook(
return JSONResponse({"ok": True, "dedup": True}, status_code=200)

try:
payload = json.loads(raw) if raw else {}
_ = json.loads(raw) if raw else {}
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail=_error_envelope("BAD_REQUEST", "Invalid JSON payload"))


async def _process(event_id_local: Optional[str], body_sha_local: str) -> None:
t0 = time.perf_counter()
try:
counts = await HygraphService.pull_all(db)
for t, c in counts.items():
sync_records_upserted_total.labels(t).inc(int(c or 0))
processed = _processed_value(c)
sync_records_upserted_total.labels(t).inc(processed)
sync_success_total.labels("all").inc()
# Log at WARNING so caplog sees it without level configuration
logger.warning(
"hygraph_webhook",
extra={
Expand All @@ -125,6 +125,33 @@ async def _process(event_id_local: Optional[str], body_sha_local: str) -> None:
"elapsed_ms": int((time.perf_counter() - t0) * 1000),
},
)
except CircuitOpenError as exc:
hygraph_cb_trips_total.labels("all").inc()
fallback_payload = exc.fallback or {}
if fallback_payload:
hygraph_cb_fallback_total.labels("all").inc()
for t, payload in fallback_payload.items():
processed = _processed_value(payload)
sync_records_upserted_total.labels(t).inc(processed)
logger.warning(
"hygraph_webhook_fallback",
extra={
"event_id": event_id_local,
"dedup": False,
"fallback": fallback_payload,
"elapsed_ms": int((time.perf_counter() - t0) * 1000),
},
)
else:
sync_failure_total.labels("all").inc()
logger.warning(
"hygraph_webhook_circuit_open",
extra={
"event_id": event_id_local,
"dedup": False,
"error": "Circuit breaker open with empty cache",
},
)
except Exception as e: # noqa: BLE001
sync_failure_total.labels("all").inc()
logger.exception(
Expand All @@ -137,7 +164,6 @@ async def _process(event_id_local: Optional[str], body_sha_local: str) -> None:
)

background.add_task(_process, event_id, body_sha)
# Explicit background attachment ensures Starlette runs it before TestClient returns
return JSONResponse({"ok": True, "accepted": True}, status_code=202, background=background)


Expand All @@ -146,12 +172,8 @@ async def hygraph_pull(
body: Dict[str, Any] = Body(...),
db: Session = Depends(get_db),
) -> Dict[str, Any]:
"""
Admin pull:
- Auth via Bearer token (constant-time compare)
- Accepts "type" or "sync_type" + optional "page_size"
- Validates positive page_size and caps inside service (≤200)
"""
"""Admin-triggered Hygraph syncs supporting manual pull types."""

sync_type = str((body.get("type") or body.get("sync_type") or "")).lower().strip()
page_size_raw = body.get("page_size")
page_size: Optional[int] = None
Expand All @@ -163,31 +185,55 @@ async def hygraph_pull(
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail=_error_envelope("BAD_REQUEST", "page_size must be a positive integer"))


try:
if sync_type == "materials":
counts = await HygraphService.pull_materials(db, page_size=page_size)
processed = _processed_value(counts)
sync_success_total.labels("materials").inc()
sync_records_upserted_total.labels("materials").inc(int(counts.get("processed", 0)))
sync_records_upserted_total.labels("materials").inc(processed)
elif sync_type == "modules":
counts = await HygraphService.pull_modules(db, page_size=page_size)
processed = _processed_value(counts)
sync_success_total.labels("modules").inc()
sync_records_upserted_total.labels("modules").inc(int(counts.get("processed", 0)))
sync_records_upserted_total.labels("modules").inc(processed)
elif sync_type == "systems":
counts = await HygraphService.pull_systems(db, page_size=page_size)
processed = _processed_value(counts)
sync_success_total.labels("systems").inc()
sync_records_upserted_total.labels("systems").inc(int(counts.get("processed", 0)))
sync_records_upserted_total.labels("systems").inc(processed)
elif sync_type == "all":
counts = await HygraphService.pull_all(db, page_size=page_size)
for t, c in counts.items():
for t, payload in counts.items():
processed = _processed_value(payload)
sync_success_total.labels(t).inc()
sync_records_upserted_total.labels(t).inc(int(c or 0))
sync_records_upserted_total.labels(t).inc(processed)
else:
raise HTTPException(status_code=400, detail=_error_envelope("BAD_REQUEST", "unsupported type"))
except CircuitOpenError as exc:
circuit_type = sync_type or "all"
hygraph_cb_trips_total.labels(circuit_type).inc()
fallback_payload = exc.fallback
if fallback_payload:
hygraph_cb_fallback_total.labels(circuit_type).inc()
return {
"ok": True,
"breaker_open": True,
"data": fallback_payload,
}
raise HTTPException(
status_code=503,
detail=_error_envelope(
"SERVICE_UNAVAILABLE",
"Hygraph circuit breaker open and no cached data available",
{"type": circuit_type},
),
)
except HTTPException:
raise
except Exception as e: # noqa: BLE001
sync_failure_total.labels(sync_type or "all").inc()
logger.exception("hygraph_pull_failure", extra={"type": sync_type, "error": str(e)})
raise HTTPException(status_code=500, detail=_error_envelope("INTERNAL", "sync failed", {"type": sync_type}))

return {"ok": True, "data": counts}
return {"ok": True, "data": counts}
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"alembic>=1.13.2",
"psycopg[binary]>=3.2.1",
"prometheus-fastapi-instrumentator",
"httpx>=0.27.0",
"sentry-sdk[fastapi,httpx,opentelemetry]==2.42.0",
]

Expand Down
67 changes: 67 additions & 0 deletions backend/services/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Simple cache helper that prefers Redis but falls back to an in-memory store."""

from __future__ import annotations

import json
import os
import threading
import time
from typing import Any, Dict, Optional

try: # pragma: no cover - optional dependency
import redis # type: ignore
except Exception: # pragma: no cover - optional dependency
redis = None # type: ignore[assignment]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Address the pipeline failure by refining the type-ignore comment.

The pipeline failure at line 12 indicates that the broad # type: ignore comment may be unnecessary or should be more specific. Consider using a more precise suppression like # type: ignore[import-not-found] or remove it if the import can be typed correctly with TYPE_CHECKING guards.

-try:  # pragma: no cover - optional dependency
-    import redis  # type: ignore
-except Exception:  # pragma: no cover - optional dependency
-    redis = None  # type: ignore[assignment]
+try:  # pragma: no cover - optional dependency
+    import redis
+except ImportError:  # pragma: no cover - optional dependency
+    redis = None  # type: ignore[assignment]

Note: Also consider catching only ImportError rather than the broad Exception for clarity.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try: # pragma: no cover - optional dependency
import redis # type: ignore
except Exception: # pragma: no cover - optional dependency
redis = None # type: ignore[assignment]
try: # pragma: no cover - optional dependency
import redis
except ImportError: # pragma: no cover - optional dependency
redis = None # type: ignore[assignment]
🧰 Tools
🪛 GitHub Actions: CI

[error] 12-12: Typos/lint issues detected by ruff/typos; possibly unused type-ignore comments or redefinition.

🤖 Prompt for AI Agents
In backend/services/cache.py around lines 11 to 14, the import block uses a
broad Exception catch and an unspecific "# type: ignore" which triggered the
pipeline; change the except to catch ImportError only, and replace the blanket
"# type: ignore" on the import with a targeted suppression like "# type:
ignore[import-not-found]" (or remove it if you add a conditional TYPE_CHECKING
guard and proper typing), and keep the assignment to redis = None with a
specific "# type: ignore[assignment]" if mypy still complains.



class RedisCache:
"""Tiny wrapper that abstracts Redis availability."""

def __init__(self, url: str | None = None, *, namespace: str = "hygraph") -> None:
self._url = url or os.getenv("HYGRAPH_CACHE_URL") or os.getenv("REDIS_URL")
self._namespace = namespace
self._lock = threading.Lock()
self._memory: Dict[str, tuple[str, Optional[float]]] = {}
if redis is not None and self._url:
self._client = redis.Redis.from_url(self._url, decode_responses=True)
else: # pragma: no cover - exercised implicitly when redis missing
self._client = None

def _ns(self, key: str) -> str:
return f"{self._namespace}:{key}"

def get(self, key: str) -> Any | None:
namespaced = self._ns(key)
if self._client is not None:
raw = self._client.get(namespaced)
return json.loads(raw) if raw else None
with self._lock:
value = self._memory.get(namespaced)
if value is None:
return None
raw, expires_at = value
if expires_at is not None and expires_at < time.time():
del self._memory[namespaced]
return None
return json.loads(raw)

def set(self, key: str, value: Any, *, ttl: int | None = None) -> None:
payload = json.dumps(value)
namespaced = self._ns(key)
if self._client is not None:
if ttl is not None:
self._client.setex(namespaced, ttl, payload)
else:
self._client.set(namespaced, payload)
return
expires_at = (time.time() + ttl) if ttl else None
with self._lock:
self._memory[namespaced] = (payload, expires_at)

def clear(self, key: str) -> None:
namespaced = self._ns(key)
if self._client is not None:
self._client.delete(namespaced)
return

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge RedisCache does not degrade gracefully when Redis is unreachable

The cache helper advertises falling back to an in‑memory store, but get, set, and clear call the Redis client without handling redis.exceptions.RedisError. If Redis is configured but unavailable or times out, these methods will raise and propagate the connection error, causing sync endpoints to fail instead of transparently using the in‑process cache. Wrapping the Redis operations in try/except and falling back to _memory on failure would align the implementation with the intended resilience.

Useful? React with 👍 / 👎.

with self._lock:
self._memory.pop(namespaced, None)
84 changes: 84 additions & 0 deletions backend/services/circuit_breaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Circuit breaker utility for wrapping outbound service calls."""

from __future__ import annotations

import time
from dataclasses import dataclass, field
from typing import Any, Callable, TypeVar


T = TypeVar("T")


class CircuitOpenError(RuntimeError):
"""Raised when the circuit breaker refuses to execute a call."""

def __init__(self, message: str = "Circuit breaker is open", *, fallback: Any | None = None) -> None:
super().__init__(message)
self.fallback = fallback


@dataclass
class CircuitBreaker:
"""Simple circuit breaker implementation with configurable thresholds."""

failure_threshold: int = 5
recovery_timeout: float = 30.0
success_threshold: int = 1
state: str = field(default="CLOSED", init=False)
failure_count: int = field(default=0, init=False)
success_count: int = field(default=0, init=False)
last_failure_time: float | None = field(default=None, init=False)

CLOSED: str = field(default="CLOSED", init=False)
OPEN: str = field(default="OPEN", init=False)
HALF_OPEN: str = field(default="HALF_OPEN", init=False)

def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
"""Execute ``func`` respecting the breaker state machine."""

now = time.monotonic()
if self.state == self.OPEN:
if self.last_failure_time is None or (now - self.last_failure_time) < self.recovery_timeout:
raise CircuitOpenError()
self.state = self.HALF_OPEN
self.success_count = 0

try:
result = func(*args, **kwargs)
except Exception: # noqa: BLE001
self._record_failure()
raise

self._record_success()
return result

def _record_failure(self) -> None:
now = time.monotonic()
if self.state == self.HALF_OPEN:
self._trip(now)
return

self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self._trip(now)

def _record_success(self) -> None:
if self.state == self.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self._reset()
else:
self._reset()

def _trip(self, when: float) -> None:
self.state = self.OPEN
self.last_failure_time = when
self.failure_count = 0
self.success_count = 0

def _reset(self) -> None:
self.state = self.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
Loading
Loading