Skip to content

Commit af4cef3

Browse files
authored
Realtime: forward exceptions from transport layer (#1107)
The model listens to websocket events in a separate task, and the session receives those events in on_event. So if there's e.g. a websocket or json exception, it's silently dropped. Similar to the streaming text agents, we should: - capture exceptions separately - raise them in `async for event in session` loop. Added tests to verify. --- [//]: # (BEGIN SAPLING FOOTER) * #1112 * #1111 * __->__ #1107 * #1106
1 parent bd988dc commit af4cef3

File tree

6 files changed

+438
-62
lines changed

6 files changed

+438
-62
lines changed

src/agents/realtime/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
class RealtimeClientMessage(TypedDict):
3030
type: str # explicitly required
3131
other_data: NotRequired[dict[str, Any]]
32+
"""Merged into the message body."""
3233

3334

3435
class RealtimeUserInputText(TypedDict):

src/agents/realtime/model_events.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ class RealtimeModelOtherEvent:
130130
type: Literal["other"] = "other"
131131

132132

133+
@dataclass
134+
class RealtimeModelExceptionEvent:
135+
"""Exception occurred during model operation."""
136+
137+
exception: Exception
138+
context: str | None = None
139+
140+
type: Literal["exception"] = "exception"
141+
142+
133143
# TODO (rm) Add usage events
134144

135145

@@ -147,4 +157,5 @@ class RealtimeModelOtherEvent:
147157
RealtimeModelTurnStartedEvent,
148158
RealtimeModelTurnEndedEvent,
149159
RealtimeModelOtherEvent,
160+
RealtimeModelExceptionEvent,
150161
]

src/agents/realtime/openai_realtime.py

Lines changed: 87 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
RealtimeModelAudioInterruptedEvent,
4040
RealtimeModelErrorEvent,
4141
RealtimeModelEvent,
42+
RealtimeModelExceptionEvent,
4243
RealtimeModelInputAudioTranscriptionCompletedEvent,
4344
RealtimeModelItemDeletedEvent,
4445
RealtimeModelItemUpdatedEvent,
@@ -130,48 +131,84 @@ async def _listen_for_messages(self):
130131

131132
try:
132133
async for message in self._websocket:
133-
parsed = json.loads(message)
134-
await self._handle_ws_event(parsed)
134+
try:
135+
parsed = json.loads(message)
136+
await self._handle_ws_event(parsed)
137+
except json.JSONDecodeError as e:
138+
await self._emit_event(
139+
RealtimeModelExceptionEvent(
140+
exception=e, context="Failed to parse WebSocket message as JSON"
141+
)
142+
)
143+
except Exception as e:
144+
await self._emit_event(
145+
RealtimeModelExceptionEvent(
146+
exception=e, context="Error handling WebSocket event"
147+
)
148+
)
135149

136-
except websockets.exceptions.ConnectionClosed:
137-
# TODO connection closed handling (event, cleanup)
138-
logger.warning("WebSocket connection closed")
150+
except websockets.exceptions.ConnectionClosedOK:
151+
# Normal connection closure - no exception event needed
152+
logger.info("WebSocket connection closed normally")
153+
except websockets.exceptions.ConnectionClosed as e:
154+
await self._emit_event(
155+
RealtimeModelExceptionEvent(
156+
exception=e, context="WebSocket connection closed unexpectedly"
157+
)
158+
)
139159
except Exception as e:
140-
logger.error(f"WebSocket error: {e}")
160+
await self._emit_event(
161+
RealtimeModelExceptionEvent(
162+
exception=e, context="WebSocket error in message listener"
163+
)
164+
)
141165

142166
async def send_event(self, event: RealtimeClientMessage) -> None:
143167
"""Send an event to the model."""
144168
assert self._websocket is not None, "Not connected"
145-
converted_event = {
146-
"type": event["type"],
147-
}
148169

149-
converted_event.update(event.get("other_data", {}))
170+
try:
171+
converted_event = {
172+
"type": event["type"],
173+
}
150174

151-
await self._websocket.send(json.dumps(converted_event))
175+
converted_event.update(event.get("other_data", {}))
176+
177+
await self._websocket.send(json.dumps(converted_event))
178+
except Exception as e:
179+
await self._emit_event(
180+
RealtimeModelExceptionEvent(
181+
exception=e, context=f"Failed to send event: {event.get('type', 'unknown')}"
182+
)
183+
)
152184

153185
async def send_message(
154186
self, message: RealtimeUserInput, other_event_data: dict[str, Any] | None = None
155187
) -> None:
156188
"""Send a message to the model."""
157-
message = (
158-
message
159-
if isinstance(message, dict)
160-
else {
161-
"type": "message",
162-
"role": "user",
163-
"content": [{"type": "input_text", "text": message}],
189+
try:
190+
message = (
191+
message
192+
if isinstance(message, dict)
193+
else {
194+
"type": "message",
195+
"role": "user",
196+
"content": [{"type": "input_text", "text": message}],
197+
}
198+
)
199+
other_data = {
200+
"item": message,
164201
}
165-
)
166-
other_data = {
167-
"item": message,
168-
}
169-
if other_event_data:
170-
other_data.update(other_event_data)
202+
if other_event_data:
203+
other_data.update(other_event_data)
171204

172-
await self.send_event({"type": "conversation.item.create", "other_data": other_data})
205+
await self.send_event({"type": "conversation.item.create", "other_data": other_data})
173206

174-
await self.send_event({"type": "response.create"})
207+
await self.send_event({"type": "response.create"})
208+
except Exception as e:
209+
await self._emit_event(
210+
RealtimeModelExceptionEvent(exception=e, context="Failed to send message")
211+
)
175212

176213
async def send_audio(self, audio: bytes, *, commit: bool = False) -> None:
177214
"""Send a raw audio chunk to the model.
@@ -182,17 +219,23 @@ async def send_audio(self, audio: bytes, *, commit: bool = False) -> None:
182219
detection, this can be used to indicate the turn is completed.
183220
"""
184221
assert self._websocket is not None, "Not connected"
185-
base64_audio = base64.b64encode(audio).decode("utf-8")
186-
await self.send_event(
187-
{
188-
"type": "input_audio_buffer.append",
189-
"other_data": {
190-
"audio": base64_audio,
191-
},
192-
}
193-
)
194-
if commit:
195-
await self.send_event({"type": "input_audio_buffer.commit"})
222+
223+
try:
224+
base64_audio = base64.b64encode(audio).decode("utf-8")
225+
await self.send_event(
226+
{
227+
"type": "input_audio_buffer.append",
228+
"other_data": {
229+
"audio": base64_audio,
230+
},
231+
}
232+
)
233+
if commit:
234+
await self.send_event({"type": "input_audio_buffer.commit"})
235+
except Exception as e:
236+
await self._emit_event(
237+
RealtimeModelExceptionEvent(exception=e, context="Failed to send audio")
238+
)
196239

197240
async def send_tool_output(
198241
self, tool_call: RealtimeModelToolCallEvent, output: str, start_response: bool
@@ -342,8 +385,13 @@ async def _handle_ws_event(self, event: dict[str, Any]):
342385
OpenAIRealtimeServerEvent
343386
).validate_python(event)
344387
except Exception as e:
345-
logger.error(f"Invalid event: {event} - {e}")
346-
# await self._emit_event(RealtimeModelErrorEvent(error=f"Invalid event: {event} - {e}"))
388+
event_type = event.get("type", "unknown") if isinstance(event, dict) else "unknown"
389+
await self._emit_event(
390+
RealtimeModelExceptionEvent(
391+
exception=e,
392+
context=f"Failed to validate server event: {event_type}",
393+
)
394+
)
347395
return
348396

349397
if parsed.type == "response.audio.delta":

src/agents/realtime/session.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def __init__(
8484
self._run_config = run_config or {}
8585
self._event_queue: asyncio.Queue[RealtimeSessionEvent] = asyncio.Queue()
8686
self._closed = False
87+
self._stored_exception: Exception | None = None
8788

8889
# Guardrails state tracking
8990
self._interrupted_by_guardrail = False
@@ -130,17 +131,20 @@ async def __aiter__(self) -> AsyncIterator[RealtimeSessionEvent]:
130131
"""Iterate over events from the session."""
131132
while not self._closed:
132133
try:
134+
# Check if there's a stored exception to raise
135+
if self._stored_exception is not None:
136+
# Clean up resources before raising
137+
await self._cleanup()
138+
raise self._stored_exception
139+
133140
event = await self._event_queue.get()
134141
yield event
135142
except asyncio.CancelledError:
136143
break
137144

138145
async def close(self) -> None:
139146
"""Close the session."""
140-
self._closed = True
141-
self._cleanup_guardrail_tasks()
142-
self._model.remove_listener(self)
143-
await self._model.close()
147+
await self._cleanup()
144148

145149
async def send_message(self, message: RealtimeUserInput) -> None:
146150
"""Send a message to the model."""
@@ -228,6 +232,9 @@ async def on_event(self, event: RealtimeModelEvent) -> None:
228232
info=self._event_info,
229233
)
230234
)
235+
elif event.type == "exception":
236+
# Store the exception to be raised in __aiter__
237+
self._stored_exception = event.exception
231238
elif event.type == "other":
232239
pass
233240
else:
@@ -403,3 +410,17 @@ def _cleanup_guardrail_tasks(self) -> None:
403410
if not task.done():
404411
task.cancel()
405412
self._guardrail_tasks.clear()
413+
414+
async def _cleanup(self) -> None:
415+
"""Clean up all resources and mark session as closed."""
416+
# Cancel and cleanup guardrail tasks
417+
self._cleanup_guardrail_tasks()
418+
419+
# Remove ourselves as a listener
420+
self._model.remove_listener(self)
421+
422+
# Close the model connection
423+
await self._model.close()
424+
425+
# Mark as closed
426+
self._closed = True

tests/realtime/test_openai_realtime.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -174,39 +174,34 @@ class TestEventHandlingRobustness(TestOpenAIRealtimeWebSocketModel):
174174

175175
@pytest.mark.asyncio
176176
async def test_handle_malformed_json_logs_error_continues(self, model):
177-
"""Test that malformed JSON is logged as error but doesn't crash."""
177+
"""Test that malformed JSON emits exception event but doesn't crash."""
178178
mock_listener = AsyncMock()
179179
model.add_listener(mock_listener)
180180

181181
# Malformed JSON should not crash the handler
182-
with patch("agents.realtime.openai_realtime.logger") as mock_logger:
183-
await model._handle_ws_event("invalid json {")
182+
await model._handle_ws_event("invalid json {")
184183

185-
# Should log error but not crash
186-
mock_logger.error.assert_called_once()
187-
assert "Invalid event" in mock_logger.error.call_args[0][0]
188-
189-
# Should not emit any events to listeners
190-
mock_listener.on_event.assert_not_called()
184+
# Should emit exception event to listeners
185+
mock_listener.on_event.assert_called_once()
186+
exception_event = mock_listener.on_event.call_args[0][0]
187+
assert exception_event.type == "exception"
188+
assert "Failed to validate server event: unknown" in exception_event.context
191189

192190
@pytest.mark.asyncio
193191
async def test_handle_invalid_event_schema_logs_error(self, model):
194-
"""Test that events with invalid schema are logged but don't crash."""
192+
"""Test that events with invalid schema emit exception events but don't crash."""
195193
mock_listener = AsyncMock()
196194
model.add_listener(mock_listener)
197195

198196
invalid_event = {"type": "response.audio.delta"} # Missing required fields
199197

200-
with patch("agents.realtime.openai_realtime.logger") as mock_logger:
201-
await model._handle_ws_event(invalid_event)
198+
await model._handle_ws_event(invalid_event)
202199

203-
# Should log validation error
204-
mock_logger.error.assert_called_once()
205-
error_msg = mock_logger.error.call_args[0][0]
206-
assert "Invalid event" in error_msg
207-
208-
# Should not emit events to listeners
209-
mock_listener.on_event.assert_not_called()
200+
# Should emit exception event to listeners
201+
mock_listener.on_event.assert_called_once()
202+
exception_event = mock_listener.on_event.call_args[0][0]
203+
assert exception_event.type == "exception"
204+
assert "Failed to validate server event: response.audio.delta" in exception_event.context
210205

211206
@pytest.mark.asyncio
212207
async def test_handle_unknown_event_type_ignored(self, model):

0 commit comments

Comments
 (0)