Skip to content

Commit ece7bb5

Browse files
committed
streaming - working
1 parent b43f945 commit ece7bb5

File tree

5 files changed

+143
-59
lines changed

5 files changed

+143
-59
lines changed

api_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def post_token_request(self, data):
6767
'Content-Type': 'application/x-www-form-urlencoded'
6868
}
6969
response = self.session.post(f"{self.config.API_BASE_URL}/v1/oauth/token", headers=headers, data=data)
70-
if response.ok:
70+
if response.status_code == 200:
7171
self.save_token(response.json())
7272
self.logger.info("Tokens successfully updated.")
7373
return True

main.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,38 @@
1+
import asyncio
12
from datetime import datetime, timedelta
23
from api_client import APIClient
34
from accounts import Accounts
45
from market_data import Quotes, Options, PriceHistory, Movers, MarketHours, Instruments
56
from orders import Orders
67
from stream_client import StreamClient
78
from asyncio import get_event_loop
9+
import stream_utilities
810

911

1012
async def main_stream():
1113
client = APIClient() # Initialize the API client
1214
stream_client = StreamClient(client)
13-
await stream_client.start()
14-
15+
await stream_client.start() # Start and connect
16+
17+
while stream_client.active:
18+
# Construct and send a subscription request
19+
request = stream_utilities.basic_request(
20+
"LEVELONE_EQUITIES",
21+
request_id=stream_client.request_id,
22+
command="SUBS",
23+
customer_id=stream_client.streamer_info.get("schwabClientCustomerId"),
24+
correl_id=stream_client.streamer_info.get("schwabClientCorrelId"),
25+
parameters={
26+
"keys": "TSLA,AMZN,AAPL,NFLX,BABA",
27+
"fields": "0,1,2,3,4,5,8,9,12,13,15,24,28,29,30,31,48"
28+
}
29+
)
30+
await stream_client.send(request)
31+
message = await stream_client.receive()
32+
print(f"Received: {message}")
33+
await asyncio.sleep(1) # Delay between messages
34+
35+
stream_client.stop()
1536

1637
def main():
1738
client = APIClient() # Initialize the API client

