Skip to content

Commit 43f0766

Browse files
authored
Move updated bars to their own subscription (#574)
* Move updated bars to their own subscription * update unsubscribe * pep8 format fix
1 parent 294b105 commit 43f0766

File tree

2 files changed

+71
-20
lines changed

2 files changed

+71
-20
lines changed

alpaca_trade_api/stream.py

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,11 @@ def __init__(self,
5555
self._raw_data = raw_data
5656
self._stop_stream_queue = queue.Queue()
5757
self._handlers = {
58-
'trades': {},
59-
'quotes': {},
60-
'bars': {},
61-
'dailyBars': {},
58+
'trades': {},
59+
'quotes': {},
60+
'bars': {},
61+
'updatedBars': {},
62+
'dailyBars': {},
6263
}
6364
self._name = 'data'
6465
self._should_run = True
@@ -137,7 +138,7 @@ def _cast(self, msg_type, msg):
137138
quote_mapping_v2[k]: v
138139
for k, v in msg.items() if k in quote_mapping_v2
139140
})
140-
elif msg_type in ('b', 'd'):
141+
elif msg_type in ('b', 'u', 'd'):
141142
result = Bar({
142143
bar_mapping_v2[k]: v
143144
for k, v in msg.items() if k in bar_mapping_v2
@@ -164,6 +165,11 @@ async def _dispatch(self, msg):
164165
symbol, self._handlers['bars'].get('*', None))
165166
if handler:
166167
await handler(self._cast(msg_type, msg))
168+
elif msg_type == 'u':
169+
handler = self._handlers['updatedBars'].get(
170+
symbol, self._handlers['updatedBars'].get('*', None))
171+
if handler:
172+
await handler(self._cast(msg_type, msg))
167173
elif msg_type == 'd':
168174
handler = self._handlers['dailyBars'].get(
169175
symbol, self._handlers['dailyBars'].get('*', None))
@@ -201,15 +207,17 @@ async def _unsubscribe(self,
201207
trades=(),
202208
quotes=(),
203209
bars=(),
210+
updated_bars=(),
204211
daily_bars=()):
205-
if trades or quotes or bars or daily_bars:
212+
if trades or quotes or bars or updated_bars or daily_bars:
206213
await self._ws.send(
207214
msgpack.packb({
208-
'action': 'unsubscribe',
209-
'trades': trades,
210-
'quotes': quotes,
211-
'bars': bars,
212-
'dailyBars': daily_bars,
215+
'action': 'unsubscribe',
216+
'trades': trades,
217+
'quotes': quotes,
218+
'bars': bars,
219+
'updatedBars': updated_bars,
220+
'dailyBars': daily_bars,
213221
}))
214222

215223
async def _run_forever(self):
@@ -261,6 +269,9 @@ def subscribe_quotes(self, handler, *symbols):
261269
def subscribe_bars(self, handler, *symbols):
262270
self._subscribe(handler, symbols, self._handlers['bars'])
263271

272+
def subscribe_updated_bars(self, handler, *symbols):
273+
self._subscribe(handler, symbols, self._handlers['updatedBars'])
274+
264275
def subscribe_daily_bars(self, handler, *symbols):
265276
self._subscribe(handler, symbols, self._handlers['dailyBars'])
266277

@@ -288,6 +299,13 @@ def unsubscribe_bars(self, *symbols):
288299
for symbol in symbols:
289300
del self._handlers['bars'][symbol]
290301

302+
def unsubscribe_updated_bars(self, *symbols):
303+
if self._running:
304+
asyncio.get_event_loop().run_until_complete(
305+
self._unsubscribe(updated_bars=symbols))
306+
for symbol in symbols:
307+
del self._handlers['updatedBars'][symbol]
308+
291309
def unsubscribe_daily_bars(self, *symbols):
292310
if self._running:
293311
asyncio.run_coroutine_threadsafe(
@@ -377,19 +395,22 @@ async def _unsubscribe(self,
377395
trades=(),
378396
quotes=(),
379397
bars=(),
398+
updated_bars=(),
380399
daily_bars=(),
381400
statuses=(),
382401
lulds=()):
383-
if trades or quotes or bars or daily_bars or statuses or lulds:
402+
if (trades or quotes or bars or updated_bars or daily_bars or
403+
statuses or lulds):
384404
await self._ws.send(
385405
msgpack.packb({
386-
'action': 'unsubscribe',
387-
'trades': trades,
388-
'quotes': quotes,
389-
'bars': bars,
390-
'dailyBars': daily_bars,
391-
'statuses': statuses,
392-
'lulds': lulds,
406+
'action': 'unsubscribe',
407+
'trades': trades,
408+
'quotes': quotes,
409+
'bars': bars,
410+
'updatedBars': updated_bars,
411+
'dailyBars': daily_bars,
412+
'statuses': statuses,
413+
'lulds': lulds,
393414
}))
394415

395416
def subscribe_statuses(self, handler, *symbols):
@@ -702,6 +723,9 @@ def subscribe_quotes(self, handler, *symbols):
702723
def subscribe_bars(self, handler, *symbols):
703724
self._data_ws.subscribe_bars(handler, *symbols)
704725

726+
def subscribe_updated_bars(self, handler, *symbols):
727+
self._data_ws.subscribe_updated_bars(handler, *symbols)
728+
705729
def subscribe_daily_bars(self, handler, *symbols):
706730
self._data_ws.subscribe_daily_bars(handler, *symbols)
707731

@@ -720,6 +744,9 @@ def subscribe_crypto_quotes(self, handler, *symbols):
720744
def subscribe_crypto_bars(self, handler, *symbols):
721745
self._crypto_ws.subscribe_bars(handler, *symbols)
722746

747+
def subscribe_crypto_updated_bars(self, handler, *symbols):
748+
self._crypto_ws.subscribe_updated_bars(handler, *symbols)
749+
723750
def subscribe_crypto_daily_bars(self, handler, *symbols):
724751
self._crypto_ws.subscribe_daily_bars(handler, *symbols)
725752

@@ -751,6 +778,13 @@ def decorator(func):
751778

752779
return decorator
753780

781+
def on_updated_bar(self, *symbols):
782+
def decorator(func):
783+
self.subscribe_updated_bars(func, *symbols)
784+
return func
785+
786+
return decorator
787+
754788
def on_daily_bar(self, *symbols):
755789
def decorator(func):
756790
self.subscribe_daily_bars(func, *symbols)
@@ -807,6 +841,13 @@ def decorator(func):
807841

808842
return decorator
809843

844+
def on_crypto_updated_bar(self, *symbols):
845+
def decorator(func):
846+
self.subscribe_crypto_updated_bars(func, *symbols)
847+
return func
848+
849+
return decorator
850+
810851
def on_crypto_daily_bar(self, *symbols):
811852
def decorator(func):
812853
self.subscribe_crypto_daily_bars(func, *symbols)
@@ -832,6 +873,9 @@ def unsubscribe_quotes(self, *symbols):
832873
def unsubscribe_bars(self, *symbols):
833874
self._data_ws.unsubscribe_bars(*symbols)
834875

876+
def unsubscribe_updated_bars(self, *symbols):
877+
self._data_ws.unsubscribe_updated_bars(*symbols)
878+
835879
def unsubscribe_daily_bars(self, *symbols):
836880
self._data_ws.unsubscribe_daily_bars(*symbols)
837881

@@ -850,6 +894,9 @@ def unsubscribe_crypto_quotes(self, *symbols):
850894
def unsubscribe_crypto_bars(self, *symbols):
851895
self._crypto_ws.unsubscribe_bars(*symbols)
852896

897+
def unsubscribe_crypto_updated_bars(self, *symbols):
898+
self._crypto_ws.unsubscribe_updated_bars(*symbols)
899+
853900
def unsubscribe_crypto_daily_bars(self, *symbols):
854901
self._crypto_ws.unsubscribe_daily_bars(*symbols)
855902

@@ -901,7 +948,7 @@ def is_open(self):
901948
:return:
902949
"""
903950
open_ws = (self._trading_ws._ws or self._data_ws._ws
904-
or self._crypto_ws._ws or self._news_ws) # noqa
951+
or self._crypto_ws._ws or self._news_ws) # noqa
905952
if open_ws:
906953
return True
907954
return False

examples/websockets/v2_example.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ def main():
3434
async def _(bar):
3535
print('bar', bar)
3636

37+
@stream.on_updated_bar('MSFT')
38+
async def _(bar):
39+
print('updated bar', bar)
40+
3741
@stream.on_status("*")
3842
async def _(status):
3943
print('status', status)

0 commit comments

Comments
 (0)