Skip to content

Commit 29c903a

Browse files
📝 Add docstrings to codex/implement-circuit-breaker-and-hygraph-client-m5f9cc
Docstrings generation was requested by @shayancoin. * #124 (comment) The following files were modified: * `backend/api/routes_sync.py` * `backend/services/cache.py` * `backend/services/circuit_breaker.py` * `backend/services/hygraph_client.py` * `backend/services/hygraph_service.py`
1 parent 8f09a21 commit 29c903a

File tree

5 files changed

+345
-10
lines changed

5 files changed

+345
-10
lines changed

‎backend/api/routes_sync.py‎

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,31 @@ def get_hygraph_service() -> HygraphService:
4747

4848

4949
def _error_envelope(code: str, message: str, details: Optional[dict] = None) -> Dict[str, Any]:
50-
"""Build a standardized error envelope for API responses."""
50+
"""
51+
Create a standardized error envelope used for API error responses.
52+
53+
Parameters:
54+
code (str): Machine-readable error code.
55+
message (str): Human-readable error message.
56+
details (Optional[dict]): Additional structured error details; if omitted, an empty dict is used.
57+
58+
Returns:
59+
Dict[str, Any]: A dictionary of the form {"ok": False, "error": {"code": code, "message": message, "details": details}}.
60+
"""
5161

5262
return {"ok": False, "error": {"code": code, "message": message, "details": details or {}}}
5363

5464

5565
def _processed_value(payload: Any) -> int:
66+
"""
67+
Normalize a Hygraph payload into an integer count of processed items.
68+
69+
Parameters:
70+
payload (Any): Either a numeric-like value or a dict containing a "processed" key; other shapes are accepted but treated as zero when not convertible.
71+
72+
Returns:
73+
int: Integer count extracted from the payload; returns 0 if the value is missing, falsy, or cannot be converted to an integer.
74+
"""
5675
if isinstance(payload, dict):
5776
value = payload.get("processed", 0)
5877
else:
@@ -73,7 +92,14 @@ async def hygraph_webhook(
7392
background: BackgroundTasks,
7493
db: Session = Depends(get_db),
7594
) -> Dict[str, Any]:
76-
"""Webhook receiver for Hygraph change notifications."""
95+
"""
96+
Receive Hygraph webhook notifications, persist a deduplicating sync event, and schedule background processing to pull and reconcile Hygraph data.
97+
98+
Persists a SyncEvent to detect duplicates; if the event is a duplicate returns a 200 response indicating deduplication. If the webhook payload is invalid JSON raises a 400 error. On a new event, schedules a background task that performs a full Hygraph pull and updates sync counters (or replays a circuit-breaker fallback payload if the pull is unavailable).
99+
100+
Returns:
101+
JSONResponse: On duplicate events `{"ok": True, "dedup": True}` with status 200; on accepted new events `{"ok": True, "accepted": True}` with status 202.
102+
"""
77103

