diff --git a/binance/ws/websocket_api.py b/binance/ws/websocket_api.py index e2800094d..8542b62db 100644 --- a/binance/ws/websocket_api.py +++ b/binance/ws/websocket_api.py @@ -21,25 +21,26 @@ def __init__(self, url: str, tld: str = "com", testnet: bool = False): def _handle_message(self, msg): """Override message handling to support request-response""" parsed_msg = super()._handle_message(msg) + self._log.debug(f"Received message: {parsed_msg}") if parsed_msg is None: return None - req_id, exception, throwError = None, None, False + req_id, exception = None, None if "id" in parsed_msg: req_id = parsed_msg["id"] if "status" in parsed_msg: if parsed_msg["status"] != 200: - throwError = True exception = BinanceAPIException( parsed_msg, parsed_msg["status"], self.json_dumps(parsed_msg["error"]) ) if req_id is not None and req_id in self._responses: - if throwError and exception is not None: + if exception is not None: self._responses[req_id].set_exception(exception) else: self._responses[req_id].set_result(parsed_msg) - elif throwError and exception is not None: + elif exception is not None: raise exception - return parsed_msg + else: + self._log.warning(f"WS api receieved unknown message: {parsed_msg}") async def _ensure_ws_connection(self) -> None: """Ensure WebSocket connection is established and ready diff --git a/tests/test_ws_api.py b/tests/test_ws_api.py index 20a098875..dfdbad1cc 100644 --- a/tests/test_ws_api.py +++ b/tests/test_ws_api.py @@ -1,4 +1,5 @@ import json +import sys import re import pytest import asyncio @@ -114,16 +115,35 @@ async def test_testnet_url(): async def test_message_handling(clientAsync): """Test message handling with various message types""" # Test valid message - valid_msg = {"id": "123", "result": {"test": "data"}} - result = clientAsync.ws_api._handle_message(json.dumps(valid_msg)) + future = asyncio.Future() + clientAsync.ws_api._responses["123"] = future + valid_msg = {"id": "123", "status": 200, "result": {"test": "data"}} + clientAsync.ws_api._handle_message(json.dumps(valid_msg)) + result = await clientAsync.ws_api._responses["123"] assert result == valid_msg + +@pytest.mark.asyncio +async def test_message_handling_raise_exception(clientAsync): + with pytest.raises(BinanceAPIException): + future = asyncio.Future() + clientAsync.ws_api._responses["123"] = future + valid_msg = {"id": "123", "status": 400, "error": {"code": "0", "msg": "error message"}} + clientAsync.ws_api._handle_message(json.dumps(valid_msg)) + await future +@pytest.mark.asyncio +async def test_message_handling_raise_exception_without_id(clientAsync): + with pytest.raises(BinanceAPIException): + future = asyncio.Future() + clientAsync.ws_api._responses["123"] = future + valid_msg = {"id": "123", "status": 400, "error": {"code": "0", "msg": "error message"}} + clientAsync.ws_api._handle_message(json.dumps(valid_msg)) + await future + +@pytest.mark.asyncio +async def test_message_handling_invalid_json(clientAsync): + with pytest.raises(json.JSONDecodeError): + clientAsync.ws_api._handle_message("invalid json") - # Test message without ID - no_id_msg = {"data": "test"} - result = clientAsync.ws_api._handle_message(json.dumps(no_id_msg)) - assert result == no_id_msg - - # Test invalid JSON with pytest.raises(json.JSONDecodeError): clientAsync.ws_api._handle_message("invalid json") @@ -151,3 +171,51 @@ async def test_cleanup_on_exit(clientAsync): # Check cleanup assert "test" not in clientAsync.ws_api._responses assert future.exception() is not None + + +@pytest.mark.asyncio +async def test_ws_queue_overflow(clientAsync): + """WebSocket API should not overflow queue""" + # + original_size = clientAsync.ws_api.MAX_QUEUE_SIZE + clientAsync.ws_api.MAX_QUEUE_SIZE = 1 + + try: + # Request multiple order books concurrently + symbols = ["BTCUSDT", "ETHUSDT", "BNBUSDT"] + tasks = [clientAsync.ws_get_order_book(symbol=symbol) for symbol in symbols] + + # Execute all requests concurrently and wait for results + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Check that we got valid responses or expected overflow errors + valid_responses = [r for r in results if not isinstance(r, Exception)] + assert len(valid_responses) == len(symbols), "Should get at least one valid response" + + for result in valid_responses: + assert_ob(result) + + finally: + # Restore original queue size + clientAsync.ws_api.MAX_QUEUE_SIZE = original_size + +@pytest.mark.skipif(sys.version_info < (3, 8), reason="websockets_proxy Python 3.8+") +@pytest.mark.asyncio +async def test_ws_api_with_stream(clientAsync): + """Test combining WebSocket API requests with stream listening""" + from binance import BinanceSocketManager + + # Create socket manager and trade socket + bm = BinanceSocketManager(clientAsync) + ts = bm.trade_socket("BTCUSDT") + + async with ts: + # Make WS API request while stream is active + order_book = await clientAsync.ws_get_order_book(symbol="BTCUSDT") + assert_ob(order_book) + + # Verify we can still receive stream data + trade = await ts.recv() + assert "s" in trade # Symbol + assert "p" in trade # Price + assert "q" in trade # Quantity