Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions backend/api/routes_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from services.hygraph_service import HygraphService
from services.circuit_breaker import CircuitOpenError

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/sync", tags=["sync"])
Expand Down Expand Up @@ -239,6 +240,16 @@ async def hygraph_pull(
raise HTTPException(status_code=400, detail=_error_envelope("BAD_REQUEST", "unsupported type"))
except HTTPException:
raise
except CircuitOpenError as exc:
hygraph_cb_trips_total.inc()
fallback_result = exc.fallback_result
if fallback_result is not None:
hygraph_cb_fallback_total.inc()
return {"ok": True, "data": fallback_result, "cached": True}
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=_error_envelope("SERVICE_UNAVAILABLE", "Hygraph unavailable", {"type": sync_type}),
)
except Exception as e: # noqa: BLE001
record_sync_failure(sync_type or "all")
record_sync_duration(sync_type or "all", (time.perf_counter() - start) * 1000)
Expand Down
73 changes: 73 additions & 0 deletions backend/services/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Simple Redis-backed cache helper with in-memory fallback."""

from __future__ import annotations

import json
import time
from dataclasses import dataclass, field
from typing import Any, Dict, Optional

try:
from redis import asyncio as redis_asyncio
except Exception: # pragma: no cover - optional dependency guard
redis_asyncio = None # type: ignore[assignment]
Comment on lines +10 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Narrow the exception handler scope.

The bare Exception catch is too broad and could mask import errors that should be debugged. Consider catching specific exceptions like ImportError or ModuleNotFoundError for clearer intent.

Apply this diff:

 try:
     from redis import asyncio as redis_asyncio
-except Exception:  # pragma: no cover - optional dependency guard
+except (ImportError, ModuleNotFoundError):  # pragma: no cover - optional dependency guard
     redis_asyncio = None  # type: ignore[assignment]
🤖 Prompt for AI Agents
In backend/services/cache.py around lines 10 to 13, the try/except currently
catches all Exceptions; narrow it to only import-related errors by catching
ImportError or ModuleNotFoundError (e.g., except (ImportError,
ModuleNotFoundError):) to avoid masking other runtime errors, preserve the
pragma no cover and the redis_asyncio = None # type: ignore[assignment]
fallback.



@dataclass
class RedisCache:
"""Minimal JSON cache built on top of Redis with TTL support."""

url: str
namespace: str = "hygraph"
use_memory_fallback: bool = True
_client: Optional["redis_asyncio.Redis"] = field(default=None, init=False, repr=False)
_memory_store: Dict[str, tuple[Any, Optional[float]]] = field(default_factory=dict, init=False, repr=False)

def __post_init__(self) -> None:
if redis_asyncio is not None:
try:
self._client = redis_asyncio.from_url(self.url, decode_responses=True)
except Exception:
self._client = None
Comment on lines +26 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add logging for Redis connection failures.

The __post_init__ method silently swallows connection errors. While the fallback to None is intentional, logging the failure would improve observability when Redis is misconfigured or unavailable.

Consider adding a logger and logging connection failures:

+import logging
+
+logger = logging.getLogger(__name__)
+
 def __post_init__(self) -> None:
     if redis_asyncio is not None:
         try:
             self._client = redis_asyncio.from_url(self.url, decode_responses=True)
-        except Exception:
+        except Exception as e:
+            logger.warning("Failed to connect to Redis at %s: %s. Using memory fallback.", self.url, e)
             self._client = None
🤖 Prompt for AI Agents
In backend/services/cache.py around lines 26 to 31, the __post_init__ currently
swallows exceptions when creating the Redis client; add a module-level logger
(import logging and getLogger(__name__)) and update the except block to log the
failure (include the exception details via exc_info=True or logging.exception
and include the Redis URL or a masked form for context) before setting
self._client = None so the fallback behavior remains but failures are
observable.


def _prefixed(self, key: str) -> str:
return f"{self.namespace}:{key}" if self.namespace else key

async def get_json(self, key: str) -> Any | None:
full_key = self._prefixed(key)
if self._client is not None:
try:
raw = await self._client.get(full_key)
except Exception:
raw = None
if raw is not None:
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
Comment on lines +38 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Narrow exception handling and add logging.

The bare except Exception on lines 41 and 46 catches all exceptions, including programming errors. Consider:

  1. Catching specific Redis exceptions (e.g., ConnectionError, TimeoutError)
  2. Logging when falling back to memory cache for observability

If you narrow the exception types and add logging, it will be easier to diagnose issues in production:

         if self._client is not None:
             try:
                 raw = await self._client.get(full_key)
-            except Exception:
+            except Exception as e:
+                logger.debug("Redis get failed for key %s: %s", full_key, e)
                 raw = None
             if raw is not None:
                 try:
                     return json.loads(raw)
                 except json.JSONDecodeError:
+                    logger.warning("Invalid JSON in Redis cache for key %s", full_key)
                     return None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if self._client is not None:
try:
raw = await self._client.get(full_key)
except Exception:
raw = None
if raw is not None:
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
if self._client is not None:
try:
raw = await self._client.get(full_key)
except Exception as e:
logger.debug("Redis get failed for key %s: %s", full_key, e)
raw = None
if raw is not None:
try:
return json.loads(raw)
except json.JSONDecodeError:
logger.warning("Invalid JSON in Redis cache for key %s", full_key)
return None
🤖 Prompt for AI Agents
In backend/services/cache.py around lines 38 to 47, replace the two broad
"except Exception" blocks with targeted exception handling: catch the Redis
client-specific errors (e.g., redis.exceptions.ConnectionError,
redis.exceptions.TimeoutError or your client’s equivalent) when calling
self._client.get(full_key) and log the exception with context before falling
back to the in-memory cache; for the JSON decode branch catch
json.JSONDecodeError only and log the raw payload and decode error before
returning None. Ensure the module has a logger (e.g., using
logging.getLogger(__name__)) and include the exception instance in the log calls
for observability.

if not self.use_memory_fallback:
return None
payload = self._memory_store.get(full_key)
if payload is None:
return None
value, expires_at = payload
if expires_at is not None and expires_at < time.monotonic():
self._memory_store.pop(full_key, None)
return None
return value

async def set_json(self, key: str, value: Any, *, ttl: Optional[int] = None) -> None:
full_key = self._prefixed(key)
dumped = json.dumps(value)
if self._client is not None:
try:
await self._client.set(full_key, dumped, ex=ttl)
except Exception:
# Fall back to memory cache if Redis write fails.
pass
Comment on lines +62 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add logging for Redis write failures.

Silent exception swallowing on line 65 makes it difficult to diagnose Redis write failures. Add logging to track when the fallback is being used.

Apply this diff:

         if self._client is not None:
             try:
                 await self._client.set(full_key, dumped, ex=ttl)
-            except Exception:
+            except Exception as e:
+                logger.debug("Redis set failed for key %s: %s", full_key, e)
                 # Fall back to memory cache if Redis write fails.
                 pass
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if self._client is not None:
try:
await self._client.set(full_key, dumped, ex=ttl)
except Exception:
# Fall back to memory cache if Redis write fails.
pass
if self._client is not None:
try:
await self._client.set(full_key, dumped, ex=ttl)
except Exception as e:
logger.debug("Redis set failed for key %s: %s", full_key, e)
# Fall back to memory cache if Redis write fails.
pass
🤖 Prompt for AI Agents
In backend/services/cache.py around lines 62 to 67, the except block currently
swallows all exceptions when Redis set fails; change it to log the failure and
that the code is falling back to the in-memory cache. In the except block, call
the appropriate logger (use self._logger if available, otherwise obtain module
logger via logging.getLogger(__name__)) to record a descriptive message
including the full_key and ttl and the exception details (use logger.exception
or logger.error(..., exc_info=True)), then continue to fall back to memory cache
as before.

if not self.use_memory_fallback:
return
expires_at = None
if ttl is not None:
expires_at = time.monotonic() + ttl
self._memory_store[full_key] = (value, expires_at)
108 changes: 108 additions & 0 deletions backend/services/circuit_breaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Simple circuit breaker implementation for Hygraph integrations."""

