Skip to content

Commit 7f0fb76

Browse files
committed
Fix Polygon handler registry and add symbol specification
1 parent 5ca968b commit 7f0fb76

File tree

2 files changed

+20
-17
lines changed

2 files changed

+20
-17
lines changed

alpaca_trade_api/polygon/stream2.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ def __init__(self, key_id=None):
1717
'wss://alpaca.socket.polygon.io/stocks'
1818
).rstrip('/')
1919
self._handlers = {}
20+
self._handler_symbols = {}
2021
self._ws = None
2122
self._retry = int(os.environ.get('APCA_RETRY_MAX', 3))
2223
self._retry_wait = int(os.environ.get('APCA_RETRY_WAIT', 3))
@@ -114,7 +115,7 @@ def run(self, initial_channels=[]):
114115
loop.run_until_complete(self.close())
115116

116117
async def close(self):
117-
'''Close any of open connections'''
118+
'''Close any open connections'''
118119
if self._ws is not None:
119120
await self._ws.close()
120121

@@ -168,15 +169,10 @@ def _cast(self, subject, data):
168169
async def _dispatch(self, channel, msg):
169170
for pat, handler in self._handlers.items():
170171
if pat.match(channel):
171-
ent = self._cast(channel, msg)
172-
await handler(self, channel, ent)
173-
174-
def on(self, channel_pat):
175-
def decorator(func):
176-
self.register(channel_pat, func)
177-
return func
178-
179-
return decorator
172+
handled_symbols = self._handler_symbols.get(handler)
173+
if handled_symbols is None or msg['sym'] in handled_symbols:
174+
ent = self._cast(channel, msg)
175+
await handler(self, channel, ent)
180176

181177
def register(self, channel_pat, func):
182178
if not asyncio.iscoroutinefunction(func):
@@ -188,4 +184,6 @@ def register(self, channel_pat, func):
188184
def deregister(self, channel_pat):
189185
if isinstance(channel_pat, str):
190186
channel_pat = re.compile(channel_pat)
187+
handler = self._handlers[channel_pat]
188+
del self._handler_symbols[handler]
191189
del self._handlers[channel_pat]

alpaca_trade_api/stream2.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def __init__(self, key_id=None, secret_key=None, base_url=None):
1313
base_url = re.sub(r'^http', 'ws', base_url or get_base_url())
1414
self._endpoint = base_url + '/stream'
1515
self._handlers = {}
16+
self._handler_symbols = {}
1617
self._base_url = base_url
1718
self._ws = None
1819
self.polygon = None
@@ -65,7 +66,8 @@ async def _ensure_polygon(self):
6566
if 'staging' in self._base_url:
6667
key_id += '-staging'
6768
self.polygon = polygon.StreamConn(key_id)
68-
self.polygon.register(r'.*', self._dispatch_polygon)
69+
self.polygon._handlers = self._handlers
70+
self.polygon._handler_symbols = self._handler_symbols
6971
await self.polygon.connect()
7072

7173
async def _ensure_ws(self):
@@ -121,20 +123,17 @@ def _cast(self, channel, msg):
121123
return Account(msg)
122124
return Entity(msg)
123125

124-
async def _dispatch_polygon(self, conn, subject, data):
125-
for pat, handler in self._handlers.items():
126-
if pat.match(subject):
127-
await handler(self, subject, data)
128-
129126
async def _dispatch(self, channel, msg):
130127
for pat, handler in self._handlers.items():
131128
if pat.match(channel):
132129
ent = self._cast(channel, msg['data'])
133130
await handler(self, channel, ent)
134131

135-
def on(self, channel_pat):
132+
def on(self, channel_pat, symbols=None):
136133
def decorator(func):
137134
self.register(channel_pat, func)
135+
if symbols:
136+
self._handler_symbols[func] = symbols
138137
return func
139138

140139
return decorator
@@ -145,8 +144,14 @@ def register(self, channel_pat, func):
145144
if isinstance(channel_pat, str):
146145
channel_pat = re.compile(channel_pat)
147146
self._handlers[channel_pat] = func
147+
if self.polygon:
148+
self.polygon.register(channel_pat, func)
148149

149150
def deregister(self, channel_pat):
150151
if isinstance(channel_pat, str):
151152
channel_pat = re.compile(channel_pat)
153+
handler = self._handlers[channel_pat]
154+
del self._handler_symbols[handler]
152155
del self._handlers[channel_pat]
156+
if self.polygon:
157+
self.polygon.deregister(channel_pat)

0 commit comments

Comments
 (0)