Skip to content

Commit 7b08343

Browse files
committed
add broader retry on http client
1 parent 4fd8e40 commit 7b08343

File tree

4 files changed

+75
-64
lines changed

4 files changed

+75
-64
lines changed

cookbook/evals_and_experiments/parea_evaluation_deepdive.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@
705705
"\n",
706706
"\n",
707707
"dataset = to_simple_dictionary(comments_df)\n",
708-
"dataset[0]"
708+
"dataset"
709709
]
710710
},
711711
{

cookbook/parea_llm_proxy/deployments/tracing_with_deployed_prompt.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from dotenv import load_dotenv
99

1010
from parea import Parea, get_current_trace_id, trace
11-
from parea.schemas import Completion, CompletionResponse, LLMInputs, Message, Role
11+
from parea.schemas import Completion, CompletionResponse, FeedbackRequest, LLMInputs, Message, Role
1212

1313
load_dotenv()
1414

@@ -101,10 +101,10 @@ def deployed_argument_chain_tags_metadata(query: str, additional_description: st
101101
additional_description="Provide a concise, few sentence argument on why coffee is good for you.",
102102
)
103103
print(json.dumps(asdict(result2), indent=2))
104-
# p.record_feedback(
105-
# FeedbackRequest(
106-
# trace_id=trace_id,
107-
# score=0.7, # 0.0 (bad) to 1.0 (good)
108-
# target="Coffee is wonderful. End of story.",
109-
# )
110-
# )
104+
p.record_feedback(
105+
FeedbackRequest(
106+
trace_id=trace_id,
107+
score=0.7, # 0.0 (bad) to 1.0 (good)
108+
target="Coffee is wonderful. End of story.",
109+
)
110+
)

parea/api_client.py

Lines changed: 65 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,17 @@
1-
from typing import Any, AsyncIterable, Callable, Dict, List, Optional
1+
from typing import Any, AsyncIterable, Dict, List, Optional
22

3-
import asyncio
43
import json
4+
import logging
55
import os
6-
import time
7-
from functools import wraps
86
from importlib import metadata as importlib_metadata
97

108
import httpx
119
from dotenv import load_dotenv
10+
from tenacity import retry, stop_after_attempt, wait_exponential
1211

1312
load_dotenv()
1413

15-
MAX_RETRIES = 8
16-
BACKOFF_FACTOR = 0.5
17-
18-
19-
def retry_on_502(func: Callable[..., Any]) -> Callable[..., Any]:
20-
"""
21-
A decorator to retry a function or coroutine on encountering a 502 error.
22-
Parameters:
23-
- func: The function or coroutine to be decorated.
24-
Returns:
25-
- A wrapper function that incorporates retry logic.
26-
"""
27-
28-
@wraps(func)
29-
async def async_wrapper(*args, **kwargs):
30-
for retry in range(MAX_RETRIES):
31-
try:
32-
return await func(*args, **kwargs)
33-
except httpx.HTTPError as e:
34-
if not _should_retry(e, retry):
35-
raise
36-
await asyncio.sleep(BACKOFF_FACTOR * (2**retry))
37-
38-
@wraps(func)
39-
def sync_wrapper(*args, **kwargs):
40-
for retry in range(MAX_RETRIES):
41-
try:
42-
return func(*args, **kwargs)
43-
except httpx.HTTPError as e:
44-
if not _should_retry(e, retry):
45-
raise
46-
time.sleep(BACKOFF_FACTOR * (2**retry))
47-
48-
def _should_retry(error, current_retry):
49-
"""Determines if the function should retry on error."""
50-
is_502_error = isinstance(error, httpx.HTTPStatusError) and error.response.status_code == 502
51-
is_last_retry = current_retry == MAX_RETRIES - 1
52-
return not is_last_retry and (isinstance(error, (httpx.ConnectError, httpx.ReadError, httpx.RemoteProtocolError)) or is_502_error)
53-
54-
if asyncio.iscoroutinefunction(func):
55-
return async_wrapper
56-
else:
57-
return sync_wrapper
14+
logger = logging.getLogger()
5815

5916

6017
class HTTPClient:
@@ -87,7 +44,7 @@ def _get_headers(self, api_key: Optional[str] = None) -> Dict[str, str]:
8744
headers["x-sdk-integrations"] = ",".join(self.integrations)
8845
return headers
8946

90-
@retry_on_502
47+
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=4, max=10))
9148
def request(
9249
self,
9350
method: str,
@@ -109,8 +66,20 @@ def request(
10966
# update the error message to include the validation errors
11067
e.args = (f"{e.args[0]}: {e.response.json()}",)
11168
raise
69+
except httpx.TimeoutException as e:
70+
logger.error(
71+
f"Timeout error for {e.request.method} {e.request.url}",
72+
extra={"request_data": data, "request_params": params},
73+
)
74+
raise
75+
except httpx.RequestError as e:
76+
logger.error(
77+
f"Request error for {e.request.method} {e.request.url}: {str(e)}",
78+
extra={"request_data": data, "request_params": params},
79+
)
80+
raise
11281

113-
@retry_on_502
82+
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=4, max=10))
11483
async def request_async(
11584
self,
11685
method: str,
@@ -128,10 +97,24 @@ async def request_async(
12897
response.raise_for_status()
12998
return response
13099
except httpx.HTTPStatusError as e:
131-
print(f"HTTP Error {e.response.status_code} for {e.request.url}: {e.response.text}")
100+
if e.response.status_code == 422:
101+
# update the error message to include the validation errors
102+
e.args = (f"{e.args[0]}: {e.response.json()}",)
103+
raise
104+
except httpx.TimeoutException as e:
105+
logger.error(
106+
f"Timeout error for {e.request.method} {e.request.url}",
107+
extra={"request_data": data, "request_params": params},
108+
)
109+
raise
110+
except httpx.RequestError as e:
111+
logger.error(
112+
f"Request error for {e.request.method} {e.request.url}: {str(e)}",
113+
extra={"request_data": data, "request_params": params},
114+
)
132115
raise
133116

134-
@retry_on_502
117+
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=4, max=10))
135118
def stream_request(
136119
self,
137120
method: str,
@@ -151,10 +134,24 @@ def stream_request(
151134
for chunk in response.iter_bytes(chunk_size):
152135
yield parse_event_data(chunk)
153136
except httpx.HTTPStatusError as e:
154-
print(f"HTTP Error {e.response.status_code} for {e.request.url}: {e.response.text}")
137+
if e.response.status_code == 422:
138+
# update the error message to include the validation errors
139+
e.args = (f"{e.args[0]}: {e.response.json()}",)
140+
raise
141+
except httpx.TimeoutException as e:
142+
logger.error(
143+
f"Timeout error for {e.request.method} {e.request.url}",
144+
extra={"request_data": data, "request_params": params},
145+
)
146+
raise
147+
except httpx.RequestError as e:
148+
logger.error(
149+
f"Request error for {e.request.method} {e.request.url}: {str(e)}",
150+
extra={"request_data": data, "request_params": params},
151+
)
155152
raise
156153

157-
@retry_on_502
154+
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=4, max=10))
158155
async def stream_request_async(
159156
self,
160157
method: str,
@@ -174,7 +171,21 @@ async def stream_request_async(
174171
async for chunk in response.aiter_bytes(chunk_size):
175172
yield parse_event_data(chunk)
176173
except httpx.HTTPStatusError as e:
177-
print(f"HTTP Error {e.response.status_code} for {e.request.url}: {e.response.text}")
174+
if e.response.status_code == 422:
175+
# update the error message to include the validation errors
176+
e.args = (f"{e.args[0]}: {e.response.json()}",)
177+
raise
178+
except httpx.TimeoutException as e:
179+
logger.error(
180+
f"Timeout error for {e.request.method} {e.request.url}",
181+
extra={"request_data": data, "request_params": params},
182+
)
183+
raise
184+
except httpx.RequestError as e:
185+
logger.error(
186+
f"Request error for {e.request.method} {e.request.url}: {str(e)}",
187+
extra={"request_data": data, "request_params": params},
188+
)
178189
raise
179190

180191
def close(self):

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
66
[tool.poetry]
77
name = "parea-ai"
88
packages = [{ include = "parea" }]
9-
version = "0.2.209"
9+
version = "0.2.210"
1010
description = "Parea python sdk"
1111
readme = "README.md"
1212
authors = ["joel-parea-ai <[email protected]>"]

0 commit comments

Comments
 (0)