from __future__ import annotations

import time
from dataclasses import dataclass, field
from typing import Any, Callable, Optional, TypeVar


T = TypeVar("T")


class CircuitOpenError(RuntimeError):
"""Raised when the circuit breaker is OPEN and calls are blocked."""

def __init__(self, message: str, fallback_result: Any | None = None) -> None:
super().__init__(message)
self.fallback_result = fallback_result


@dataclass
class CircuitBreaker:
"""Dataclass based circuit breaker with simple HALF_OPEN support."""

failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_success_threshold: int = 1
time_provider: Callable[[], float] = time.monotonic
state: str = field(default="CLOSED", init=False)
failure_count: int = field(default=0, init=False)
success_count: int = field(default=0, init=False)
opened_at: Optional[float] = field(default=None, init=False)

CLOSED: str = "CLOSED"
OPEN: str = "OPEN"
HALF_OPEN: str = "HALF_OPEN"
Comment on lines +34 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Mark breaker state sentinels as ClassVar constants.

Declaring CLOSED/OPEN/HALF_OPEN without ClassVar makes them dataclass fields, so they leak into __init__, equality, and asdict. Anyone instantiating CircuitBreaker(OPEN="foo") (even inadvertently via config) will corrupt the state machine. Mark them as ClassVar (and import it) so they stay true class constants.

