From 6b9447f6b45f0f62f1bf129a371569588f42496a Mon Sep 17 00:00:00 2001 From: Pablo Date: Tue, 18 Feb 2025 01:07:33 -0600 Subject: [PATCH 1/4] add timeout to jobs --- .github/workflows/python-app.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 2b0012d20..658baafff 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -16,6 +16,7 @@ permissions: jobs: lint: runs-on: ubuntu-latest + timeout-minutes: 5 steps: - uses: actions/checkout@v4 - name: Set up Python @@ -33,6 +34,7 @@ jobs: build: needs: lint runs-on: ubuntu-22.04 + timeout-minutes: 20 env: PROXY: "http://51.83.140.52:16301" TEST_TESTNET: "true" @@ -74,6 +76,7 @@ jobs: needs: build if: ${{ always() }} runs-on: ubuntu-latest + timeout-minutes: 5 steps: - name: Coveralls Finished uses: coverallsapp/github-action@v2 From 315b454e2cbc0829432a440e7663bbbe0f04548f Mon Sep 17 00:00:00 2001 From: Pablo Date: Sun, 20 Apr 2025 17:11:15 -0400 Subject: [PATCH 2/4] feat: error handling depth cache --- binance/ws/depthcache.py | 15 +++--- binance/ws/reconnecting_websocket.py | 8 +++- binance/ws/threaded_stream.py | 18 ++++--- docs/depth_cache.rst | 25 ++++++++++ examples/depth_cache_example.py | 60 ++++++++++++++++++++++++ examples/depth_cache_threaded_example.py | 38 +++++++++++++++ 6 files changed, 149 insertions(+), 15 deletions(-) create mode 100644 examples/depth_cache_example.py create mode 100644 examples/depth_cache_threaded_example.py diff --git a/binance/ws/depthcache.py b/binance/ws/depthcache.py index 5b5484a3f..a25a81b92 100644 --- a/binance/ws/depthcache.py +++ b/binance/ws/depthcache.py @@ -185,6 +185,7 @@ async def __aenter__(self): return self async def __aexit__(self, *args, **kwargs): + self._log.debug(f"Exiting depth cache manager for {self._symbol}") await self._socket.__aexit__(*args, **kwargs) async def recv(self): @@ -192,8 +193,9 @@ async def recv(self): while not dc: try: res = await asyncio.wait_for(self._socket.recv(), timeout=self.TIMEOUT) + self._log.debug(f"Received message: {res}") except Exception as e: - self._log.warning(e) + self._log.warning(f"Exception recieving message: {e.__class__.__name__} (e) ") else: dc = await self._depth_event(res) return dc @@ -203,7 +205,7 @@ async def _init_cache(self): :return: """ - + self._log.debug(f"Initialising depth cache for {self._symbol}") # initialise or clear depth cache self._depth_cache = DepthCache(self._symbol, conv_type=self._conv_type) @@ -228,16 +230,15 @@ async def _depth_event(self, msg): :return: """ + self._log.debug(f"Received depth event: {msg}") if not msg: return None if "e" in msg and msg["e"] == "error": - # close the socket - await self.close() - - # notify the user by returning a None value - return None + # notify user by return msg with error + self._log.error(f"Error in depth event restarting cache: {msg}") + return msg return await self._process_depth_message(msg) diff --git a/binance/ws/reconnecting_websocket.py b/binance/ws/reconnecting_websocket.py index 3445a42fd..27a0112f2 100644 --- a/binance/ws/reconnecting_websocket.py +++ b/binance/ws/reconnecting_websocket.py @@ -202,7 +202,6 @@ async def _read_loop(self): self.ws.recv(), timeout=self.TIMEOUT ) res = self._handle_message(res) - print(self._queue.qsize()) self._log.debug(f"Received message: {res}") if res: if self._queue.qsize() < self.max_queue_size: @@ -216,6 +215,11 @@ async def _read_loop(self): # _no_message_received_reconnect except asyncio.CancelledError as e: self._log.debug(f"_read_loop cancelled error {e}") + await self._queue.put({ + "e": "error", + "type": f"{e.__class__.__name__}", + "m": f"{e}", + }) break except ( asyncio.IncompleteReadError, @@ -236,7 +240,7 @@ async def _read_loop(self): Exception, ) as e: # reports errors and break the loop - self._log.error(f"Unknown exception ({e})") + self._log.error(f"Unknown exception: {e.__class__.__name__} ({e})") await self._queue.put({ "e": "error", "type": e.__class__.__name__, diff --git a/binance/ws/threaded_stream.py b/binance/ws/threaded_stream.py index 5f43fe969..47993e0c6 100755 --- a/binance/ws/threaded_stream.py +++ b/binance/ws/threaded_stream.py @@ -59,13 +59,19 @@ async def start_listener(self, socket, path: str, callback): except asyncio.TimeoutError: ... continue + except Exception as e: + self._log.error(f"Error receiving message: {e}") + msg = { + "e": "error", + "type": e.__class__.__name__, + "m": f"{e}", + } + if not msg: + continue # Handle both async and sync callbacks + if asyncio.iscoroutinefunction(callback): + asyncio.create_task(callback(msg)) else: - if not msg: - continue # Handle both async and sync callbacks - if asyncio.iscoroutinefunction(callback): - asyncio.create_task(callback(msg)) - else: - callback(msg) + callback(msg) del self._socket_running[path] def run(self): diff --git a/docs/depth_cache.rst b/docs/depth_cache.rst index 6cb97f9e9..8edb4b381 100755 --- a/docs/depth_cache.rst +++ b/docs/depth_cache.rst @@ -153,6 +153,31 @@ Websocket Errors ---------------- If the underlying websocket is disconnected and is unable to reconnect None is returned for the depth_cache parameter. +If the underlying websocket is disconnected an error msg is passed to the callback and to recv() containing the error message. +In the case the BinanceWebsocketClosed is returned, the websocket will attempt to reconnect 5 times before returning a BinanceUnableToConnect error. +Example: + +.. code:: python + + depth_cache = await dcm.recv() + if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error': + logger.error(f"Received depth cache error in callback: {depth_cache}") + if type == 'BinanceWebsocketClosed': + # ignore as attempts to reconnect + continue + break + +.. code:: python + def handle_depth_cache(depth_cache): + if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error': + logger.error(f"Received depth cache error in callback: {depth_cache}") + type = depth_cache.get('type') + if type == 'BinanceWebsocketClosed': + # Automatically attempts to reconnect + return + dcm.stop() + return + # handle non error cases here Examples -------- diff --git a/examples/depth_cache_example.py b/examples/depth_cache_example.py new file mode 100644 index 000000000..99bf54cc9 --- /dev/null +++ b/examples/depth_cache_example.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 + +import os +import sys + +root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(root) + +import asyncio +import logging +from binance import AsyncClient +from binance.ws.depthcache import DepthCacheManager, DepthCache + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +async def main(): + # Initialize the client + client = await AsyncClient.create() + + # Symbol to monitor + symbol = 'BTCUSDT' + + # Create a depth cache manager instance + async with DepthCacheManager( + client=client, + symbol=symbol, + ) as dcm: + logger.info(f"Started depth cache for {symbol}") + + # Monitor depth cache updates for 1 minute + for _ in range(100): # 6 iterations * 10 seconds = 1 minute + depth_cache = await dcm.recv() + if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error': + logger.error(f"Received depth cache error in callback: {depth_cache}") + if type == 'BinanceWebsocketClosed': + # ignore as attempts to reconnect + continue + break + + # Get current bids and asks + bids = depth_cache.get_bids()[:5] # Top 5 bids + asks = depth_cache.get_asks()[:5] # Top 5 asks + + logger.info("Top 5 bids:") + for bid in bids: + logger.info(f"Price: {bid[0]}, Quantity: {bid[1]}") + + logger.info("Top 5 asks:") + for ask in asks: + logger.info(f"Price: {ask[0]}, Quantity: {ask[1]}") + + logger.info(f"Last update time: {depth_cache.update_time}") + + # Close the client + await client.close_connection() + +if __name__ == '__main__': + # Run the async example + asyncio.run(main()) diff --git a/examples/depth_cache_threaded_example.py b/examples/depth_cache_threaded_example.py new file mode 100644 index 000000000..050e76445 --- /dev/null +++ b/examples/depth_cache_threaded_example.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import os +import sys +import time + +root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(root) + +import logging +from binance.ws.depthcache import ThreadedDepthCacheManager + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +def main(): + dcm = ThreadedDepthCacheManager() + dcm.start() + + def handle_depth_cache(depth_cache): + if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error': + logger.error(f"Received depth cache error in callback: {depth_cache}") + type = depth_cache.get('type') + if type == 'BinanceWebsocketClosed': + # Automatically attempts to reconnect + return + dcm.stop() + return + + logger.info(f"symbol {depth_cache.symbol}") + logger.info(depth_cache.get_bids()[:5]) + + dcm.start_depth_cache(handle_depth_cache, symbol='BNBBTC') + dcm.join() + + +if __name__ == "__main__": + main() From 5962f279997fab1d2899525ee3af25422e475192 Mon Sep 17 00:00:00 2001 From: Pablo Date: Sun, 20 Apr 2025 17:17:02 -0400 Subject: [PATCH 3/4] imrpove example --- examples/depth_cache_threaded_example.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/depth_cache_threaded_example.py b/examples/depth_cache_threaded_example.py index 050e76445..2891fcad6 100644 --- a/examples/depth_cache_threaded_example.py +++ b/examples/depth_cache_threaded_example.py @@ -24,6 +24,7 @@ def handle_depth_cache(depth_cache): if type == 'BinanceWebsocketClosed': # Automatically attempts to reconnect return + logger.error(f"Error received - Closing depth cache: {depth_cache}") dcm.stop() return From c7e89b11417d28de20a3f7229f77585179d8ee00 Mon Sep 17 00:00:00 2001 From: Pablo Date: Sun, 20 Apr 2025 17:20:10 -0400 Subject: [PATCH 4/4] lint --- examples/depth_cache_example.py | 2 +- examples/depth_cache_threaded_example.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/depth_cache_example.py b/examples/depth_cache_example.py index 99bf54cc9..a179f745f 100644 --- a/examples/depth_cache_example.py +++ b/examples/depth_cache_example.py @@ -9,7 +9,7 @@ import asyncio import logging from binance import AsyncClient -from binance.ws.depthcache import DepthCacheManager, DepthCache +from binance.ws.depthcache import DepthCacheManager logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) diff --git a/examples/depth_cache_threaded_example.py b/examples/depth_cache_threaded_example.py index 2891fcad6..6b20e5aac 100644 --- a/examples/depth_cache_threaded_example.py +++ b/examples/depth_cache_threaded_example.py @@ -2,7 +2,6 @@ import os import sys -import time root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(root)