78104
start = time.perf_counter()
79105
raw = getattr(request.state, "raw_body", b"")
@@ -109,6 +135,15 @@ async def hygraph_webhook(
109135

110136

111137
async def _process(event_id_local: Optional[str], body_sha_local: str) -> None:
138+
"""
139+
Handle an asynchronous Hygraph sync triggered by a webhook: fetch counts, update Prometheus metrics, and log the outcome.
140+
141+
If the pull succeeds, increments per-type upsert counters and a global success counter and emits a warning log with timing and counts. If a CircuitOpenError occurs, increments the circuit-breaker trip metric; if the exception provides a fallback payload, increments the fallback metric, replays its counts into the upsert counters and logs fallback details; if no fallback is available, increments the global failure metric and logs that the circuit is open. Any other exception increments the global failure metric and logs the error.
142+
143+
Parameters:
144+
event_id_local (Optional[str]): Identifier of the originating sync event (may be None).
145+
body_sha_local (str): SHA256 of the webhook body used for traceability.
146+
"""
112147
t0 = time.perf_counter()
113148
try:
114149
counts = await HygraphService.pull_all(db)
@@ -172,7 +207,23 @@ async def hygraph_pull(
172207
body: Dict[str, Any] = Body(...),
173208
db: Session = Depends(get_db),
174209
) -> Dict[str, Any]:
175-
"""Admin-triggered Hygraph syncs supporting manual pull types."""
210+
"""
211+
Trigger an ad-hoc Hygraph sync for a specified resource type.
212+
213+
Parameters:
214+
body (dict): Request body that must include either `type` or `sync_type` (case-insensitive string) identifying the sync target:
215+
- "materials", "modules", "systems", or "all".
216+
Optionally accepts `page_size` (positive integer) to limit page size for the pull.
217+
db (Session): Database session.
218+
219+
Returns:
220+
dict: {"ok": True, "data": counts} where `counts` is the result of the pull operation — either a mapping of resource types to payloads/counts (for "all") or a payload/count for the requested type.
221+
222+
Raises:
223+
HTTPException (400): when `page_size` is invalid or `type` is unsupported.
224+
HTTPException (503): when the Hygraph circuit breaker is open and no fallback data is available.
225+
HTTPException (500): on other internal sync failures.
226+
"""
176227

177228
sync_type = str((body.get("type") or body.get("sync_type") or "")).lower().strip()
178229
page_size_raw = body.get("page_size")
@@ -236,4 +287,4 @@ async def hygraph_pull(
236287
logger.exception("hygraph_pull_failure", extra={"type": sync_type, "error": str(e)})
237288
raise HTTPException(status_code=500, detail=_error_envelope("INTERNAL", "sync failed", {"type": sync_type}))
238289

239-
return {"ok": True, "data": counts}
290+
return {"ok": True, "data": counts}

‎backend/services/cache.py‎

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ class RedisCache:
2626
"""Tiny wrapper that abstracts Redis availability."""
2727

2828
def __init__(self, url: str | None = None, *, namespace: str = "hygraph") -> None:
29+
"""
30+
Initialize the cache, preferring a Redis client when a URL is available and falling back to an in-memory store.
31+
32+
Parameters:
33+
url (str | None): Optional Redis connection URL; if not provided, the constructor will read HYGRAPH_CACHE_URL then REDIS_URL from the environment.
34+
namespace (str): Key prefix used to namespace all cache entries (default "hygraph").
35+
36+
Notes:
37+
- Creates a thread-safe in-memory store and associated lock for the fallback path.
38+
- When a Redis URL is available and the optional redis dependency is present, a Redis client is created and used for subsequent operations; otherwise the cache operates purely in-memory.
39+
"""
2940
self._url = url or os.getenv("HYGRAPH_CACHE_URL") or os.getenv("REDIS_URL")
3041
self._namespace = namespace
3142
self._lock = threading.Lock()
@@ -36,9 +47,28 @@ def __init__(self, url: str | None = None, *, namespace: str = "hygraph") -> Non
3647
self._client = None
3748

3849
def _ns(self, key: str) -> str:
50+
"""
51+
Compute the namespaced cache key.
52+
53+
Prefixes the provided key with the instance namespace and a colon.
54+
55+
Parameters:
56+
key (str): The cache key to namespace (without namespace prefix).
57+
58+
Returns:
59+
namespaced_key (str): The key prefixed with "<namespace>:".
60+
"""
3961
return f"{self._namespace}:{key}"
4062

4163
def get(self, key: str) -> Any | None:
64+
"""
65+
Retrieve a cached value for the given key, preferring Redis and falling back to the in-memory store.
66+
67+
If an in-memory entry has expired it is removed and treated as missing; Redis failures cause the client to be disabled and the in-memory store to be used thereafter.
68+
69+
Returns:
70+
The decoded Python object stored for the key, or `None` if not present or expired.
71+
"""
4272
namespaced = self._ns(key)
4373
if self._client is not None:
4474
try:
@@ -58,6 +88,16 @@ def get(self, key: str) -> Any | None:
5888
return json.loads(raw)
5989

6090
def set(self, key: str, value: Any, *, ttl: int | None = None) -> None:
91+
"""
92+
Store a value in the cache under the given key, preferring Redis and falling back to an in-memory store.
93+
94+
The value is serialized to JSON before storage. If a Redis client is available, the entry is written to Redis (using expiry when `ttl` is provided). On Redis errors the instance disables the Redis client and stores the entry in the local in-memory store with an expiration timestamp computed from `ttl`. Keys are stored using the instance namespace prefix.
95+
96+
Parameters:
97+
key (str): Cache key (will be namespaced).
98+
value (Any): Value to store; will be JSON-serialized.
99+
ttl (int | None): Optional time-to-live in seconds; when provided, the entry expires after `ttl` seconds.
100+
"""
61101
payload = json.dumps(value)
62102
namespaced = self._ns(key)
63103
if self._client is not None:
@@ -75,6 +115,12 @@ def set(self, key: str, value: Any, *, ttl: int | None = None) -> None:
75115
self._memory[namespaced] = (payload, expires_at)
76116

77117
def clear(self, key: str) -> None:
118+
"""
119+
Remove the cached entry for the given key from Redis if available; otherwise remove it from the in-memory fallback.
120+
121+
Parameters:
122+
key (str): The cache key (un-prefixed) to remove.
123+
"""
78124
namespaced = self._ns(key)
79125
if self._client is not None:
80126
try:
@@ -84,4 +130,4 @@ def clear(self, key: str) -> None:
84130
else:
85131
return
86132
with self._lock:
87-
self._memory.pop(namespaced, None)
133+
self._memory.pop(namespaced, None)

‎backend/services/circuit_breaker.py‎

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ class CircuitOpenError(RuntimeError):
1414
"""Raised when the circuit breaker refuses to execute a call."""
1515

1616
def __init__(self, message: str = "Circuit breaker is open", *, fallback: Any | None = None) -> None:
17+
"""
18+
Create a CircuitOpenError carrying an optional fallback value.
19+
20+
Parameters:
21+
message (str): Error message describing the open-circuit condition.
22+
fallback (Any | None): Optional value to be used as a fallback when the circuit is open; stored on the exception instance as `fallback`.
23+
"""
1724
super().__init__(message)
1825
self.fallback = fallback
1926

@@ -35,7 +42,23 @@ class CircuitBreaker:
3542
HALF_OPEN: str = field(default="HALF_OPEN", init=False)
3643

3744
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
38-
"""Execute ``func`` respecting the breaker state machine."""
45+
"""
46+
Execute a callable while enforcing the circuit breaker's state transitions.
47+
48+
If the breaker is OPEN and the recovery timeout has not elapsed, a CircuitOpenError is raised; otherwise the breaker may transition to HALF_OPEN before executing the callable. Any exception raised by `func` is recorded as a failure and re-raised; a successful call is recorded as a success.
49+
50+
Parameters:
51+
func (Callable[..., T]): The callable to invoke.
52+
*args: Positional arguments forwarded to `func`.
53+
**kwargs: Keyword arguments forwarded to `func`.
54+
55+
Returns:
56+
T: The value returned by `func`.
57+
58+
Raises:
59+
CircuitOpenError: If the circuit is OPEN and not yet ready for recovery.
60+
Exception: Re-raises any exception thrown by `func`.
61+
"""
3962

4063
now = time.monotonic()
4164
if self.state == self.OPEN:
@@ -54,6 +77,11 @@ def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
5477
return result
5578

5679
def _record_failure(self) -> None:
80+
"""
81+
Record a failed call and update the circuit breaker's state.
82+
83+
If the circuit is in HALF_OPEN, immediately move it to OPEN and record the failure time. Otherwise, increment the consecutive failure count and, when that count reaches or exceeds `failure_threshold`, move the circuit to OPEN and record the failure time.
84+
"""
5785
now = time.monotonic()
5886
if self.state == self.HALF_OPEN:
5987
self._trip(now)
@@ -64,6 +92,11 @@ def _record_failure(self) -> None:
6492
self._trip(now)
6593

6694
def _record_success(self) -> None:
95+
"""
96+
Record a successful call and update the circuit state accordingly.
97+
98+
If the breaker is in HALF_OPEN, increment the consecutive success counter and close the circuit (clearing counters and last-failure timestamp) when the counter reaches or exceeds the configured success threshold. If the breaker is in any other state, close the circuit and clear counters and last-failure timestamp immediately.
99+
"""
67100
if self.state == self.HALF_OPEN:
68101
self.success_count += 1
69102
if self.success_count >= self.success_threshold:
@@ -72,13 +105,24 @@ def _record_success(self) -> None:
72105
self._reset()
73106

74107
def _trip(self, when: float) -> None:
108+
"""
109+
Transition the circuit to OPEN, record the trip timestamp, and reset failure/success counters.
110+
111+
Parameters:
112+
when (float): Monotonic timestamp indicating when the circuit was tripped.
113+
"""
75114
self.state = self.OPEN
76115
self.last_failure_time = when
77116
self.failure_count = 0
78117
self.success_count = 0
79118

80119
def _reset(self) -> None:
120+
"""
121+
Reset the circuit breaker to the CLOSED state and clear its counters and last failure timestamp.
122+
123+
Sets `state` to `CLOSED`, sets `failure_count` and `success_count` to 0, and sets `last_failure_time` to `None`.
124+
"""
81125
self.state = self.CLOSED
82126
self.failure_count = 0
83127
self.success_count = 0
84-
self.last_failure_time = None
128+
self.last_failure_time = None

‎backend/services/hygraph_client.py‎

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,27 @@ def __init__(
2121
breaker: CircuitBreaker | None = None,
2222
client: httpx.Client | None = None,
2323
) -> None:
24+
"""
25+
Initialize the HygraphClient with its GraphQL endpoint, authentication token, HTTP client, and circuit-breaker.
26+
27+
Parameters:
28+
endpoint (str): GraphQL endpoint URL (must be provided as a keyword argument).
29+
token (str | None): Optional bearer token; `None` is treated as an empty string (no Authorization header).
30+
timeout (float): Default HTTP client timeout in seconds; used only when `client` is not provided.
31+
breaker (CircuitBreaker | None): Optional CircuitBreaker instance; a new CircuitBreaker is created if omitted.
32+
client (httpx.Client | None): Optional preconfigured httpx.Client; when provided, `timeout` is ignored.
33+
"""
2434
self._endpoint = endpoint
2535
self._token = token or ""
2636
self._breaker = breaker or CircuitBreaker()
2737
self._client = client or httpx.Client(timeout=timeout)
2838

2939
def close(self) -> None:
40+
"""
41+
Close the underlying HTTP client.
42+
43+
Closes the internal httpx.Client and releases its associated network resources.
44+
"""
3045
self._client.close()
3146

3247
def execute(
@@ -36,9 +51,33 @@ def execute(
3651
variables: Optional[Dict[str, Any]] = None,
3752
fallback: Callable[[], Any] | None = None,
3853
) -> Dict[str, Any]:
39-
"""Execute a GraphQL query and return the ``data`` payload."""
54+
"""
55+
Execute the GraphQL `query` and return its `data` payload.
56+
57+
Parameters:
58+
query (str): The GraphQL query string.
59+
variables (Optional[Dict[str, Any]]): Variables for the GraphQL query; defaults to an empty dict when omitted.
60+
fallback (Optional[Callable[[], Any]]): Callable invoked only when the circuit is open to produce a fallback payload; exceptions raised by the fallback are ignored and treated as no fallback.
61+
62+
Returns:
63+
Dict[str, Any]: The `data` field from the GraphQL response.
64+
65+
Raises:
66+
RuntimeError: If the GraphQL response contains `errors` or the `data` field is missing or not a dict.
67+
CircuitOpenError: If the circuit breaker is open; the exception will include the fallback payload (if any).
68+
"""
4069

4170
def _do_request() -> Dict[str, Any]:
71+
"""
72+
Perform the GraphQL HTTP POST to the configured Hygraph endpoint and return the parsed `data` object.
73+
74+
Returns:
75+
dict: The GraphQL `data` payload.
76+
77+
Raises:
78+
httpx.HTTPStatusError: If the HTTP response status indicates failure.
79+
RuntimeError: If the GraphQL response contains an `errors` entry, or if the `data` field is missing or not a dictionary.
80+
"""
4281
payload = {"query": query, "variables": variables or {}}
4382
headers = {"Content-Type": "application/json"}
4483
if self._token:
@@ -66,4 +105,10 @@ def _do_request() -> Dict[str, Any]:
66105

67106
@property
68107
def breaker(self) -> CircuitBreaker:
69-
return self._breaker
108+
"""
109+
Expose the client's circuit breaker instance.
110+
111+
Returns:
112+
CircuitBreaker: The internal CircuitBreaker used to guard and monitor HTTP requests.
113+
"""
114+
return self._breaker

0 commit comments

Comments
 (0)