-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_websocket.py
More file actions
137 lines (107 loc) Β· 4.88 KB
/
test_websocket.py
File metadata and controls
137 lines (107 loc) Β· 4.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
"""
Test script for WebSocket market data client
"""
import asyncio
import logging
from websocket_client import MarketDataManager, MarketData, OrderBookData, TradeData
async def test_websocket_client():
"""Test the WebSocket client functionality"""
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
print("π Testing WebSocket Market Data Client")
print("=" * 50)
# Create manager
manager = MarketDataManager(testnet=True)
# Track received data
ticker_updates = 0
orderbook_updates = 0
trade_updates = 0
def on_trade(trade_data: TradeData):
nonlocal trade_updates
trade_updates += 1
if trade_updates <= 5: # Show first 5 trades
print(f"π Trade: {trade_data.symbol} - ${trade_data.price:.4f} x {trade_data.quantity:.4f}")
# Add trade callback
manager.add_trade_callback(on_trade)
try:
print("π Starting WebSocket connection...")
await manager.start()
# Wait a moment for connection
await asyncio.sleep(2)
# Subscribe to test symbols
test_symbols = ['BTCUSDT', 'ETHUSDT']
print(f"π‘ Subscribing to {test_symbols}...")
for symbol in test_symbols:
await manager.subscribe_symbol(symbol, include_trades=True)
print("β³ Collecting data for 30 seconds...")
# Collect data for 30 seconds
for i in range(30):
await asyncio.sleep(1)
# Count updates
ticker_updates = len(manager.market_data_cache)
orderbook_updates = len(manager.orderbook_cache)
if i % 5 == 0: # Print status every 5 seconds
stats = manager.get_stats()
print(f"β±οΈ {i}s - Messages: {stats['messages_received']}, "
f"Tickers: {ticker_updates}, Orderbooks: {orderbook_updates}, "
f"Trades: {trade_updates}")
# Show current prices
for symbol in test_symbols:
data = manager.get_market_data(symbol)
if data:
print(f" π° {symbol}: ${data.price:.4f} ({data.change_percent:+.2f}%)")
# Final statistics
print("\nπ Final Test Results:")
print("=" * 30)
final_stats = manager.get_stats()
print(f"Connection State: {final_stats['connection_state']}")
print(f"Messages Received: {final_stats['messages_received']}")
print(f"Successful Connections: {final_stats['successful_connections']}")
print(f"Reconnections: {final_stats['reconnections']}")
print(f"Errors: {final_stats['errors']}")
print(f"Active Subscriptions: {final_stats['active_subscriptions']}")
print(f"Cached Symbols: {final_stats['cached_symbols']}")
print(f"Trade Updates: {trade_updates}")
# Test data quality
print("\nπ Data Quality Check:")
for symbol in test_symbols:
market_data = manager.get_market_data(symbol)
orderbook_data = manager.get_orderbook(symbol)
if market_data:
print(f"β
{symbol} ticker data: Price=${market_data.price:.4f}, Volume={market_data.volume:.0f}")
else:
print(f"β {symbol} ticker data: Missing")
if orderbook_data:
best_bid = orderbook_data.bids[0][0] if orderbook_data.bids else 0
best_ask = orderbook_data.asks[0][0] if orderbook_data.asks else 0
print(f"β
{symbol} orderbook: Bid=${best_bid:.4f}, Ask=${best_ask:.4f}")
else:
print(f"β {symbol} orderbook: Missing")
# Test connection resilience
print("\nπ§ Testing Connection Resilience...")
print("Simulating brief disconnection...")
# Disconnect and reconnect
await manager.ws_client.disconnect()
await asyncio.sleep(2)
# Should auto-reconnect
await asyncio.sleep(5)
reconnect_stats = manager.get_stats()
if reconnect_stats['connection_state'] == 'connected':
print("β
Auto-reconnection successful!")
else:
print(f"β οΈ Reconnection status: {reconnect_stats['connection_state']}")
print(f"Total reconnections: {reconnect_stats['reconnections']}")
except Exception as e:
print(f"β Test failed with error: {e}")
import traceback
traceback.print_exc()
finally:
print("\nπ Stopping WebSocket client...")
await manager.stop()
print("β
Test completed!")
if __name__ == "__main__":
asyncio.run(test_websocket_client())