-from typing import Any, Callable, Optional, TypeVar
+from typing import Any, Callable, ClassVar, Optional, TypeVar
@@
-    CLOSED: str = "CLOSED"
-    OPEN: str = "OPEN"
-    HALF_OPEN: str = "HALF_OPEN"
+    CLOSED: ClassVar[str] = "CLOSED"
+    OPEN: ClassVar[str] = "OPEN"
+    HALF_OPEN: ClassVar[str] = "HALF_OPEN"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
CLOSED: str = "CLOSED"
OPEN: str = "OPEN"
HALF_OPEN: str = "HALF_OPEN"
from typing import Any, Callable, ClassVar, Optional, TypeVar
- CLOSED: str = "CLOSED"
- OPEN: str = "OPEN"
CLOSED: ClassVar[str] = "CLOSED"
OPEN: ClassVar[str] = "OPEN"
HALF_OPEN: ClassVar[str] = "HALF_OPEN"
🤖 Prompt for AI Agents
In backend/services/circuit_breaker.py around lines 34 to 36, the state sentinel
attributes CLOSED, OPEN, and HALF_OPEN are currently plain annotated strings
which makes them dataclass fields; change their annotations to ClassVar[str] and
add an import for ClassVar from typing so these stay true class-level constants
(e.g., from typing import ClassVar) and are excluded from the
dataclass-generated __init__, equality, and asdict behavior.


def _transition_to_open(self) -> None:
self.state = self.OPEN
self.opened_at = self.time_provider()
self.failure_count = 0
self.success_count = 0

def _transition_to_half_open(self) -> None:
self.state = self.HALF_OPEN
self.opened_at = None
self.failure_count = 0
self.success_count = 0

def _transition_to_closed(self) -> None:
self.state = self.CLOSED
self.opened_at = None
self.failure_count = 0
self.success_count = 0

def _ready_for_half_open(self) -> bool:
if self.opened_at is None:
return False
return (self.time_provider() - self.opened_at) >= self.recovery_timeout

def _record_failure(self) -> None:
if self.state == self.HALF_OPEN:
self._transition_to_open()
return
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self._transition_to_open()

def _record_success(self) -> None:
if self.state == self.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.half_open_success_threshold:
self._transition_to_closed()
return
elif self.state == self.OPEN:
# Should not happen, but close proactively.
self._transition_to_closed()
else:
self.failure_count = 0

Comment on lines +21 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Circuit breaker is not thread-safe.

