Skip to content

Commit 89595f0

Browse files
committed
fix: add streaming response validation and error handling utilities
1 parent 051ab20 commit 89595f0

File tree

6 files changed

+156
-13
lines changed

6 files changed

+156
-13
lines changed

rest_old.py

Whitespace-only changes.

src/a2a/client/errors.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Custom exceptions for the A2A client."""
22

3+
from collections.abc import Mapping
4+
35
from a2a.types import JSONRPCErrorResponse
46

57

@@ -10,16 +12,27 @@ class A2AClientError(Exception):
1012
class A2AClientHTTPError(A2AClientError):
1113
"""Client exception for HTTP errors received from the server."""
1214

13-
def __init__(self, status_code: int, message: str):
15+
def __init__(
16+
self,
17+
status: int,
18+
message: str,
19+
body: str | None = None,
20+
headers: Mapping[str, str] | None = None,
21+
):
1422
"""Initializes the A2AClientHTTPError.
1523
1624
Args:
17-
status_code: The HTTP status code of the response.
25+
status: The HTTP status code of the response.
1826
message: A descriptive error message.
27+
body: The raw response body, if available.
28+
headers: The HTTP response headers.
1929
"""
20-
self.status_code = status_code
30+
self.status = status
31+
self.status_code = status
2132
self.message = message
22-
super().__init__(f'HTTP Error {status_code}: {message}')
33+
self.body = body
34+
self.headers = dict(headers or {})
35+
super().__init__(f'HTTP {status} - {message}')
2336

2437

2538
class A2AClientJSONError(A2AClientError):
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
"""Shared helpers for handling streaming HTTP responses."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
7+
from typing import Any
8+
9+
import httpx # noqa: TC002
10+
11+
from httpx_sse import EventSource # noqa: TC002
12+
13+
from a2a.client.errors import A2AClientHTTPError
14+
15+
16+
SUCCESS_STATUS_MIN = 200
17+
SUCCESS_STATUS_MAX = 300
18+
19+
20+
async def ensure_streaming_response(event_source: EventSource) -> None:
21+
"""Validate the initial streaming response before attempting SSE parsing."""
22+
response = event_source.response
23+
if not SUCCESS_STATUS_MIN <= response.status_code < SUCCESS_STATUS_MAX:
24+
error = await _build_http_error(response)
25+
raise error
26+
27+
if not _has_event_stream_content_type(response):
28+
error = await _build_content_type_error(response)
29+
raise error
30+
31+
32+
async def _build_http_error(response: httpx.Response) -> A2AClientHTTPError:
33+
body_text = await _read_body(response)
34+
json_payload: Any | None
35+
try:
36+
json_payload = response.json()
37+
except (json.JSONDecodeError, ValueError):
38+
json_payload = None
39+
40+
message = _extract_message(response, json_payload, body_text)
41+
return A2AClientHTTPError(
42+
response.status_code,
43+
message,
44+
body=body_text,
45+
headers=dict(response.headers),
46+
)
47+
48+
49+
async def _build_content_type_error(
50+
response: httpx.Response,
51+
) -> A2AClientHTTPError:
52+
body_text = await _read_body(response)
53+
content_type = response.headers.get('content-type', None)
54+
descriptor = content_type or 'missing'
55+
message = f'Unexpected Content-Type {descriptor!r} for streaming response'
56+
return A2AClientHTTPError(
57+
response.status_code,
58+
message,
59+
body=body_text,
60+
headers=dict(response.headers),
61+
)
62+
63+
64+
async def _read_body(response: httpx.Response) -> str | None:
65+
await response.aread()
66+
text = response.text
67+
return text if text else None
68+
69+
70+
def _extract_message(
71+
response: httpx.Response,
72+
json_payload: Any | None,
73+
body_text: str | None,
74+
) -> str:
75+
message: str | None = None
76+
if isinstance(json_payload, dict):
77+
title = _coerce_str(json_payload.get('title'))
78+
detail = _coerce_str(json_payload.get('detail'))
79+
if title and detail:
80+
message = f'{title}: {detail}'
81+
else:
82+
for key in ('message', 'detail', 'error', 'title'):
83+
value = _coerce_str(json_payload.get(key))
84+
if value:
85+
message = value
86+
break
87+
elif isinstance(json_payload, list):
88+
# Some APIs return a list of error descriptions—prefer the first string entry.
89+
for item in json_payload:
90+
value = _coerce_str(item)
91+
if value:
92+
message = value
93+
break
94+
95+
if not message and body_text:
96+
stripped = body_text.strip()
97+
if stripped:
98+
message = stripped
99+
100+
if not message:
101+
reason = getattr(response, 'reason_phrase', '') or ''
102+
message = reason or 'HTTP error'
103+
104+
return message
105+
106+
107+
def _coerce_str(value: Any) -> str | None:
108+
if isinstance(value, str):
109+
stripped = value.strip()
110+
return stripped or None
111+
return None
112+
113+
114+
def _has_event_stream_content_type(response: httpx.Response) -> bool:
115+
content_type = response.headers.get('content-type', '')
116+
return 'text/event-stream' in content_type.lower()

