Skip to content

Commit 70826c5

Browse files
TexasCodingclaude
andcommitted
feat: Add WebSocket streaming support for real-time market data (v3.1.0-alpha)
- Implement StreamClient class with WebSocket connection management - Add real-time quote, trade, and bar data streaming - Implement automatic reconnection with exponential backoff - Support multiple concurrent data handlers per stream type - Add dynamic subscription management (subscribe/unsubscribe on the fly) - Create comprehensive data models (QuoteData, TradeData, BarData) - Add thread-safe message dispatching - Implement context manager support for clean resource management - Add feed selection support (IEX, SIP, OTC) - Create 36 unit tests and 11 integration tests for streaming - Add comprehensive documentation with usage examples - Fix trailing stop test to use buy order (avoid short selling restriction) - Update version to 3.1.0-alpha.1 Features: - Real-time market data streaming via WebSocket - Automatic reconnection on connection loss - Support for multiple symbols and data types - Clean API integration via create_stream_client() - Full type safety with dataclass models This implements the v3.1.0 WebSocket streaming milestone from DEVELOPMENT_PLAN.md 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 5bad3f5 commit 70826c5

File tree

12 files changed

+1830
-4
lines changed

12 files changed

+1830
-4
lines changed

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ A modern Python wrapper for the Alpaca Trading API, providing easy access to tra
2222
- **🧪 Battle-Tested**: 300+ tests with comprehensive coverage
2323
- **⚡ Modern Python**: Python 3.10+ with latest best practices
2424

25+
### New in v3.1.0-alpha
26+
- **📡 WebSocket Streaming**: Real-time market data streaming for quotes, trades, and bars
27+
- **🔄 Auto-Reconnection**: Built-in exponential backoff for stable connections
28+
- **📊 Multiple Handlers**: Support for multiple concurrent data handlers
29+
- **🎯 Dynamic Subscriptions**: Subscribe and unsubscribe symbols on the fly
30+
2531
### New in v3.0.0
2632
- **📸 Market Snapshots**: Get complete market snapshots with latest trade, quote, and bar data
2733
- **⚙️ Account Configuration**: Manage PDT settings, trade confirmations, and margin configurations
@@ -169,6 +175,32 @@ future_price = predictions[predictions['ds'] == '2024-12-31']['yhat'].values[0]
169175
print(f"Predicted AAPL price on 2024-12-31: ${future_price:.2f}")
170176
```
171177

178+
### Real-time Streaming (v3.1.0+)
179+
180+
```python
181+
# Create streaming client
182+
stream = api.create_stream_client(feed="iex")
183+
184+
# Define handlers
185+
def on_quote(quote):
186+
print(f"{quote.symbol}: Bid ${quote.bid_price} Ask ${quote.ask_price}")
187+
188+
def on_trade(trade):
189+
print(f"{trade.symbol}: ${trade.price} x {trade.size}")
190+
191+
# Connect and subscribe
192+
stream.connect()
193+
stream.subscribe_quotes(["AAPL", "GOOGL"], on_quote)
194+
stream.subscribe_trades(["MSFT"], on_trade)
195+
196+
# Stream runs in background - do other work or sleep
197+
import time
198+
time.sleep(60)
199+
200+
# Disconnect when done
201+
stream.disconnect()
202+
```
203+
172204
### Financial News & Sentiment
173205

174206
```python

