Skip to content

Commit 732e8e9

Browse files
TexasCodingclaude
andcommitted
fix(realtime): Add proper type annotations for SignalR hub connections
- Created HubConnection type alias in types.base for BaseHubConnection - Updated ProjectXRealtimeClient to use HubConnection instead of Any - Updated ProjectXRealtimeClientProtocol to match implementation types - Improved subscription handling with proper event waiting - Added circuit breaker pattern to BatchedWebSocketHandler - Fixed asyncio deprecation warnings (get_event_loop -> get_running_loop) This ensures IDEs properly recognize connection object methods (send, on, start, stop) and provides better type safety for real-time connection management. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 64eaad5 commit 732e8e9

File tree

7 files changed

+112
-116
lines changed

7 files changed

+112
-116
lines changed

src/project_x_py/realtime/batched_handler.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ def __init__(
6262
self.total_processing_time = 0.0
6363
self.last_batch_time = time.time()
6464

65+
# Circuit breaker state
66+
self.failed_batches = 0
67+
self.circuit_breaker_tripped_at: float | None = None
68+
self.circuit_breaker_timeout = 60.0 # 60 seconds
69+
6570
# Lock for thread safety
6671
self._lock = asyncio.Lock()
6772

@@ -89,6 +94,18 @@ async def _process_batch(self) -> None:
8994
async with self._lock:
9095
if self.processing:
9196
return
97+
98+
# Check circuit breaker
99+
if self.circuit_breaker_tripped_at:
100+
if (
101+
time.time() - self.circuit_breaker_tripped_at
102+
) > self.circuit_breaker_timeout:
103+
logger.warning("Resetting circuit breaker.")
104+
self.circuit_breaker_tripped_at = None
105+
self.failed_batches = 0
106+
else:
107+
return # Circuit breaker is tripped, do not process
108+
92109
self.processing = True
93110

94111
try:
@@ -136,6 +153,8 @@ async def _process_batch(self) -> None:
136153
if self.process_callback:
137154
try:
138155
await self.process_callback(batch)
156+
# Reset failure count on success
157+
self.failed_batches = 0
139158
except asyncio.CancelledError:
140159
# Re-raise cancellation for proper shutdown
141160
raise
@@ -145,12 +164,14 @@ async def _process_batch(self) -> None:
145164
exc_info=True,
146165
)
147166
# Track failures for circuit breaker
148-
self.failed_batches = getattr(self, "failed_batches", 0) + 1
167+
self.failed_batches += 1
149168
if self.failed_batches > 10:
150169
logger.critical(
151-
"Batch processing circuit breaker triggered"
170+
f"Batch processing circuit breaker triggered for {self.circuit_breaker_timeout}s."
152171
)
172+
self.circuit_breaker_tripped_at = time.time()
153173
self.processing = False
174+
return # Stop processing
154175

155176
# Update metrics
156177
processing_time = time.time() - start_time

src/project_x_py/realtime/connection_management.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,11 @@ async def connect(self: "ProjectXRealtimeClientProtocol") -> bool:
269269
await self.setup_connections()
270270

271271
# Store the event loop for cross-thread task scheduling
272-
self._loop = asyncio.get_event_loop()
272+
try:
273+
self._loop = asyncio.get_running_loop()
274+
except RuntimeError:
275+
logger.error("No running event loop found.")
276+
return False
273277

274278
logger.debug(LogMessages.WS_CONNECT)
275279

@@ -293,8 +297,22 @@ async def connect(self: "ProjectXRealtimeClientProtocol") -> bool:
293297
)
294298
return False
295299

296-
# Wait for connections to establish and stabilize
297-
await asyncio.sleep(2.0) # Increased wait time for connection stability
300+
# Wait for connections to establish
301+
try:
302+
await asyncio.wait_for(
303+
asyncio.gather(
304+
self.user_hub_ready.wait(), self.market_hub_ready.wait()
305+
),
306+
timeout=10.0,
307+
)
308+
except TimeoutError:
309+
logger.error(
310+
LogMessages.WS_ERROR,
311+
extra={
312+
"error": "Connection attempt timed out after 10 seconds."
313+
},
314+
)
315+
return False
298316