src/a2a/client/transports/jsonrpc.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
A2AClientTimeoutError,
1818
)
1919
from a2a.client.middleware import ClientCallContext, ClientCallInterceptor
20+
from a2a.client.transports._streaming_utils import ensure_streaming_response
2021
from a2a.client.transports.base import ClientTransport
2122
from a2a.types import (
2223
AgentCard,
@@ -161,23 +162,30 @@ async def send_message_streaming(
161162
json=payload,
162163
**modified_kwargs,
163164
) as event_source:
165+
http_response = event_source.response
164166
try:
167+
await ensure_streaming_response(event_source)
165168
async for sse in event_source.aiter_sse():
166-
response = SendStreamingMessageResponse.model_validate(
167-
json.loads(sse.data)
169+
stream_response = (
170+
SendStreamingMessageResponse.model_validate(
171+
json.loads(sse.data)
172+
)
168173
)
169-
if isinstance(response.root, JSONRPCErrorResponse):
170-
raise A2AClientJSONRPCError(response.root)
171-
yield response.root.result
174+
if isinstance(stream_response.root, JSONRPCErrorResponse):
175+
raise A2AClientJSONRPCError(stream_response.root)
176+
yield stream_response.root.result
172177
except SSEError as e:
173178
raise A2AClientHTTPError(
174-
400, f'Invalid SSE response or protocol error: {e}'
179+
http_response.status_code,
180+
f'Invalid SSE response or protocol error: {e}',
181+
headers=dict(http_response.headers),
175182
) from e
176183
except json.JSONDecodeError as e:
177184
raise A2AClientJSONError(str(e)) from e
178185
except httpx.RequestError as e:
179186
raise A2AClientHTTPError(
180-
503, f'Network communication error: {e}'
187+
503,
188+
f'Network communication error: {e}',
181189
) from e
182190

183191
async def _send_request(

src/a2a/client/transports/rest.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from a2a.client.card_resolver import A2ACardResolver
1313
from a2a.client.errors import A2AClientHTTPError, A2AClientJSONError
1414
from a2a.client.middleware import ClientCallContext, ClientCallInterceptor
15+
from a2a.client.transports._streaming_utils import ensure_streaming_response
1516
from a2a.client.transports.base import ClientTransport
1617
from a2a.grpc import a2a_pb2
1718
from a2a.types import (
@@ -139,20 +140,25 @@ async def send_message_streaming(
139140
json=payload,
140141
**modified_kwargs,
141142
) as event_source:
143+
http_response = event_source.response
142144
try:
145+
await ensure_streaming_response(event_source)
143146
async for sse in event_source.aiter_sse():
144147
event = a2a_pb2.StreamResponse()
145148
Parse(sse.data, event)
146149
yield proto_utils.FromProto.stream_response(event)
147150
except SSEError as e:
148151
raise A2AClientHTTPError(
149-
400, f'Invalid SSE response or protocol error: {e}'
152+
http_response.status_code,
153+
f'Invalid SSE response or protocol error: {e}',
154+
headers=dict(http_response.headers),
150155
) from e
151156
except json.JSONDecodeError as e:
152157
raise A2AClientJSONError(str(e)) from e
153158
except httpx.RequestError as e:
154159
raise A2AClientHTTPError(
155-
503, f'Network communication error: {e}'
160+
503,
161+
f'Network communication error: {e}',
156162
) from e
157163

158164
async def _send_request(self, request: httpx.Request) -> dict[str, Any]:

tmp_old_jsonrpc.py

Whitespace-only changes.

0 commit comments

Comments
 (0)