Skip to content

Commit 109dd86

Browse files
committed
feat: Add batch operations for multi-symbol data fetching
- Update history.py to support multi-symbol bars with automatic batching for 200+ symbols - Update latest_quote.py to support batch quotes with automatic batching - Implement concurrent request handling using ThreadPoolExecutor - Add comprehensive tests (20 test cases: 11 unit, 9 integration) - Update README with batch operation examples and features - Optimize DataFrame operations for better performance - Maintain backward compatibility for single-symbol requests - Update DEVELOPMENT_PLAN.md to mark batch operations as complete
1 parent b7aab1f commit 109dd86

File tree

6 files changed

+913
-44
lines changed

6 files changed

+913
-44
lines changed

DEVELOPMENT_PLAN.md

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -186,19 +186,21 @@ main
186186

187187
### Phase 3: Performance & Quality (Weeks 6-7)
188188

189-
#### 3.1 Batch Operations
189+
#### 3.1 Batch Operations
190190
**Branch**: `feature/batch-operations`
191191
**Priority**: 🟢 Medium
192192
**Estimated Time**: 3 days
193+
**Actual Time**: < 1 day
194+
**Completed**: 2025-01-16
193195

194196
**Tasks**:
195-
- [ ] Update `stock/history.py` for multi-symbol bars
196-
- [ ] Update `stock/latest_quote.py` for batch quotes
197-
- [ ] Implement concurrent request handling
198-
- [ ] Add request batching logic (max 200 symbols)
199-
- [ ] Optimize DataFrame operations
200-
- [ ] Add comprehensive tests (10+ test cases)
201-
- [ ] Update documentation
197+
- [x] Update `stock/history.py` for multi-symbol bars
198+
- [x] Update `stock/latest_quote.py` for batch quotes
199+
- [x] Implement concurrent request handling
200+
- [x] Add request batching logic (max 200 symbols)
201+
- [x] Optimize DataFrame operations
202+
- [x] Add comprehensive tests (20 test cases)
203+
- [x] Update documentation
202204

203205
**Acceptance Criteria**:
204206
- Handles 200+ symbols efficiently
@@ -294,13 +296,13 @@ main
294296

295297
## 📈 Progress Tracking
296298

297-
### Overall Progress: 🟦 50% Complete
299+
### Overall Progress: 🟦 60% Complete
298300

299301
| Phase | Status | Progress | Estimated Completion |
300302
|-------|--------|----------|---------------------|
301303
| Phase 1: Critical Features | ✅ Complete | 100% | Week 1 |
302304
| Phase 2: Important Enhancements | ✅ Complete | 100% | Week 2 |
303-
| Phase 3: Performance & Quality | ⬜ Not Started | 0% | Week 7 |
305+
| Phase 3: Performance & Quality | 🟦 In Progress | 33% | Week 7 |
304306
| Phase 4: Advanced Features | ⬜ Not Started | 0% | Week 10 |
305307

306308
### Feature Status Legend

README.md

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ A modern Python wrapper for the Alpaca Trading API, providing easy access to tra
1313

1414
- **🔐 Complete Alpaca API Coverage**: Trading, market data, account management, and more
1515
- **📊 Stock Market Analysis**: Built-in screeners for gainers/losers, historical data analysis
16+
- **🚀 Batch Operations**: Efficient multi-symbol data fetching with automatic batching (200+ symbols)
1617
- **🤖 ML-Powered Predictions**: Stock price predictions using Facebook Prophet
1718
- **📰 Financial News Integration**: Real-time news from Yahoo Finance and Benzinga
1819
- **📈 Technical Analysis**: Stock recommendations and sentiment analysis
@@ -99,16 +100,30 @@ api.trading.orders.cancel_all()
99100
### Market Data & Analysis
100101

