Skip to content

Commit 319f28b

Browse files
committed
Fix exception handling, timestamp precisions
1 parent a735f24 commit 319f28b

File tree

3 files changed

+19
-7
lines changed

3 files changed

+19
-7
lines changed

alpaca_trade_api/entity.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ def __getattr__(self, key):
134134
if key in self._raw:
135135
val = self._raw[key]
136136
if key == 'timestamp':
137+
if val > 1000000000000000000:
138+
return pd.Timestamp(val, tz=NY)
137139
return pd.Timestamp(val, tz=NY, unit='ms')
138140
return val
139141
return getattr(super(), key)

alpaca_trade_api/polygon/streamconn.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ async def _recv(self):
9898
await self.close()
9999
asyncio.ensure_future(self._ensure_ws())
100100

101+
async def consume(self):
102+
if self._consume_task:
103+
await self._consume_task
104+
101105
async def _consume_msg(self):
102106
async for data in self._stream:
103107
stream = data.get('ev')

alpaca_trade_api/stream2.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from .common import get_base_url, get_data_url, get_credentials
77
from .entity import Account, Entity, trade_mapping, agg_mapping, quote_mapping
88
from . import polygon
9-
from .polygon.entity import Trade, Quote, Agg
9+
from .entity import Trade, Quote, Agg
1010
import logging
1111

1212

@@ -52,6 +52,10 @@ async def _connect(self):
5252

5353
self._consume_task = asyncio.ensure_future(self._consume_msg())
5454

55+
async def consume(self):
56+
if self._consume_task:
57+
await self._consume_task
58+
5559
async def _consume_msg(self):
5660
ws = self._ws
5761
try:
@@ -239,26 +243,28 @@ async def subscribe(self, channels):
239243
async def unsubscribe(self, channels):
240244
'''Handle unsubscribing from channels.'''
241245

242-
data_prefixes = ('Q.', 'T.', 'AM.')
243-
if self._data_stream == 'polygon':
244-
data_prefixes = ('Q.', 'T.', 'A.', 'AM.')
245-
246246
data_channels = [
247247
c for c in channels
248-
if c.startswith(data_prefixes)
248+
if c.startswith(self._data_prefixes)
249249
]
250250

251251
if data_channels:
252252
await self.data_ws.unsubscribe(data_channels)
253253

254+
async def consume(self):
255+
await asyncio.gather(
256+
self.trading_ws.consume(),
257+
self.data_ws.consume(),
258+
)
259+
254260
def run(self, initial_channels=[]):
255261
'''Run forever and block until exception is raised.
256262
initial_channels is the channels to start with.
257263
'''
258264
loop = self.loop
259265
try:
260266
loop.run_until_complete(self.subscribe(initial_channels))
261-
loop.run_forever()
267+
loop.run_until_complete(self.consume())
262268
except KeyboardInterrupt:
263269
logging.info("Exiting on Interrupt")
264270
finally:

0 commit comments

Comments
 (0)