This comprehensive guide covers performance optimization, benchmarking, and monitoring strategies for SAFLA (Self-Aware Feedback Loop Algorithm). Our performance approach focuses on measurable improvements with specific targets and continuous monitoring.
- Performance Philosophy
- Performance Targets
- Benchmarking Framework
- Component-Specific Optimizations
- Memory Optimization
- Concurrency and Parallelization
- Caching Strategies
- Performance Monitoring
- Profiling and Analysis
- Optimization Techniques
- Performance Testing
- Troubleshooting Performance Issues
SAFLA's performance optimization is guided by several key principles:
All optimizations are based on concrete measurements and benchmarks:
- Baseline Establishment: Measure current performance before optimization
- Target-Driven: Set specific, measurable performance targets
- Continuous Monitoring: Track performance over time
- Regression Prevention: Ensure optimizations don't introduce regressions
Each component has specific performance requirements:
- Vector Search: <1ms latency for 10k vectors
- Delta Evaluation: 50% throughput improvement over baseline
- MCP Communication: <5ms round-trip latency
- Safety Validation: <10ms real-time performance
The system adapts its performance strategies based on load and conditions:
- Load-Based Optimization: Adjust strategies based on current load
- Resource-Aware: Consider available system resources
- Graceful Degradation: Maintain functionality under high load
Based on our performance test suite in test_performance_benchmarks.py:
VECTOR_SEARCH_BENCHMARKS = [
PerformanceBenchmark(
name="vector_search_latency_10k",
target_value=1.0, # <1ms
target_unit="ms",
description="Vector similarity search latency for 10k vectors"
),
PerformanceBenchmark(
name="vector_search_throughput",
target_value=1000.0, # 1000 searches/second
target_unit="ops/sec",
description="Vector similarity search throughput"
),
PerformanceBenchmark(
name="vector_batch_store_latency",
target_value=10.0, # <10ms for 1000 vectors
target_unit="ms",
description="Batch vector storage latency"
),
PerformanceBenchmark(
name="vector_memory_efficiency",
target_value=0.8, # 80% memory efficiency
target_unit="ratio",
description="Vector memory storage efficiency"
)
]DELTA_EVALUATION_BENCHMARKS = [
PerformanceBenchmark(
name="delta_evaluation_throughput",
target_value=500.0, # 50% improvement over baseline
target_unit="ops/sec",
description="Delta evaluation pipeline throughput"
),
PerformanceBenchmark(
name="delta_calculation_latency",
target_value=2.0, # <2ms per evaluation
target_unit="ms",
description="Individual delta calculation latency"
),
PerformanceBenchmark(
name="batch_delta_processing",
target_value=100.0, # 100 evaluations per batch
target_unit="ops/batch",
description="Batch delta evaluation processing"
)
]MCP_COMMUNICATION_BENCHMARKS = [
PerformanceBenchmark(
name="mcp_round_trip_latency",
target_value=5.0, # <5ms round-trip
target_unit="ms",
description="MCP server communication round-trip latency"
),
PerformanceBenchmark(
name="mcp_concurrent_requests",
target_value=100.0, # 100 concurrent requests
target_unit="requests",
description="MCP server concurrent request handling"
),
PerformanceBenchmark(
name="mcp_message_throughput",
target_value=1000.0, # 1000 messages/second
target_unit="msgs/sec",
description="MCP message processing throughput"
)
]SAFETY_VALIDATION_BENCHMARKS = [
PerformanceBenchmark(
name="safety_validation_latency",
target_value=10.0, # <10ms real-time performance
target_unit="ms",
description="Safety validation pipeline latency"
),
PerformanceBenchmark(
name="safety_validation_throughput",
target_value=100.0, # 100 validations/second
target_unit="ops/sec",
description="Safety validation throughput"
),
PerformanceBenchmark(
name="safety_rule_evaluation_speed",
target_value=1000.0, # 1000 rules/second
target_unit="rules/sec",
description="Safety rule evaluation speed"
)
]SAFLA includes a comprehensive benchmarking framework for measuring and tracking performance across all system components. The framework provides modular, extensible benchmarking capabilities with persistent storage and trend analysis.
The benchmarking framework consists of several key components:
- Core Framework (
benchmarks/core.py): Abstract base classes and execution engine - Database Layer (
benchmarks/database.py): SQLite-based persistent storage - CLI Benchmarks (
benchmarks/cli_benchmarks.py): SAFLA CLI-specific benchmarks - Utilities (
benchmarks/utils.py): Configuration, metrics, and analysis tools
class Benchmark(ABC):
"""Abstract base class for all benchmarks."""
def __init__(self, name: str, description: str = "", config: Optional[BenchmarkConfig] = None):
self.name = name
self.description = description
self.config = config or BenchmarkConfig()
@abstractmethod
async def run(self) -> BenchmarkResult:
"""Execute the benchmark and return results."""
pass
@abstractmethod
async def setup(self) -> None:
"""Setup benchmark environment."""
pass
@abstractmethod
async def teardown(self) -> None:
"""Cleanup benchmark environment."""
pass@dataclass
class BenchmarkResult:
"""Result of a benchmark execution."""
benchmark_name: str
execution_time: float
memory_usage: float
success: bool
error_message: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert result to dictionary for storage."""
return {
'benchmark_name': self.benchmark_name,
'execution_time': self.execution_time,
'memory_usage': self.memory_usage,
'success': self.success,
'error_message': self.error_message,
'metadata': json.dumps(self.metadata),
'timestamp': self.timestamp.isoformat()
}class BenchmarkRunner:
"""Executes benchmarks with monitoring and error handling."""
def __init__(self, database: Optional[BenchmarkDatabase] = None):
self.database = database or BenchmarkDatabase()
self.results: List[BenchmarkResult] = []
async def run_benchmark(self, benchmark: Benchmark) -> BenchmarkResult:
"""Run a single benchmark with full monitoring."""
# Memory monitoring setup
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
try:
await benchmark.setup()
start_time = time.perf_counter()
await benchmark.run()
end_time = time.perf_counter()
execution_time = end_time - start_time
final_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_usage = final_memory - initial_memory
result = BenchmarkResult(
benchmark_name=benchmark.name,
execution_time=execution_time,
memory_usage=memory_usage,
success=True
)
except Exception as e:
result = BenchmarkResult(
benchmark_name=benchmark.name,
execution_time=0.0,
memory_usage=0.0,
success=False,
error_message=str(e)
)
finally:
await benchmark.teardown()
# Store result
self.results.append(result)
if self.database:
await self.database.store_result(result)
return resultThe benchmarking framework is integrated into the SAFLA CLI with dedicated commands:
# Run all benchmarks
safla benchmark run
# Run specific benchmark
safla benchmark run --name "cli_help_performance"
# List available benchmarks
safla benchmark list
# View benchmark results
safla benchmark results
# Analyze performance trends
safla benchmark analyzeThe framework includes comprehensive CLI benchmarks:
- CLI Help Performance - Tests
safla --helpresponse time - CLI Version Performance - Tests
safla --versionresponse time - CLI List Commands Performance - Tests command listing speed
- CLI Invalid Command Performance - Tests error handling speed
- CLI Benchmark List Performance - Tests benchmark listing speed
- CLI Benchmark Results Performance - Tests results retrieval speed
- CLI Benchmark Run Performance - Tests benchmark execution speed
- CLI Benchmark Analyze Performance - Tests analysis performance
The framework uses SQLite for persistent storage with the following schema:
CREATE TABLE benchmark_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
benchmark_name TEXT NOT NULL,
execution_time REAL NOT NULL,
memory_usage REAL NOT NULL,
success BOOLEAN NOT NULL,
error_message TEXT,
metadata TEXT,
timestamp TEXT NOT NULL
);
CREATE INDEX idx_benchmark_name ON benchmark_results(benchmark_name);
CREATE INDEX idx_timestamp ON benchmark_results(timestamp);The framework provides comprehensive analysis capabilities:
class BenchmarkAnalyzer:
"""Analyzes benchmark results for trends and insights."""
async def analyze_trends(self, benchmark_name: str, days: int = 30) -> Dict[str, Any]:
"""Analyze performance trends over time."""
results = await self.database.get_results_by_name(benchmark_name, days)
if len(results) < 2:
return {"error": "Insufficient data for trend analysis"}
execution_times = [r.execution_time for r in results]
# Statistical analysis
mean_time = statistics.mean(execution_times)
median_time = statistics.median(execution_times)
std_dev = statistics.stdev(execution_times) if len(execution_times) > 1 else 0
# Trend detection using linear regression
x = list(range(len(execution_times)))
slope = self._calculate_slope(x, execution_times)
return {
"benchmark_name": benchmark_name,
"total_runs": len(results),
"mean_execution_time": mean_time,
"median_execution_time": median_time,
"std_deviation": std_dev,
"trend_slope": slope,
"trend_direction": "improving" if slope < 0 else "degrading" if slope > 0 else "stable",
"performance_stability": "stable" if std_dev < mean_time * 0.1 else "variable"
}class PerformanceMetrics:
"""Calculates and tracks performance metrics."""
@staticmethod
def calculate_percentiles(values: List[float]) -> Dict[str, float]:
"""Calculate performance percentiles."""
if not values:
return {}
sorted_values = sorted(values)
n = len(sorted_values)
return {
"p50": sorted_values[int(n * 0.5)],
"p90": sorted_values[int(n * 0.9)],
"p95": sorted_values[int(n * 0.95)],
"p99": sorted_values[int(n * 0.99)] if n >= 100 else sorted_values[-1]
}
@staticmethod
def detect_regressions(current_results: List[float], baseline_results: List[float], threshold: float = 0.2) -> bool:
"""Detect performance regressions."""
if not current_results or not baseline_results:
return False
current_mean = statistics.mean(current_results)
baseline_mean = statistics.mean(baseline_results)
regression_ratio = (current_mean - baseline_mean) / baseline_mean
return regression_ratio > thresholdBenchmarks are configured using the BenchmarkConfig class:
@dataclass
class BenchmarkConfig:
"""Configuration for benchmark execution."""
iterations: int = 10
warmup_iterations: int = 2
timeout_seconds: float = 30.0
memory_limit_mb: float = 1000.0
parallel_execution: bool = False
store_results: bool = True
@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> 'BenchmarkConfig':
"""Create config from dictionary."""
return cls(**{k: v for k, v in config_dict.items() if hasattr(cls, k)})from benchmarks import BenchmarkRunner, BenchmarkDatabase
from benchmarks.cli_benchmarks import CLIHelpBenchmark
# Initialize components
database = BenchmarkDatabase()
await database.initialize()
runner = BenchmarkRunner(database)
# Run specific benchmark
benchmark = CLIHelpBenchmark()
result = await runner.run_benchmark(benchmark)
print(f"Benchmark: {result.benchmark_name}")
print(f"Execution Time: {result.execution_time:.3f}s")
print(f"Memory Usage: {result.memory_usage:.2f}MB")
print(f"Success: {result.success}")from benchmarks.utils import BenchmarkAnalyzer
analyzer = BenchmarkAnalyzer(database)
# Analyze trends for specific benchmark
trends = await analyzer.analyze_trends("cli_help_performance", days=7)
print(f"Trend Direction: {trends['trend_direction']}")
print(f"Mean Execution Time: {trends['mean_execution_time']:.3f}s")
# Get performance summary
summary = await analyzer.get_performance_summary()
for benchmark_name, stats in summary.items():
print(f"{benchmark_name}: {stats['mean_time']:.3f}s ± {stats['std_dev']:.3f}s")The framework includes comprehensive unit tests in tests/test_benchmarks_simple.py:
- Core framework functionality tests
- Database operations tests
- CLI benchmark tests
- Configuration and utilities tests
- Error handling and edge case tests
Run tests with:
python -m pytest tests/test_benchmarks_simple.py -vCurrent benchmark performance targets:
| Benchmark | Target Time | Current Performance |
|---|---|---|
| CLI Help | < 1.0s | ~0.4s |
| CLI Version | < 0.5s | ~0.4s |
| CLI List Commands | < 1.0s | ~0.5s |
| CLI Error Handling | < 0.5s | ~0.4s |
| Benchmark Execution | < 5.0s | ~4.7s |
All benchmarks currently meet or exceed their performance targets with 100% success rate.
class OptimizedVectorMemoryManager:
def __init__(self, embedding_dim, similarity_metric, max_capacity):
self.embedding_dim = embedding_dim
self.similarity_metric = similarity_metric
self.max_capacity = max_capacity
# Use FAISS for fast similarity search
if similarity_metric == SimilarityMetric.COSINE:
self.index = faiss.IndexFlatIP(embedding_dim)
elif similarity_metric == SimilarityMetric.EUCLIDEAN:
self.index = faiss.IndexFlatL2(embedding_dim)
# Metadata storage
self.metadata_store = {}
self.id_to_index = {}
def store(self, vector, metadata):
"""Optimized vector storage with indexing."""
# Normalize vector for cosine similarity
if self.similarity_metric == SimilarityMetric.COSINE:
vector = vector / np.linalg.norm(vector)
# Add to index
vector_id = metadata.get('id', str(uuid.uuid4()))
index_position = self.index.ntotal
self.index.add(vector.reshape(1, -1))
self.metadata_store[vector_id] = metadata
self.id_to_index[vector_id] = index_position
return vector_id
def similarity_search(self, query_vector, k=10):
"""Optimized similarity search using FAISS."""
if self.similarity_metric == SimilarityMetric.COSINE:
query_vector = query_vector / np.linalg.norm(query_vector)
# Fast similarity search
distances, indices = self.index.search(query_vector.reshape(1, -1), k)
# Retrieve metadata
results = []
for i, (distance, index) in enumerate(zip(distances[0], indices[0])):
if index != -1: # Valid result
# Find vector_id by index
vector_id = next(
(vid for vid, idx in self.id_to_index.items() if idx == index),
None
)
if vector_id:
results.append({
'id': vector_id,
'similarity': float(distance),
'metadata': self.metadata_store[vector_id]
})
return results
def batch_store(self, vectors, metadata_list):
"""Optimized batch storage."""
vectors_array = np.array(vectors)
# Normalize for cosine similarity
if self.similarity_metric == SimilarityMetric.COSINE:
norms = np.linalg.norm(vectors_array, axis=1, keepdims=True)
vectors_array = vectors_array / norms
# Batch add to index
start_index = self.index.ntotal
self.index.add(vectors_array)
# Store metadata
vector_ids = []
for i, metadata in enumerate(metadata_list):
vector_id = metadata.get('id', str(uuid.uuid4()))
self.metadata_store[vector_id] = metadata
self.id_to_index[vector_id] = start_index + i
vector_ids.append(vector_id)
return vector_idsclass MemoryEfficientVectorStore:
def __init__(self, embedding_dim, compression_ratio=0.5):
self.embedding_dim = embedding_dim
self.compression_ratio = compression_ratio
# Use quantized index for memory efficiency
self.index = faiss.IndexIVFPQ(
faiss.IndexFlatL2(embedding_dim),
embedding_dim,
nlist=100, # Number of clusters
m=8, # Number of subquantizers
nbits=8 # Bits per subquantizer
)
# Compressed metadata storage
self.compressed_metadata = {}
def compress_metadata(self, metadata):
"""Compress metadata to save memory."""
import gzip
import pickle
serialized = pickle.dumps(metadata)
compressed = gzip.compress(serialized)
return compressed
def decompress_metadata(self, compressed_metadata):
"""Decompress metadata."""
import gzip
import pickle
decompressed = gzip.decompress(compressed_metadata)
metadata = pickle.loads(decompressed)
return metadataclass OptimizedDeltaEvaluator:
def __init__(self, weights=None):
self.weights = weights or {
'performance': 0.3,
'efficiency': 0.25,
'stability': 0.25,
'capability': 0.2
}
# Pre-compute weight vector for vectorized operations
self.weight_vector = np.array([
self.weights['performance'],
self.weights['efficiency'],
self.weights['stability'],
self.weights['capability']
])
def evaluate_batch(self, metrics_batch):
"""Optimized batch evaluation using vectorized operations."""
# Convert metrics to numpy array
metrics_array = np.array([
[m.performance, m.efficiency, m.stability, m.capability]
for m in metrics_batch
])
# Vectorized weighted sum
total_deltas = np.dot(metrics_array, self.weight_vector)
# Vectorized confidence calculation
confidences = np.mean(metrics_array, axis=1)
# Create results
results = []
for i, (total_delta, confidence) in enumerate(zip(total_deltas, confidences)):
results.append(DeltaResult(
success=True,
metrics=metrics_batch[i],
total_delta=float(total_delta),
confidence=float(confidence),
should_adapt=total_delta > 0.15
))
return results
def evaluate_streaming(self, metrics_stream):
"""Optimized streaming evaluation for real-time processing."""
# Use sliding window for efficiency
window_size = 100
metrics_buffer = []
for metrics in metrics_stream:
metrics_buffer.append(metrics)
if len(metrics_buffer) >= window_size:
# Process batch
results = self.evaluate_batch(metrics_buffer)
yield from results
# Clear buffer
metrics_buffer.clear()
# Process remaining metrics
if metrics_buffer:
results = self.evaluate_batch(metrics_buffer)
yield from resultsfrom functools import lru_cache
import hashlib
class CachedDeltaEvaluator:
def __init__(self, cache_size=1000):
self.cache_size = cache_size
self._evaluation_cache = {}
def _hash_metrics(self, metrics):
"""Create hash key for metrics."""
metrics_str = f"{metrics.performance}_{metrics.efficiency}_{metrics.stability}_{metrics.capability}"
return hashlib.md5(metrics_str.encode()).hexdigest()
@lru_cache(maxsize=1000)
def _cached_evaluate(self, metrics_hash, performance, efficiency, stability, capability):
"""Cached evaluation function."""
# Perform actual evaluation
total_delta = (
performance * self.weights['performance'] +
efficiency * self.weights['efficiency'] +
stability * self.weights['stability'] +
capability * self.weights['capability']
)
confidence = (performance + efficiency + stability + capability) / 4
return total_delta, confidence
def evaluate(self, metrics):
"""Evaluate with caching."""
metrics_hash = self._hash_metrics(metrics)
total_delta, confidence = self._cached_evaluate(
metrics_hash,
metrics.performance,
metrics.efficiency,
metrics.stability,
metrics.capability
)
return DeltaResult(
success=True,
metrics=metrics,
total_delta=total_delta,
confidence=confidence,
should_adapt=total_delta > 0.15
)class OptimizedMCPOrchestrator:
def __init__(self, max_connections=100, connection_timeout=30):
self.max_connections = max_connections
self.connection_timeout = connection_timeout
# Connection pool
self.connection_pool = asyncio.Queue(maxsize=max_connections)
self.active_connections = {}
# Task queue with priority
self.task_queue = asyncio.PriorityQueue()
# Worker pool
self.workers = []
self.worker_count = min(10, max_connections // 2)
async def initialize_connection_pool(self):
"""Initialize connection pool."""
for _ in range(self.max_connections):
connection = await self._create_connection()
await self.connection_pool.put(connection)
async def get_connection(self):
"""Get connection from pool."""
try:
connection = await asyncio.wait_for(
self.connection_pool.get(),
timeout=self.connection_timeout
)
return connection
except asyncio.TimeoutError:
raise ConnectionError("No available connections in pool")
async def return_connection(self, connection):
"""Return connection to pool."""
if connection.is_healthy():
await self.connection_pool.put(connection)
else:
# Create new connection to replace unhealthy one
new_connection = await self._create_connection()
await self.connection_pool.put(new_connection)
async def submit_task_optimized(self, task):
"""Submit task with optimized routing."""
# Add to priority queue
priority = task.priority if hasattr(task, 'priority') else 5
await self.task_queue.put((priority, time.time(), task))
# Return task ID immediately
return TaskResult(
task_id=task.id,
status='queued',
submitted_at=time.time()
)class BatchMCPProcessor:
def __init__(self, batch_size=50, batch_timeout=0.1):
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.pending_tasks = []
self.batch_timer = None
async def submit_task(self, task):
"""Submit task for batch processing."""
self.pending_tasks.append(task)
# Start batch timer if not already running
if self.batch_timer is None:
self.batch_timer = asyncio.create_task(self._batch_timeout_handler())
# Process batch if full
if len(self.pending_tasks) >= self.batch_size:
await self._process_batch()
return task.id
async def _process_batch(self):
"""Process current batch of tasks."""
if not self.pending_tasks:
return
# Cancel timer
if self.batch_timer:
self.batch_timer.cancel()
self.batch_timer = None
# Get current batch
batch = self.pending_tasks.copy()
self.pending_tasks.clear()
# Process batch concurrently
tasks = [self._process_single_task(task) for task in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def _batch_timeout_handler(self):
"""Handle batch timeout."""
try:
await asyncio.sleep(self.batch_timeout)
await self._process_batch()
except asyncio.CancelledError:
passimport psutil
import gc
from typing import Dict, Any
class MemoryMonitor:
def __init__(self):
self.baseline_memory = self._get_memory_usage()
self.peak_memory = self.baseline_memory
self.memory_history = []
def _get_memory_usage(self) -> Dict[str, float]:
"""Get current memory usage."""
process = psutil.Process()
memory_info = process.memory_info()
return {
'rss': memory_info.rss / 1024 / 1024, # MB
'vms': memory_info.vms / 1024 / 1024, # MB
'percent': process.memory_percent(),
'available': psutil.virtual_memory().available / 1024 / 1024 # MB
}
def check_memory(self) -> Dict[str, Any]:
"""Check current memory usage and detect issues."""
current_memory = self._get_memory_usage()
# Update peak memory
if current_memory['rss'] > self.peak_memory['rss']:
self.peak_memory = current_memory.copy()
# Add to history
self.memory_history.append({
'timestamp': time.time(),
'memory': current_memory.copy()
})
# Keep only recent history
if len(self.memory_history) > 1000:
self.memory_history = self.memory_history[-500:]
# Detect memory issues
memory_growth = current_memory['rss'] - self.baseline_memory['rss']
memory_growth_rate = self._calculate_memory_growth_rate()
return {
'current': current_memory,
'peak': self.peak_memory,
'growth': memory_growth,
'growth_rate': memory_growth_rate,
'needs_gc': memory_growth > 100, # >100MB growth
'critical': current_memory['percent'] > 80
}
def _calculate_memory_growth_rate(self) -> float:
"""Calculate memory growth rate over recent history."""
if len(self.memory_history) < 10:
return 0.0
recent_history = self.memory_history[-10:]
start_memory = recent_history[0]['memory']['rss']
end_memory = recent_history[-1]['memory']['rss']
time_diff = recent_history[-1]['timestamp'] - recent_history[0]['timestamp']
if time_diff > 0:
return (end_memory - start_memory) / time_diff # MB/second
return 0.0class MemoryPool:
def __init__(self, block_size=1024, pool_size=1000):
self.block_size = block_size
self.pool_size = pool_size
self.available_blocks = []
self.allocated_blocks = set()
# Pre-allocate blocks
self._initialize_pool()
def _initialize_pool(self):
"""Initialize memory pool with pre-allocated blocks."""
for _ in range(self.pool_size):
block = bytearray(self.block_size)
self.available_blocks.append(block)
def allocate(self, size=None):
"""Allocate memory block from pool."""
if size and size > self.block_size:
# Allocate directly for large requests
return bytearray(size)
if self.available_blocks:
block = self.available_blocks.pop()
self.allocated_blocks.add(id(block))
return block
else:
# Pool exhausted, allocate new block
return bytearray(self.block_size)
def deallocate(self, block):
"""Return memory block to pool."""
block_id = id(block)
if block_id in self.allocated_blocks:
self.allocated_blocks.remove(block_id)
# Clear block and return to pool
block[:] = b'\x00' * len(block)
if len(self.available_blocks) < self.pool_size:
self.available_blocks.append(block)
def get_stats(self):
"""Get memory pool statistics."""
return {
'total_blocks': self.pool_size,
'available_blocks': len(self.available_blocks),
'allocated_blocks': len(self.allocated_blocks),
'utilization': len(self.allocated_blocks) / self.pool_size
}class OptimizedGarbageCollector:
def __init__(self, memory_threshold=100, gc_interval=60):
self.memory_threshold = memory_threshold # MB
self.gc_interval = gc_interval # seconds
self.last_gc_time = time.time()
self.memory_monitor = MemoryMonitor()
def should_run_gc(self) -> bool:
"""Determine if garbage collection should run."""
memory_status = self.memory_monitor.check_memory()
# Run GC if memory growth exceeds threshold
if memory_status['growth'] > self.memory_threshold:
return True
# Run GC if memory growth rate is high
if memory_status['growth_rate'] > 10: # >10MB/second
return True
# Run GC at regular intervals
if time.time() - self.last_gc_time > self.gc_interval:
return True
return False
def run_optimized_gc(self):
"""Run optimized garbage collection."""
if not self.should_run_gc():
return
# Record pre-GC memory
pre_gc_memory = self.memory_monitor._get_memory_usage()
# Run garbage collection
collected = gc.collect()
# Record post-GC memory
post_gc_memory = self.memory_monitor._get_memory_usage()
memory_freed = pre_gc_memory['rss'] - post_gc_memory['rss']
self.last_gc_time = time.time()
return {
'objects_collected': collected,
'memory_freed': memory_freed,
'pre_gc_memory': pre_gc_memory,
'post_gc_memory': post_gc_memory
}class AsyncTaskManager:
def __init__(self, max_concurrent_tasks=100):
self.max_concurrent_tasks = max_concurrent_tasks
self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
self.active_tasks = set()
self.completed_tasks = []
self.failed_tasks = []
async def submit_task(self, coro, task_id=None):
"""Submit async task with concurrency control."""
task_id = task_id or str(uuid.uuid4())
async def wrapped_task():
async with self.semaphore:
try:
result = await coro
self.completed_tasks.append({
'task_id': task_id,
'result': result,
'completed_at': time.time()
})
return result
except Exception as e:
self.failed_tasks.append({
'task_id': task_id,
'error': str(e),
'failed_at': time.time()
})
raise
finally:
self.active_tasks.discard(task_id)
task = asyncio.create_task(wrapped_task())
self.active_tasks.add(task_id)
return task
async def wait_for_completion(self, timeout=None):
"""Wait for all active tasks to complete."""
if not self.active_tasks:
return
# Get all active task objects
active_task_objects = [
task for task in asyncio.all_tasks()
if hasattr(task, 'get_name') and task.get_name() in self.active_tasks
]
if active_task_objects:
await asyncio.wait(active_task_objects, timeout=timeout)
def get_stats(self):
"""Get task execution statistics."""
return {
'active_tasks': len(self.active_tasks),
'completed_tasks': len(self.completed_tasks),
'failed_tasks': len(self.failed_tasks),
'success_rate': len(self.completed_tasks) / (len(self.completed_tasks) + len(self.failed_tasks)) if (len(self.completed_tasks) + len(self.failed_tasks)) > 0 else 0
}from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
class OptimizedThreadPool:
def __init__(self, max_workers=None, thread_name_prefix='SAFLA'):
self.max_workers = max_workers or min(32, (os.cpu_count() or 1) + 4)
self.thread_name_prefix = thread_name_prefix
# Create thread pool with optimized settings
self.executor = ThreadPoolExecutor(
max_workers=self.max_workers,
thread_name_prefix=thread_name_prefix
)
# Thread-local storage for performance optimization
self.thread_local = threading.local()
# Performance tracking
self.task_stats = {
'submitted': 0,
'completed': 0,
'failed': 0,
'total_execution_time': 0.0
}
def submit_cpu_bound_task(self, func, *args, **kwargs):
"""Submit CPU-bound task to thread pool."""
def wrapped_func():
start_time = time.time()
try:
# Initialize thread-local resources if needed
if not hasattr(self.thread_local, 'initialized'):
self._initialize_thread_local()
result = func(*args, **kwargs)
# Update stats
execution_time = time.time() - start_time
self.task_stats['completed'] += 1
self.task_stats['total_execution_time'] += execution_time
return result
except Exception as e:
self.task_stats['failed'] += 1
raise
self.task_stats['submitted'] += 1
return self.executor.submit(wrapped_func)
def _initialize_thread_local(self):
"""Initialize thread-local resources."""
self.thread_local.initialized = True
# Add any thread-specific initialization here
# e.g., database connections, caches, etc.
def batch_execute(self, func, args_list, max_workers=None):
"""Execute function with multiple argument sets in parallel."""
max_workers = max_workers or self.max_workers
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_args = {
executor.submit(func, *args): args
for args in args_list
}
# Collect results
results = []
for future in as_completed(future_to_args):
args = future_to_args[future]
try:
result = future.result()
results.append({'args': args, 'result': result, 'success': True})
except Exception as e:
results.append({'args': args, 'error': str(e), 'success': False})
return results
def get_performance_stats(self):
"""Get thread pool performance statistics."""
if self.task_stats['completed'] > 0:
avg_execution_time = self.task_stats['total_execution_time'] / self.task_stats['completed']
else:
avg_execution_time = 0.0
return {
'max_workers': self.max_workers,
'submitted_tasks': self.task_stats['submitted'],
'completed_tasks': self.task_stats['completed'],
'failed_tasks': self.task_stats['failed'],
'success_rate': self.task_stats['completed'] / self.task_stats['submitted'] if self.task_stats['submitted'] > 0 else 0,
'avg_execution_time': avg_execution_time
}from functools import lru_cache
import redis
import pickle
class MultiLevelCache:
def __init__(self, l1_size=1000, redis_host='localhost', redis_port=6379):
# L1 Cache: In-memory LRU cache
self.l1_cache = {}
self.l1_size = l1_size
self.l1_access_order = []
# L2 Cache: Redis cache
try:
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=False)
self.redis_available = True
except:
self.redis_client = None
self.redis_available = False
# Cache statistics
self.stats = {
'l1_hits': 0,
'l2_hits': 0,
'misses': 0,
'evictions': 0
}
def get(self, key):
"""Get value from cache (L1 -> L2 -> miss)."""
# Try L1 cache first
if key in self.l1_cache:
self._update_l1_access(key)
self.stats['l1_hits'] += 1
return self.l1_cache[key]
# Try L2 cache (Redis)
if self.redis_available:
try:
value = self.redis_client.get(key)
if value is not None:
# Deserialize and promote to L1
deserialized_value = pickle.loads(value)
self._set_l1(key, deserialized_value)
self.stats['l2_hits'] += 1
return deserialized_value
except Exception:
pass
# Cache miss
self.stats['misses'] += 1
return None
def set(self, key, value, ttl=3600):
"""Set value in cache (both L1 and L2)."""
# Set in L1 cache
self._set_l1(key, value)
# Set in L2 cache (Redis)
if self.redis_available:
try:
serialized_value = pickle.dumps(value)
self.redis_client.setex(key, ttl, serialized_value)
except Exception:
pass
def _set_l1(self, key, value):
"""Set value in L1 cache with LRU eviction."""
if key in self.l1_cache:
self._update_l1_access(key)
else:
# Check if eviction is needed
if len(self.l1_cache) >= self.l1_size:
self._evict_l1()
self.l1_access_order.append(key)
self.l1_cache[key] = value
def _update_l1_access(self, key):
"""Update access order for LRU."""
self.l1_access_order.remove(key)
self.l1_access_order.append(key)
def _evict_l1(self):
"""Evict least recently used item from L1."""
if self.l1_access_order:
lru_key = self.l1_access_order.pop(0)
del self.l1_cache[lru_key]
self.stats['evictions'] += 1
def get_stats(self):
"""Get cache performance statistics."""
total_requests = self.stats['l1_hits'] + self.stats['l2_hits'] + self.stats['misses']
if total_requests > 0:
l1_hit_rate = self.stats['l1_hits'] / total_requests
l2_hit_rate = self.stats['l2_hits'] / total_requests
overall_hit_rate = (self.stats['l1_hits'] + self.stats['l2_hits']) / total_requests
else:
l1_hit_rate = l2_hit_rate = overall_hit_rate = 0.0
return {
'l1_hits': self.stats['l1_hits'],
'l2_hits': self.stats['l2_hits'],
'misses': self.stats['misses'],
'evictions': self.stats['evictions'],
'l1_hit_rate': l1_hit_rate,
'l2_hit_rate': l2_hit_rate,
'overall_hit_rate': overall_hit_rate,
'l1_size': len(self.l1_cache),
'redis_available': self.redis_available
}class AdaptiveCache:
def __init__(self, initial_size=1000, max_size=10000):
self.cache = {}
self.access_counts = {}
self.access_times = {}
self.current_size = initial_size
self.max_size = max_size
# Adaptive parameters
self.hit_rate_threshold = 0.8
self.resize_factor = 1.5
self.monitoring_window = 1000 # requests
self.request_count = 0
self.hits = 0
def get(self, key):
"""Get value with adaptive behavior tracking."""
self.request_count += 1
if key in self.cache:
self.hits += 1
self.access_counts[key] = self.access_counts.get(key, 0) + 1
self.access_times[key] = time.time()
return self.cache[key]
# Check if cache should be resized
if self.request_count % self.monitoring_window == 0:
self._adapt_cache_size()
return None
def set(self, key, value):
"""Set value with adaptive eviction."""
if len(self.cache) >= self.current_size:
self._adaptive_evict()
self.cache[key] = value
self.access_counts[key] = 1
self.access_times[key] = time.time()
def _adapt_cache_size(self):
"""Adapt cache size based on hit rate."""
hit_rate = self.hits / self.request_count
if hit_rate < self.hit_rate_threshold and self.current_size < self.max_size:
# Increase cache size
new_size = min(int(self.current_size * self.resize_factor), self.max_size)
self.current_size = new_size
elif hit_rate > 0.95 and self.current_size > 100:
# Decrease cache size if hit rate is very high
new_size = max(int(self.current_size / self.resize_factor), 100)
self.current_size = new_size
# Reset counters
self.hits = 0
self.request_count = 0
def _adaptive_evict(self):
"""Evict items based on access patterns."""
if not self.cache:
return
# Calculate scores for eviction (lower score = more likely to evict)
current_time = time.time()
scores = {}
for key in self.cache:
access_count = self.access_counts.get(key, 1)
last_access = self.access_times.get(key, current_time)
time_since_access = current_time - last_access
# Score based on frequency and recency
score = access_count / (1 + time_since_access)
scores[key] = score
# Evict item with lowest score
evict_key = min(scores.keys(), key=lambda k: scores[k])
del self.cache[evict_key]
del self.access_counts[evict_key]
del self.access_times[evict_key]class PerformanceMonitor:
def __init__(self, window_size=1000):
self.window_size = window_size
self.metrics = {
'latency': [],
'throughput': [],
'error_rate': [],
'memory_usage': [],
'cpu_usage': []
}
self.start_time = time.time()
self.request_count = 0
self.error_count = 0
def record_request(self, latency, success=True):
"""Record request metrics."""
self.request_count += 1
if not success:
self.error_count += 1
# Record latency
self.metrics['latency'].append(latency)
if len(self.metrics['latency']) > self.window_size:
self.metrics['latency'].pop(0)
# Calculate throughput
current_time = time.time()
time_window = min(current_time - self.start_time, 60) # 1-minute window
if time_window > 0:
throughput = self.request_count / time_window
self.metrics['throughput'].append(throughput)
if len(self.metrics['throughput']) > self.window_size:
self.metrics['throughput'].pop(0)
# Calculate error rate
error_rate = self.error_count / self.request_count
self.metrics['error_rate'].append(error_rate)
if len(self.metrics['error_rate']) > self.window_size:
self.metrics['error_rate'].pop(0)
# Record system metrics
self._record_system_metrics()
def _record_system_metrics(self):
"""Record system-level metrics."""
# Memory usage
process = psutil.Process()
memory_percent = process.memory_percent()
self.metrics['memory_usage'].append(memory_percent)
if len(self.metrics['memory_usage']) > self.window_size:
self.metrics['memory_usage'].pop(0)
# CPU usage
cpu_percent = process.cpu_percent()
self.metrics['cpu_usage'].append(cpu_percent)
if len(self.metrics['cpu_usage']) > self.window_size:
self.metrics['cpu_usage'].pop(0)
def get_current_metrics(self):
"""Get current performance metrics."""
if not self.metrics['latency']:
return {}
return {
'avg_latency': statistics.mean(self.metrics['latency']),
'p95_latency': statistics.quantiles(self.metrics['latency'], n=20)[18] if len(self.metrics['latency']) > 20 else max(self.metrics['latency']),
'current_throughput': self.metrics['throughput'][-1] if self.metrics['throughput'] else 0,
'current_error_rate': self.metrics['error_rate'][-1] if self.metrics['error_rate'] else 0,
'memory_usage': self.metrics['memory_usage'][-1] if self.metrics['memory_usage'] else 0,
'cpu_usage': self.metrics['cpu_usage'][-1] if self.metrics['cpu_usage'] else 0,
'total_requests': self.request_count,
'total_errors': self.error_count
}
def get_performance_trends(self):
"""Get performance trends over time."""
if len(self.metrics['latency']) < 10:
return {}
# Calculate trends
recent_latency = self.metrics['latency'][-10:]
older_latency = self.metrics['latency'][-20:-10] if len(self.metrics['latency']) >= 20 else self.metrics['latency'][:-10]
latency_trend = statistics.mean(recent_latency) - statistics.mean(older_latency) if older_latency else 0
recent_throughput = self.metrics['throughput'][-10:] if len(self.metrics['throughput']) >= 10 else self.metrics['throughput']
older_throughput = self.metrics['throughput'][-20:-10] if len(self.metrics['throughput']) >= 20 else []
throughput_trend = statistics.mean(recent_throughput) - statistics.mean(older_throughput) if older_throughput else 0
return {
'latency_trend': latency_trend,
'throughput_trend': throughput_trend,
'performance_improving': latency_trend < 0 and throughput_trend > 0
}import cProfile
import pstats
from functools import wraps
class PerformanceProfiler:
def __init__(self, output_dir='profiles'):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.profiles = {}
def profile_function(self, func_name=None):
"""Decorator to profile function execution."""
def decorator(func):
name = func_name or func.__name__
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
try:
result = func(*args, **kwargs)
return result
finally:
profiler.disable()
# Save profile
profile_file = self.output_dir / f"{name}_{int(time.time())}.prof"
profiler.dump_stats(str(profile_file))
# Store in memory for analysis
stats = pstats.Stats(profiler)
self.profiles[name] = stats
return wrapper
return decorator
def profile_block(self, block_name):
"""Context manager to profile code blocks."""
return ProfileBlock(block_name, self)
def analyze_profile(self, func_name, top_n=20):
"""Analyze profile for specific function."""
if func_name not in self.profiles:
return None
stats = self.profiles[func_name]
# Get top functions by cumulative time
stats.sort_stats('cumulative')
# Capture output
import io
output = io.StringIO()
stats.print_stats(top_n, file=output)
return {
'total_calls': stats.total_calls,
'total_time': stats.total_tt,
'top_functions': output.getvalue()
}
def get_hotspots(self, func_name):
"""Identify performance hotspots."""
if func_name not in self.profiles:
return []
stats = self.profiles[func_name]
# Get functions with high cumulative time
hotspots = []
for func, (cc, nc, tt, ct, callers) in stats.stats.items():
if ct > 0.01: # Functions taking >10ms
hotspots.append({
'function': func,
'calls': nc,
'total_time': tt,
'cumulative_time': ct,
'time_per_call': ct / nc if nc > 0 else 0
})
# Sort by cumulative time
hotspots.sort(key=lambda x: x['cumulative_time'], reverse=True)
return hotspots[:10] # Top 10 hotspots
class ProfileBlock:
def __init__(self, name, profiler):
self.name = name
self.profiler = profiler
self.prof = None
def __enter__(self):
self.prof = cProfile.Profile()
self.prof.enable()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.prof.disable()
# Save profile
profile_file = self.profiler.output_dir / f"{self.name}_{int(time.time())}.prof"
self.prof.dump_stats(str(profile_file))
# Store in memory
stats = pstats.Stats(self.prof)
self.profiler.profiles[self.name] = statsimport tracemalloc
from typing import List, Dict
class MemoryProfiler:
def __init__(self):
self.snapshots = []
self.baseline_snapshot = None
def start_tracing(self):
"""Start memory tracing."""
tracemalloc.start()
self.baseline_snapshot = tracemalloc.take_snapshot()
def take_snapshot(self, name=None):
"""Take memory snapshot."""
if not tracemalloc.is_tracing():
raise RuntimeError("Memory tracing not started")
snapshot = tracemalloc.take_snapshot()
self.snapshots.append({
'name': name or f'snapshot_{len(self.snapshots)}',
'timestamp': time.time(),
'snapshot': snapshot
})
return snapshot
def analyze_memory_growth(self, snapshot1_name=None, snapshot2_name=None):
"""Analyze memory growth between snapshots."""
if len(self.snapshots) < 2:
return None
# Get snapshots
if snapshot1_name:
snapshot1 = next((s['snapshot'] for s in self.snapshots if s['name'] == snapshot1_name), None)
else:
snapshot1 = self.snapshots[-2]['snapshot']
if snapshot2_name:
snapshot2 = next((s['snapshot'] for s in self.snapshots if s['name'] == snapshot2_name), None)
else:
snapshot2 = self.snapshots[-1]['snapshot']
if not snapshot1 or not snapshot2:
return None
# Compare snapshots
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
# Analyze top memory consumers
memory_growth = []
for stat in top_stats[:20]: # Top 20
memory_growth.append({
'file': stat.traceback.format()[0] if stat.traceback.format() else 'unknown',
'size_diff': stat.size_diff,
'count_diff': stat.count_diff,
'size': stat.size,
'count': stat.count
})
return memory_growth
def get_top_memory_consumers(self, snapshot_name=None):
"""Get top memory consumers from snapshot."""
if not self.snapshots:
return []
if snapshot_name:
snapshot_data = next((s for s in self.snapshots if s['name'] == snapshot_name), None)
else:
snapshot_data = self.snapshots[-1]
if not snapshot_data:
return []
snapshot = snapshot_data['snapshot']
top_stats = snapshot.statistics('lineno')
consumers = []
for stat in top_stats[:20]: # Top 20
consumers.append({
'file': stat.traceback.format()[0] if stat.traceback.format() else 'unknown',
'size': stat.size,
'count': stat.count,
'average_size': stat.size / stat.count if stat.count > 0 else 0
})
return consumers
def stop_tracing(self):
"""Stop memory tracing."""
tracemalloc.stop()This comprehensive performance guide provides the foundation for optimizing SAFLA's performance across all components. Regular benchmarking and monitoring ensure that performance targets are met and maintained over time.