@@ -51,6 +51,7 @@ def __init__(self,
5151 self ._secret_key = secret_key
5252 self ._ws = None
5353 self ._running = False
54+ self ._loop = None
5455 self ._raw_data = raw_data
5556 self ._stop_stream_queue = queue .Queue ()
5657 self ._handlers = {
@@ -179,7 +180,9 @@ def _subscribe(self, handler, symbols, handlers):
179180 for symbol in symbols :
180181 handlers [symbol ] = handler
181182 if self ._running :
182- asyncio .get_event_loop ().run_until_complete (self ._subscribe_all ())
183+ asyncio .run_coroutine_threadsafe (
184+ self ._subscribe_all (), self ._loop
185+ ).result ()
183186
184187 async def _subscribe_all (self ):
185188 if any (
@@ -210,6 +213,7 @@ async def _unsubscribe(self,
210213 }))
211214
212215 async def _run_forever (self ):
216+ self ._loop = asyncio .get_running_loop ()
213217 # do not start the websocket connection until we subscribe to something
214218 while not any (
215219 v for k , v in self ._handlers .items ()
@@ -262,32 +266,42 @@ def subscribe_daily_bars(self, handler, *symbols):
262266
263267 def unsubscribe_trades (self , * symbols ):
264268 if self ._running :
265- asyncio .get_event_loop ().run_until_complete (
266- self ._unsubscribe (trades = symbols ))
269+ asyncio .run_coroutine_threadsafe (
270+ self ._unsubscribe (trades = symbols ),
271+ self ._loop ).result ()
267272 for symbol in symbols :
268273 del self ._handlers ['trades' ][symbol ]
269274
270275 def unsubscribe_quotes (self , * symbols ):
271276 if self ._running :
272- asyncio .get_event_loop ().run_until_complete (
273- self ._unsubscribe (quotes = symbols ))
277+ asyncio .run_coroutine_threadsafe (
278+ self ._unsubscribe (quotes = symbols ),
279+ self ._loop ).result ()
274280 for symbol in symbols :
275281 del self ._handlers ['quotes' ][symbol ]
276282
277283 def unsubscribe_bars (self , * symbols ):
278284 if self ._running :
279- asyncio .get_event_loop ().run_until_complete (
280- self ._unsubscribe (bars = symbols ))
285+ asyncio .run_coroutine_threadsafe (
286+ self ._unsubscribe (bars = symbols ),
287+ self ._loop ).result ()
281288 for symbol in symbols :
282289 del self ._handlers ['bars' ][symbol ]
283290
284291 def unsubscribe_daily_bars (self , * symbols ):
285292 if self ._running :
286- asyncio .get_event_loop ().run_until_complete (
287- self ._unsubscribe (daily_bars = symbols ))
293+ asyncio .run_coroutine_threadsafe (
294+ self ._unsubscribe (daily_bars = symbols ),
295+ self ._loop ).result ()
288296 for symbol in symbols :
289297 del self ._handlers ['dailyBars' ][symbol ]
290298
299+ def stop (self ):
300+ if self ._loop .is_running ():
301+ asyncio .run_coroutine_threadsafe (
302+ self .stop_ws (),
303+ self ._loop ).result ()
304+
291305
292306class DataStream (_DataStream ):
293307 def __init__ (self ,
@@ -386,15 +400,17 @@ def subscribe_lulds(self, handler, *symbols):
386400
387401 def unsubscribe_statuses (self , * symbols ):
388402 if self ._running :
389- asyncio .get_event_loop ().run_until_complete (
390- self ._unsubscribe (statuses = symbols ))
403+ asyncio .run_coroutine_threadsafe (
404+ self ._unsubscribe (statuses = symbols ),
405+ self ._loop ).result ()
391406 for symbol in symbols :
392407 del self ._handlers ['statuses' ][symbol ]
393408
394409 def unsubscribe_lulds (self , * symbols ):
395410 if self ._running :
396- asyncio .get_event_loop ().run_until_complete (
397- self ._unsubscribe (lulds = symbols ))
411+ asyncio .run_coroutine_threadsafe (
412+ self ._unsubscribe (lulds = symbols ),
413+ self ._loop ).result ()
398414 for symbol in symbols :
399415 del self ._handlers ['lulds' ][symbol ]
400416
@@ -484,8 +500,9 @@ def subscribe_news(self, handler, *symbols):
484500
485501 def unsubscribe_news (self , * symbols ):
486502 if self ._running :
487- asyncio .get_event_loop ().run_until_complete (
488- self ._unsubscribe (news = symbols ))
503+ asyncio .run_coroutine_threadsafe (
504+ self ._unsubscribe (news = symbols ),
505+ self ._loop ).result ()
489506 for symbol in symbols :
490507 del self ._handlers ['news' ][symbol ]
491508
@@ -503,6 +520,7 @@ def __init__(self,
503520 self ._trade_updates_handler = None
504521 self ._ws = None
505522 self ._running = False
523+ self ._loop = None
506524 self ._raw_data = raw_data
507525 self ._stop_stream_queue = queue .Queue ()
508526 self ._should_run = True
@@ -550,8 +568,9 @@ def subscribe_trade_updates(self, handler):
550568 _ensure_coroutine (handler )
551569 self ._trade_updates_handler = handler
552570 if self ._running :
553- asyncio .get_event_loop ().run_until_complete (
554- self ._subscribe_trade_updates ())
571+ asyncio .run_coroutine_threadsafe (
572+ self ._subscribe_trade_updates (),
573+ self ._loop ).result ()
555574
556575 async def _start_ws (self ):
557576 await self ._connect ()
@@ -577,6 +596,7 @@ async def _consume(self):
577596 pass
578597
579598 async def _run_forever (self ):
599+ self ._loop = asyncio .get_running_loop ()
580600 # do not start the websocket connection until we subscribe to something
581601 while not self ._trade_updates_handler :
582602 if not self ._stop_stream_queue .empty ():
@@ -590,7 +610,7 @@ async def _run_forever(self):
590610 try :
591611 if not self ._should_run :
592612 log .info ("Trading stream stopped" )
593- break
613+ return
594614 if not self ._running :
595615 log .info ("starting trading websocket connection" )
596616 await self ._start_ws ()
@@ -618,6 +638,12 @@ async def stop_ws(self):
618638 if self ._stop_stream_queue .empty ():
619639 self ._stop_stream_queue .put_nowait ({"should_stop" : True })
620640
641+ def stop (self ):
642+ if self ._loop .is_running ():
643+ asyncio .run_coroutine_threadsafe (
644+ self .stop_ws (),
645+ self ._loop ).result ()
646+
621647
622648class Stream :
623649 def __init__ (self ,
@@ -630,25 +656,25 @@ def __init__(self,
630656 crypto_exchanges : Optional [List [str ]] = None ):
631657 self ._key_id , self ._secret_key , _ = get_credentials (key_id , secret_key )
632658 self ._base_url = base_url or get_base_url ()
633- self ._data_steam_url = data_stream_url or get_data_stream_url ()
659+ self ._data_stream_url = data_stream_url or get_data_stream_url ()
634660
635661 self ._trading_ws = TradingStream (self ._key_id ,
636662 self ._secret_key ,
637663 self ._base_url ,
638664 raw_data )
639665 self ._data_ws = DataStream (self ._key_id ,
640666 self ._secret_key ,
641- self ._data_steam_url ,
667+ self ._data_stream_url ,
642668 raw_data ,
643669 data_feed .lower ())
644670 self ._crypto_ws = CryptoDataStream (self ._key_id ,
645671 self ._secret_key ,
646- self ._data_steam_url ,
672+ self ._data_stream_url ,
647673 raw_data ,
648674 crypto_exchanges )
649675 self ._news_ws = NewsDataStream (self ._key_id ,
650676 self ._secret_key ,
651- self ._data_steam_url ,
677+ self ._data_stream_url ,
652678 raw_data )
653679
654680 def subscribe_trade_updates (self , handler ):
@@ -836,9 +862,8 @@ async def _run_forever(self):
836862 self ._news_ws ._run_forever ())
837863
838864 def run (self ):
839- loop = asyncio .get_event_loop ()
840865 try :
841- loop . run_until_complete (self ._run_forever ())
866+ asyncio . run (self ._run_forever ())
842867 except KeyboardInterrupt :
843868 print ('keyboard interrupt, bye' )
844869 pass
@@ -859,6 +884,16 @@ async def stop_ws(self):
859884 if self ._news_ws :
860885 await self ._news_ws .stop_ws ()
861886
887+ def stop (self ):
888+ if self ._trading_ws :
889+ self ._trading_ws .stop ()
890+ if self ._data_ws :
891+ self ._data_ws .stop ()
892+ if self ._crypto_ws :
893+ self ._crypto_ws .stop ()
894+ if self ._news_ws :
895+ self ._news_ws .stop ()
896+
862897 def is_open (self ):
863898 """
864899 Checks if either of the websockets is open
0 commit comments