Skip to content

Commit 1736075

Browse files
committed
Merge branch 'master' of https://github.com/alpacahq/alpaca-trade-api-python into feature/ws-handler-registry
2 parents 32d231d + 213bf8d commit 1736075

File tree

4 files changed

+157
-3
lines changed

4 files changed

+157
-3
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ account = api.get_account()
3030
api.list_positions()
3131
```
3232

33+
## Example Scripts
34+
35+
Please see the `examples/` folder for some example scripts that make use of this API
36+
3337
## API Document
3438

3539
The HTTP API document is located at https://docs.alpaca.markets/

alpaca_trade_api/polygon/stream2.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
from .entity import (
88
Quote, Trade, Agg, Entity,
99
)
10+
import logging
1011

1112

1213
class StreamConn(object):
1314
def __init__(self, key_id=None):
14-
self._key_id = key_id
15+
self._key_id = key_id or os.environ.get('APCA_API_KEY_ID')
1516
self._endpoint = os.environ.get(
1617
'POLYGON_WS_URL',
1718
'wss://alpaca.socket.polygon.io/stocks'
@@ -23,6 +24,7 @@ def __init__(self, key_id=None):
2324
self._retry = int(os.environ.get('APCA_RETRY_MAX', 3))
2425
self._retry_wait = int(os.environ.get('APCA_RETRY_WAIT', 3))
2526
self._retries = 0
27+
self.loop = asyncio.get_event_loop()
2628

2729
async def connect(self):
2830
await self._dispatch({'ev': 'status',
@@ -85,6 +87,9 @@ async def _recv(self):
8587
msg = json.loads(r)
8688
for update in msg:
8789
yield update
90+
except websockets.exceptions.ConnectionClosed:
91+
# Ignore, occurs on self.close() such as after KeyboardInterrupt
92+
pass
8893
except websockets.exceptions.ConnectionClosedError as e:
8994
await self._dispatch({'ev': 'status',
9095
'status': 'disconnected',
@@ -165,12 +170,15 @@ def run(self, initial_channels=[]):
165170
'''Run forever and block until exception is raised.
166171
initial_channels is the channels to start with.
167172
'''
168-
loop = asyncio.get_event_loop()
173+
loop = self.loop
169174
try:
170175
loop.run_until_complete(self.subscribe(initial_channels))
171176
loop.run_forever()
177+
except KeyboardInterrupt:
178+
logging.info("Exiting on Interrupt")
172179
finally:
173180
loop.run_until_complete(self.close())
181+
loop.close()
174182

175183
async def close(self):
176184
'''Close any open connections'''

alpaca_trade_api/stream2.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from .common import get_base_url, get_credentials
66
from .entity import Account, Entity
77
from . import polygon
8+
import logging
89

910

1011
class StreamConn(object):
@@ -17,6 +18,7 @@ def __init__(self, key_id=None, secret_key=None, base_url=None):
1718
self._base_url = base_url
1819
self._ws = None
1920
self.polygon = None
21+
self.loop = asyncio.get_event_loop()
2022

2123
async def _connect(self):
2224
ws = await websockets.connect(self._endpoint)
@@ -126,12 +128,15 @@ def run(self, initial_channels=[]):
126128
'''Run forever and block until exception is rasised.
127129
initial_channels is the channels to start with.
128130
'''
129-
loop = asyncio.get_event_loop()
131+
loop = self.loop
130132
try:
131133
loop.run_until_complete(self.subscribe(initial_channels))
132134
loop.run_forever()
135+
except KeyboardInterrupt:
136+
logging.info("Exiting on Interrupt")
133137
finally:
134138
loop.run_until_complete(self.close())
139+
loop.close()
135140

136141
async def close(self):
137142
'''Close any of open connections'''

examples/simpleStream.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
#!/usr/bin/env python
2+
3+
import os
4+
import sys
5+
import pprint
6+
import asyncio
7+
import logging
8+
import argparse
9+
10+
import pandas as pd
11+
12+
from datetime import datetime
13+
14+
# For some fun colors...
15+
from colorama import Fore, Back, Style, init as ColoramaInit
16+
ColoramaInit(autoreset=True)
17+
18+
# Make this global
19+
opt = None
20+
21+
def ts():
22+
return pd.Timestamp.now()
23+
24+
25+
def log(*args, **kwargs):
26+
print(ts(), " ", *args, **kwargs)
27+
28+
29+
def debug(*args, **kwargs):
30+
print(ts(), " ", *args, file=sys.stderr, **kwargs)
31+
32+
33+
def ms2date(ms, fmt='%Y-%m-%d'):
34+
if isinstance(ms, pd.Timestamp):
35+
return ms.strftime(fmt)
36+
else:
37+
return datetime.fromtimestamp(ms/1000).strftime(fmt)
38+
39+
40+
async def on_minute(conn, channel, bar):
41+
symbol = bar.symbol
42+
close = bar.close
43+
44+
try:
45+
percent = (bar.dailyopen - close)/close
46+
up = 1 if bar.open > bar.dailyopen else -1
47+
except: # noqa
48+
percent = 0
49+
up = 0
50+
51+
if up > 0:
52+
bar_color = f'{Style.BRIGHT}{Fore.CYAN}'
53+
elif up < 0:
54+
bar_color = f'{Style.BRIGHT}{Fore.RED}'
55+
else:
56+
bar_color = f'{Style.BRIGHT}{Fore.WHITE}'
57+
58+
print(f'{channel:<6} {ms2date(bar.end)} {bar_color}{symbol:<10s} {percent:>8.2f} {bar.open:>8.2f} {bar.close:>8.2f} {bar.volume:<10d}'
59+
f' {(Fore.GREEN+"above VWAP") if close > bar.vwap else (Fore.RED+"below VWAP")}')
60+
61+
62+
async def on_tick(conn, channel, bar):
63+
try:
64+
percent = (bar.dailyopen - bar.close)/bar.close
65+
except: # noqa
66+
percent = 0
67+
68+
print(f'{channel:<6s} {ms2date(bar.end)} {bar.symbol:<10s} {percent:>8.2f}% {bar.open:>8.2f} {bar.close:>8.2f} {bar.volume:<10d}')
69+
70+
71+
async def on_data(conn, channel, data):
72+
if opt.debug or not (channel in ('AM', 'Q', 'A', 'T')):
73+
debug("debug: ", pprint.pformat(data))
74+
75+
76+
77+
def reloadWatch(prog, cmd):
78+
async def watch_command():
79+
startingmodtime = os.path.getmtime(prog)
80+
81+
while True:
82+
modtime = os.path.getmtime(prog)
83+
if modtime != startingmodtime:
84+
debug(f'Reloading {" ".join(cmd)} ...')
85+
86+
os.execv(prog, cmd)
87+
88+
await asyncio.sleep(5)
89+
90+
return watch_command
91+
92+
93+
if __name__ == '__main__':
94+
logging.basicConfig(level=logging.INFO)
95+
96+
parser = argparse.ArgumentParser()
97+
98+
parser.add_argument(
99+
"--all", "-a",
100+
help="Watch the A.* feed as well, which can overwelm and backup during active times",
101+
action='store_true')
102+
103+
parser.add_argument(
104+
"--debug",
105+
help="Prints debug messages",
106+
action='store_true')
107+
108+
parser.add_argument(
109+
"--polygon",
110+
help="Only import Polygon.StreamConn instead of Alpaca Wrapper (mainly for testing)",
111+
action='store_true')
112+
113+
opt = parser.parse_args()
114+
115+
if opt.polygon:
116+
from alpaca_trade_api.polygon import StreamConn
117+
else:
118+
from alpaca_trade_api import StreamConn
119+
120+
conn = StreamConn()
121+
122+
# This is another way to setup wrappers for websocket callbacks, handy if conn is not global.
123+
on_minute = conn.on(r'AM$')(on_minute)
124+
on_tick = conn.on(r'A$')(on_tick)
125+
on_data = conn.on(r'.*')(on_data)
126+
127+
# This is an example of how you can add your own async functions into the loop
128+
# This one just watches this program for edits and tries to restart it
129+
asyncio.ensure_future(reloadWatch(__file__, sys.argv)())
130+
131+
try:
132+
if opt.all:
133+
conn.run(['Q.*', 'T.*', 'AM.*', 'A.*'])
134+
else:
135+
conn.run(['AM.*'])
136+
except Exception as e:
137+
print(e)

0 commit comments

Comments
 (0)