@@ -45,6 +45,7 @@ def __init__(self,
4545 'dailyBars' : {},
4646 }
4747 self ._name = 'data'
48+ self ._run_forever_flag = False
4849
4950 async def _connect (self ):
5051 self ._ws = await websockets .connect (
@@ -79,6 +80,7 @@ async def close(self):
7980 await self ._ws .close ()
8081 self ._ws = None
8182 self ._running = False
83+ self ._run_forever_flag = False
8284
8385 async def stop_ws (self ):
8486 self ._stop_stream_queue .put_nowait ({"should_stop" : True })
@@ -191,7 +193,8 @@ async def _run_forever(self):
191193 await asyncio .sleep (0.1 )
192194 log .info (f'started { self ._name } stream' )
193195 self ._running = False
194- while True :
196+ self ._run_forever_flag = True
197+ while self ._run_forever_flag :
195198 try :
196199 if not self ._running :
197200 log .info ("starting websocket connection" )
@@ -376,6 +379,7 @@ def __init__(self,
376379 self ._ws = None
377380 self ._running = False
378381 self ._stop_stream_queue = queue .Queue ()
382+ self ._run_forever_flag = False
379383
380384 async def _connect (self ):
381385 self ._ws = await websockets .connect (self ._endpoint )
@@ -443,7 +447,8 @@ async def _run_forever(self):
443447 await asyncio .sleep (0.1 )
444448 log .info ('started trading stream' )
445449 self ._running = False
446- while True :
450+ self ._run_forever_flag = True
451+ while self ._run_forever_flag :
447452 try :
448453 if not self ._running :
449454 log .info ("starting websocket connection" )
@@ -466,6 +471,7 @@ async def close(self):
466471 await self ._ws .close ()
467472 self ._ws = None
468473 self ._running = False
474+ self ._run_forever_flag = False
469475
470476 async def stop_ws (self ):
471477 self ._stop_stream_queue .put_nowait ({"should_stop" : True })
@@ -648,6 +654,22 @@ def run(self):
648654 print ('keyboard interrupt, bye' )
649655 pass
650656
657+ async def _close (self ):
658+ await asyncio .gather (
659+ self .stop_ws (),
660+ self ._trading_ws .close (),
661+ self ._data_ws .close (),
662+ self ._crypto_ws .close ()
663+ )
664+
665+ def close (self ):
666+ loop = asyncio .get_event_loop ()
667+ try :
668+ loop .run_until_complete (self ._close ())
669+ except KeyboardInterrupt :
670+ print ('keyboard interrupt, bye' )
671+ pass
672+
651673 async def stop_ws (self ):
652674 """
653675 Signal the ws connections to stop listenning to api stream.
0 commit comments