In a concurrent web environment, multiple request threads will access and modify state, failure_count, success_count, and opened_at without synchronization. This causes data races:

  • Lost counter increments (two threads both read 4, both write 5, final value is 5 instead of 6)
  • Inconsistent state transitions (multiple threads calling _transition_to_open() concurrently)
  • Race between reading and writing opened_at

For production use with concurrent requests, add a threading.Lock to protect all state access and transitions.

Recommended: Add parameter validation.

Consider validating that failure_threshold, half_open_success_threshold are positive integers and recovery_timeout is positive to prevent edge cases.

Apply this diff to add thread safety:

 import time
+import threading
 from dataclasses import dataclass, field
 from typing import Any, Callable, Optional, TypeVar
 
 @dataclass
 class CircuitBreaker:
     """Dataclass based circuit breaker with simple HALF_OPEN support."""
 
     failure_threshold: int = 5
     recovery_timeout: float = 30.0
     half_open_success_threshold: int = 1
     time_provider: Callable[[], float] = time.monotonic
     state: str = field(default="CLOSED", init=False)
     failure_count: int = field(default=0, init=False)
     success_count: int = field(default=0, init=False)
     opened_at: Optional[float] = field(default=None, init=False)
+    _lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False)
 
     CLOSED: str = "CLOSED"
     OPEN: str = "OPEN"
     HALF_OPEN: str = "HALF_OPEN"
+
+    def __post_init__(self) -> None:
+        if self.failure_threshold <= 0:
+            raise ValueError("failure_threshold must be positive")
+        if self.half_open_success_threshold <= 0:
+            raise ValueError("half_open_success_threshold must be positive")
+        if self.recovery_timeout <= 0:
+            raise ValueError("recovery_timeout must be positive")

Then wrap all state-modifying operations in the call() method with the lock.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@dataclass
class CircuitBreaker:
"""Dataclass based circuit breaker with simple HALF_OPEN support."""
failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_success_threshold: int = 1
time_provider: Callable[[], float] = time.monotonic
state: str = field(default="CLOSED", init=False)
failure_count: int = field(default=0, init=False)
success_count: int = field(default=0, init=False)
opened_at: Optional[float] = field(default=None, init=False)
CLOSED: str = "CLOSED"
OPEN: str = "OPEN"
HALF_OPEN: str = "HALF_OPEN"
def _transition_to_open(self) -> None:
self.state = self.OPEN
self.opened_at = self.time_provider()
self.failure_count = 0
self.success_count = 0
def _transition_to_half_open(self) -> None:
self.state = self.HALF_OPEN
self.opened_at = None
self.failure_count = 0
self.success_count = 0
def _transition_to_closed(self) -> None:
self.state = self.CLOSED
self.opened_at = None
self.failure_count = 0
self.success_count = 0
def _ready_for_half_open(self) -> bool:
if self.opened_at is None:
return False
return (self.time_provider() - self.opened_at) >= self.recovery_timeout
def _record_failure(self) -> None:
if self.state == self.HALF_OPEN:
self._transition_to_open()
return
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self._transition_to_open()
def _record_success(self) -> None:
if self.state == self.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.half_open_success_threshold:
self._transition_to_closed()
return
elif self.state == self.OPEN:
# Should not happen, but close proactively.
self._transition_to_closed()
else:
self.failure_count = 0
import time
import threading
from dataclasses import dataclass, field
from typing import Any, Callable, Optional, TypeVar
@dataclass
class CircuitBreaker:
"""Dataclass based circuit breaker with simple HALF_OPEN support."""
failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_success_threshold: int = 1
time_provider: Callable[[], float] = time.monotonic
state: str = field(default="CLOSED", init=False)
failure_count: int = field(default=0, init=False)
success_count: int = field(default=0, init=False)
opened_at: Optional[float] = field(default=None, init=False)
_lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False)
CLOSED: str = "CLOSED"
OPEN: str = "OPEN"
HALF_OPEN: str = "HALF_OPEN"
def __post_init__(self) -> None:
if self.failure_threshold <= 0:
raise ValueError("failure_threshold must be positive")
if self.half_open_success_threshold <= 0:
raise ValueError("half_open_success_threshold must be positive")
if self.recovery_timeout <= 0:
raise ValueError("recovery_timeout must be positive")
def _transition_to_open(self) -> None:
- self.state = self.OPEN
- self.opened_at = self.time_provider()
- self.failure_count = 0
with self._lock:
self.state = self.OPEN
self.opened_at = self.time_provider()
self.failure_count = 0
self.success_count = 0
def _transition_to_half_open(self) -> None:
- self.state = self.HALF_OPEN
- self.opened_at = None
- self.failure_count = 0
with self._lock:
self.state = self.HALF_OPEN
self.opened_at = None
self.failure_count = 0
self.success_count = 0
def _transition_to_closed(self) -> None:
- self.state = self.CLOSED
- self.opened_at = None
- self.failure_count = 0
with self._lock:
self.state = self.CLOSED
self.opened_at = None
self.failure_count = 0
self.success_count = 0
def _ready_for_half_open(self) -> bool:
- if self.opened_at is None:
- return False
with self._lock:
if self.opened_at is None:
return False
return (self.time_provider() - self.opened_at) >= self.recovery_timeout
def _record_failure(self) -> None:
- if self.state == self.HALF_OPEN:
- self._transition_to_open()
- return
- self.failure_count += 1
- if self.failure_count >= self.failure_threshold:
with self._lock:
if self.state == self.HALF_OPEN:
self._transition_to_open()
return
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self._transition_to_open()
def _record_success(self) -> None:
- if self.state == self.HALF_OPEN:
- self.success_count += 1
- if self.success_count >= self.half_open_success_threshold:
- self._transition_to_closed()
- return
- elif self.state == self.OPEN:
- # Should not happen, but close proactively.
- self._transition_to_closed()
- else:
with self._lock:
if self.state == self.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.half_open_success_threshold:
self._transition_to_closed()
return
elif self.state == self.OPEN:
# Should not happen, but close proactively.
self._transition_to_closed()
else:
self.failure_count = 0