multi_terminal.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def __init__(self, title="Terminal", height=20, width=200, font=("Courier New",
1515
self.textColor = textColor
1616
self.allowClosing = allowClosing
1717
self.ignoreClosedPrints = ignoreClosedPrints
18-
self.isOpen = False
18+
self.is_open = False
1919
self.start()
2020

2121
def run(self):
@@ -25,16 +25,16 @@ def run(self):
2525
self.text_box = tk.Text(self.root, height=self.height, width=self.width, font=self.font,
2626
bg=self.backgroundColor, fg=self.textColor, state='disabled')
2727
self.text_box.pack(side="left", fill="both", expand=True)
28-
self.isOpen = True
28+
self.is_open = True
2929
self.root.mainloop()
3030

3131
def close(self):
32-
if self.isOpen:
33-
self.isOpen = False
32+
if self.is_open:
33+
self.is_open = False
3434
self.root.destroy()
3535

3636
def print(self, text, end="\n"):
37-
if not self.isOpen:
37+
if not self.is_open:
3838
if not self.ignoreClosedPrints:
3939
raise Exception(f"Terminal '{self.title}' is closed.")
4040
return

stream_client.py

Lines changed: 92 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -11,59 +11,112 @@
1111

1212
class StreamClient:
1313
def __init__(self, client: APIClient):
14+
self.client = client
1415
self.websocket = None
1516
self.streamer_info = None
1617
self.start_timestamp = None
1718
self.terminal = MultiTerminal(title="Stream Output")
1819
self.color_print = ColorPrint()
1920
self.active = False
20-
self.request_id = 0
21-
self.client = client
21+
self.login_successful = False
22+
self.request_id = -1
2223

2324
async def start(self):
2425
response = self.client.get_user_preferences()
25-
if not response:
26-
self.color_print.print("error", f"Failed to get streamer info: {response.text}")
26+
if 'error' in response: # Assuming error handling is done inside get_user_preferences
27+
self.color_print.print("error", f"Failed to get streamer info: {response['error']}")
2728
exit(1)
2829
self.streamer_info = response['streamerInfo'][0]
2930
login = self._construct_login_message()
30-
self.color_print.print("info", "Starting stream...")
31-
self.color_print.print("info", f"Streamer info: {self.streamer_info}")
32-
self.color_print.print("info", f"Login message: {login}")
33-
while True:
34-
try:
35-
await self._connect_and_stream(login)
36-
except websockets.exceptions.ConnectionClosedOK:
37-
self.color_print.print("info", "Stream has closed.")
38-
break
39-
except Exception as e:
40-
self.color_print.print("error", f"{e}")
41-
self._handle_stream_error(e)
42-
31+
await self.connect()
32+
await self.send(login)
33+
34+
async def connect(self):
35+
try:
36+
self.websocket = await websockets.connect(self.streamer_info.get('streamerSocketUrl'))
37+
self.active = True
38+
self.color_print.print("info", "Connection established.")
39+
except Exception as e:
40+
self.color_print.print("error", f"Failed to connect: {e}")
41+
42+
async def send(self, message):
43+
if not self.active:
44+
await self.connect()
45+
try:
46+
await self.websocket.send(json.dumps(message))
47+
self.color_print.print("info", f"Message sent: {json.dumps(message)}")
48+
response = await self.websocket.recv()
49+
await self.handle_response(response)
50+
except Exception as e:
51+
self.color_print.print("error", f"Failed to send message: {e}")
52+
53+
async def handle_response(self, message):
54+
message = json.loads(message)
55+
self.color_print.print("info", f"Received: {message}")
56+
if "Login" in message.get('command', '') and message.get('content', {}).get('code') == 0:
57+
self.login_successful = True
58+
self.color_print.print("info", "Login successful.")
59+
60+
async def receive(self):
61+
try:
62+
return await self.websocket.recv()
63+
except Exception as e:
64+
self.color_print.print("error", f"Error receiving message: {e}")
65+
return None
66+
4367
def _construct_login_message(self):
68+
# Increment request ID for each new request
4469
self.request_id += 1
45-
return basic_request("ADMIN", "LOGIN", self.request_id, {
70+
71+
# Prepare the parameters dictionary specifically for the parameters that need to be nested under 'parameters'
72+
parameters = {
4673
"Authorization": self.client.token_info.get("access_token"),
4774
"SchwabClientChannel": self.streamer_info.get("schwabClientChannel"),
48-
"SchwabClientFunctionId": self.streamer_info.get("schwabClientFunctionId"),
49-
"SchwabClientCustomerId": self.streamer_info.get("schwabClientCustomerId"),
50-
"SchwabClientCorrelId": self.streamer_info.get("schwabClientCorrelId")
51-
})
75+
"SchwabClientFunctionId": self.streamer_info.get("schwabClientFunctionId")
76+
}
77+
78+
# Call the basic_request function with customer ID and correlation ID at the top level of the request
79+
return basic_request(
80+
service="ADMIN",
81+
request_id=self.request_id,
82+
command="LOGIN",
83+
customer_id=self.streamer_info.get("schwabClientCustomerId"),
84+
correl_id=self.streamer_info.get("schwabClientCorrelId"),
85+
parameters=parameters
86+
)
5287

5388
async def _connect_and_stream(self, login):
54-
self.start_timestamp = datetime.now()
55-
self.color_print.print("info", "Connecting to server...")
56-
self.color_print.print("info", f"Start timestamp: {self.start_timestamp}")
57-
self.color_print.print("info", f"Streamer socket URL: {self.streamer_info.get('streamerSocketUrl')}")
58-
async with websockets.connect(self.streamer_info.get('streamerSocketUrl'),
59-
ping_interval=None) as self.websocket:
60-
self.terminal.print("[INFO]: Connecting to server...")
61-
await self.websocket.send(json.dumps(login))
62-
self.terminal.print(f"[Login]: {await self.websocket.recv()}")
63-
self.active = True
64-
while True:
65-
received = await self.websocket.recv()
66-
self.terminal.print(received)
89+
try:
90+
async with websockets.connect(self.streamer_info.get('streamerSocketUrl')) as websocket:
91+
self.websocket = websocket
92+
await websocket.send(json.dumps(login))
93+
while True:
94+
message = await websocket.recv()
95+
await self.handle_message(json.loads(message))
96+
except websockets.exceptions.ConnectionClosedOK:
97+
self.color_print.print("info", "Stream has closed.")
98+
except Exception as e:
99+
self.color_print.print("error", f"{e}")
100+
self._handle_stream_error(e)
101+
102+
async def handle_message(self, message):
103+
if "response" in message and any(
104+
resp.get("code") == "0" for resp in message["response"]): # Check if login is successful
105+
self.color_print.print("info", "Logged in successfully, sending subscription requests...")
106+
await self.send_subscription_requests()
107+
else:
108+
self.color_print.print("info", f"Received: {message}")
109+
110+
async def reconnect(self):
111+
self.terminal.print("[INFO]: Attempting to reconnect...")
112+
try:
113+
await asyncio.sleep(10) # Wait before attempting to reconnect
114+
login = self._construct_login_message() # Reconstruct login info
115+
await self._connect_and_stream(login) # Attempt to reconnect
116+
return True
117+
except Exception as e:
118+
self.terminal.print(f"Reconnect failed: {e}")
119+
return False
67120

68121
def _handle_stream_error(self, error):
69122
self.active = False
@@ -75,16 +128,8 @@ def _handle_stream_error(self, error):
75128
else:
76129
self.terminal.print("[WARNING]: Connection lost to server, reconnecting...")
77130

78-
async def send(self, listOfRequests):
79-
80-
if not isinstance(listOfRequests, list):
81-
listOfRequests = [listOfRequests]
82-
if self.active:
83-
to_send = json.dumps({"requests": listOfRequests})
84-
await self.websocket.send(to_send)
85-
else:
86-
self.color_print.print("warning", "Stream is not active, nothing sent.")
87-
88131
def stop(self):
89-
self.send(basic_request("ADMIN", "LOGOUT", self.request_id))
90-
self.active = False
132+
if self.active:
133+
self.active = False
134+
asyncio.create_task(self.websocket.close())
135+
self.color_print.print("info", "Connection closed.")

stream_utilities.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,25 @@
1-
def basic_request(service, command, request_id, parameters=None):
2-
return {
1+
def basic_request(service, request_id, command, customer_id, correl_id, parameters=None):
2+
"""Constructs a basic request dictionary for streaming commands.
3+
4+
Args:
5+
service (str): The service name, e.g., 'ADMIN'.
6+
request_id (int): The identifier for this request.
7+
command (str): The command to be executed, e.g., 'LOGIN'.
8+
customer_id (str): The Schwab client customer ID.
9+
correl_id (str): The Schwab client correlation ID.
10+
parameters (dict, optional): Additional parameters for the command.
11+
12+
Returns:
13+
dict: The request dictionary.
14+
"""
15+
request = {
316
"service": service.upper(),
17+
"requestid": str(request_id),
418
"command": command.upper(),
5-
"requestid": request_id,
6-
"parameters": parameters if parameters else {}
19+
"SchwabClientCustomerId": customer_id,
20+
"SchwabClientCorrelId": correl_id
721
}
22+
# Include parameters if provided
23+
if parameters:
24+
request["parameters"] = parameters
25+
return request

0 commit comments

Comments
 (0)