|
7 | 7 | from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch |
8 | 8 |
|
9 | 9 | import pytest |
10 | | -from reactivex.subject import Subject |
| 10 | +from reactivex.subject import BehaviorSubject, Subject |
11 | 11 |
|
12 | 12 | from pybricksdev.ble.pybricks import PYBRICKS_COMMAND_EVENT_UUID |
13 | 13 | from pybricksdev.connections.pybricks import ( |
@@ -243,26 +243,51 @@ async def test_pybricks_hub_usb_write_gatt_char_timeout(self): |
243 | 243 | # Make _response_queue.get() block indefinitely |
244 | 244 | hub._response_queue.get = AsyncMock(side_effect=asyncio.Event().wait) |
245 | 245 |
|
246 | | - mock_observable = MagicMock(spec=Subject) |
| 246 | + mock_observable = MagicMock(spec=BehaviorSubject) |
| 247 | + mock_observable.value = ConnectionState.CONNECTED |
| 248 | + hub.connection_state_observable = mock_observable |
247 | 249 |
|
248 | | - def mock_subscribe_side_effect(on_next_callback, *args, **kwargs): |
249 | | - mock_subscription = MagicMock() |
250 | | - mock_subscription.dispose = MagicMock() |
251 | | - return mock_subscription |
| 250 | + # Simulate a timeout while the hub is still connected |
| 251 | + with patch( |
| 252 | + "asyncio.wait_for", side_effect=asyncio.TimeoutError("Test-induced timeout") |
| 253 | + ): |
| 254 | + with pytest.raises( |
| 255 | + asyncio.TimeoutError, match="Timeout waiting for USB response" |
| 256 | + ): |
| 257 | + await hub.write_gatt_char( |
| 258 | + PYBRICKS_COMMAND_EVENT_UUID, b"test_data", True |
| 259 | + ) |
252 | 260 |
|
253 | | - mock_observable.subscribe = MagicMock(side_effect=mock_subscribe_side_effect) |
254 | | - type(hub.connection_state_observable).value = PropertyMock( |
255 | | - return_value=ConnectionState.CONNECTED |
| 261 | + hub._ep_out.write.assert_called_once_with( |
| 262 | + bytes([PybricksUsbOutEpMessageType.COMMAND]) + b"test_data" |
256 | 263 | ) |
| 264 | + |
| 265 | + @pytest.mark.asyncio |
| 266 | + async def test_pybricks_hub_usb_write_gatt_char_timeout_disconnected(self): |
| 267 | + """Test write_gatt_char when a timeout occurs and hub is already disconnected. |
| 268 | +
|
| 269 | + This test documents the FIXME case where race_disconnect() doesn't work properly |
| 270 | + for USB connections because pyusb doesn't provide reliable disconnect detection. |
| 271 | + In this case, we might get a timeout while the hub is already disconnected, |
| 272 | + but the disconnect event wasn't received in time to cancel the wait operation. |
| 273 | + """ |
| 274 | + hub = PybricksHubUSB(MagicMock()) |
| 275 | + |
| 276 | + hub._ep_out = MagicMock() |
| 277 | + hub._response_queue = AsyncMock() |
| 278 | + # Make _response_queue.get() block indefinitely |
| 279 | + hub._response_queue.get = AsyncMock(side_effect=asyncio.Event().wait) |
| 280 | + |
| 281 | + mock_observable = MagicMock(spec=BehaviorSubject) |
| 282 | + mock_observable.value = ConnectionState.DISCONNECTED |
257 | 283 | hub.connection_state_observable = mock_observable |
258 | 284 |
|
259 | | - # The method has a hardcoded timeout of 5.0s. |
260 | | - # We can patch asyncio.wait_for to speed up the test. |
| 285 | + # Simulate a timeout while the hub is already disconnected |
261 | 286 | with patch( |
262 | 287 | "asyncio.wait_for", side_effect=asyncio.TimeoutError("Test-induced timeout") |
263 | 288 | ): |
264 | 289 | with pytest.raises( |
265 | | - asyncio.TimeoutError, match="Timeout waiting for USB response" |
| 290 | + RuntimeError, match="Hub disconnected while waiting for response" |
266 | 291 | ): |
267 | 292 | await hub.write_gatt_char( |
268 | 293 | PYBRICKS_COMMAND_EVENT_UUID, b"test_data", True |
|
0 commit comments