Skip to content

Commit a2b7d9c

Browse files
TexasCodingclaude
andcommitted
fix: address PR review feedback for v3.1.0
## Security & Stability Improvements - Add path validation to prevent directory traversal attacks - Set secure file permissions (0700) for mmap storage directories - Improve error handling with circuit breaker for batch processing - Add proper resource cleanup with __del__ for mmap files ## Configuration Enhancements - Make overflow threshold configurable (default 80%) - Make compression threshold configurable (default 1KB) - Add mmap_storage_path configuration option - Add mmap_cleanup_days for automatic old file removal ## Error Handling Improvements - Better exception handling in batch processing - Add circuit breaker after 10 consecutive failures - Properly re-raise asyncio.CancelledError for clean shutdown - Add detailed logging with exc_info for debugging ## Resource Management - Implement proper cleanup in __del__ for mmap storage - Add automatic cleanup of old overflow files (>7 days) - Close all storage instances properly on cleanup - Use contextlib.suppress for cleaner exception handling 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
1 parent cd42def commit a2b7d9c

File tree

4 files changed

+76
-16
lines changed

4 files changed

+76
-16
lines changed

src/project_x_py/client/cache.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,13 @@ def __init__(self) -> None:
5757
)
5858
self._opt_market_data_cache_time: dict[str, float] = {}
5959

60-
# Compression settings
61-
self.compression_threshold = 1024 # Compress data > 1KB
62-
self.compression_level = 3 # lz4 compression level (0-16)
60+
# Compression settings (configurable)
61+
self.compression_threshold = getattr(self, "config", {}).get(
62+
"compression_threshold", 1024
63+
) # Compress data > 1KB
64+
self.compression_level = getattr(self, "config", {}).get(
65+
"compression_level", 3
66+
) # lz4 compression level (0-16)
6367

6468
def _serialize_dataframe(self, df: pl.DataFrame) -> bytes:
6569
"""

src/project_x_py/realtime/batched_handler.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,21 @@ async def _process_batch(self) -> None:
109109
if self.process_callback:
110110
try:
111111
await self.process_callback(batch)
112+
except asyncio.CancelledError:
113+
# Re-raise cancellation for proper shutdown
114+
raise
112115
except Exception as e:
113116
logger.error(
114-
f"Error processing batch of {len(batch)} messages: {e}"
117+
f"Error processing batch of {len(batch)} messages: {e}",
118+
exc_info=True,
115119
)
120+
# Track failures for circuit breaker
121+
self.failed_batches = getattr(self, "failed_batches", 0) + 1
122+
if self.failed_batches > 10:
123+
logger.critical(
124+
"Batch processing circuit breaker triggered"
125+
)
126+
self.processing = False
116127

117128
# Update metrics
118129
processing_time = time.time() - start_time

src/project_x_py/realtime_data_manager/mmap_overflow.py

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
maintaining fast access to recent data.
77
"""
88

9-
from datetime import datetime
9+
from contextlib import suppress
10+
from datetime import datetime, timedelta
1011
from pathlib import Path
1112
from typing import TYPE_CHECKING, Any
1213

@@ -41,11 +42,31 @@ def __init__(self) -> None:
4142
"""Initialize memory-mapped overflow storage."""
4243
super().__init__()
4344

44-
# Storage configuration
45-
self.enable_mmap_overflow = True
46-
self.overflow_threshold = 0.8 # Start overflow at 80% of max bars
47-
self.mmap_storage_path = Path.home() / ".projectx" / "data_overflow"
48-
self.mmap_storage_path.mkdir(parents=True, exist_ok=True)
45+
# Storage configuration (can be overridden via config)
46+
self.enable_mmap_overflow = getattr(self, "config", {}).get(
47+
"enable_mmap_overflow", True
48+
)
49+
self.overflow_threshold = getattr(self, "config", {}).get(
50+
"overflow_threshold", 0.8
51+
) # Start overflow at 80% of max bars
52+
53+
# Validate and create storage path
54+
base_path = getattr(self, "config", {}).get(
55+
"mmap_storage_path", Path.home() / ".projectx" / "data_overflow"
56+
)
57+
self.mmap_storage_path = Path(base_path)
58+
59+
# Validate path to prevent directory traversal
60+
try:
61+
self.mmap_storage_path = self.mmap_storage_path.resolve()
62+
if not str(self.mmap_storage_path).startswith(str(Path.home())):
63+
raise ValueError(f"Invalid storage path: {self.mmap_storage_path}")
64+
self.mmap_storage_path.mkdir(
65+
parents=True, exist_ok=True, mode=0o700
66+
) # Secure permissions
67+
except Exception as e:
68+
logger.error(f"Failed to create overflow storage directory: {e}")
69+
self.enable_mmap_overflow = False
4970

5071
# Storage instances per timeframe
5172
self._mmap_storages: dict[str, MemoryMappedStorage] = {}
@@ -260,19 +281,43 @@ async def get_historical_data(
260281
async def cleanup_overflow_storage(self) -> None:
261282
"""Clean up old overflow files and close storage instances."""
262283
try:
263-
# Close all storage instances
264-
for storage in self._mmap_storages.values():
265-
storage.close()
284+
# Close all storage instances properly
285+
for timeframe, storage in list(self._mmap_storages.items()):
286+
try:
287+
storage.close()
288+
logger.debug(f"Closed mmap storage for {timeframe}")
289+
except Exception as e:
290+
logger.warning(f"Error closing storage for {timeframe}: {e}")
266291
self._mmap_storages.clear()
267292

268-
# Clean up old files (optional - keep last N days)
269-
# This could be configured based on requirements
293+
# Clean up old files based on config
294+
cleanup_days = getattr(self, "config", {}).get("mmap_cleanup_days", 7)
295+
if cleanup_days > 0:
296+
cutoff_time = datetime.now() - timedelta(days=cleanup_days)
297+
try:
298+
for file_path in self.mmap_storage_path.glob("*.mmap"):
299+
if file_path.stat().st_mtime < cutoff_time.timestamp():
300+
file_path.unlink()
301+
# Also remove metadata file
302+
meta_path = file_path.with_suffix(".meta")
303+
if meta_path.exists():
304+
meta_path.unlink()
305+
logger.info(f"Removed old overflow file: {file_path.name}")
306+
except Exception as e:
307+
logger.warning(f"Error cleaning old files: {e}")
270308

271309
logger.info("Cleaned up overflow storage")
272310

273311
except Exception as e:
274312
logger.error(f"Error cleaning up overflow storage: {e}")
275313

314+
def __del__(self) -> None:
315+
"""Ensure cleanup on deletion."""
316+
# Synchronous cleanup for destructor
317+
for storage in self._mmap_storages.values():
318+
with suppress(Exception):
319+
storage.close()
320+
276321
def get_overflow_stats(self) -> dict[str, Any]:
277322
"""
278323
Get statistics about overflow storage.

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)