Skip to content

Commit 08f1981

Browse files
committed
streaming
1 parent 31187e1 commit 08f1981

File tree

6 files changed

+131
-7
lines changed

6 files changed

+131
-7
lines changed

api_client.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ def post_token_request(self, data):
6969
response = self.session.post(f"{self.config.API_BASE_URL}/v1/oauth/token", headers=headers, data=data)
7070
if response.ok:
7171
self.save_token(response.json())
72-
self.load_token()
7372
self.logger.info("Tokens successfully updated.")
7473
return True
7574
else:
@@ -86,7 +85,7 @@ def refresh_access_token(self):
8685
if not self.post_token_request(data):
8786
self.logger.error("Failed to refresh access token.")
8887
return False
89-
88+
self.token_info = self.load_token()
9089
return self.validate_token()
9190

9291
def save_token(self, token_data):
@@ -106,11 +105,17 @@ def load_token(self):
106105
self.logger.warning(f"Loading token failed: {e}")
107106
return None
108107

109-
def validate_token(self):
108+
def validate_token(self, force=False):
110109
""" Validate the current token's validity. """
110+
print(self.token_info['expires_at'])
111+
print(datetime.now())
112+
print(datetime.fromisoformat(self.token_info['expires_at']))
113+
print(datetime.now() < datetime.fromisoformat(self.token_info['expires_at']))
111114
if self.token_info and datetime.now() < datetime.fromisoformat(self.token_info['expires_at']):
115+
print(f"Token expires in {datetime.fromisoformat(self.token_info['expires_at']) - datetime.now()} seconds")
112116
return True
113-
else:
117+
elif force:
118+
print("Token expired or invalid.")
114119
# get AAPL to validate token
115120
params = {'symbol': 'AAPL'}
116121
response = self.make_request(endpoint=f"{self.config.MARKET_DATA_BASE_URL}/chains", params=params, validating=True)
@@ -146,3 +151,11 @@ def make_request(self, endpoint, method="GET", **kwargs):
146151
response = self.session.request(method, url, headers=headers, **kwargs)
147152
response.raise_for_status()
148153
return response.json()
154+
155+
def get_user_preferences(self):
156+
"""Retrieve user preferences."""
157+
try:
158+
return self.make_request(f'{self.config.TRADER_BASE_URL}/userPreference')
159+
except Exception as e:
160+
self.logger.error(f"Failed to get user preferences: {e}")
161+
return None

color_print.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ def print(message_type, message, end="\n"):
1515
def input(message):
1616
return input(f"{ColorPrint.COLORS['input']}{message}")
1717

18+
@staticmethod
19+
def user_input(message):
20+
return input(f"{ColorPrint.COLORS['user']}{message}")
21+
22+
1823

1924
if __name__ == '__main__':
2025
ColorPrint.print('info', 'This is an informational message')

