From d713fdde14f5cf9275c0f35d1cfb26603977f96a Mon Sep 17 00:00:00 2001 From: Ali Gokalp Peker Date: Sun, 28 Dec 2025 17:43:35 +0000 Subject: [PATCH 1/8] Convert tool execution exception to runtime error, fixes #2204 --- src/agents/realtime/session.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/agents/realtime/session.py b/src/agents/realtime/session.py index a3cd1d3ea8..d44d363ec1 100644 --- a/src/agents/realtime/session.py +++ b/src/agents/realtime/session.py @@ -493,7 +493,12 @@ async def _handle_tool_call( ) ) else: - raise ModelBehaviorError(f"Tool {event.name} not found") + await self._put_event( + RealtimeError( + info=self._event_info, + error={"message": f"Tool {event.name} not found"}, + ) + ) @classmethod def _get_new_history( From 4c4bb2443a653cb7f80904541f97c719b87cc2cb Mon Sep 17 00:00:00 2001 From: Ali Gokalp Peker Date: Mon, 29 Dec 2025 10:34:47 +0000 Subject: [PATCH 2/8] Implement websocket connection configuration --- src/agents/realtime/model.py | 29 ++++++++++++++++ src/agents/realtime/openai_realtime.py | 47 +++++++++++++++++++++++--- 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/src/agents/realtime/model.py b/src/agents/realtime/model.py index c207878cdf..870bb435bc 100644 --- a/src/agents/realtime/model.py +++ b/src/agents/realtime/model.py @@ -1,6 +1,8 @@ from __future__ import annotations import abc +import socket +from collections.abc import Awaitable from typing import Callable from typing_extensions import NotRequired, TypedDict @@ -104,6 +106,30 @@ async def on_event(self, event: RealtimeModelEvent) -> None: pass +SocketFactory = Callable[[], Awaitable[socket.socket]] +"""An async function that returns a configured `socket.socket`. +Used for setting low-level TCP Keepalive options or proxy configurations.""" + + +class TransportConfig(TypedDict): + """Low-level network transport configuration.""" + + ping_interval: NotRequired[float | None] + """Time in seconds between keepalive pings sent by the client. + Default is usually 20.0. Set to None to disable.""" + + ping_timeout: NotRequired[float | None] + """Time in seconds to wait for a pong response before disconnecting. + Set to None to enable 'Zombie Mode' (ignore network lag).""" + + connect_timeout: NotRequired[float] + """Time in seconds to wait for the connection handshake to complete.""" + + socket_factory: NotRequired[SocketFactory] + """An async function that returns a configured `socket.socket`. + Used for setting low-level TCP Keepalive options or proxy configurations.""" + + class RealtimeModelConfig(TypedDict): """Options for connecting to a realtime model.""" @@ -146,6 +172,9 @@ class RealtimeModelConfig(TypedDict): model name. This is used for SIP-originated calls that are accepted via the Realtime Calls API. """ + transport: NotRequired[TransportConfig] + """Low-level network transport configuration for timeouts and TCP socket configuration.""" + class RealtimeModel(abc.ABC): """Interface for connecting to a realtime model and sending/receiving events.""" diff --git a/src/agents/realtime/openai_realtime.py b/src/agents/realtime/openai_realtime.py index dedbb01952..c740f9ccbb 100644 --- a/src/agents/realtime/openai_realtime.py +++ b/src/agents/realtime/openai_realtime.py @@ -103,6 +103,7 @@ RealtimeModelListener, RealtimePlaybackState, RealtimePlaybackTracker, + TransportConfig, ) from .model_events import ( RealtimeModelAudioDoneEvent, @@ -244,15 +245,51 @@ async def connect(self, options: RealtimeModelConfig) -> None: raise UserError("API key is required but was not provided.") headers.update({"Authorization": f"Bearer {api_key}"}) - self._websocket = await websockets.connect( - url, - user_agent_header=_USER_AGENT, - additional_headers=headers, - max_size=None, # Allow any size of message + + self._websocket = await self._create_websocket_connection( + url=url, + headers=headers, + transport_config=options.get("transport"), ) self._websocket_task = asyncio.create_task(self._listen_for_messages()) await self._update_session_config(model_settings) + async def _create_websocket_connection( + self, + url: str, + headers: dict[str, str], + transport_config: TransportConfig | None = None, + ) -> ClientConnection: + """Create a WebSocket connection with the given configuration. + + Args: + url: The WebSocket URL to connect to. + headers: HTTP headers to include in the connection request. + transport_config: Optional low-level transport configuration. + + Returns: + A connected WebSocket client connection. + """ + connect_kwargs: dict[str, Any] = { + "user_agent_header": _USER_AGENT, + "additional_headers": headers, + "max_size": None, # Allow any size of message + } + + if transport_config: + if "ping_interval" in transport_config: + connect_kwargs["ping_interval"] = transport_config["ping_interval"] + if "ping_timeout" in transport_config: + connect_kwargs["ping_timeout"] = transport_config["ping_timeout"] + if "connect_timeout" in transport_config: + connect_kwargs["open_timeout"] = transport_config["connect_timeout"] + if "socket_factory" in transport_config: + socket_factory = transport_config["socket_factory"] + sock = await socket_factory() + connect_kwargs["sock"] = sock + + return await websockets.connect(url, **connect_kwargs) + async def _send_tracing_config( self, tracing_config: RealtimeModelTracingConfig | Literal["auto"] | None ) -> None: From 4dbe17676b7c2a5e06f8b8864cb3bd204449e674 Mon Sep 17 00:00:00 2001 From: Ali Gokalp Peker Date: Mon, 29 Dec 2025 14:08:23 +0000 Subject: [PATCH 3/8] Eliminate sock factory, it is not required. --- src/agents/realtime/model.py | 9 --------- src/agents/realtime/openai_realtime.py | 4 ---- 2 files changed, 13 deletions(-) diff --git a/src/agents/realtime/model.py b/src/agents/realtime/model.py index 870bb435bc..c373528538 100644 --- a/src/agents/realtime/model.py +++ b/src/agents/realtime/model.py @@ -106,11 +106,6 @@ async def on_event(self, event: RealtimeModelEvent) -> None: pass -SocketFactory = Callable[[], Awaitable[socket.socket]] -"""An async function that returns a configured `socket.socket`. -Used for setting low-level TCP Keepalive options or proxy configurations.""" - - class TransportConfig(TypedDict): """Low-level network transport configuration.""" @@ -125,10 +120,6 @@ class TransportConfig(TypedDict): connect_timeout: NotRequired[float] """Time in seconds to wait for the connection handshake to complete.""" - socket_factory: NotRequired[SocketFactory] - """An async function that returns a configured `socket.socket`. - Used for setting low-level TCP Keepalive options or proxy configurations.""" - class RealtimeModelConfig(TypedDict): """Options for connecting to a realtime model.""" diff --git a/src/agents/realtime/openai_realtime.py b/src/agents/realtime/openai_realtime.py index c740f9ccbb..ed587502e5 100644 --- a/src/agents/realtime/openai_realtime.py +++ b/src/agents/realtime/openai_realtime.py @@ -283,10 +283,6 @@ async def _create_websocket_connection( connect_kwargs["ping_timeout"] = transport_config["ping_timeout"] if "connect_timeout" in transport_config: connect_kwargs["open_timeout"] = transport_config["connect_timeout"] - if "socket_factory" in transport_config: - socket_factory = transport_config["socket_factory"] - sock = await socket_factory() - connect_kwargs["sock"] = sock return await websockets.connect(url, **connect_kwargs) From 717225deca11e93b15efc6efd58891ebe81e284b Mon Sep 17 00:00:00 2001 From: Ali Gokalp Peker Date: Mon, 5 Jan 2026 19:09:46 +0000 Subject: [PATCH 4/8] Remove unused import --- src/agents/realtime/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/agents/realtime/session.py b/src/agents/realtime/session.py index d44d363ec1..941acdf09c 100644 --- a/src/agents/realtime/session.py +++ b/src/agents/realtime/session.py @@ -8,7 +8,7 @@ from typing_extensions import assert_never from ..agent import Agent -from ..exceptions import ModelBehaviorError, UserError +from ..exceptions import UserError from ..handoffs import Handoff from ..logger import logger from ..run_context import RunContextWrapper, TContext From ef0c19f11c0a4d967a8bf6b14b282989b655c4af Mon Sep 17 00:00:00 2001 From: Ali Gokalp Peker Date: Tue, 6 Jan 2026 05:52:14 +0000 Subject: [PATCH 5/8] Fix unit tests. --- tests/realtime/test_session.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/realtime/test_session.py b/tests/realtime/test_session.py index 775e5418fe..47d3160600 100644 --- a/tests/realtime/test_session.py +++ b/tests/realtime/test_session.py @@ -1067,11 +1067,7 @@ async def test_handoff_tool_handling(self, mock_model): @pytest.mark.asyncio async def test_unknown_tool_handling(self, mock_model, mock_agent, mock_function_tool): - """Test that unknown tools raise an error""" - import pytest - - from agents.exceptions import ModelBehaviorError - + """Test that unknown tools emit a RealtimeError event""" # Set up agent to return different tool than what's called mock_function_tool.name = "known_tool" mock_agent.get_all_tools.return_value = [mock_function_tool] @@ -1083,9 +1079,14 @@ async def test_unknown_tool_handling(self, mock_model, mock_agent, mock_function name="unknown_tool", call_id="call_unknown", arguments="{}" ) - # Should raise an error for unknown tool - with pytest.raises(ModelBehaviorError, match="Tool unknown_tool not found"): - await session._handle_tool_call(tool_call_event) + # Should emit a RealtimeError event for unknown tool + await session._handle_tool_call(tool_call_event) + + # Should have emitted a RealtimeError event + assert session._event_queue.qsize() >= 1 + error_event = await session._event_queue.get() + assert isinstance(error_event, RealtimeError) + assert "Tool unknown_tool not found" in error_event.error.get("message", "") # Should not have called any tools mock_function_tool.on_invoke_tool.assert_not_called() From 26d39e93afdb8911ac946f80e6d13a87c06e2932 Mon Sep 17 00:00:00 2001 From: Ali Gokalp Peker Date: Tue, 6 Jan 2026 06:09:43 +0000 Subject: [PATCH 6/8] Remove unused imports. --- src/agents/realtime/model.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/agents/realtime/model.py b/src/agents/realtime/model.py index c373528538..0fd07af898 100644 --- a/src/agents/realtime/model.py +++ b/src/agents/realtime/model.py @@ -1,8 +1,6 @@ from __future__ import annotations import abc -import socket -from collections.abc import Awaitable from typing import Callable from typing_extensions import NotRequired, TypedDict From af06277620f0519a8c305f6720479c50c1fc2cca Mon Sep 17 00:00:00 2001 From: Ali Gokalp Peker Date: Wed, 7 Jan 2026 19:22:12 +0000 Subject: [PATCH 7/8] Added unit tests for the new configurations. --- tests/realtime/test_openai_realtime.py | 93 ++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/tests/realtime/test_openai_realtime.py b/tests/realtime/test_openai_realtime.py index 8d7c57d7dc..450d52c3a4 100644 --- a/tests/realtime/test_openai_realtime.py +++ b/tests/realtime/test_openai_realtime.py @@ -299,6 +299,99 @@ async def test_connect_websocket_failure_propagates(self, model): assert model._websocket is None assert model._websocket_task is None + @pytest.mark.asyncio + async def test_connect_with_transport_config(self, model, mock_websocket): + """Test that transport configuration is properly passed to the WebSocket connection.""" + config = { + "api_key": "test-key", + "transport": { + "ping_interval": 30.0, + "ping_timeout": 10.0, + "connect_timeout": 5.0, + }, + } + + async def async_websocket(*args, **kwargs): + return mock_websocket + + with patch("websockets.connect", side_effect=async_websocket) as mock_connect: + with patch("asyncio.create_task") as mock_create_task: + mock_task = AsyncMock() + + def mock_create_task_func(coro): + coro.close() + return mock_task + + mock_create_task.side_effect = mock_create_task_func + + await model.connect(config) + + # Verify WebSocket connection called with correct transport params + mock_connect.assert_called_once() + kwargs = mock_connect.call_args.kwargs + assert kwargs["ping_interval"] == 30.0 + assert kwargs["ping_timeout"] == 10.0 + assert kwargs["open_timeout"] == 5.0 + + @pytest.mark.asyncio + async def test_connect_with_transport_config_disable_ping(self, model, mock_websocket): + """Test disabling ping in transport configuration.""" + config = { + "api_key": "test-key", + "transport": { + "ping_interval": None, + }, + } + + async def async_websocket(*args, **kwargs): + return mock_websocket + + with patch("websockets.connect", side_effect=async_websocket) as mock_connect: + with patch("asyncio.create_task") as mock_create_task: + mock_task = AsyncMock() + + def mock_create_task_func(coro): + coro.close() + return mock_task + + mock_create_task.side_effect = mock_create_task_func + + await model.connect(config) + + # Verify WebSocket connection called with correct transport params + mock_connect.assert_called_once() + kwargs = mock_connect.call_args.kwargs + assert kwargs["ping_interval"] is None + + @pytest.mark.asyncio + async def test_connect_with_empty_transport_config(self, model, mock_websocket): + """Test that empty transport configuration works without error.""" + config = { + "api_key": "test-key", + "transport": {}, + } + + async def async_websocket(*args, **kwargs): + return mock_websocket + + with patch("websockets.connect", side_effect=async_websocket) as mock_connect: + with patch("asyncio.create_task") as mock_create_task: + mock_task = AsyncMock() + + def mock_create_task_func(coro): + coro.close() + return mock_task + + mock_create_task.side_effect = mock_create_task_func + + await model.connect(config) + + mock_connect.assert_called_once() + kwargs = mock_connect.call_args.kwargs + assert "ping_interval" not in kwargs + assert "ping_timeout" not in kwargs + assert "open_timeout" not in kwargs + @pytest.mark.asyncio async def test_connect_already_connected_assertion(self, model, mock_websocket): """Test that connecting when already connected raises assertion error.""" From c4cb13e142852c03bb93990486178cab91a05ad6 Mon Sep 17 00:00:00 2001 From: Ali Gokalp Peker Date: Thu, 8 Jan 2026 09:26:35 +0000 Subject: [PATCH 8/8] Test suite for the TransportConfig --- tests/realtime/test_openai_realtime.py | 459 +++++++++++++++++++++++++ 1 file changed, 459 insertions(+) diff --git a/tests/realtime/test_openai_realtime.py b/tests/realtime/test_openai_realtime.py index 450d52c3a4..0bbeb38086 100644 --- a/tests/realtime/test_openai_realtime.py +++ b/tests/realtime/test_openai_realtime.py @@ -1,3 +1,4 @@ +import asyncio import json from types import SimpleNamespace from typing import Any, cast @@ -9,6 +10,7 @@ from agents import Agent from agents.exceptions import UserError from agents.handoffs import handoff +from agents.realtime.model import RealtimeModelConfig, TransportConfig from agents.realtime.model_events import ( RealtimeModelAudioEvent, RealtimeModelErrorEvent, @@ -977,3 +979,460 @@ async def test_handle_audio_delta_state_management(self, model): # Test that last audio item is tracked last_item = model._audio_state_tracker.get_last_audio_item() assert last_item == ("test_item", 5) + + +class TestTransportIntegration: + """Integration tests for transport configuration using a local WebSocket server.""" + + @pytest.mark.asyncio + async def test_connect_to_local_server(self): + """Test connecting to a real local server with transport config.""" + received_messages = [] + import asyncio + + async def handler(websocket): + try: + # Use async iteration for compatibility with newer websockets + async for message in websocket: + received_messages.append(json.loads(message)) + # Respond to session update + # We need to provide a minimally valid session object + response = { + "type": "session.updated", + "event_id": "event_123", + "session": { + "id": "sess_001", + "object": "realtime.session", + "model": "gpt-4o-realtime-preview", + "modalities": ["audio", "text"], + "instructions": "", + "voice": "alloy", + "input_audio_format": "pcm16", + "output_audio_format": "pcm16", + "input_audio_transcription": None, + "turn_detection": None, + "tools": [], + "tool_choice": "auto", + "temperature": 0.8, + "max_response_output_tokens": "inf", + }, + } + await websocket.send(json.dumps(response)) + except Exception: + pass + + # Create a model instance + model = OpenAIRealtimeWebSocketModel() + + # Start a local server + async with websockets.serve(handler, "127.0.0.1", 0) as server: + # Get the assigned port + assert server.sockets + + # Cast sockets to list to make mypy happy as Iterable isn't indexable directly + sockets = list(server.sockets) + port = sockets[0].getsockname()[1] + url = f"ws://127.0.0.1:{port}/v1/realtime" + + # Connect with transport config + transport: TransportConfig = { + "ping_interval": 0.5, + "ping_timeout": 0.5, + "connect_timeout": 1.0, + } + + config: RealtimeModelConfig = { + "api_key": "test-key", + "url": url, + "transport": transport, + "initial_model_settings": {"model_name": "gpt-4o-realtime-preview"}, + } + + await model.connect(config) + + # Wait briefly for the connection logic to send session.update + # In a real scenario we'd wait for an event, but here we just sleep briefly + await asyncio.sleep(0.2) + + # Verify we are connected + assert model._websocket is not None + + # Verify the server received the session.update message + assert len(received_messages) > 0 + session_update = next( + (m for m in received_messages if m["type"] == "session.update"), None + ) + assert session_update is not None + + # Clean up + await model.close() + assert model._websocket is None + + @pytest.mark.asyncio + async def test_ping_timeout_success_when_server_responds_quickly(self): + """Test that connection stays alive when server responds to pings within timeout.""" + + async def responsive_handler(websocket): + # Server that responds normally - websockets library handles ping/pong automatically + async for _ in websocket: + pass + + model = OpenAIRealtimeWebSocketModel() + + async with websockets.serve(responsive_handler, "127.0.0.1", 0) as server: + sockets = list(server.sockets) + port = sockets[0].getsockname()[1] + url = f"ws://127.0.0.1:{port}/v1/realtime" + + # Client with reasonable ping settings - server responds quickly so this should work + transport: TransportConfig = { + "ping_interval": 0.1, # Send ping every 100ms + "ping_timeout": 1.0, # Allow 1 second for pong response (generous) + } + config: RealtimeModelConfig = { + "api_key": "test-key", + "url": url, + "transport": transport, + "initial_model_settings": {"model_name": "gpt-4o-realtime-preview"}, + } + + await model.connect(config) + + # Wait for multiple ping/pong cycles + await asyncio.sleep(0.4) + + # Connection should still be open + assert model._websocket is not None + assert model._websocket.close_code is None + + await model.close() + + @pytest.mark.asyncio + async def test_ping_timeout_config_is_applied(self): + """Test that ping_timeout configuration is properly applied to connection. + + This test verifies the ping_timeout parameter is passed to the websocket + connection. Since the websockets library handles pong responses automatically, + we verify the configuration is applied rather than testing actual timeout behavior. + """ + from unittest.mock import AsyncMock, patch + + # Track what parameters were passed to websockets.connect + captured_kwargs_short: dict[str, Any] = {} + captured_kwargs_long: dict[str, Any] = {} + + async def capture_connect_short(*args, **kwargs): + captured_kwargs_short.update(kwargs) + mock_ws = AsyncMock() + mock_ws.close_code = None + mock_ws.__aiter__ = AsyncMock(return_value=iter([])) + return mock_ws + + async def capture_connect_long(*args, **kwargs): + captured_kwargs_long.update(kwargs) + mock_ws = AsyncMock() + mock_ws.close_code = None + mock_ws.__aiter__ = AsyncMock(return_value=iter([])) + return mock_ws + + # Test with short ping_timeout + model_short = OpenAIRealtimeWebSocketModel() + with patch("websockets.connect", side_effect=capture_connect_short): + transport_short: TransportConfig = { + "ping_interval": 0.1, + "ping_timeout": 0.05, # Very short timeout + } + config_short: RealtimeModelConfig = { + "api_key": "test-key", + "url": "ws://localhost:8080/v1/realtime", + "transport": transport_short, + "initial_model_settings": {"model_name": "gpt-4o-realtime-preview"}, + } + await model_short.connect(config_short) + + assert captured_kwargs_short.get("ping_interval") == 0.1 + assert captured_kwargs_short.get("ping_timeout") == 0.05 + + # Test with longer ping_timeout (use a fresh model) + model_long = OpenAIRealtimeWebSocketModel() + with patch("websockets.connect", side_effect=capture_connect_long): + transport_long: TransportConfig = { + "ping_interval": 5.0, + "ping_timeout": 10.0, # Longer timeout + } + config_long: RealtimeModelConfig = { + "api_key": "test-key", + "url": "ws://localhost:8080/v1/realtime", + "transport": transport_long, + "initial_model_settings": {"model_name": "gpt-4o-realtime-preview"}, + } + await model_long.connect(config_long) + + assert captured_kwargs_long.get("ping_interval") == 5.0 + assert captured_kwargs_long.get("ping_timeout") == 10.0 + + @pytest.mark.asyncio + async def test_ping_timeout_disabled_vs_enabled(self): + """Test that ping timeout can be disabled (None) vs enabled with a value.""" + from unittest.mock import AsyncMock, patch + + captured_kwargs_disabled: dict[str, Any] = {} + captured_kwargs_enabled: dict[str, Any] = {} + + async def capture_connect_disabled(*args, **kwargs): + captured_kwargs_disabled.update(kwargs) + mock_ws = AsyncMock() + mock_ws.close_code = None + mock_ws.__aiter__ = AsyncMock(return_value=iter([])) + return mock_ws + + async def capture_connect_enabled(*args, **kwargs): + captured_kwargs_enabled.update(kwargs) + mock_ws = AsyncMock() + mock_ws.close_code = None + mock_ws.__aiter__ = AsyncMock(return_value=iter([])) + return mock_ws + + # Test with ping disabled + model_disabled = OpenAIRealtimeWebSocketModel() + with patch("websockets.connect", side_effect=capture_connect_disabled): + transport_disabled: TransportConfig = { + "ping_interval": None, # Disable pings entirely + "ping_timeout": None, + } + config_disabled: RealtimeModelConfig = { + "api_key": "test-key", + "url": "ws://localhost:8080/v1/realtime", + "transport": transport_disabled, + "initial_model_settings": {"model_name": "gpt-4o-realtime-preview"}, + } + await model_disabled.connect(config_disabled) + + assert captured_kwargs_disabled.get("ping_interval") is None + assert captured_kwargs_disabled.get("ping_timeout") is None + + # Test with ping enabled (use a fresh model) + model_enabled = OpenAIRealtimeWebSocketModel() + with patch("websockets.connect", side_effect=capture_connect_enabled): + transport_enabled: TransportConfig = { + "ping_interval": 1.0, + "ping_timeout": 2.0, + } + config_enabled: RealtimeModelConfig = { + "api_key": "test-key", + "url": "ws://localhost:8080/v1/realtime", + "transport": transport_enabled, + "initial_model_settings": {"model_name": "gpt-4o-realtime-preview"}, + } + await model_enabled.connect(config_enabled) + + assert captured_kwargs_enabled.get("ping_interval") == 1.0 + assert captured_kwargs_enabled.get("ping_timeout") == 2.0 + + @pytest.mark.asyncio + async def test_connect_timeout_success_when_server_responds_quickly(self): + """Test that connection succeeds when server responds within timeout.""" + + async def quick_handler(websocket): + # Server that accepts connections immediately + async for _ in websocket: + pass + + model = OpenAIRealtimeWebSocketModel() + + async with websockets.serve(quick_handler, "127.0.0.1", 0) as server: + sockets = list(server.sockets) + port = sockets[0].getsockname()[1] + url = f"ws://127.0.0.1:{port}/v1/realtime" + + # Client with generous connect timeout - server is fast so this should work + transport: TransportConfig = { + "connect_timeout": 5.0, # 5 seconds is plenty for local connection + } + config: RealtimeModelConfig = { + "api_key": "test-key", + "url": url, + "transport": transport, + "initial_model_settings": {"model_name": "gpt-4o-realtime-preview"}, + } + + await model.connect(config) + + # Should connect successfully + assert model._websocket is not None + assert model._websocket.close_code is None + + await model.close() + + @pytest.mark.asyncio + async def test_connect_timeout_with_delayed_server(self): + """Test connect timeout behavior with a server that has a defined handshake delay. + + Uses the same server with a fixed delay threshold to test both: + - Success: client timeout > server delay + - Failure: client timeout < server delay + """ + import base64 + import hashlib + + # Server handshake delay threshold (in seconds) + SERVER_HANDSHAKE_DELAY = 0.3 + + shutdown_event = asyncio.Event() + connections_attempted = [] + + async def delayed_websocket_server(reader, writer): + """A WebSocket server that delays the handshake by a fixed amount.""" + connections_attempted.append(True) + try: + # Read HTTP upgrade request + request = b"" + while b"\r\n\r\n" not in request: + chunk = await asyncio.wait_for(reader.read(1024), timeout=5.0) + if not chunk: + return + request += chunk + + # Extract Sec-WebSocket-Key + key = None + for line in request.decode().split("\r\n"): + if line.lower().startswith("sec-websocket-key:"): + key = line.split(":", 1)[1].strip() + break + + if not key: + writer.close() + return + + # Intentional delay before completing handshake + await asyncio.sleep(SERVER_HANDSHAKE_DELAY) + + # Generate accept key + GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + accept = base64.b64encode(hashlib.sha1((key + GUID).encode()).digest()).decode() + + # Send HTTP 101 Switching Protocols response + response = ( + "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + f"Sec-WebSocket-Accept: {accept}\r\n" + "\r\n" + ) + writer.write(response.encode()) + await writer.drain() + + # Keep connection open until shutdown + await shutdown_event.wait() + + except asyncio.TimeoutError: + pass + except Exception: + pass + finally: + writer.close() + + server = await asyncio.start_server(delayed_websocket_server, "127.0.0.1", 0) + port = server.sockets[0].getsockname()[1] + url = f"ws://127.0.0.1:{port}/v1/realtime" + + try: + # Test 1: FAILURE - Client timeout < server delay + # Client gives up before server completes handshake + model_fail = OpenAIRealtimeWebSocketModel() + transport_fail: TransportConfig = { + "connect_timeout": 0.1, # 100ms < 300ms server delay → will timeout + } + config_fail: RealtimeModelConfig = { + "api_key": "test-key", + "url": url, + "transport": transport_fail, + "initial_model_settings": {"model_name": "gpt-4o-realtime-preview"}, + } + + with pytest.raises((TimeoutError, asyncio.TimeoutError)): + await model_fail.connect(config_fail) + + # Verify connection was attempted + assert len(connections_attempted) >= 1 + + # Test 2: SUCCESS - Client timeout > server delay + # Client waits long enough for server to complete handshake + model_success = OpenAIRealtimeWebSocketModel() + transport_success: TransportConfig = { + "connect_timeout": 1.0, # 1000ms > 300ms server delay → will succeed + } + config_success: RealtimeModelConfig = { + "api_key": "test-key", + "url": url, + "transport": transport_success, + "initial_model_settings": {"model_name": "gpt-4o-realtime-preview"}, + } + + await model_success.connect(config_success) + + # Verify successful connection + assert model_success._websocket is not None + assert model_success._websocket.close_code is None + + await model_success.close() + + finally: + shutdown_event.set() + server.close() + await server.wait_closed() + + @pytest.mark.asyncio + async def test_ping_interval_comparison_fast_vs_slow(self): + """Test that faster ping intervals detect issues sooner than slower ones.""" + + connection_durations: dict[str, float] = {} + + async def handler(websocket): + # Simple handler that stays connected + async for _ in websocket: + pass + + async def test_with_ping_interval(interval: float, label: str): + model = OpenAIRealtimeWebSocketModel() + + async with websockets.serve(handler, "127.0.0.1", 0) as server: + sockets = list(server.sockets) + port = sockets[0].getsockname()[1] + url = f"ws://127.0.0.1:{port}/v1/realtime" + + transport: TransportConfig = { + "ping_interval": interval, + "ping_timeout": 2.0, # Same timeout for both + } + config: RealtimeModelConfig = { + "api_key": "test-key", + "url": url, + "transport": transport, + "initial_model_settings": {"model_name": "gpt-4o-realtime-preview"}, + } + + start = asyncio.get_event_loop().time() + await model.connect(config) + + # Let it run for a bit + await asyncio.sleep(0.3) + + end = asyncio.get_event_loop().time() + connection_durations[label] = end - start + + # Both should stay connected with valid server + assert model._websocket is not None + assert model._websocket.close_code is None + + await model.close() + + # Test with fast ping interval + await test_with_ping_interval(0.05, "fast") + + # Test with slow ping interval + await test_with_ping_interval(0.5, "slow") + + # Both should have completed successfully + assert "fast" in connection_durations + assert "slow" in connection_durations