docs/streaming_guide.md

Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
# WebSocket Streaming Guide
2+
3+
## Overview
4+
5+
The py-alpaca-api v3.1.0 introduces WebSocket streaming support for real-time market data. This allows you to receive live quotes, trades, and bar data without polling.
6+
7+
## Features
8+
9+
- **Real-time Data**: Receive quotes, trades, and bars as they happen
10+
- **Automatic Reconnection**: Built-in exponential backoff for connection stability
11+
- **Multiple Subscriptions**: Subscribe to multiple symbols and data types simultaneously
12+
- **Feed Selection**: Choose between IEX, SIP, and OTC feeds
13+
- **Thread-Safe**: Handle messages in separate threads safely
14+
15+
## Quick Start
16+
17+
```python
18+
from py_alpaca_api import PyAlpacaAPI
19+
20+
# Initialize API
21+
alpaca = PyAlpacaAPI(
22+
api_key="your_api_key",
23+
api_secret="your_secret_key",
24+
api_paper=True
25+
)
26+
27+
# Create streaming client
28+
stream = alpaca.create_stream_client(feed="iex")
29+
30+
# Define handlers
31+
def on_quote(quote):
32+
print(f"Quote: {quote.symbol} - Bid: ${quote.bid_price} Ask: ${quote.ask_price}")
33+
34+
def on_trade(trade):
35+
print(f"Trade: {trade.symbol} - Price: ${trade.price} Size: {trade.size}")
36+
37+
def on_bar(bar):
38+
print(f"Bar: {bar.symbol} - OHLC: ${bar.open}/{bar.high}/{bar.low}/{bar.close}")
39+
40+
# Connect and subscribe
41+
stream.connect()
42+
stream.subscribe_quotes(["AAPL", "GOOGL"], on_quote)
43+
stream.subscribe_trades(["MSFT"], on_trade)
44+
stream.subscribe_bars(["SPY"], on_bar)
45+
46+
# Keep running (or do other work)
47+
import time
48+
time.sleep(60)
49+
50+
# Disconnect when done
51+
stream.disconnect()
52+
```
53+
54+
## Using Context Manager
55+
56+
```python
57+
from py_alpaca_api import PyAlpacaAPI
58+
59+
alpaca = PyAlpacaAPI(
60+
api_key="your_api_key",
61+
api_secret="your_secret_key",
62+
api_paper=True
63+
)
64+
65+
# Automatically connects and disconnects
66+
with alpaca.create_stream_client() as stream:
67+
def on_quote(quote):
68+
print(f"Quote received: {quote}")
69+
70+
stream.subscribe_quotes(["AAPL"], on_quote)
71+
72+
# Stream will run for 30 seconds
73+
import time
74+
time.sleep(30)
75+
# Automatically disconnects here
76+
```
77+
78+
## Advanced Usage
79+
80+
### Multiple Handlers
81+
82+
```python
83+
# You can have multiple handlers for the same data type
84+
def log_quote(quote):
85+
logger.info(f"Quote: {quote}")
86+
87+
def process_quote(quote):
88+
# Your processing logic
89+
if quote.bid_price > threshold:
90+
place_order(quote.symbol)
91+
92+
stream.subscribe_quotes(["AAPL"], log_quote)
93+
stream.subscribe_quotes(["AAPL"], process_quote)
94+
```
95+
96+
### Dynamic Subscriptions
97+
98+
```python
99+
# Subscribe and unsubscribe dynamically
100+
stream.connect()
101+
102+
# Start with some symbols
103+
stream.subscribe_quotes(["AAPL", "GOOGL"], handler)
104+
105+
# Add more symbols later
106+
stream.subscribe_quotes(["MSFT", "AMZN"], handler)
107+
108+
# Remove symbols
109+
stream.unsubscribe_quotes(["AAPL"])
110+
111+
# Check active subscriptions
112+
print(stream.subscriptions)
113+
```
114+
115+
### Error Handling
116+
117+
```python
118+
def safe_quote_handler(quote):
119+
try:
120+
# Your processing logic
121+
process_quote(quote)
122+
except Exception as e:
123+
logger.error(f"Error processing quote: {e}")
124+
# Handler errors won't crash the stream
125+
126+
stream.subscribe_quotes(["AAPL"], safe_quote_handler)
127+
```
128+
129+
### Collecting Data
130+
131+
```python
132+
from collections import deque
133+
from threading import Lock
134+
135+
# Thread-safe data collection
136+
quotes_buffer = deque(maxlen=1000)
137+
buffer_lock = Lock()
138+
139+
def collect_quotes(quote):
140+
with buffer_lock:
141+
quotes_buffer.append({
142+
'symbol': quote.symbol,
143+
'bid': quote.bid_price,
144+
'ask': quote.ask_price,
145+
'timestamp': quote.timestamp
146+
})
147+
148+
stream.subscribe_quotes(["SPY"], collect_quotes)
149+
150+
# Process collected data periodically
151+
def process_buffer():
152+
with buffer_lock:
153+
data = list(quotes_buffer)
154+
quotes_buffer.clear()
155+
156+
# Process data
157+
df = pd.DataFrame(data)
158+
# ... analysis ...
159+
```
160+
161+
## Data Models
162+
163+
### QuoteData
164+
```python
165+
@dataclass
166+
class QuoteData:
167+
symbol: str
168+
timestamp: datetime
169+
bid_price: float
170+
bid_size: int
171+
ask_price: float
172+
ask_size: int
173+
bid_exchange: str
174+
ask_exchange: str
175+
conditions: list[str]
176+
```
177+
178+
### TradeData
179+
```python
180+
@dataclass
181+
class TradeData:
182+
symbol: str
183+
timestamp: datetime
184+
price: float
185+
size: int
186+
exchange: str
187+
trade_id: str
188+
conditions: list[str]
189+
```
190+
191+
### BarData
192+
```python
193+
@dataclass
194+
class BarData:
195+
symbol: str
196+
timestamp: datetime
197+
open: float
198+
high: float
199+
low: float
200+
close: float
201+
volume: int
202+
trade_count: int
203+
vwap: float
204+
```
205+
206+
## Feed Types
207+
208+
- **IEX**: Included with all market data subscriptions (free)
209+
- **SIP**: Full exchange data, requires Unlimited or Business subscription
210+
- **OTC**: Over-the-counter securities data
211+
212+
```python
213+
# IEX feed (default, free)
214+
stream_iex = alpaca.create_stream_client(feed="iex")
215+
216+
# SIP feed (requires subscription)
217+
stream_sip = alpaca.create_stream_client(feed="sip")
218+
219+
# OTC feed
220+
stream_otc = alpaca.create_stream_client(feed="otc")
221+
```
222+
223+
## Best Practices
224+
225+
1. **Use Handlers Efficiently**: Keep handlers fast and non-blocking
226+
2. **Error Handling**: Always wrap handler logic in try-except
227+
3. **Resource Management**: Use context managers or ensure proper disconnect
228+
4. **Buffering**: Collect data in thread-safe structures for batch processing
229+
5. **Logging**: Log important events but avoid logging every message
230+
6. **Reconnection**: The client auto-reconnects, but monitor connection status
231+
232+
## Connection Management
233+
234+
### Monitoring Connection Status
235+
236+
```python
237+
# Check connection status
238+
if stream.is_connected:
239+
print("Stream is connected")
240+
241+
if stream.is_authenticated:
242+
print("Stream is authenticated")
243+
244+
# Monitor reconnection attempts
245+
print(f"Reconnect attempts: {stream.reconnect_attempts}")
246+
```
247+
248+
### Custom Reconnection Settings
249+
250+
```python
251+
stream = alpaca.create_stream_client()
252+
253+
# Customize reconnection behavior
254+
stream.max_reconnect_attempts = 20
255+
stream.reconnect_delay = 2 # Initial delay in seconds
256+
stream.max_reconnect_delay = 60 # Max delay between attempts
257+
258+
stream.connect()
259+
```
260+
261+
## Performance Considerations
262+
263+
- Each handler is called synchronously - keep them fast
264+
- Use queues or buffers for heavy processing
265+
- Consider using separate threads for data processing
266+
- Monitor memory usage with large subscriptions
267+
- Bars are aggregated and sent less frequently than quotes/trades
268+
269+
## Troubleshooting
270+
271+
### No Data Received
272+
273+
1. Check market hours - no data outside trading hours
274+
2. Verify symbol is valid and tradable
275+
3. Check your subscription level for feed access
276+
4. Ensure handlers are properly defined
277+
278+
### Connection Issues
279+
280+
1. Verify API credentials are correct
281+
2. Check network connectivity
282+
3. Monitor logs for authentication errors
283+
4. Review reconnection attempts in logs
284+
285+
### Handler Errors
286+
287+
1. Wrap handler logic in try-except blocks
288+
2. Log errors for debugging
289+
3. Handler errors won't disconnect stream
290+
4. Test handlers with sample data first
291+
292+
## Example: Real-time VWAP Calculation
293+
294+
```python
295+
from collections import defaultdict
296+
import threading
297+
298+
class VWAPCalculator:
299+
def __init__(self):
300+
self.data = defaultdict(lambda: {'volume': 0, 'value': 0})
301+
self.lock = threading.Lock()
302+
303+
def on_trade(self, trade):
304+
with self.lock:
305+
symbol_data = self.data[trade.symbol]
306+
symbol_data['volume'] += trade.size
307+
symbol_data['value'] += trade.price * trade.size
308+
309+
vwap = symbol_data['value'] / symbol_data['volume']
310+
print(f"{trade.symbol} VWAP: ${vwap:.2f}")
311+
312+
# Use the calculator
313+
calculator = VWAPCalculator()
314+
stream = alpaca.create_stream_client()
315+
stream.connect()
316+
stream.subscribe_trades(["AAPL", "GOOGL"], calculator.on_trade)
317+
```
318+
319+
## Migration from Polling
320+
321+
If you're currently polling for data, streaming provides several advantages:
322+
323+
```python
324+
# Old way - Polling
325+
while True:
326+
quote = alpaca.stock.latest_quote.get("AAPL")
327+
process_quote(quote)
328+
time.sleep(1) # Delay, missing data
329+
330+
# New way - Streaming
331+
stream = alpaca.create_stream_client()
332+
stream.connect()
333+
stream.subscribe_quotes(["AAPL"], process_quote)
334+
# Receives all quotes in real-time, no missed data
335+
```
336+
337+
## Related Documentation
338+
339+
- [Alpaca WebSocket API Docs](https://docs.alpaca.markets/docs/real-time-market-data)
340+
- [Market Data Subscriptions](https://alpaca.markets/data)
341+
- [API Rate Limits](https://docs.alpaca.markets/docs/api-rate-limit)

0 commit comments

Comments
 (0)