config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ class APIConfig:
1010
ACCOUNTS_BASE_URL = f"{TRADER_BASE_URL}/accounts"
1111
MARKET_DATA_BASE_URL = f"{API_BASE_URL}/marketdata/v1"
1212
ORDERS_BASE_URL = ACCOUNTS_BASE_URL
13+
STREAMER_INFO_URL = f"{API_BASE_URL}/streamer-info"
1314
REQUEST_TIMEOUT = 30 # Timeout for API requests in seconds
1415
RETRY_STRATEGY = {
1516
'total': 3, # Total number of retries to allow

main.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@
33
from accounts import Accounts
44
from market_data import Quotes, Options, PriceHistory, Movers, MarketHours, Instruments
55
from orders import Orders
6+
from stream_client import StreamClient
7+
from asyncio import get_event_loop
8+
9+
10+
async def main_stream():
11+
client = APIClient() # Initialize the API client
12+
stream_client = StreamClient(client)
13+
await stream_client.start()
614

715

816
def main():
@@ -86,6 +94,8 @@ def main():
8694

8795

8896
if __name__ == '__main__':
89-
print(
90-
"Welcome to the unofficial Schwab API interface!\nGitHub: https://github.com/Patch-Code-Prosperity/Pythonic-Schwab-API")
91-
main()
97+
print("Welcome to the unofficial Schwab API interface!\n"
98+
"GitHub: https://github.com/Patch-Code-Prosperity/Pythonic-Schwab-API")
99+
loop = get_event_loop()
100+
loop.run_until_complete(main_stream())
101+
# main()

stream_client.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import json
2+
import asyncio
3+
import threading
4+
import websockets
5+
from datetime import datetime, time
6+
from multi_terminal import MultiTerminal
7+
from api_client import APIClient
8+
from stream_utilities import basic_request # Importing utility functions
9+
from color_print import ColorPrint
10+
11+
12+
class StreamClient:
13+
def __init__(self, client: APIClient):
14+
self.websocket = None
15+
self.streamer_info = None
16+
self.start_timestamp = None
17+
self.terminal = MultiTerminal(title="Stream Output")
18+
self.color_print = ColorPrint()
19+
self.active = False
20+
self.request_id = 0
21+
self.client = client
22+
23+
async def start(self):
24+
response = self.client.get_user_preferences()
25+
if not response:
26+
self.color_print.print("error", f"Failed to get streamer info: {response.status_code}")
27+
exit(1)
28+
self.streamer_info = response['streamerInfo'][0]
29+
login = self._construct_login_message()
30+
31+
while True:
32+
try:
33+
await self._connect_and_stream(login)
34+
except websockets.exceptions.ConnectionClosedOK:
35+
self.color_print.print("info", "Stream has closed.")
36+
break
37+
except Exception as e:
38+
self.color_print.print("error", f"{e}")
39+
self._handle_stream_error(e)
40+
41+
def _construct_login_message(self):
42+
self.request_id += 1
43+
return basic_request("ADMIN", "LOGIN", self.request_id, {
44+
"Authorization": self.client.token_info.get("access_token"),
45+
"SchwabClientChannel": self.streamer_info.get("schwabClientChannel"),
46+
"SchwabClientFunctionId": self.streamer_info.get("schwabClientFunctionId")
47+
})
48+
49+
async def _connect_and_stream(self, login):
50+
self.start_timestamp = datetime.now()
51+
self.color_print.print("info", "Connecting to server...")
52+
self.color_print.print("info", f"Start timestamp: {self.start_timestamp}")
53+
self.color_print.print("info", f"Streamer socket URL: {self.streamer_info.get('streamerSocketUrl')}")
54+
async with websockets.connect(self.streamer_info.get('streamerSocketUrl'),
55+
ping_interval=None) as self.websocket:
56+
self.terminal.print("[INFO]: Connecting to server...")
57+
await self.websocket.send(json.dumps(login))
58+
self.terminal.print(f"[Login]: {await self.websocket.recv()}")
59+
self.active = True
60+
while True:
61+
received = await self.websocket.recv()
62+
self.terminal.print(received)
63+
64+
def _handle_stream_error(self, error):
65+
self.active = False
66+
if isinstance(error, RuntimeError) and str(error) == "Streaming window has been closed":
67+
self.color_print.print("warning", "Streaming window has been closed.")
68+
else:
69+
if (datetime.now() - self.start_timestamp).seconds < 70:
70+
self.color_print.print("error", "Stream not alive for more than 1 minute, exiting...")
71+
else:
72+
self.terminal.print("[WARNING]: Connection lost to server, reconnecting...")
73+
74+
def send(self, listOfRequests):
75+
async def _send(to_send):
76+
await self.websocket.send(to_send)
77+
78+
if not isinstance(listOfRequests, list):
79+
listOfRequests = [listOfRequests]
80+
if self.active:
81+
to_send = json.dumps({"requests": listOfRequests})
82+
asyncio.run(_send(to_send))
83+
else:
84+
self.color_print.print("warning", "Stream is not active, nothing sent.")
85+
86+
def stop(self):
87+
self.send(basic_request("ADMIN", "LOGOUT", self.request_id))
88+
self.active = False

stream_utilities.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
def basic_request(service, command, request_id, parameters=None):
2+
return {
3+
"service": service.upper(),
4+
"command": command.upper(),
5+
"requestid": request_id,
6+
"parameters": parameters if parameters else {}
7+
}

0 commit comments

Comments
 (0)