Skip to content

Commit e387db6

Browse files
author
Shlomi Kushchi
authored
Merge pull request #379 from alpacahq/advanced_streamconn_usage
Advanced streamconn usage
2 parents 788f035 + 07f1e9c commit e387db6

File tree

8 files changed

+234
-4
lines changed

8 files changed

+234
-4
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,14 @@ import logging
288288
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
289289
```
290290

291+
## Websocket best practices
292+
Under the examples folder you could find several examples to do the following:
293+
* Different subscriptions(channels) usage with alpaca/polygon streams
294+
* pause / resume connection
295+
* change subscriptions/channels of existing connection
296+
* ws disconnections handler (make sure we reconnect when the internal mechanism fails)
297+
298+
291299
# Polygon API Service
292300

293301
Alpaca's API key ID can be used to access Polygon API, the documentation for

alpaca_trade_api/polygon/streamconn.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,15 @@ def run(self, initial_channels=[]):
190190

191191
async def close(self):
192192
'''Close any open connections'''
193-
if self._consume_task:
194-
self._consume_task.cancel()
193+
await self.cancel_task()
195194
if self._ws is not None:
196195
await self._ws.close()
197196
self._ws = None
198197

198+
async def cancel_task(self):
199+
if self._consume_task:
200+
self._consume_task.cancel()
201+
199202
def _cast(self, subject, data):
200203
if subject == 'T':
201204
return Trade({trade_mapping[k]: v for k,

alpaca_trade_api/stream2.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import re
55
import traceback
66
from asyncio import CancelledError
7+
import queue
78

89
import websockets
910
from .common import get_base_url, get_data_url, get_credentials, URL
@@ -140,12 +141,15 @@ async def unsubscribe(self, channels):
140141
}))
141142

142143
async def close(self):
143-
if self._consume_task:
144-
self._consume_task.cancel()
144+
await self.cancel_task()
145145
if self._ws:
146146
await self._ws.close()
147147
self._ws = None
148148

149+
async def cancel_task(self):
150+
if self._consume_task:
151+
self._consume_task.cancel()
152+
149153
def _cast(self, channel, msg):
150154
if channel == 'account_updates':
151155
return Account(msg)
@@ -227,6 +231,7 @@ def __init__(
227231
self._data_stream = _data_stream
228232
self._debug = debug
229233
self._raw_data = raw_data
234+
self._stop_stream_queue = queue.Queue()
230235

231236
self.trading_ws = _StreamConn(self._key_id,
232237
self._secret_key,
@@ -337,6 +342,9 @@ def run(self, initial_channels: List[str] = []):
337342
logging.error(f"error while consuming ws messages: {m}")
338343
if self._debug:
339344
traceback.print_exc()
345+
if not self._stop_stream_queue.empty():
346+
self._stop_stream_queue.get()
347+
should_renew = False
340348
loop.run_until_complete(self.close(should_renew))
341349
if loop.is_running():
342350
loop.close()
@@ -370,6 +378,18 @@ async def close(self, renew):
370378
self._oauth,
371379
raw_data=self._raw_data)
372380

381+
async def stop_ws(self):
382+
"""
383+
Signal the ws connections to stop listenning to api stream.
384+
"""
385+
self._stop_stream_queue.put_nowait({"should_stop": True})
386+
if self.trading_ws is not None:
387+
logging.info("Stopping the trading websocket connection")
388+
await self.trading_ws.cancel_task()
389+
if self.data_ws is not None:
390+
logging.info("Stopping the data websocket connection")
391+
await self.data_ws.cancel_task()
392+
373393
def on(self, channel_pat, symbols=None):
374394
def decorator(func):
375395
self.register(channel_pat, func, symbols)

examples/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,13 @@ This shows a basic approach to opening a streaming connection for Polygon market
1717
## Martingale
1818

1919
This trading algorithm explores a strategy based on a gambling technique. Trading every few seconds, it maintains a position in the $SPY symbol of a size determined by the number of up or down candles it's experienced in a row. For a more complete explanation, please see [this post](https://forum.alpaca.markets/t/martingale-day-trading-with-the-alpaca-trading-api/).
20+
21+
## Websocket Best practices
22+
Under this folder you could find several examples to do the following:
23+
* Different subscriptions(channels) usage with alpaca/polygon streams
24+
* pause / resume connection
25+
* change subscriptions of existing connection
26+
* ws disconnections handler (make sure we reconnect)
27+
28+
29+
Use it to integrate with your own code.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"""
2+
In this example code we wrap the ws connection to make sure we reconnect
3+
in case of ws disconnection.
4+
"""
5+
6+
import logging
7+
import threading
8+
import asyncio
9+
import time
10+
from alpaca_trade_api import StreamConn
11+
from alpaca_trade_api.common import URL
12+
13+
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
14+
15+
ALPACA_API_KEY = "<YOUR-API-KEY>"
16+
ALPACA_SECRET_KEY = "<YOUR-SECRET-KEY>"
17+
USE_POLYGON = False
18+
19+
20+
def run_connection(conn, channels):
21+
try:
22+
conn.run(channels)
23+
except Exception as e:
24+
print(f'Exception from websocket connection: {e}')
25+
finally:
26+
send_msg(f"Trying to re-establish connection")
27+
time.sleep(3)
28+
run_connection(conn, channels)
29+
30+
if __name__ == '__main__':
31+
channels = ['alpacadatav1/Q.GOOG']
32+
33+
conn = StreamConn(
34+
ALPACA_API_KEY,
35+
ALPACA_SECRET_KEY,
36+
base_url=URL('https://paper-api.alpaca.markets'),
37+
data_url=URL('https://data.alpaca.markets'),
38+
# data_url=URL('http://127.0.0.1:8765'),
39+
data_stream='polygon' if USE_POLYGON else 'alpacadatav1'
40+
)
41+
42+
43+
@conn.on(r'^AM\..+$')
44+
async def on_minute_bars(conn, channel, bar):
45+
print('bars', bar)
46+
47+
48+
@conn.on(r'Q\..+')
49+
async def on_quotes(conn, channel, quote):
50+
print('quote', quote)
51+
52+
53+
@conn.on(r'T\..+')
54+
async def on_trades(conn, channel, trade):
55+
print('trade', trade)
56+
57+
run_connection(conn, channels)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""
2+
In this example code we will show a pattern that allows a user to change
3+
the websocket subscriptions as they please.
4+
"""
5+
import logging
6+
import threading
7+
import asyncio
8+
import time
9+
from alpaca_trade_api import StreamConn
10+
from alpaca_trade_api.common import URL
11+
12+
ALPACA_API_KEY = "<YOUR-API-KEY>"
13+
ALPACA_SECRET_KEY = "<YOUR-SECRET-KEY>"
14+
USE_POLYGON = False
15+
16+
conn: StreamConn = None
17+
18+
def consumer_thread():
19+
20+
try:
21+
# make sure we have an event loop, if not create a new one
22+
loop = asyncio.get_event_loop()
23+
loop.set_debug(True)
24+
except RuntimeError:
25+
asyncio.set_event_loop(asyncio.new_event_loop())
26+
27+
global conn
28+
conn = StreamConn(
29+
ALPACA_API_KEY,
30+
ALPACA_SECRET_KEY,
31+
base_url=URL('https://paper-api.alpaca.markets'),
32+
data_url=URL('https://data.alpaca.markets'),
33+
# data_url=URL('http://127.0.0.1:8765'),
34+
data_stream='polygon' if USE_POLYGON else 'alpacadatav1'
35+
)
36+
37+
@conn.on(r'^AM\..+$')
38+
async def on_minute_bars(conn, channel, bar):
39+
print('bars', bar)
40+
41+
42+
@conn.on(r'Q\..+')
43+
async def on_quotes(conn, channel, quote):
44+
print('quote', quote)
45+
46+
47+
@conn.on(r'T\..+')
48+
async def on_trades(conn, channel, trade):
49+
print('trade', trade)
50+
51+
conn.run(['alpacadatav1/Q.GOOG'])
52+
53+
if __name__ == '__main__':
54+
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
55+
threading.Thread(target=consumer_thread).start()
56+
57+
loop = asyncio.get_event_loop()
58+
59+
time.sleep(5) # give the initial connection time to be established
60+
subscriptions = [['alpacadatav1/AM.TSLA'], ['alpacadatav1/Q.GOOG'],
61+
['alpacadatav1/T.AAPL']]
62+
63+
while 1:
64+
for channels in subscriptions:
65+
loop.run_until_complete(conn.subscribe(channels))
66+
if "AM." in channels[0]:
67+
time.sleep(60) # aggs are once every minute. give it time
68+
else:
69+
time.sleep(20)
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""
2+
In this example code we will show how to shut the streamconn websocket
3+
connection down and then up again. it's the ability to stop/start the
4+
connection
5+
"""
6+
import logging
7+
import threading
8+
import asyncio
9+
import time
10+
from alpaca_trade_api import StreamConn
11+
from alpaca_trade_api.common import URL
12+
13+
ALPACA_API_KEY = "<YOUR-API-KEY>"
14+
ALPACA_SECRET_KEY = "<YOUR-SECRET-KEY>"
15+
USE_POLYGON = False
16+
17+
conn: StreamConn = None
18+
19+
def consumer_thread():
20+
21+
try:
22+
# make sure we have an event loop, if not create a new one
23+
loop = asyncio.get_event_loop()
24+
loop.set_debug(True)
25+
except RuntimeError:
26+
asyncio.set_event_loop(asyncio.new_event_loop())
27+
28+
global conn
29+
conn = StreamConn(
30+
ALPACA_API_KEY,
31+
ALPACA_SECRET_KEY,
32+
base_url=URL('https://paper-api.alpaca.markets'),
33+
data_url=URL('https://data.alpaca.markets'),
34+
# data_url=URL('http://127.0.0.1:8765'),
35+
data_stream='polygon' if USE_POLYGON else 'alpacadatav1'
36+
)
37+
38+
@conn.on(r'^AM\..+$')
39+
async def on_minute_bars(conn, channel, bar):
40+
print('bars', bar)
41+
42+
43+
@conn.on(r'Q\..+')
44+
async def on_quotes(conn, channel, quote):
45+
print('quote', quote)
46+
47+
48+
@conn.on(r'T\..+')
49+
async def on_trades(conn, channel, trade):
50+
print('trade', trade)
51+
52+
conn.run(['alpacadatav1/Q.GOOG'])
53+
54+
if __name__ == '__main__':
55+
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
56+
57+
loop = asyncio.get_event_loop()
58+
59+
while 1:
60+
threading.Thread(target=consumer_thread).start()
61+
time.sleep(5)
62+
loop.run_until_complete(conn.stop_ws())
63+
time.sleep(20)
File renamed without changes.

0 commit comments

Comments
 (0)