|
101 | 101 | if TYPE_CHECKING: |
102 | 102 | from project_x_py.types.stats_types import RealtimeDataManagerStats |
103 | 103 |
|
| 104 | +import polars as pl |
| 105 | + |
104 | 106 | if TYPE_CHECKING: |
105 | 107 | from asyncio import Lock |
106 | 108 |
|
107 | | - import polars as pl |
108 | | - |
109 | 109 | logger = logging.getLogger(__name__) |
110 | 110 |
|
111 | 111 |
|
@@ -392,58 +392,71 @@ async def _cleanup_old_data(self) -> None: |
392 | 392 | if current_time - self.last_cleanup < self.cleanup_interval: |
393 | 393 | return |
394 | 394 |
|
395 | | - async with self.data_lock: |
396 | | - total_bars_before = 0 |
397 | | - total_bars_after = 0 |
398 | | - |
399 | | - # Cleanup each timeframe's data |
400 | | - for tf_key in self.timeframes: |
401 | | - if tf_key in self.data and not self.data[tf_key].is_empty(): |
402 | | - initial_count = len(self.data[tf_key]) |
403 | | - total_bars_before += initial_count |
404 | | - |
405 | | - # Check for buffer overflow first |
406 | | - is_overflow, utilization = await self._check_buffer_overflow(tf_key) |
407 | | - if is_overflow: |
408 | | - await self._handle_buffer_overflow(tf_key, utilization) |
409 | | - total_bars_after += len(self.data[tf_key]) |
410 | | - continue |
411 | | - |
412 | | - # Check if overflow is needed (if mixin is available) |
413 | | - if hasattr( |
414 | | - self, "_check_overflow_needed" |
415 | | - ) and await self._check_overflow_needed(tf_key): |
416 | | - await self._overflow_to_disk(tf_key) |
417 | | - # Data has been overflowed, update count |
418 | | - total_bars_after += len(self.data[tf_key]) |
419 | | - continue |
420 | | - |
421 | | - # Keep only the most recent bars (sliding window) |
422 | | - if initial_count > self.max_bars_per_timeframe: |
423 | | - self.data[tf_key] = self.data[tf_key].tail( |
424 | | - self.max_bars_per_timeframe |
425 | | - ) |
| 395 | + # Import here to avoid circular dependency |
| 396 | + from project_x_py.utils.lock_optimization import AsyncRWLock |
| 397 | + |
| 398 | + # Use appropriate lock method based on lock type |
| 399 | + if isinstance(self.data_lock, AsyncRWLock): |
| 400 | + async with self.data_lock.write_lock(): |
| 401 | + await self._perform_cleanup() |
| 402 | + else: |
| 403 | + async with self.data_lock: |
| 404 | + await self._perform_cleanup() |
426 | 405 |
|
| 406 | + async def _perform_cleanup(self) -> None: |
| 407 | + """Perform the actual cleanup logic (extracted for lock handling).""" |
| 408 | + total_bars_before = 0 |
| 409 | + total_bars_after = 0 |
| 410 | + |
| 411 | + # Cleanup each timeframe's data |
| 412 | + for tf_key in self.timeframes: |
| 413 | + if tf_key in self.data and not self.data[tf_key].is_empty(): |
| 414 | + initial_count = len(self.data[tf_key]) |
| 415 | + total_bars_before += initial_count |
| 416 | + |
| 417 | + # Check for buffer overflow first |
| 418 | + is_overflow, utilization = await self._check_buffer_overflow(tf_key) |
| 419 | + if is_overflow: |
| 420 | + await self._handle_buffer_overflow(tf_key, utilization) |
| 421 | + total_bars_after += len(self.data[tf_key]) |
| 422 | + continue |
| 423 | + |
| 424 | + # Check if overflow is needed (if mixin is available) |
| 425 | + if hasattr( |
| 426 | + self, "_check_overflow_needed" |
| 427 | + ) and await self._check_overflow_needed(tf_key): |
| 428 | + await self._overflow_to_disk(tf_key) |
| 429 | + # Data has been overflowed, update count |
427 | 430 | total_bars_after += len(self.data[tf_key]) |
| 431 | + continue |
428 | 432 |
|
429 | | - # Cleanup tick buffer - deque handles its own cleanup with maxlen |
430 | | - # No manual cleanup needed for deque with maxlen |
| 433 | + # Keep only the most recent bars (sliding window) |
| 434 | + if initial_count > self.max_bars_per_timeframe: |
| 435 | + self.data[tf_key] = self.data[tf_key].tail( |
| 436 | + self.max_bars_per_timeframe |
| 437 | + ) |
431 | 438 |
|
432 | | - # Update stats |
433 | | - self.last_cleanup = current_time |
434 | | - self.memory_stats["bars_cleaned"] += total_bars_before - total_bars_after |
435 | | - self.memory_stats["total_bars"] = total_bars_after |
436 | | - self.memory_stats["last_cleanup"] = current_time |
| 439 | + total_bars_after += len(self.data[tf_key]) |
437 | 440 |
|
438 | | - # Log cleanup if significant |
439 | | - if total_bars_before != total_bars_after: |
440 | | - self.logger.debug( |
441 | | - f"DataManager cleanup - Bars: {total_bars_before}→{total_bars_after}, " |
442 | | - f"Ticks: {len(self.current_tick_data)}" |
443 | | - ) |
| 441 | + # Cleanup tick buffer - deque handles its own cleanup with maxlen |
| 442 | + # No manual cleanup needed for deque with maxlen |
444 | 443 |
|
445 | | - # Force garbage collection after cleanup |
446 | | - gc.collect() |
| 444 | + # Update stats |
| 445 | + current_time = time.time() |
| 446 | + self.last_cleanup = current_time |
| 447 | + self.memory_stats["bars_cleaned"] += total_bars_before - total_bars_after |
| 448 | + self.memory_stats["total_bars"] = total_bars_after |
| 449 | + self.memory_stats["last_cleanup"] = current_time |
| 450 | + |
| 451 | + # Log cleanup if significant |
| 452 | + if total_bars_before != total_bars_after: |
| 453 | + self.logger.debug( |
| 454 | + f"DataManager cleanup - Bars: {total_bars_before}→{total_bars_after}, " |
| 455 | + f"Ticks: {len(self.current_tick_data)}" |
| 456 | + ) |
| 457 | + |
| 458 | + # Force garbage collection after cleanup |
| 459 | + gc.collect() |
447 | 460 |
|
448 | 461 | async def _periodic_cleanup(self) -> None: |
449 | 462 | """Background task for periodic cleanup.""" |
|
0 commit comments