Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions binance/ws/websocket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,26 @@ def __init__(self, url: str, tld: str = "com", testnet: bool = False):
def _handle_message(self, msg):
"""Override message handling to support request-response"""
parsed_msg = super()._handle_message(msg)
self._log.debug(f"Received message: {parsed_msg}")
if parsed_msg is None:
return None
req_id, exception, throwError = None, None, False
req_id, exception = None, None
if "id" in parsed_msg:
req_id = parsed_msg["id"]
if "status" in parsed_msg:
if parsed_msg["status"] != 200:
throwError = True
exception = BinanceAPIException(
parsed_msg, parsed_msg["status"], self.json_dumps(parsed_msg["error"])
)
if req_id is not None and req_id in self._responses:
if throwError and exception is not None:
if exception is not None:
self._responses[req_id].set_exception(exception)
else:
self._responses[req_id].set_result(parsed_msg)
elif throwError and exception is not None:
elif exception is not None:
raise exception
return parsed_msg
else:
self._log.warning(f"WS api receieved unknown message: {parsed_msg}")

async def _ensure_ws_connection(self) -> None:
"""Ensure WebSocket connection is established and ready
Expand Down
84 changes: 76 additions & 8 deletions tests/test_ws_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import sys
import re
import pytest
import asyncio
Expand Down Expand Up @@ -114,16 +115,35 @@ async def test_testnet_url():
async def test_message_handling(clientAsync):
"""Test message handling with various message types"""
# Test valid message
valid_msg = {"id": "123", "result": {"test": "data"}}
result = clientAsync.ws_api._handle_message(json.dumps(valid_msg))
future = asyncio.Future()
clientAsync.ws_api._responses["123"] = future
valid_msg = {"id": "123", "status": 200, "result": {"test": "data"}}
clientAsync.ws_api._handle_message(json.dumps(valid_msg))
result = await clientAsync.ws_api._responses["123"]
assert result == valid_msg

@pytest.mark.asyncio
async def test_message_handling_raise_exception(clientAsync):
with pytest.raises(BinanceAPIException):
future = asyncio.Future()
clientAsync.ws_api._responses["123"] = future
valid_msg = {"id": "123", "status": 400, "error": {"code": "0", "msg": "error message"}}
clientAsync.ws_api._handle_message(json.dumps(valid_msg))
await future
@pytest.mark.asyncio
async def test_message_handling_raise_exception_without_id(clientAsync):
with pytest.raises(BinanceAPIException):
future = asyncio.Future()
clientAsync.ws_api._responses["123"] = future
valid_msg = {"id": "123", "status": 400, "error": {"code": "0", "msg": "error message"}}
clientAsync.ws_api._handle_message(json.dumps(valid_msg))
await future

@pytest.mark.asyncio
async def test_message_handling_invalid_json(clientAsync):
with pytest.raises(json.JSONDecodeError):
clientAsync.ws_api._handle_message("invalid json")

# Test message without ID
no_id_msg = {"data": "test"}
result = clientAsync.ws_api._handle_message(json.dumps(no_id_msg))
assert result == no_id_msg

# Test invalid JSON
with pytest.raises(json.JSONDecodeError):
clientAsync.ws_api._handle_message("invalid json")

Expand Down Expand Up @@ -151,3 +171,51 @@ async def test_cleanup_on_exit(clientAsync):
# Check cleanup
assert "test" not in clientAsync.ws_api._responses
assert future.exception() is not None


@pytest.mark.asyncio
async def test_ws_queue_overflow(clientAsync):
"""WebSocket API should not overflow queue"""
#
original_size = clientAsync.ws_api.MAX_QUEUE_SIZE
clientAsync.ws_api.MAX_QUEUE_SIZE = 1

try:
# Request multiple order books concurrently
symbols = ["BTCUSDT", "ETHUSDT", "BNBUSDT"]
tasks = [clientAsync.ws_get_order_book(symbol=symbol) for symbol in symbols]

# Execute all requests concurrently and wait for results
results = await asyncio.gather(*tasks, return_exceptions=True)

# Check that we got valid responses or expected overflow errors
valid_responses = [r for r in results if not isinstance(r, Exception)]
assert len(valid_responses) == len(symbols), "Should get at least one valid response"

for result in valid_responses:
assert_ob(result)

finally:
# Restore original queue size
clientAsync.ws_api.MAX_QUEUE_SIZE = original_size

@pytest.mark.skipif(sys.version_info < (3, 8), reason="websockets_proxy Python 3.8+")
@pytest.mark.asyncio
async def test_ws_api_with_stream(clientAsync):
"""Test combining WebSocket API requests with stream listening"""
from binance import BinanceSocketManager

# Create socket manager and trade socket
bm = BinanceSocketManager(clientAsync)
ts = bm.trade_socket("BTCUSDT")

async with ts:
# Make WS API request while stream is active
order_book = await clientAsync.ws_get_order_book(symbol="BTCUSDT")
assert_ob(order_book)

# Verify we can still receive stream data
trade = await ts.recv()
assert "s" in trade # Symbol
assert "p" in trade # Price
assert "q" in trade # Quantity
Loading