-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver2.py
More file actions
115 lines (98 loc) · 4.56 KB
/
server2.py
File metadata and controls
115 lines (98 loc) · 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import asyncio
import json
import websockets
from websockets.server import serve
from market_data import get_market_data, format_market_data
from config import assets
import redis.asyncio as redis
# Initialize Redis client
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Function to get cached market data from Redis
async def get_cached_market_data():
data = await redis_client.get("market_data")
if data:
return json.loads(data) # Convert the JSON string back to a Python object
return None
# Function to set cached market data to Redis
async def set_cached_market_data(data):
await redis_client.set("market_data", json.dumps(data)) # Store as JSON string
# List to store active WebSocket connections
active_connections = []
async def market_websocket_handler(websocket):
try:
# Add new WebSocket connection to the list
active_connections.append(websocket)
print("New client connected.")
async for message in websocket:
data = json.loads(message)
# Handle subscription message
if data.get("event") == "subscribe" and data.get("channel") == "rates":
print("Client subscribed to rates channel.")
# Immediately send cached data to the new client
cached_data = await get_cached_market_data()
for asset, asset_data in cached_data.items():
response = {
"channel": "rates",
"event": "data",
"data": format_market_data(asset, asset_data)
}
await websocket.send(json.dumps(response))
except websockets.ConnectionClosed:
print("Client disconnected.")
except Exception as e:
print(f"Error handling WebSocket connection: {e}")
finally:
# Remove WebSocket from active connections if disconnected
if websocket in active_connections:
active_connections.remove(websocket)
# Function to fetch market data and broadcast to all connected clients
async def broadcast_market_data():
while True:
try:
# Fetch the market data once for all clients
market_data = get_market_data(assets)
# Handle empty market_data response (possible api is currently unavailable)
# Users still use the old data fetch or redis data (both are the same data)
if not market_data:
print("Market data is empty. Retrying after 60 seconds.")
await asyncio.sleep(60) # Wait before retrying
continue
print(active_connections)
await set_cached_market_data(market_data) # Update the cache
print("Broadcasting new market data to clients.")
# Loop through all active WebSocket connections
for websocket in active_connections:
try:
# Send the market data to each client
for asset, asset_data in market_data.items():
response = {
"channel": "rates",
"event": "data",
"data": format_market_data(asset, asset_data)
}
await websocket.send(json.dumps(response))
except websockets.ConnectionClosed:
# Handle disconnected client
print("A client has disconnected.")
active_connections.remove(websocket)
except Exception as e:
print(f"Error sending data to client: {e}")
except Exception as e:
print(f"Error fetching market data: {e}")
# Wait 60 seconds before fetching again
await asyncio.sleep(60)
# Start the WebSocket server
async def start_server():
# The serve function in start_server accepts new WebSocket connections.
# After the connection is established, control is handed over to market_websocket_handler
async with serve(market_websocket_handler, "localhost", 8765):
print("WebSocket server started on ws://localhost:8765/markets/ws")
# Run both the WebSocket server and the market data broadcaster concurrently
# Only happen once when the WebSocket server starts running.
await asyncio.gather(
asyncio.Future(), # Keep server running
broadcast_market_data() # Broadcast market data to all clients
)
# Run the server
if __name__ == "__main__":
asyncio.run(start_server())