Skip to content

Commit 5d1d01d

Browse files
committed
adds self.run_forever_flag to streams
1 parent f9e25cd commit 5d1d01d

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed

alpaca_trade_api/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
from .stream import Stream # noqa
44
from .stream2 import StreamConn # noqa
55

6-
__version__ = '1.4.0'
6+
__version__ = '1.5.0'

alpaca_trade_api/stream.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(self,
4545
'dailyBars': {},
4646
}
4747
self._name = 'data'
48+
self._run_forever_flag = False
4849

4950
async def _connect(self):
5051
self._ws = await websockets.connect(
@@ -79,6 +80,7 @@ async def close(self):
7980
await self._ws.close()
8081
self._ws = None
8182
self._running = False
83+
self._run_forever_flag = False
8284

8385
async def stop_ws(self):
8486
self._stop_stream_queue.put_nowait({"should_stop": True})
@@ -191,7 +193,8 @@ async def _run_forever(self):
191193
await asyncio.sleep(0.1)
192194
log.info(f'started {self._name} stream')
193195
self._running = False
194-
while True:
196+
self._run_forever_flag = True
197+
while self._run_forever_flag:
195198
try:
196199
if not self._running:
197200
log.info("starting websocket connection")
@@ -376,6 +379,7 @@ def __init__(self,
376379
self._ws = None
377380
self._running = False
378381
self._stop_stream_queue = queue.Queue()
382+
self._run_forever_flag = False
379383

380384
async def _connect(self):
381385
self._ws = await websockets.connect(self._endpoint)
@@ -443,7 +447,8 @@ async def _run_forever(self):
443447
await asyncio.sleep(0.1)
444448
log.info('started trading stream')
445449
self._running = False
446-
while True:
450+
self._run_forever_flag = True
451+
while self._run_forever_flag:
447452
try:
448453
if not self._running:
449454
log.info("starting websocket connection")
@@ -466,6 +471,7 @@ async def close(self):
466471
await self._ws.close()
467472
self._ws = None
468473
self._running = False
474+
self._run_forever_flag = False
469475

470476
async def stop_ws(self):
471477
self._stop_stream_queue.put_nowait({"should_stop": True})
@@ -648,6 +654,22 @@ def run(self):
648654
print('keyboard interrupt, bye')
649655
pass
650656

657+
async def _close(self):
658+
await asyncio.gather(
659+
self.stop_ws(),
660+
self._trading_ws.close(),
661+
self._data_ws.close(),
662+
self._crypto_ws.close()
663+
)
664+
665+
def close(self):
666+
loop = asyncio.get_event_loop()
667+
try:
668+
loop.run_until_complete(self._close())
669+
except KeyboardInterrupt:
670+
print('keyboard interrupt, bye')
671+
pass
672+
651673
async def stop_ws(self):
652674
"""
653675
Signal the ws connections to stop listenning to api stream.

0 commit comments

Comments
 (0)