Skip to content

Commit a20fbb8

Browse files
author
Mateusz
committed
feat: Enhance connector resilience and error handling
- Improve Gemini connector error handling in _accumulate_streaming_response for non-streaming requests where backend errors weren't properly propagated - Add Cline connector support for unwrapping 'data' envelope in non-streaming responses - Fix Anthropic converter handling when usage is None to prevent AttributeError - Enhance OpenAI connector debug logging for non-streaming response troubleshooting - Improve response parser to serialize responses when 'choices' key is missing instead of returning empty content (affects embeddings API and similar) - Add comprehensive tests for error handling scenarios and edge cases
1 parent 104430b commit a20fbb8

File tree

9 files changed

+1047
-500
lines changed

9 files changed

+1047
-500
lines changed

src/anthropic_converters.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ def openai_to_anthropic_response(openai_response: Any) -> dict[str, Any]:
256256
# No choices and no explicit error - produce a message indicating
257257
# empty response. Use a clear message instead of empty string to
258258
# help debugging and prevent silent failures.
259-
usage = oai_dict.get("usage", {})
259+
usage = oai_dict.get("usage") or {}
260260
response = {
261261
"id": oai_dict.get("id", "msg_unk"),
262262
"type": "message",
@@ -282,7 +282,7 @@ def openai_to_anthropic_response(openai_response: Any) -> dict[str, Any]:
282282
choice = choices[0]
283283
message = choice.get("message", {})
284284
content_blocks = _build_content_blocks(choice, message)
285-
usage = oai_dict.get("usage", {})
285+
usage = oai_dict.get("usage") or {}
286286

287287
# Map finish_reason to stop_reason
288288
finish_reason = choice.get("finish_reason")

src/connectors/cline.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,98 @@ async def initialize(self, **kwargs: Any) -> None:
140140

141141
await super().initialize(**passthrough)
142142

143+
def _unwrap_cline_data_envelope(
144+
self, response_json: dict[str, Any]
145+
) -> dict[str, Any]:
146+
"""
147+
Unwrap Cline's non-standard 'data' envelope from responses.
148+
149+
Cline API wraps OpenAI-format responses in a 'data' key for non-streaming
150+
requests. This method extracts the inner response to normalize it to
151+
standard OpenAI format that the rest of the pipeline expects.
152+
"""
153+
data_val = response_json.get("data")
154+
if isinstance(data_val, dict):
155+
# Only unwrap if the inner dict looks like a valid OpenAI response
156+
if "choices" in data_val or "id" in data_val or "model" in data_val:
157+
logger.debug(
158+
"Unwrapping Cline 'data' envelope - found keys: %s",
159+
list(data_val.keys())[:5],
160+
)
161+
return data_val
162+
return response_json
163+
164+
async def _handle_non_streaming_response(
165+
self,
166+
url: str,
167+
payload: dict[str, Any],
168+
headers: dict[str, str] | None,
169+
session_id: str,
170+
) -> ResponseEnvelope:
171+
"""
172+
Override to handle Cline's non-standard response format.
173+
174+
Cline wraps responses in a 'data' envelope for non-streaming requests.
175+
We unwrap this before passing to the parent handler.
176+
"""
177+
from src.core.common.exceptions import ServiceUnavailableError
178+
from src.core.security.loop_prevention import ensure_loop_guard_header
179+
180+
if not headers or not headers.get("Authorization"):
181+
raise AuthenticationError(message="No auth credentials found")
182+
183+
guarded_headers = ensure_loop_guard_header(headers)
184+
185+
try:
186+
response = await self.client.post(
187+
url, json=payload, headers=guarded_headers
188+
)
189+
except httpx.RequestError as e:
190+
logger.error(f"Cline request failed to {url}. Error: {e}")
191+
raise ServiceUnavailableError(
192+
message=f"Could not connect to Cline backend ({e})"
193+
)
194+
195+
if int(response.status_code) >= 400:
196+
try:
197+
err = response.json()
198+
except Exception:
199+
err = response.text
200+
raise HTTPException(status_code=response.status_code, detail=err)
201+
202+
response_json = response.json()
203+
204+
# Unwrap Cline's non-standard 'data' envelope
205+
response_json = self._unwrap_cline_data_envelope(response_json)
206+
207+
# Debug log for troubleshooting
208+
if logger.isEnabledFor(logging.DEBUG):
209+
choices_count = len(response_json.get("choices", []))
210+
response_id = response_json.get("id", "unknown")
211+
response_model = response_json.get("model", "unknown")
212+
logger.debug(
213+
"Cline non-streaming response: id=%s model=%s choices_count=%d",
214+
response_id,
215+
response_model,
216+
choices_count,
217+
)
218+
219+
domain_response = self.translation_service.to_domain_response(
220+
response_json, "openai"
221+
)
222+
223+
try:
224+
response_headers = dict(response.headers)
225+
except Exception:
226+
response_headers = {}
227+
228+
return ResponseEnvelope(
229+
content=domain_response.model_dump(),
230+
status_code=response.status_code,
231+
headers=response_headers,
232+
usage=domain_response.usage,
233+
)
234+
143235
async def chat_completions(
144236
self,
145237
request_data: DomainModel | InternalDTO | dict[str, Any],

src/connectors/gemini_base/connector.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1719,10 +1719,21 @@ async def _accumulate_streaming_response(
17191719
finish_reason: str | None = None
17201720
usage_data: dict[str, int] | None = None
17211721
accumulated_reasoning: str = ""
1722+
error_data: dict[str, Any] | None = None
17221723

17231724
def _process_openai_chunk(data: dict[str, Any]) -> None:
17241725
"""Process an OpenAI-style chunk and accumulate content."""
1725-
nonlocal accumulated_content, finish_reason, usage_data, accumulated_reasoning
1726+
nonlocal accumulated_content, finish_reason, usage_data, accumulated_reasoning, error_data
1727+
1728+
# Check for error in the chunk - this is critical for non-streaming
1729+
# requests where backend errors need to be properly propagated
1730+
if data.get("error"):
1731+
error_data = data.get("error")
1732+
# Also capture the finish_reason if present (usually "error")
1733+
choices = data.get("choices", [])
1734+
if choices and choices[0].get("finish_reason"):
1735+
finish_reason = choices[0]["finish_reason"]
1736+
return
17261737

17271738
choices = data.get("choices", [])
17281739
if choices:
@@ -1830,6 +1841,42 @@ def _process_openai_chunk(data: dict[str, Any]) -> None:
18301841

18311842
except Exception as e:
18321843
logger.warning(f"Error accumulating streaming response: {e}", exc_info=True)
1844+
# Capture the exception as an error to propagate to the client
1845+
if error_data is None:
1846+
error_data = {
1847+
"message": f"Error processing response: {e}",
1848+
"type": "internal_error",
1849+
"code": 500,
1850+
}
1851+
1852+
# If an error was encountered during streaming, return an error response
1853+
# This is critical for non-streaming requests where the client waits for
1854+
# a complete response and needs to know about backend failures
1855+
if error_data:
1856+
error_status_code = error_data.get("code", 500)
1857+
if isinstance(error_status_code, str):
1858+
try:
1859+
error_status_code = int(error_status_code)
1860+
except ValueError:
1861+
error_status_code = 500
1862+
1863+
error_response: dict[str, Any] = {
1864+
"id": f"chatcmpl-error-{uuid.uuid4().hex[:8]}",
1865+
"object": "chat.completion",
1866+
"created": int(time.time()),
1867+
"model": getattr(self, "backend_type", "gemini"),
1868+
"choices": [],
1869+
"error": error_data,
1870+
}
1871+
logger.warning(
1872+
f"Returning error response for non-streaming request: {error_data.get('message', 'Unknown error')}"
1873+
)
1874+
return ResponseEnvelope(
1875+
content=error_response,
1876+
headers=streaming_response.headers or {},
1877+
status_code=error_status_code,
1878+
usage=None,
1879+
)
18331880

18341881
# Build OpenAI-style response
18351882
message_content: dict[str, Any] = {

src/connectors/openai.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,8 +632,27 @@ async def _handle_non_streaming_response(
632632
err = response.text
633633
raise HTTPException(status_code=response.status_code, detail=err)
634634

635+
response_json = response.json()
636+
# Debug log raw response for non-streaming requests to help diagnose
637+
# translation issues (e.g., Claude Code via Anthropic frontend)
638+
if logger.isEnabledFor(logging.DEBUG):
639+
choices_count = len(response_json.get("choices", []))
640+
response_id = response_json.get("id", "unknown")
641+
response_model = response_json.get("model", "unknown")
642+
logger.debug(
643+
"Non-streaming response from backend: id=%s model=%s choices_count=%d",
644+
response_id,
645+
response_model,
646+
choices_count,
647+
)
648+
if choices_count == 0:
649+
logger.debug(
650+
"Empty choices in non-streaming response - raw response: %s",
651+
str(response_json)[:500],
652+
)
653+
635654
domain_response = self.translation_service.to_domain_response(
636-
response.json(), "openai"
655+
response_json, "openai"
637656
)
638657
# Some tests use mocks that set response.headers to AsyncMock or
639658
# other non-dict types; defensively coerce to a dict and fall back

0 commit comments

Comments
 (0)