diff --git a/binance/ws/depthcache.py b/binance/ws/depthcache.py index 5b5484a3f..a25a81b92 100644 --- a/binance/ws/depthcache.py +++ b/binance/ws/depthcache.py @@ -185,6 +185,7 @@ async def __aenter__(self): return self async def __aexit__(self, *args, **kwargs): + self._log.debug(f"Exiting depth cache manager for {self._symbol}") await self._socket.__aexit__(*args, **kwargs) async def recv(self): @@ -192,8 +193,9 @@ async def recv(self): while not dc: try: res = await asyncio.wait_for(self._socket.recv(), timeout=self.TIMEOUT) + self._log.debug(f"Received message: {res}") except Exception as e: - self._log.warning(e) + self._log.warning(f"Exception recieving message: {e.__class__.__name__} (e) ") else: dc = await self._depth_event(res) return dc @@ -203,7 +205,7 @@ async def _init_cache(self): :return: """ - + self._log.debug(f"Initialising depth cache for {self._symbol}") # initialise or clear depth cache self._depth_cache = DepthCache(self._symbol, conv_type=self._conv_type) @@ -228,16 +230,15 @@ async def _depth_event(self, msg): :return: """ + self._log.debug(f"Received depth event: {msg}") if not msg: return None if "e" in msg and msg["e"] == "error": - # close the socket - await self.close() - - # notify the user by returning a None value - return None + # notify user by return msg with error + self._log.error(f"Error in depth event restarting cache: {msg}") + return msg return await self._process_depth_message(msg) diff --git a/binance/ws/reconnecting_websocket.py b/binance/ws/reconnecting_websocket.py index 3445a42fd..27a0112f2 100644 --- a/binance/ws/reconnecting_websocket.py +++ b/binance/ws/reconnecting_websocket.py @@ -202,7 +202,6 @@ async def _read_loop(self): self.ws.recv(), timeout=self.TIMEOUT ) res = self._handle_message(res) - print(self._queue.qsize()) self._log.debug(f"Received message: {res}") if res: if self._queue.qsize() < self.max_queue_size: @@ -216,6 +215,11 @@ async def _read_loop(self): # _no_message_received_reconnect except asyncio.CancelledError as e: self._log.debug(f"_read_loop cancelled error {e}") + await self._queue.put({ + "e": "error", + "type": f"{e.__class__.__name__}", + "m": f"{e}", + }) break except ( asyncio.IncompleteReadError, @@ -236,7 +240,7 @@ async def _read_loop(self): Exception, ) as e: # reports errors and break the loop - self._log.error(f"Unknown exception ({e})") + self._log.error(f"Unknown exception: {e.__class__.__name__} ({e})") await self._queue.put({ "e": "error", "type": e.__class__.__name__, diff --git a/binance/ws/threaded_stream.py b/binance/ws/threaded_stream.py index 5f43fe969..47993e0c6 100755 --- a/binance/ws/threaded_stream.py +++ b/binance/ws/threaded_stream.py @@ -59,13 +59,19 @@ async def start_listener(self, socket, path: str, callback): except asyncio.TimeoutError: ... continue + except Exception as e: + self._log.error(f"Error receiving message: {e}") + msg = { + "e": "error", + "type": e.__class__.__name__, + "m": f"{e}", + } + if not msg: + continue # Handle both async and sync callbacks + if asyncio.iscoroutinefunction(callback): + asyncio.create_task(callback(msg)) else: - if not msg: - continue # Handle both async and sync callbacks - if asyncio.iscoroutinefunction(callback): - asyncio.create_task(callback(msg)) - else: - callback(msg) + callback(msg) del self._socket_running[path] def run(self): diff --git a/docs/depth_cache.rst b/docs/depth_cache.rst index 6cb97f9e9..8edb4b381 100755 --- a/docs/depth_cache.rst +++ b/docs/depth_cache.rst @@ -153,6 +153,31 @@ Websocket Errors ---------------- If the underlying websocket is disconnected and is unable to reconnect None is returned for the depth_cache parameter. +If the underlying websocket is disconnected an error msg is passed to the callback and to recv() containing the error message. +In the case the BinanceWebsocketClosed is returned, the websocket will attempt to reconnect 5 times before returning a BinanceUnableToConnect error. +Example: + +.. code:: python + + depth_cache = await dcm.recv() + if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error': + logger.error(f"Received depth cache error in callback: {depth_cache}") + if type == 'BinanceWebsocketClosed': + # ignore as attempts to reconnect + continue + break + +.. code:: python + def handle_depth_cache(depth_cache): + if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error': + logger.error(f"Received depth cache error in callback: {depth_cache}") + type = depth_cache.get('type') + if type == 'BinanceWebsocketClosed': + # Automatically attempts to reconnect + return + dcm.stop() + return + # handle non error cases here Examples -------- diff --git a/examples/depth_cache_example.py b/examples/depth_cache_example.py new file mode 100644 index 000000000..a179f745f --- /dev/null +++ b/examples/depth_cache_example.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 + +import os +import sys + +root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(root) + +import asyncio +import logging +from binance import AsyncClient +from binance.ws.depthcache import DepthCacheManager + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +async def main(): + # Initialize the client + client = await AsyncClient.create() + + # Symbol to monitor + symbol = 'BTCUSDT' + + # Create a depth cache manager instance + async with DepthCacheManager( + client=client, + symbol=symbol, + ) as dcm: + logger.info(f"Started depth cache for {symbol}") + + # Monitor depth cache updates for 1 minute + for _ in range(100): # 6 iterations * 10 seconds = 1 minute + depth_cache = await dcm.recv() + if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error': + logger.error(f"Received depth cache error in callback: {depth_cache}") + if type == 'BinanceWebsocketClosed': + # ignore as attempts to reconnect + continue + break + + # Get current bids and asks + bids = depth_cache.get_bids()[:5] # Top 5 bids + asks = depth_cache.get_asks()[:5] # Top 5 asks + + logger.info("Top 5 bids:") + for bid in bids: + logger.info(f"Price: {bid[0]}, Quantity: {bid[1]}") + + logger.info("Top 5 asks:") + for ask in asks: + logger.info(f"Price: {ask[0]}, Quantity: {ask[1]}") + + logger.info(f"Last update time: {depth_cache.update_time}") + + # Close the client + await client.close_connection() + +if __name__ == '__main__': + # Run the async example + asyncio.run(main()) diff --git a/examples/depth_cache_threaded_example.py b/examples/depth_cache_threaded_example.py new file mode 100644 index 000000000..6b20e5aac --- /dev/null +++ b/examples/depth_cache_threaded_example.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import os +import sys + +root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(root) + +import logging +from binance.ws.depthcache import ThreadedDepthCacheManager + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +def main(): + dcm = ThreadedDepthCacheManager() + dcm.start() + + def handle_depth_cache(depth_cache): + if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error': + logger.error(f"Received depth cache error in callback: {depth_cache}") + type = depth_cache.get('type') + if type == 'BinanceWebsocketClosed': + # Automatically attempts to reconnect + return + logger.error(f"Error received - Closing depth cache: {depth_cache}") + dcm.stop() + return + + logger.info(f"symbol {depth_cache.symbol}") + logger.info(depth_cache.get_bids()[:5]) + + dcm.start_depth_cache(handle_depth_cache, symbol='BNBBTC') + dcm.join() + + +if __name__ == "__main__": + main()