299317
if self.user_connected and self.market_connected:
300318
self.stats["connected_time"] = datetime.now()
@@ -325,7 +343,7 @@ async def _start_connection_async(
325343
This is an internal method that bridges sync SignalR with async code.
326344
"""
327345
# SignalR connections are synchronous, so we run them in executor
328-
loop = asyncio.get_event_loop()
346+
loop = asyncio.get_running_loop()
329347
await loop.run_in_executor(None, connection.start)
330348
logger.debug(LogMessages.WS_CONNECTED, extra={"hub": name})
331349

@@ -365,13 +383,12 @@ async def disconnect(self: "ProjectXRealtimeClientProtocol") -> None:
365383
logger.debug(LogMessages.WS_DISCONNECT)
366384

367385
async with self._connection_lock:
386+
loop = asyncio.get_running_loop()
368387
if self.user_connection:
369-
loop = asyncio.get_event_loop()
370388
await loop.run_in_executor(None, self.user_connection.stop)
371389
self.user_connected = False
372390

373391
if self.market_connection:
374-
loop = asyncio.get_event_loop()
375392
await loop.run_in_executor(None, self.market_connection.stop)
376393
self.market_connected = False
377394

@@ -390,6 +407,7 @@ def _on_user_hub_open(self: "ProjectXRealtimeClientProtocol") -> None:
390407
- Logs connection success
391408
"""
392409
self.user_connected = True
410+
self.user_hub_ready.set()
393411
self.logger.info("✅ User hub connected")
394412

395413
def _on_user_hub_close(self: "ProjectXRealtimeClientProtocol") -> None:
@@ -407,6 +425,7 @@ def _on_user_hub_close(self: "ProjectXRealtimeClientProtocol") -> None:
407425
Automatic reconnection will attempt based on configuration.
408426
"""
409427
self.user_connected = False
428+
self.user_hub_ready.clear()
410429
self.logger.warning("❌ User hub disconnected")
411430

412431
def _on_market_hub_open(self: "ProjectXRealtimeClientProtocol") -> None:
@@ -421,6 +440,7 @@ def _on_market_hub_open(self: "ProjectXRealtimeClientProtocol") -> None:
421440
- Logs connection success
422441
"""
423442
self.market_connected = True
443+
self.market_hub_ready.set()
424444
self.logger.info("✅ Market hub connected")
425445

426446
def _on_market_hub_close(self: "ProjectXRealtimeClientProtocol") -> None:
@@ -438,6 +458,7 @@ def _on_market_hub_close(self: "ProjectXRealtimeClientProtocol") -> None:
438458
Automatic reconnection will attempt based on configuration.
439459
"""
440460
self.market_connected = False
461+
self.market_hub_ready.clear()
441462
self.logger.warning("❌ Market hub disconnected")
442463

443464
def _on_connection_error(

src/project_x_py/realtime/core.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ async def main():
100100
from project_x_py.realtime.connection_management import ConnectionManagementMixin
101101
from project_x_py.realtime.event_handling import EventHandlingMixin
102102
from project_x_py.realtime.subscriptions import SubscriptionsMixin
103+
from project_x_py.types.base import HubConnection
103104

104105
if TYPE_CHECKING:
105106
from project_x_py.models import ProjectXConfig
@@ -285,17 +286,16 @@ def __init__(
285286
self.base_market_url = "https://rtc.topstepx.com/hubs/market"
286287

287288
# SignalR connection objects
288-
self.user_connection: Any | None = None
289-
self.market_connection: Any | None = None
289+
self.user_connection: HubConnection | None = None
290+
self.market_connection: HubConnection | None = None
290291

291292
# Connection state tracking
292293
self.user_connected = False
293294
self.market_connected = False
294295
self.setup_complete = False
295296

296-
# Event callbacks (pure forwarding, no caching) - already initialized in mixin
297-
if not hasattr(self, "callbacks"):
298-
self.callbacks: defaultdict[str, list[Any]] = defaultdict(list)
297+
# Event callbacks (pure forwarding, no caching)
298+
self.callbacks: defaultdict[str, list[Any]] = defaultdict(list)
299299

300300
# Basic statistics (no business logic)
301301
self.stats = {
@@ -315,8 +315,10 @@ def __init__(
315315
self.logger.info(f"User Hub: {final_user_url}")
316316
self.logger.info(f"Market Hub: {final_market_url}")
317317

318-
# Async locks for thread-safe operations - check if not already initialized
319-
if not hasattr(self, "_callback_lock"):
320-
self._callback_lock = asyncio.Lock()
321-
if not hasattr(self, "_connection_lock"):
322-
self._connection_lock = asyncio.Lock()
318+
# Async locks for thread-safe operations
319+
self._callback_lock = asyncio.Lock()
320+
self._connection_lock = asyncio.Lock()
321+
322+
# Async events for connection readiness
323+
self.user_hub_ready = asyncio.Event()
324+
self.market_hub_ready = asyncio.Event()

src/project_x_py/realtime/event_handling.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ def _schedule_async_task(
430430
)
431431
except Exception as e:
432432
# Fallback for logging - avoid recursion
433-
print(f"Error scheduling async task: {e}")
433+
self.logger.error(f"Error scheduling async task: {e}")
434434
else:
435435
# Fallback - try to create task in current loop context
436436
try:
@@ -439,7 +439,7 @@ def _schedule_async_task(
439439
task.add_done_callback(lambda t: None)
440440
except RuntimeError:
441441
# No event loop available, log and continue
442-
print(f"No event loop available for {event_type} event")
442+
self.logger.error(f"No event loop available for {event_type} event")
443443

444444
async def _forward_event_async(
445445
self: "ProjectXRealtimeClientProtocol", event_type: str, args: Any

0 commit comments

Comments
 (0)