Skip to content

Commit 1cafdcf

Browse files
TexasCodingclaude
andauthored
feat: Add Feed Management System for automatic feed selection and fallback (#74)
- Create comprehensive feed_manager.py module with smart feed handling - Implement automatic subscription level detection (Basic/Unlimited/Business) - Add intelligent feed fallback mechanism (SIP → IEX → OTC) - Support per-endpoint feed configuration - Track and cache failed feeds to avoid repeated failures - Add FeedConfig dataclass for customizable feed preferences - Implement 47 tests (36 unit, 11 integration) with 100% coverage - Fix timestamp comparison in trades test to handle different precision levels Features: - Auto-detects user's Alpaca subscription level - Automatically falls back to available feeds on permission errors - Configurable feed preferences with endpoint-specific overrides - Smart caching to avoid repeated failed requests - Clear logging for feed selection and fallback decisions This completes Phase 3.2 of the v3.0.0 development plan. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Claude <[email protected]>
1 parent 83706bd commit 1cafdcf

File tree

5 files changed

+998
-12
lines changed

5 files changed

+998
-12
lines changed

DEVELOPMENT_PLAN.md

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -208,19 +208,21 @@ main
208208
- Concurrent execution where applicable
209209
- Memory-efficient DataFrame operations
210210

211-
#### 3.2 Feed Management System
211+
#### 3.2 Feed Management System
212212
**Branch**: `feature/feed-management`
213213
**Priority**: 🟢 Medium
214214
**Estimated Time**: 2 days
215+
**Actual Time**: < 1 day
216+
**Completed**: 2025-01-16
215217

216218
**Tasks**:
217-
- [ ] Create `http/feed_manager.py` module
218-
- [ ] Implement subscription level detection
219-
- [ ] Add automatic feed fallback (SIP → IEX)
220-
- [ ] Add feed validation per endpoint
221-
- [ ] Create `FeedConfig` dataclass
222-
- [ ] Add comprehensive tests (8+ test cases)
223-
- [ ] Update documentation
219+
- [x] Create `http/feed_manager.py` module
220+
- [x] Implement subscription level detection
221+
- [x] Add automatic feed fallback (SIP → IEX)
222+
- [x] Add feed validation per endpoint
223+
- [x] Create `FeedConfig` dataclass
224+
- [x] Add comprehensive tests (47 test cases: 36 unit, 11 integration)
225+
- [x] Update documentation
224226

225227
**Acceptance Criteria**:
226228
- Auto-detects user's subscription level
@@ -296,13 +298,13 @@ main
296298

297299
## 📈 Progress Tracking
298300

299-
### Overall Progress: 🟦 60% Complete
301+
### Overall Progress: 🟦 67% Complete
300302

301303
| Phase | Status | Progress | Estimated Completion |
302304
|-------|--------|----------|---------------------|
303305
| Phase 1: Critical Features | ✅ Complete | 100% | Week 1 |
304306
| Phase 2: Important Enhancements | ✅ Complete | 100% | Week 2 |
305-
| Phase 3: Performance & Quality | 🟦 In Progress | 33% | Week 7 |
307+
| Phase 3: Performance & Quality | 🟦 In Progress | 67% | Week 7 |
306308
| Phase 4: Advanced Features | ⬜ Not Started | 0% | Week 10 |
307309

308310
### Feature Status Legend
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from dataclasses import dataclass, field
5+
from enum import Enum
6+
from typing import Any, ClassVar
7+
8+
from py_alpaca_api.exceptions import APIRequestError, ValidationError
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class FeedType(Enum):
14+
"""Available data feed types."""
15+
16+
SIP = "sip"
17+
IEX = "iex"
18+
OTC = "otc"
19+
20+
@classmethod
21+
def from_string(cls, value: str) -> FeedType:
22+
"""Create FeedType from string value."""
23+
try:
24+
return cls(value.lower())
25+
except ValueError as e:
26+
raise ValidationError(
27+
f"Invalid feed type: {value}. Must be one of {[f.value for f in cls]}"
28+
) from e
29+
30+
31+
class SubscriptionLevel(Enum):
32+
"""User subscription levels."""
33+
34+
BASIC = "basic"
35+
UNLIMITED = "unlimited"
36+
BUSINESS = "business"
37+
38+
@classmethod
39+
def from_error(cls, error_message: str) -> SubscriptionLevel | None:
40+
"""Detect subscription level from error message."""
41+
error_lower = error_message.lower()
42+
43+
if "subscription" in error_lower:
44+
if "unlimited" in error_lower or "business" in error_lower:
45+
return cls.UNLIMITED
46+
return cls.BASIC
47+
return None
48+
49+
50+
@dataclass
51+
class FeedConfig:
52+
"""Configuration for feed management."""
53+
54+
preferred_feed: FeedType = FeedType.SIP
55+
fallback_feeds: list[FeedType] = field(default_factory=lambda: [FeedType.IEX])
56+
auto_fallback: bool = True
57+
subscription_level: SubscriptionLevel | None = None
58+
endpoint_feeds: dict[str, FeedType] = field(default_factory=dict)
59+
60+
def get_feed_for_endpoint(self, endpoint: str) -> FeedType:
61+
"""Get the configured feed for a specific endpoint."""
62+
return self.endpoint_feeds.get(endpoint, self.preferred_feed)
63+
64+
65+
class FeedManager:
66+
"""Manages data feed selection and fallback logic."""
67+
68+
# Endpoints that support feed parameter
69+
FEED_SUPPORTED_ENDPOINTS: ClassVar[set[str]] = {
70+
"bars",
71+
"quotes",
72+
"trades",
73+
"snapshots",
74+
"latest/bars",
75+
"latest/quotes",
76+
"latest/trades",
77+
}
78+
79+
# Feed availability by subscription level
80+
SUBSCRIPTION_FEEDS: ClassVar[dict[SubscriptionLevel, list[FeedType]]] = {
81+
SubscriptionLevel.BASIC: [FeedType.IEX],
82+
SubscriptionLevel.UNLIMITED: [FeedType.SIP, FeedType.IEX, FeedType.OTC],
83+
SubscriptionLevel.BUSINESS: [FeedType.SIP, FeedType.IEX, FeedType.OTC],
84+
}
85+
86+
def __init__(self, config: FeedConfig | None = None):
87+
"""Initialize the feed manager.
88+
89+
Args:
90+
config: Feed configuration. If None, uses defaults.
91+
"""
92+
self.config = config or FeedConfig()
93+
self._failed_feeds: dict[str, set[FeedType]] = {}
94+
self._detected_subscription_level: SubscriptionLevel | None = None
95+
96+
def get_feed(self, endpoint: str, symbol: str | None = None) -> str | None:
97+
"""Get the appropriate feed for an endpoint.
98+
99+
Args:
100+
endpoint: The API endpoint being called
101+
symbol: Optional symbol for endpoint-specific logic
102+
103+
Returns:
104+
Feed parameter value or None if endpoint doesn't support feeds
105+
"""
106+
if not self._supports_feed(endpoint):
107+
return None
108+
109+
feed = self.config.get_feed_for_endpoint(endpoint)
110+
111+
# Check if this feed has previously failed
112+
endpoint_key = f"{endpoint}:{symbol}" if symbol else endpoint
113+
if (
114+
endpoint_key in self._failed_feeds
115+
and feed in self._failed_feeds[endpoint_key]
116+
):
117+
# Try to use fallback
118+
for fallback in self.config.fallback_feeds:
119+
if fallback not in self._failed_feeds.get(endpoint_key, set()):
120+
logger.info(f"Using fallback feed {fallback.value} for {endpoint}")
121+
return fallback.value
122+
123+
return feed.value
124+
125+
def handle_feed_error(
126+
self,
127+
endpoint: str,
128+
feed: str,
129+
error: APIRequestError,
130+
symbol: str | None = None,
131+
) -> str | None:
132+
"""Handle feed-related errors and return alternative feed if available.
133+
134+
Args:
135+
endpoint: The API endpoint that failed
136+
feed: The feed that caused the error
137+
error: The API error
138+
symbol: Optional symbol for endpoint-specific tracking
139+
140+
Returns:
141+
Alternative feed to try, or None if no alternatives available
142+
"""
143+
if not self.config.auto_fallback:
144+
return None
145+
146+
# Try to detect subscription level from error
147+
error_msg = str(error)
148+
detected_level = SubscriptionLevel.from_error(error_msg)
149+
if detected_level and not self._detected_subscription_level:
150+
self._detected_subscription_level = detected_level
151+
logger.info(f"Detected subscription level: {detected_level.value}")
152+
153+
# Track failed feed
154+
endpoint_key = f"{endpoint}:{symbol}" if symbol else endpoint
155+
if endpoint_key not in self._failed_feeds:
156+
self._failed_feeds[endpoint_key] = set()
157+
158+
try:
159+
feed_type = FeedType.from_string(feed)
160+
self._failed_feeds[endpoint_key].add(feed_type)
161+
logger.warning(f"Feed {feed} failed for {endpoint_key}: {error_msg}")
162+
except ValidationError:
163+
logger.exception(f"Invalid feed type in error handling: {feed}")
164+
return None
165+
166+
# Find alternative feed
167+
for fallback in self.config.fallback_feeds:
168+
if fallback not in self._failed_feeds[
169+
endpoint_key
170+
] and self._is_feed_available(fallback):
171+
logger.info(f"Falling back to {fallback.value} feed for {endpoint_key}")
172+
return fallback.value
173+
174+
logger.error(f"No alternative feeds available for {endpoint_key}")
175+
return None
176+
177+
def detect_subscription_level(self, api_client: Any) -> SubscriptionLevel:
178+
"""Detect user's subscription level by testing API access.
179+
180+
Args:
181+
api_client: API client instance to test with
182+
183+
Returns:
184+
Detected subscription level
185+
"""
186+
# Try SIP feed first (requires Unlimited/Business)
187+
try:
188+
# Make a test request with SIP feed
189+
test_endpoint = "latest/quotes"
190+
test_params = {"symbols": "AAPL", "feed": FeedType.SIP.value}
191+
192+
api_client._make_request(
193+
"GET", f"/stocks/{test_endpoint}", params=test_params
194+
)
195+
196+
# If successful, user has at least Unlimited
197+
self._detected_subscription_level = SubscriptionLevel.UNLIMITED
198+
logger.info("Detected Unlimited/Business subscription level")
199+
200+
except APIRequestError as e:
201+
# SIP failed, user likely has Basic subscription
202+
if "subscription" in str(e).lower() or "unauthorized" in str(e).lower():
203+
self._detected_subscription_level = SubscriptionLevel.BASIC
204+
logger.info("Detected Basic subscription level")
205+
else:
206+
# Unexpected error, default to Basic for safety
207+
self._detected_subscription_level = SubscriptionLevel.BASIC
208+
logger.warning(
209+
f"Could not detect subscription level: {e}. Defaulting to Basic."
210+
)
211+
212+
self.config.subscription_level = self._detected_subscription_level
213+
return self._detected_subscription_level
214+
215+
def validate_feed(self, endpoint: str, feed: str) -> bool:
216+
"""Validate if a feed is appropriate for an endpoint.
217+
218+
Args:
219+
endpoint: The API endpoint
220+
feed: The feed to validate
221+
222+
Returns:
223+
True if feed is valid for endpoint
224+
"""
225+
if not self._supports_feed(endpoint):
226+
return False
227+
228+
try:
229+
feed_type = FeedType.from_string(feed)
230+
except ValidationError:
231+
return False
232+
233+
return self._is_feed_available(feed_type)
234+
235+
def reset_failures(self, endpoint: str | None = None) -> None:
236+
"""Reset tracked feed failures.
237+
238+
Args:
239+
endpoint: Optional endpoint to reset. If None, resets all.
240+
"""
241+
if endpoint:
242+
keys_to_remove = [
243+
k for k in self._failed_feeds if k.startswith(f"{endpoint}:")
244+
]
245+
for key in keys_to_remove:
246+
del self._failed_feeds[key]
247+
if endpoint in self._failed_feeds:
248+
del self._failed_feeds[endpoint]
249+
else:
250+
self._failed_feeds.clear()
251+
252+
logger.info(f"Reset feed failures for {endpoint or 'all endpoints'}")
253+
254+
def _supports_feed(self, endpoint: str) -> bool:
255+
"""Check if an endpoint supports feed parameter.
256+
257+
Args:
258+
endpoint: The API endpoint
259+
260+
Returns:
261+
True if endpoint supports feed parameter
262+
"""
263+
# Check if any supported endpoint pattern matches
264+
return any(supported in endpoint for supported in self.FEED_SUPPORTED_ENDPOINTS)
265+
266+
def _is_feed_available(self, feed: FeedType) -> bool:
267+
"""Check if a feed is available based on subscription level.
268+
269+
Args:
270+
feed: The feed to check
271+
272+
Returns:
273+
True if feed is available
274+
"""
275+
if not self._detected_subscription_level and not self.config.subscription_level:
276+
# If we don't know subscription level, assume all feeds available
277+
return True
278+
279+
level = self._detected_subscription_level or self.config.subscription_level
280+
if level is None:
281+
return True
282+
available_feeds = self.SUBSCRIPTION_FEEDS.get(level, [])
283+
return feed in available_feeds
284+
285+
def get_available_feeds(self) -> list[FeedType]:
286+
"""Get list of available feeds based on subscription level.
287+
288+
Returns:
289+
List of available feed types
290+
"""
291+
if not self._detected_subscription_level and not self.config.subscription_level:
292+
# If unknown, return all feeds
293+
return list(FeedType)
294+
295+
level = self._detected_subscription_level or self.config.subscription_level
296+
if level is None:
297+
return list(FeedType)
298+
return self.SUBSCRIPTION_FEEDS.get(level, [FeedType.IEX])

0 commit comments

Comments
 (0)