66import msgpack
77import re
88import websockets
9- import queue
109
1110from .common import get_base_url , get_data_stream_url , get_credentials , URL
1211from .entity import Entity
@@ -59,7 +58,6 @@ def __init__(self,
5958 self ._running = False
6059 self ._loop = None
6160 self ._raw_data = raw_data
62- self ._stop_stream_queue = queue .Queue ()
6361 self ._handlers = {
6462 'trades' : {},
6563 'quotes' : {},
@@ -113,26 +111,14 @@ async def close(self):
113111
114112 async def stop_ws (self ):
115113 self ._should_run = False
116- if self ._stop_stream_queue .empty ():
117- self ._stop_stream_queue .put_nowait ({"should_stop" : True })
114+ await self .close ()
118115
119116 async def _consume (self ):
120117 while True :
121- if not self ._stop_stream_queue .empty ():
122- self ._stop_stream_queue .get (timeout = 1 )
123- await self .close ()
124- break
125- else :
126- try :
127- r = await asyncio .wait_for (self ._ws .recv (), 5 )
128- msgs = msgpack .unpackb (r )
129- for msg in msgs :
130- await self ._dispatch (msg )
131- except asyncio .TimeoutError :
132- # ws.recv is hanging when no data is received. by using
133- # wait_for we break when no data is received, allowing us
134- # to break the loop when needed
135- pass
118+ r = await self ._ws .recv ()
119+ msgs = msgpack .unpackb (r )
120+ for msg in msgs :
121+ await self ._dispatch (msg )
136122
137123 def _cast (self , msg_type , msg ):
138124 result = msg
@@ -230,14 +216,10 @@ async def _run_forever(self):
230216 v for k , v in self ._handlers .items ()
231217 if k not in ("cancelErrors" , "corrections" )
232218 ):
233- if not self ._stop_stream_queue .empty ():
234- # the ws was signaled to stop before starting the loop so
235- # we break
236- self ._stop_stream_queue .get (timeout = 1 )
219+ if not self ._should_run :
237220 return
238221 await asyncio .sleep (0.1 )
239222 log .info (f'started { self ._name } stream' )
240- self ._should_run = True
241223 self ._running = False
242224 while True :
243225 try :
@@ -253,10 +235,10 @@ async def _run_forever(self):
253235 self ._running = True
254236 await self ._consume ()
255237 except websockets .WebSocketException as wse :
256- await self .close ()
257- self . _running = False
258- log .warn ('data websocket error, restarting connection: ' +
259- str (wse ))
238+ if self ._should_run :
239+ await self . close ()
240+ log .warn ('data websocket error, restarting connection: ' +
241+ str (wse ))
260242 except Exception as e :
261243 log .exception ('error during websocket '
262244 'communication: {}' .format (str (e )))
@@ -621,7 +603,6 @@ def __init__(self,
621603 self ._running = False
622604 self ._loop = None
623605 self ._raw_data = raw_data
624- self ._stop_stream_queue = queue .Queue ()
625606 self ._should_run = True
626607 self ._websocket_params = websocket_params
627608
@@ -686,31 +667,18 @@ async def _start_ws(self):
686667
687668 async def _consume (self ):
688669 while True :
689- if not self ._stop_stream_queue .empty ():
690- self ._stop_stream_queue .get (timeout = 1 )
691- await self .close ()
692- break
693- else :
694- try :
695- r = await asyncio .wait_for (self ._ws .recv (), 5 )
696- msg = json .loads (r )
697- await self ._dispatch (msg )
698- except asyncio .TimeoutError :
699- # ws.recv is hanging when no data is received. by using
700- # wait_for we break when no data is received, allowing us
701- # to break the loop when needed
702- pass
670+ r = await self ._ws .recv ()
671+ msg = json .loads (r )
672+ await self ._dispatch (msg )
703673
704674 async def _run_forever (self ):
705675 self ._loop = asyncio .get_running_loop ()
706676 # do not start the websocket connection until we subscribe to something
707677 while not self ._trade_updates_handler :
708- if not self ._stop_stream_queue .empty ():
709- self ._stop_stream_queue .get (timeout = 1 )
678+ if not self ._should_run :
710679 return
711680 await asyncio .sleep (0.1 )
712681 log .info ('started trading stream' )
713- self ._should_run = True
714682 self ._running = False
715683 while True :
716684 try :
@@ -723,10 +691,10 @@ async def _run_forever(self):
723691 self ._running = True
724692 await self ._consume ()
725693 except websockets .WebSocketException as wse :
726- await self .close ()
727- self . _running = False
728- log .warn ('trading stream websocket error, restarting ' +
729- ' connection: ' + str (wse ))
694+ if self ._should_run :
695+ await self . close ()
696+ log .warn ('trading stream websocket error, restarting ' +
697+ ' connection: ' + str (wse ))
730698 except Exception as e :
731699 log .exception ('error during websocket '
732700 'communication: {}' .format (str (e )))
@@ -741,8 +709,7 @@ async def close(self):
741709
742710 async def stop_ws (self ):
743711 self ._should_run = False
744- if self ._stop_stream_queue .empty ():
745- self ._stop_stream_queue .put_nowait ({"should_stop" : True })
712+ await self .close ()
746713
747714 def stop (self ):
748715 if self ._loop .is_running ():
0 commit comments