Skip to content

Commit d18981e

Browse files
authored
feat: throw readloopclosed error if trying to connect once read loop is already closed (#1593)
* add timeout to jobs * lint * reverse timeout change * fix failing tests
1 parent 5735603 commit d18981e

File tree

4 files changed

+33
-4
lines changed

4 files changed

+33
-4
lines changed

binance/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ class BinanceWebsocketClosed(Exception):
8181
"""Raised when websocket connection is closed."""
8282
pass
8383

84+
class ReadLoopClosed(Exception):
85+
"""Raised when trying to read from read loop but already closed"""
86+
pass
87+
8488
class NotImplementedException(Exception):
8589
def __init__(self, value):
8690
message = f"Not implemented: {value}"

binance/ws/reconnecting_websocket.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
BinanceWebsocketClosed,
3737
BinanceWebsocketUnableToConnect,
3838
BinanceWebsocketQueueOverflow,
39+
ReadLoopClosed,
3940
)
4041
from binance.helpers import get_loop
4142
from binance.ws.constants import WSListenerState
@@ -247,6 +248,8 @@ async def _read_loop(self):
247248
"m": f"{e}",
248249
})
249250
break
251+
except Exception as e:
252+
self._log.error(f"Unknown exception: {e.__class__.__name__} ({e})")
250253
finally:
251254
self._handle_read_loop = None # Signal the coro is stopped
252255
self._reconnects = 0
@@ -272,6 +275,10 @@ async def _run_reconnect(self):
272275
async def recv(self):
273276
res = None
274277
while not res:
278+
if not self._handle_read_loop:
279+
raise ReadLoopClosed(
280+
"Read loop has been closed, please reset the websocket connection and listen to the message error."
281+
)
275282
try:
276283
res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT)
277284
except asyncio.TimeoutError:

tests/test_async_client_ws_futures_requests.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async def test_ws_futures_create_get_edit_cancel_order_with_orjson(futuresClient
6868
type="LIMIT",
6969
timeInForce="GTC",
7070
quantity=0.1,
71-
price=str(float(ticker["bidPrice"]) + 2),
71+
price=str(float(ticker["bidPrice"]) + 5),
7272
)
7373
assert_contract_order(futuresClientAsync, order)
7474
order = await futuresClientAsync.ws_futures_edit_order(
@@ -101,7 +101,7 @@ async def test_ws_futures_create_get_edit_cancel_order_without_orjson(futuresCli
101101
type="LIMIT",
102102
timeInForce="GTC",
103103
quantity=0.1,
104-
price=str(float(ticker["bidPrice"]) + 2),
104+
price=str(float(ticker["bidPrice"]) + 5),
105105
)
106106
assert_contract_order(futuresClientAsync, order)
107107
order = await futuresClientAsync.ws_futures_edit_order(

tests/test_reconnecting_websocket.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
import pytest
33
import gzip
44
import json
5-
from unittest.mock import patch, create_autospec
5+
from unittest.mock import patch, create_autospec, Mock
66
from binance.ws.reconnecting_websocket import ReconnectingWebsocket
77
from binance.ws.constants import WSListenerState
8-
from binance.exceptions import BinanceWebsocketUnableToConnect
8+
from binance.exceptions import BinanceWebsocketUnableToConnect, ReadLoopClosed
99
from websockets import WebSocketClientProtocol # type: ignore
1010
from websockets.protocol import State
1111
import asyncio
@@ -77,6 +77,8 @@ async def test_handle_message_invalid_json():
7777
async def test_recv_message():
7878
ws = ReconnectingWebsocket(url="wss://test.url")
7979
await ws._queue.put({"test": "data"})
80+
# Simulate the read loop being active
81+
ws._handle_read_loop = Mock()
8082
result = await ws.recv()
8183
assert result == {"test": "data"}
8284

@@ -206,3 +208,19 @@ async def test_connect_fails_to_connect_after_disconnect():
206208
async def delayed_return():
207209
await asyncio.sleep(0.1) # 100 ms delay
208210
return '{"e": "value"}'
211+
212+
213+
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Requires Python 3.8+")
214+
@pytest.mark.asyncio
215+
async def test_recv_read_loop_closed():
216+
"""Test that recv() raises ReadLoopClosed when read loop is closed."""
217+
ws = ReconnectingWebsocket(url="wss://test.url")
218+
219+
# Simulate read loop being closed by setting _handle_read_loop to None
220+
ws._handle_read_loop = None
221+
222+
with pytest.raises(ReadLoopClosed) as exc_info:
223+
await ws.recv()
224+
225+
assert "Read loop has been closed" in str(exc_info.value)
226+
assert "please reset the websocket connection" in str(exc_info.value)

0 commit comments

Comments
 (0)