def call(
self,
func: Callable[..., T],
*args: Any,
fallback: Optional[Callable[[], T]] = None,
**kwargs: Any,
) -> T:
"""Execute ``func`` guarding access through the circuit breaker."""

if self.state == self.OPEN:
if self._ready_for_half_open():
self._transition_to_half_open()
else:
fallback_result: T | None = fallback() if fallback else None
raise CircuitOpenError("Circuit breaker is open", fallback_result)

try:
result = func(*args, **kwargs)
except Exception as exc:
was_half_open = self.state == self.HALF_OPEN
self._record_failure()
if was_half_open:
fallback_result: T | None = fallback() if fallback else None
raise CircuitOpenError("Circuit breaker is open", fallback_result) from exc
raise
Comment on lines +97 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Previous review comment appears incorrect.

The existing review claims that half-open failures don't invoke the fallback, but lines 102-104 clearly do invoke fallback() and raise CircuitOpenError when was_half_open is True. The half-open probe failure path correctly propagates fallback results.

Minor: Use Optional[T] for Python 3.9 compatibility.

Lines 94 and 103 use T | None syntax, which requires Python 3.10+. For broader compatibility, use Optional[T] instead.

Apply this diff:

-                fallback_result: T | None = fallback() if fallback else None
+                fallback_result: Optional[T] = fallback() if fallback else None

(Apply the same change on line 103)

Request verification: Document fallback exception behavior.