101102
```python
102-
# Get historical stock data
103+
# Get historical stock data for a single symbol
103104
history = api.stock.history.get(
104105
symbol="TSLA",
105106
start="2024-01-01",
106107
end="2024-12-31"
107108
)
108109

109-
# Get real-time quote
110+
# NEW: Get historical data for multiple symbols (batch operation)
111+
symbols = ["AAPL", "GOOGL", "MSFT", "TSLA", "AMZN"]
112+
multi_history = api.stock.history.get(
113+
symbol=symbols, # Pass a list for batch operation
114+
start="2024-01-01",
115+
end="2024-12-31"
116+
)
117+
# Returns DataFrame with all symbols' data, automatically handles batching for 200+ symbols
118+
119+
# Get real-time quote for a single symbol
110120
quote = api.stock.latest_quote.get("MSFT")
111-
print(f"MSFT Price: ${quote.ask_price}")
121+
print(f"MSFT Price: ${quote.ask}")
122+
123+
# NEW: Get real-time quotes for multiple symbols (batch operation)
124+
quotes = api.stock.latest_quote.get(["AAPL", "GOOGL", "MSFT"])
125+
for quote in quotes:
126+
print(f"{quote.symbol}: ${quote.ask}")
112127

113128
# Screen for top gainers
114129
gainers = api.stock.screener.gainers(

src/py_alpaca_api/stock/history.py

Lines changed: 171 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
from collections import defaultdict
3+
from concurrent.futures import ThreadPoolExecutor, as_completed
34

45
import pandas as pd
56

@@ -9,6 +10,8 @@
910

1011

1112
class History:
13+
BATCH_SIZE = 200 # Alpaca API limit for multi-symbol requests
14+
1215
def __init__(self, data_url: str, headers: dict[str, str], asset: Assets) -> None:
1316
"""Initializes an instance of the History class.
1417
@@ -51,7 +54,7 @@ def check_if_stock(self, symbol: str) -> AssetModel:
5154
###########################################
5255
def get_stock_data(
5356
self,
54-
symbol: str,
57+
symbol: str | list[str],
5558
start: str,
5659
end: str,
5760
timeframe: str = "1d",
@@ -61,28 +64,59 @@ def get_stock_data(
6164
sort: str = "asc",
6265
adjustment: str = "raw",
6366
) -> pd.DataFrame:
64-
"""Retrieves historical stock data for a given symbol within a specified date range and timeframe.
67+
"""Retrieves historical stock data for one or more symbols within a specified date range and timeframe.
6568
6669
Args:
67-
symbol: The stock symbol to fetch data for.
70+
symbol: The stock symbol(s) to fetch data for. Can be a single symbol string or list of symbols.
6871
start: The start date for historical data in the format "YYYY-MM-DD".
6972
end: The end date for historical data in the format "YYYY-MM-DD".
7073
timeframe: The timeframe for the historical data. Default is "1d".
7174
feed: The data feed source. Default is "sip".
7275
currency: The currency for historical data. Default is "USD".
73-
limit: The number of data points to fetch. Default is 1000.
76+
limit: The number of data points to fetch per symbol. Default is 1000.
7477
sort: The sort order for the data. Default is "asc".
7578
adjustment: The adjustment for historical data. Default is "raw".
7679
7780
Returns:
78-
A pandas DataFrame containing the historical stock data for the given symbol and time range.
81+
A pandas DataFrame containing the historical stock data for the given symbol(s) and time range.
7982
8083
Raises:
8184
ValueError: If the given timeframe is not one of the allowed values.
8285
"""
83-
self.check_if_stock(symbol)
86+
# Handle single symbol or list of symbols
87+
is_single = isinstance(symbol, str)
88+
if is_single:
89+
assert isinstance(symbol, str) # Type guard for mypy
90+
symbols_list: list[str] = [symbol]
91+
single_symbol: str = symbol
92+
else:
93+
assert isinstance(symbol, list) # Type guard for mypy
94+
symbols_list = symbol
95+
single_symbol = "" # Won't be used in multi-symbol case
96+
97+
# Validate symbols are stocks
98+
for sym in symbols_list:
99+
self.check_if_stock(sym)
84100

85-
url = f"{self.data_url}/stocks/{symbol}/bars"
101+
# If more than BATCH_SIZE symbols, need to batch the requests
102+
if not is_single and len(symbols_list) > self.BATCH_SIZE:
103+
return self._get_batched_stock_data(
104+
symbols_list,
105+
start,
106+
end,
107+
timeframe,
108+
feed,
109+
currency,
110+
limit,
111+
sort,
112+
adjustment,
113+
)
114+
115+
# Determine if using single or multi-symbol endpoint
116+
if is_single:
117+
url = f"{self.data_url}/stocks/{single_symbol}/bars"
118+
else:
119+
url = f"{self.data_url}/stocks/bars"
86120

87121
timeframe_mapping: dict = {
88122
"1m": "1Min",
@@ -111,8 +145,105 @@ def get_stock_data(
111145
"feed": feed,
112146
"sort": sort,
113147
}
114-
symbol_data = self.get_historical_data(symbol, url, params)
115-
return self.preprocess_data(symbol_data, symbol)
148+
149+
# Add symbols parameter for multi-symbol request
150+
if not is_single:
151+
params["symbols"] = ",".join(symbols_list)
152+
153+
symbol_data = self.get_historical_data(symbols_list, url, params, is_single)
154+
155+
# Process data based on single or multi-symbol
156+
if is_single:
157+
return self.preprocess_data(symbol_data[single_symbol], single_symbol)
158+
return self.preprocess_multi_data(symbol_data)
159+
160+
def _get_batched_stock_data(
161+
self,
162+
symbols: list[str],
163+
start: str,
164+
end: str,
165+
timeframe: str,
166+
feed: str,
167+
currency: str,
168+
limit: int,
169+
sort: str,
170+
adjustment: str,
171+
) -> pd.DataFrame:
172+
"""Handle large symbol lists by batching requests.
173+
174+
Args:
175+
symbols: List of symbols to fetch data for.
176+
start: The start date for historical data.
177+
end: The end date for historical data.
178+
timeframe: The timeframe for the historical data.
179+
feed: The data feed source.
180+
currency: The currency for historical data.
181+
limit: The number of data points to fetch per symbol.
182+
sort: The sort order for the data.
183+
adjustment: The adjustment for historical data.
184+
185+
Returns:
186+
A pandas DataFrame containing the historical stock data for all symbols.
187+
"""
188+
# Split symbols into batches
189+
batches = [
190+
symbols[i : i + self.BATCH_SIZE]
191+
for i in range(0, len(symbols), self.BATCH_SIZE)
192+
]
193+
194+
# Use ThreadPoolExecutor for concurrent batch requests
195+
all_dfs = []
196+
with ThreadPoolExecutor(max_workers=5) as executor:
197+
futures = []
198+
for batch in batches:
199+
future = executor.submit(
200+
self.get_stock_data,
201+
batch,
202+
start,
203+
end,
204+
timeframe,
205+
feed,
206+
currency,
207+
limit,
208+
sort,
209+
adjustment,
210+
)
211+
futures.append(future)
212+
213+
for future in as_completed(futures):
214+
try:
215+
df = future.result()
216+
if not df.empty:
217+
all_dfs.append(df)
218+
except Exception as e:
219+
# Log error but continue with other batches
220+
print(f"Error fetching batch: {e}")
221+
222+
if all_dfs:
223+
return pd.concat(all_dfs, ignore_index=True).sort_values(["symbol", "date"])
224+
return pd.DataFrame()
225+
226+
@staticmethod
227+
def preprocess_multi_data(
228+
symbols_data: dict[str, list[defaultdict]],
229+
) -> pd.DataFrame:
230+
"""Preprocess data for multiple symbols.
231+
232+
Args:
233+
symbols_data: A dictionary mapping symbols to their bar data.
234+
235+
Returns:
236+
A pandas DataFrame containing the preprocessed historical stock data for all symbols.
237+
"""
238+
all_dfs = []
239+
for symbol, data in symbols_data.items():
240+
if data: # Only process if data exists
241+
df = History.preprocess_data(data, symbol)
242+
all_dfs.append(df)
243+
244+
if all_dfs:
245+
return pd.concat(all_dfs, ignore_index=True).sort_values(["symbol", "date"])
246+
return pd.DataFrame()
116247

117248
###########################################
118249
# /////////// PreProcess Data \\\\\\\\\\\ #
@@ -169,35 +300,51 @@ def preprocess_data(symbol_data: list[defaultdict], symbol: str) -> pd.DataFrame
169300
# ///////// Get Historical Data \\\\\\\\\ #
170301
###########################################
171302
def get_historical_data(
172-
self, symbol: str, url: str, params: dict
173-
) -> list[defaultdict]:
174-
"""Retrieves historical data for a given symbol.
303+
self, symbols: list[str], url: str, params: dict, is_single: bool
304+
) -> dict[str, list[defaultdict]]:
305+
"""Retrieves historical data for given symbol(s).
175306
176307
Args:
177-
symbol (str): The symbol for which to retrieve historical data.
178-
url (str): The URL to send the request to.
179-
params (dict): Additional parameters to include in the request.
308+
symbols: List of symbols for which to retrieve historical data.
309+
url: The URL to send the request to.
310+
params: Additional parameters to include in the request.
311+
is_single: Whether this is a single-symbol request.
180312
181313
Returns:
182-
list[defaultdict]: A list of historical data for the given symbol.
314+
dict[str, list[defaultdict]]: A dictionary mapping symbols to their historical data.
183315
"""
184316
page_token = None
185317
symbols_data = defaultdict(list)
318+
186319
while True:
187-
params["page_token"] = page_token
320+
if page_token:
321+
params["page_token"] = page_token
322+
188323
response = json.loads(
189324
Requests()
190325
.request(method="GET", url=url, headers=self.headers, params=params)
191326
.text
192327
)
193328

194-
if not response.get("bars"):
195-
raise Exception(
196-
f"No historical data found for {symbol}, with the given parameters."
197-
)
329+
# Handle single vs multi-symbol response format
330+
if is_single:
331+
if not response.get("bars"):
332+
raise Exception(
333+
f"No historical data found for {symbols[0]}, with the given parameters."
334+
)
335+
symbols_data[symbols[0]].extend(response.get("bars", []))
336+
else:
337+
# Multi-symbol response has bars nested under symbol keys
338+
bars = response.get("bars", {})
339+
if not bars:
340+
raise Exception(
341+
f"No historical data found for symbols: {', '.join(symbols)}, with the given parameters."
342+
)
343+
for symbol, symbol_bars in bars.items():
344+
symbols_data[symbol].extend(symbol_bars)
198345

199-
symbols_data[symbol].extend(response.get("bars", []))
200-
page_token = response.get("next_page_token", "")
346+
page_token = response.get("next_page_token")
201347
if not page_token:
202348
break
203-
return symbols_data[symbol]
349+
350+
return symbols_data

0 commit comments

Comments
 (0)