If the fallback callable itself raises an exception, that exception will propagate to the caller instead of being caught. Verify this is the intended behavior and consider documenting it in the docstring.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
result = func(*args, **kwargs)
except Exception as exc:
was_half_open = self.state == self.HALF_OPEN
self._record_failure()
if was_half_open:
fallback_result: T | None = fallback() if fallback else None
raise CircuitOpenError("Circuit breaker is open", fallback_result) from exc
raise
try:
result = func(*args, **kwargs)
except Exception as exc:
was_half_open = self.state == self.HALF_OPEN
self._record_failure()
if was_half_open:
fallback_result: Optional[T] = fallback() if fallback else None
raise CircuitOpenError("Circuit breaker is open", fallback_result) from exc
raise
🤖 Prompt for AI Agents
In backend/services/circuit_breaker.py around lines 94 to 105, replace the
Python-3.10-only union type T | None with typing.Optional[T] on lines 94 and 103
(add/import Optional from typing if not present) to ensure Python 3.9
compatibility; also update the docstring for the circuit breaker to state that
if the provided fallback callable raises an exception, that exception will
propagate to the caller (i.e., fallback exceptions are not caught), and confirm
this behavior is intended.

else:
self._record_success()
return result
77 changes: 77 additions & 0 deletions backend/services/hygraph_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Hygraph GraphQL client with circuit breaker protection."""

from __future__ import annotations

import httpx
from typing import Any, Callable, Dict, Optional

from services.circuit_breaker import CircuitBreaker, CircuitOpenError


class HygraphGraphQLError(RuntimeError):
"""Raised when Hygraph returns GraphQL errors."""

def __init__(self, errors: Any) -> None:
super().__init__("Hygraph GraphQL error")
self.errors = errors


class HygraphClient:
"""HTTPX based client for Hygraph GraphQL requests."""

def __init__(
self,
endpoint: str,
token: str,
*,
timeout: float = 6.0,
breaker: Optional[CircuitBreaker] = None,
client: Optional[httpx.Client] = None,
) -> None:
if not endpoint:
raise ValueError("Hygraph endpoint must be configured")
self._endpoint = endpoint
self._token = token
self._client = client or httpx.Client(timeout=timeout)
self._breaker = breaker or CircuitBreaker()
self._client.headers.setdefault("Content-Type", "application/json")
self._client.headers.setdefault("Accept", "application/json")
if token:
self._client.headers.setdefault("Authorization", f"Bearer {token}")

@property
def breaker(self) -> CircuitBreaker:
return self._breaker

def execute(
self,
query: str,
variables: Optional[Dict[str, Any]] = None,
*,
fallback: Optional[Callable[[], Any]] = None,
) -> Dict[str, Any]:
"""Run the GraphQL ``query`` through the circuit breaker."""

def _request() -> Dict[str, Any]:
response = self._client.post(
self._endpoint,
json={"query": query, "variables": variables or {}},
)
response.raise_for_status()
payload = response.json()
errors = payload.get("errors")
if errors:
raise HygraphGraphQLError(errors)
data = payload.get("data")
if not isinstance(data, dict):
return {}
return data

try:
return self._breaker.call(_request, fallback=fallback)
except CircuitOpenError:
# Re-raise so callers can handle fallback semantics.
raise

def close(self) -> None:
self._client.close()
Comment on lines +19 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider supporting async httpx.AsyncClient for better async integration.

While the current synchronous client works with asyncio.to_thread, native async support would eliminate thread overhead and improve performance. Since httpx provides httpx.AsyncClient with an identical API, you could:

  1. Add an async variant of HygraphClient (e.g., AsyncHygraphClient)
  2. Update the service to use the async client directly without asyncio.to_thread

This is optional and not required for correctness, but would be more idiomatic for async Python.

Example async client usage:

# In hygraph_client.py
class AsyncHygraphClient:
    def __init__(self, endpoint: str, token: str, *, timeout: float = 6.0, ...):
        self._client = httpx.AsyncClient(timeout=timeout)
        # ... rest of init
    
    async def execute(self, query: str, ...) -> Dict[str, Any]:
        async def _request() -> Dict[str, Any]:
            response = await self._client.post(...)
            # ... rest of logic
        return self._breaker.call(_request, fallback=fallback)

# In hygraph_service.py
async def _fetch_collection(...):
    # Remove asyncio.to_thread wrapper
    data = await client.execute(query, ...)

Loading
Loading