This comprehensive guide covers advanced usage patterns, customization, and extension points for SAFLA (Self-Aware Feedback Loop Algorithm). These topics are designed for experienced users who want to extend SAFLA's capabilities or integrate it into complex systems.
- Advanced Architecture Patterns
- Custom Component Development
- Extension Points
- Advanced Configuration
- Custom Algorithms
- Integration Patterns
- Performance Optimization
- Security Hardening
- Multi-Tenant Deployments
- Distributed SAFLA
- Machine Learning Integration
- Custom MCP Servers
- Advanced Monitoring
- Research and Experimentation
SAFLA can be deployed in hierarchical configurations where multiple SAFLA instances operate at different levels of abstraction:
# hierarchical/coordinator.py
from typing import Dict, List, Optional
import asyncio
import logging
class HierarchicalSAFLACoordinator:
"""Coordinates multiple SAFLA instances in a hierarchical structure."""
def __init__(self, config: Dict):
self.config = config
self.child_instances = {}
self.parent_instance = None
self.coordination_strategy = config.get('coordination_strategy', 'consensus')
async def initialize_hierarchy(self):
"""Initialize hierarchical SAFLA structure."""
# Initialize child instances
for child_config in self.config.get('children', []):
child_id = child_config['id']
child_instance = await self._create_child_instance(child_config)
self.child_instances[child_id] = child_instance
# Connect to parent if specified
if 'parent' in self.config:
self.parent_instance = await self._connect_to_parent(self.config['parent'])
async def coordinate_decision_making(self, decision_context: Dict) -> Dict:
"""Coordinate decision making across hierarchy levels."""
if self.coordination_strategy == 'consensus':
return await self._consensus_coordination(decision_context)
elif self.coordination_strategy == 'hierarchical':
return await self._hierarchical_coordination(decision_context)
elif self.coordination_strategy == 'federated':
return await self._federated_coordination(decision_context)
else:
raise ValueError(f"Unknown coordination strategy: {self.coordination_strategy}")
async def _consensus_coordination(self, context: Dict) -> Dict:
"""Consensus-based coordination among peer instances."""
# Collect decisions from all child instances
child_decisions = await asyncio.gather(*[
child.make_decision(context)
for child in self.child_instances.values()
])
# Apply consensus algorithm
consensus_decision = self._apply_consensus_algorithm(child_decisions)
# Validate with parent if available
if self.parent_instance:
validation = await self.parent_instance.validate_decision(consensus_decision)
if not validation['approved']:
# Retry with parent guidance
guided_context = {**context, 'parent_guidance': validation['guidance']}
return await self.coordinate_decision_making(guided_context)
return consensus_decision
async def _hierarchical_coordination(self, context: Dict) -> Dict:
"""Top-down hierarchical coordination."""
# Get high-level decision from parent
if self.parent_instance:
parent_decision = await self.parent_instance.make_high_level_decision(context)
context['parent_decision'] = parent_decision
# Delegate to appropriate child based on decision scope
target_child = self._select_target_child(context)
if target_child:
return await target_child.make_decision(context)
# Handle at current level if no suitable child
return await self._make_local_decision(context)
async def _federated_coordination(self, context: Dict) -> Dict:
"""Federated coordination with specialized instances."""
# Route to specialized child instances based on context
routing_decisions = await self._route_to_specialists(context)
# Aggregate specialist decisions
aggregated_decision = await self._aggregate_specialist_decisions(routing_decisions)
return aggregated_decision
def _apply_consensus_algorithm(self, decisions: List[Dict]) -> Dict:
"""Apply consensus algorithm to multiple decisions."""
# Implement voting-based consensus
decision_votes = {}
confidence_weights = {}
for decision in decisions:
decision_key = self._decision_to_key(decision)
decision_votes[decision_key] = decision_votes.get(decision_key, 0) + 1
confidence_weights[decision_key] = confidence_weights.get(decision_key, 0) + decision.get('confidence', 0.5)
# Select decision with highest weighted vote
best_decision_key = max(decision_votes.keys(),
key=lambda k: decision_votes[k] * confidence_weights[k])
# Find original decision object
for decision in decisions:
if self._decision_to_key(decision) == best_decision_key:
decision['consensus_confidence'] = confidence_weights[best_decision_key] / decision_votes[best_decision_key]
return decision
return decisions[0] # FallbackImplement event-driven patterns for reactive SAFLA systems:
# event_driven/event_system.py
import asyncio
from typing import Dict, List, Callable, Any
from dataclasses import dataclass
from enum import Enum
class EventType(Enum):
OBSERVATION_RECEIVED = "observation_received"
DELTA_CALCULATED = "delta_calculated"
DECISION_MADE = "decision_made"
MODIFICATION_APPLIED = "modification_applied"
VALIDATION_COMPLETED = "validation_completed"
ERROR_OCCURRED = "error_occurred"
PERFORMANCE_THRESHOLD_EXCEEDED = "performance_threshold_exceeded"
@dataclass
class SAFLAEvent:
event_type: EventType
source_component: str
timestamp: float
data: Dict[str, Any]
correlation_id: str
priority: int = 5 # 1-10, 1 being highest priority
class EventDrivenSAFLA:
"""Event-driven SAFLA implementation with reactive patterns."""
def __init__(self):
self.event_handlers = {}
self.event_queue = asyncio.PriorityQueue()
self.event_history = []
self.running = False
def register_handler(self, event_type: EventType, handler: Callable):
"""Register event handler for specific event type."""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
async def emit_event(self, event: SAFLAEvent):
"""Emit event to the system."""
# Add to queue with priority
await self.event_queue.put((event.priority, event.timestamp, event))
# Store in history
self.event_history.append(event)
# Keep history bounded
if len(self.event_history) > 10000:
self.event_history = self.event_history[-5000:]
async def start_event_loop(self):
"""Start the main event processing loop."""
self.running = True
while self.running:
try:
# Get next event from queue
priority, timestamp, event = await asyncio.wait_for(
self.event_queue.get(), timeout=1.0
)
# Process event
await self._process_event(event)
except asyncio.TimeoutError:
# No events to process, continue
continue
except Exception as e:
logger.error(f"Error processing event: {e}")
await asyncio.sleep(0.1)
async def _process_event(self, event: SAFLAEvent):
"""Process individual event."""
handlers = self.event_handlers.get(event.event_type, [])
if not handlers:
logger.warning(f"No handlers registered for event type: {event.event_type}")
return
# Execute all handlers concurrently
handler_tasks = [handler(event) for handler in handlers]
results = await asyncio.gather(*handler_tasks, return_exceptions=True)
# Log any handler errors
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Handler {i} failed for event {event.event_type}: {result}")
def create_reactive_pipeline(self) -> 'ReactiveObservationPipeline':
"""Create reactive observation processing pipeline."""
return ReactiveObservationPipeline(self)
class ReactiveObservationPipeline:
"""Reactive pipeline for processing observations."""
def __init__(self, event_system: EventDrivenSAFLA):
self.event_system = event_system
self.setup_pipeline()
def setup_pipeline(self):
"""Setup reactive pipeline handlers."""
# Observation -> Delta Calculation
self.event_system.register_handler(
EventType.OBSERVATION_RECEIVED,
self._handle_observation_received
)
# Delta -> Decision Making
self.event_system.register_handler(
EventType.DELTA_CALCULATED,
self._handle_delta_calculated
)
# Decision -> Modification
self.event_system.register_handler(
EventType.DECISION_MADE,
self._handle_decision_made
)
# Modification -> Validation
self.event_system.register_handler(
EventType.MODIFICATION_APPLIED,
self._handle_modification_applied
)
async def _handle_observation_received(self, event: SAFLAEvent):
"""Handle new observation and trigger delta calculation."""
observation = event.data['observation']
# Calculate delta
delta_result = await self._calculate_delta(observation)
# Emit delta calculated event
delta_event = SAFLAEvent(
event_type=EventType.DELTA_CALCULATED,
source_component="delta_evaluator",
timestamp=time.time(),
data={'delta_result': delta_result, 'observation': observation},
correlation_id=event.correlation_id,
priority=event.priority
)
await self.event_system.emit_event(delta_event)
async def _handle_delta_calculated(self, event: SAFLAEvent):
"""Handle delta calculation and trigger decision making."""
delta_result = event.data['delta_result']
if delta_result.should_adapt:
# Make decision
decision = await self._make_decision(delta_result)
# Emit decision made event
decision_event = SAFLAEvent(
event_type=EventType.DECISION_MADE,
source_component="decision_maker",
timestamp=time.time(),
data={'decision': decision, 'delta_result': delta_result},
correlation_id=event.correlation_id,
priority=event.priority
)
await self.event_system.emit_event(decision_event)Create specialized delta evaluators for domain-specific requirements:
# custom/delta_evaluator.py
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import numpy as np
class CustomDeltaEvaluator(ABC):
"""Base class for custom delta evaluators."""
@abstractmethod
async def evaluate(self, metrics: Dict[str, float]) -> 'DeltaResult':
"""Evaluate delta based on custom logic."""
pass
@abstractmethod
def get_evaluation_criteria(self) -> Dict[str, Any]:
"""Get evaluation criteria for this evaluator."""
pass
class DomainSpecificDeltaEvaluator(CustomDeltaEvaluator):
"""Delta evaluator for specific domain requirements."""
def __init__(self, domain_config: Dict[str, Any]):
self.domain_config = domain_config
self.weights = domain_config.get('weights', {})
self.thresholds = domain_config.get('thresholds', {})
self.custom_metrics = domain_config.get('custom_metrics', [])
async def evaluate(self, metrics: Dict[str, float]) -> 'DeltaResult':
"""Evaluate delta using domain-specific logic."""
# Apply domain-specific transformations
transformed_metrics = await self._transform_metrics(metrics)
# Calculate weighted delta
weighted_delta = self._calculate_weighted_delta(transformed_metrics)
# Apply domain-specific rules
rule_adjustments = await self._apply_domain_rules(transformed_metrics)
# Combine results
final_delta = weighted_delta + rule_adjustments
# Determine confidence based on domain factors
confidence = await self._calculate_domain_confidence(transformed_metrics)
return DeltaResult(
success=True,
metrics=transformed_metrics,
total_delta=final_delta,
confidence=confidence,
should_adapt=final_delta > self.thresholds.get('adaptation_threshold', 0.15),
domain_specific_data={
'rule_adjustments': rule_adjustments,
'domain_factors': await self._get_domain_factors(metrics)
}
)
async def _transform_metrics(self, metrics: Dict[str, float]) -> Dict[str, float]:
"""Apply domain-specific metric transformations."""
transformed = metrics.copy()
# Apply custom metric calculations
for custom_metric in self.custom_metrics:
if custom_metric['type'] == 'composite':
transformed[custom_metric['name']] = await self._calculate_composite_metric(
metrics, custom_metric['formula']
)
elif custom_metric['type'] == 'normalized':
transformed[custom_metric['name']] = await self._normalize_metric(
metrics[custom_metric['source']], custom_metric['normalization']
)
return transformed
async def _apply_domain_rules(self, metrics: Dict[str, float]) -> float:
"""Apply domain-specific business rules."""
adjustments = 0.0
rules = self.domain_config.get('rules', [])
for rule in rules:
if await self._evaluate_rule_condition(metrics, rule['condition']):
adjustments += rule['adjustment']
return adjustments
async def _calculate_domain_confidence(self, metrics: Dict[str, float]) -> float:
"""Calculate confidence based on domain-specific factors."""
base_confidence = np.mean(list(metrics.values()))
# Apply domain-specific confidence modifiers
confidence_factors = self.domain_config.get('confidence_factors', {})
for factor_name, factor_config in confidence_factors.items():
if factor_name in metrics:
factor_value = metrics[factor_name]
modifier = self._calculate_confidence_modifier(factor_value, factor_config)
base_confidence *= modifier
return min(max(base_confidence, 0.0), 1.0)
class MultiCriteriaDeltaEvaluator(CustomDeltaEvaluator):
"""Delta evaluator using multi-criteria decision analysis."""
def __init__(self, criteria_config: Dict[str, Any]):
self.criteria = criteria_config['criteria']
self.method = criteria_config.get('method', 'topsis')
self.weights = criteria_config.get('weights', {})
async def evaluate(self, metrics: Dict[str, float]) -> 'DeltaResult':
"""Evaluate using multi-criteria decision analysis."""
if self.method == 'topsis':
return await self._topsis_evaluation(metrics)
elif self.method == 'ahp':
return await self._ahp_evaluation(metrics)
elif self.method == 'electre':
return await self._electre_evaluation(metrics)
else:
raise ValueError(f"Unknown MCDA method: {self.method}")
async def _topsis_evaluation(self, metrics: Dict[str, float]) -> 'DeltaResult':
"""TOPSIS (Technique for Order Preference by Similarity to Ideal Solution)."""
# Normalize decision matrix
normalized_metrics = self._normalize_decision_matrix(metrics)
# Apply weights
weighted_metrics = {
k: v * self.weights.get(k, 1.0)
for k, v in normalized_metrics.items()
}
# Determine ideal and negative-ideal solutions
ideal_solution = {k: max(weighted_metrics.values()) for k in weighted_metrics}
negative_ideal = {k: min(weighted_metrics.values()) for k in weighted_metrics}
# Calculate distances
distance_to_ideal = self._calculate_euclidean_distance(weighted_metrics, ideal_solution)
distance_to_negative = self._calculate_euclidean_distance(weighted_metrics, negative_ideal)
# Calculate TOPSIS score
topsis_score = distance_to_negative / (distance_to_ideal + distance_to_negative)
return DeltaResult(
success=True,
metrics=metrics,
total_delta=topsis_score,
confidence=self._calculate_topsis_confidence(weighted_metrics),
should_adapt=topsis_score > 0.6, # TOPSIS threshold
domain_specific_data={
'topsis_score': topsis_score,
'distance_to_ideal': distance_to_ideal,
'distance_to_negative': distance_to_negative
}
)Implement custom vector storage backends for specialized requirements:
# custom/vector_backend.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
import numpy as np
class CustomVectorBackend(ABC):
"""Base class for custom vector storage backends."""
@abstractmethod
async def store_vector(self, vector: np.ndarray, metadata: Dict[str, Any]) -> str:
"""Store vector with metadata."""
pass
@abstractmethod
async def search_similar(self, query_vector: np.ndarray, k: int = 10) -> List[Dict]:
"""Search for similar vectors."""
pass
@abstractmethod
async def delete_vector(self, vector_id: str) -> bool:
"""Delete vector by ID."""
pass
class DistributedVectorBackend(CustomVectorBackend):
"""Distributed vector storage backend using consistent hashing."""
def __init__(self, nodes: List[str], replication_factor: int = 3):
self.nodes = nodes
self.replication_factor = replication_factor
self.hash_ring = self._build_hash_ring()
self.node_clients = self._initialize_node_clients()
async def store_vector(self, vector: np.ndarray, metadata: Dict[str, Any]) -> str:
"""Store vector across multiple nodes."""
vector_id = metadata.get('id', str(uuid.uuid4()))
# Determine target nodes using consistent hashing
target_nodes = self._get_target_nodes(vector_id)
# Store on multiple nodes for replication
store_tasks = []
for node in target_nodes[:self.replication_factor]:
task = self._store_on_node(node, vector_id, vector, metadata)
store_tasks.append(task)
# Wait for majority of stores to succeed
results = await asyncio.gather(*store_tasks, return_exceptions=True)
successful_stores = sum(1 for r in results if not isinstance(r, Exception))
if successful_stores >= (self.replication_factor // 2) + 1:
return vector_id
else:
raise Exception(f"Failed to store vector on majority of nodes")
async def search_similar(self, query_vector: np.ndarray, k: int = 10) -> List[Dict]:
"""Search across distributed nodes and merge results."""
# Query all nodes
search_tasks = [
self._search_on_node(node, query_vector, k)
for node in self.nodes
]
# Gather results from all nodes
node_results = await asyncio.gather(*search_tasks, return_exceptions=True)
# Merge and rank results
all_results = []
for results in node_results:
if not isinstance(results, Exception):
all_results.extend(results)
# Sort by similarity and return top k
all_results.sort(key=lambda x: x['similarity'], reverse=True)
return all_results[:k]
def _build_hash_ring(self) -> Dict[int, str]:
"""Build consistent hash ring for node selection."""
hash_ring = {}
virtual_nodes = 150 # Virtual nodes per physical node
for node in self.nodes:
for i in range(virtual_nodes):
virtual_node_key = f"{node}:{i}"
hash_value = hash(virtual_node_key) % (2**32)
hash_ring[hash_value] = node
return dict(sorted(hash_ring.items()))
def _get_target_nodes(self, vector_id: str) -> List[str]:
"""Get target nodes for vector using consistent hashing."""
hash_value = hash(vector_id) % (2**32)
# Find the first node in the ring >= hash_value
target_nodes = []
ring_keys = list(self.hash_ring.keys())
start_index = 0
for i, key in enumerate(ring_keys):
if key >= hash_value:
start_index = i
break
# Get nodes in order around the ring
seen_nodes = set()
for i in range(len(ring_keys)):
index = (start_index + i) % len(ring_keys)
node = self.hash_ring[ring_keys[index]]
if node not in seen_nodes:
target_nodes.append(node)
seen_nodes.add(node)
if len(target_nodes) >= self.replication_factor:
break
return target_nodes
class HybridVectorBackend(CustomVectorBackend):
"""Hybrid backend combining multiple storage strategies."""
def __init__(self, config: Dict[str, Any]):
self.hot_storage = self._initialize_hot_storage(config['hot_storage'])
self.warm_storage = self._initialize_warm_storage(config['warm_storage'])
self.cold_storage = self._initialize_cold_storage(config['cold_storage'])
self.hot_threshold = config.get('hot_threshold', 1000) # Recent vectors
self.warm_threshold = config.get('warm_threshold', 10000) # Frequently accessed
async def store_vector(self, vector: np.ndarray, metadata: Dict[str, Any]) -> str:
"""Store vector in appropriate tier based on access patterns."""
vector_id = metadata.get('id', str(uuid.uuid4()))
# Always store in hot storage initially
await self.hot_storage.store_vector(vector, metadata)
# Update access tracking
await self._update_access_tracking(vector_id)
return vector_id
async def search_similar(self, query_vector: np.ndarray, k: int = 10) -> List[Dict]:
"""Search across tiers with intelligent routing."""
# Try hot storage first
hot_results = await self.hot_storage.search_similar(query_vector, k)
if len(hot_results) >= k:
return hot_results[:k]
# Search warm storage for additional results
remaining_k = k - len(hot_results)
warm_results = await self.warm_storage.search_similar(query_vector, remaining_k)
combined_results = hot_results + warm_results
if len(combined_results) >= k:
return self._merge_and_rank_results(combined_results)[:k]
# Search cold storage if needed
remaining_k = k - len(combined_results)
cold_results = await self.cold_storage.search_similar(query_vector, remaining_k)
all_results = combined_results + cold_results
return self._merge_and_rank_results(all_results)[:k]
async def _tier_management_loop(self):
"""Background process to manage vector tiers."""
while True:
try:
# Move vectors between tiers based on access patterns
await self._promote_vectors()
await self._demote_vectors()
# Sleep for tier management interval
await asyncio.sleep(3600) # 1 hour
except Exception as e:
logger.error(f"Error in tier management: {e}")
await asyncio.sleep(300) # 5 minutes on error
async def _promote_vectors(self):
"""Promote frequently accessed vectors to higher tiers."""
# Get access statistics
access_stats = await self._get_access_statistics()
# Identify vectors for promotion
for vector_id, stats in access_stats.items():
current_tier = await self._get_vector_tier(vector_id)
if current_tier == 'cold' and stats['access_frequency'] > self.warm_threshold:
await self._move_vector_to_tier(vector_id, 'warm')
elif current_tier == 'warm' and stats['recent_accesses'] > self.hot_threshold:
await self._move_vector_to_tier(vector_id, 'hot')
async def _demote_vectors(self):
"""Demote infrequently accessed vectors to lower tiers."""
# Check hot storage capacity
hot_count = await self.hot_storage.get_vector_count()
if hot_count > self.hot_threshold:
# Get least recently used vectors from hot storage
lru_vectors = await self.hot_storage.get_lru_vectors(hot_count - self.hot_threshold)
for vector_id in lru_vectors:
await self._move_vector_to_tier(vector_id, 'warm')SAFLA supports a plugin architecture for extending functionality:
# plugins/plugin_system.py
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
import importlib
import inspect
class SAFLAPlugin(ABC):
"""Base class for SAFLA plugins."""
@property
@abstractmethod
def name(self) -> str:
"""Plugin name."""
pass
@property
@abstractmethod
def version(self) -> str:
"""Plugin version."""
pass
@abstractmethod
async def initialize(self, config: Dict[str, Any]) -> bool:
"""Initialize plugin with configuration."""
pass
@abstractmethod
async def cleanup(self) -> bool:
"""Cleanup plugin resources."""
pass
def get_hooks(self) -> Dict[str, callable]:
"""Get plugin hooks."""
hooks = {}
# Automatically discover hook methods
for name, method in inspect.getmembers(self, predicate=inspect.ismethod):
if name.startswith('hook_'):
hook_name = name[5:] # Remove 'hook_' prefix
hooks[hook_name] = method
return hooks
class PluginManager:
"""Manages SAFLA plugins."""
def __init__(self):
self.plugins = {}
self.hooks = {}
self.plugin_configs = {}
async def load_plugin(self, plugin_path: str, config: Dict[str, Any] = None) -> bool:
"""Load plugin from path."""
try:
# Import plugin module
module = importlib.import_module(plugin_path)
# Find plugin class
plugin_class = None
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, SAFLAPlugin) and
obj != SAFLAPlugin):
plugin_class = obj
break
if not plugin_class:
raise ValueError(f"No plugin class found in {plugin_path}")
# Create plugin instance
plugin = plugin_class()
# Initialize plugin
success = await plugin.initialize(config or {})
if not success:
raise Exception(f"Plugin {plugin.name} initialization failed")
# Register plugin
self.plugins[plugin.name] = plugin
self.plugin_configs[plugin.name] = config
# Register hooks
plugin_hooks = plugin.get_hooks()
for hook_name, hook_method in plugin_hooks.items():
if hook_name not in self.hooks:
self.hooks[hook_name] = []
self.hooks[hook_name].append(hook_method)
logger.info(f"Loaded plugin: {plugin.name} v{plugin.version}")
return True
except Exception as e:
logger.error(f"Failed to load plugin {plugin_path}: {e}")
return False
async def unload_plugin(self, plugin_name: str) -> bool:
"""Unload plugin by name."""
if plugin_name not in self.plugins:
return False
try:
plugin = self.plugins[plugin_name]
# Cleanup plugin
await plugin.cleanup()
# Remove hooks
plugin_hooks = plugin.get_hooks()
for hook_name, hook_method in plugin_hooks.items():
if hook_name in self.hooks:
self.hooks[hook_name] = [
h for h in self.hooks[hook_name] if h != hook_method
]
# Remove plugin
del self.plugins[plugin_name]
del self.plugin_configs[plugin_name]
logger.info(f"Unloaded plugin: {plugin_name}")
return True
except Exception as e:
logger.error(f"Failed to unload plugin {plugin_name}: {e}")
return False
async def execute_hook(self, hook_name: str, *args, **kwargs) -> List[Any]:
"""Execute all hooks for given hook name."""
if hook_name not in self.hooks:
return []
results = []
for hook in self.hooks[hook_name]:
try:
result = await hook(*args, **kwargs)
results.append(result)
except Exception as e:
logger.error(f"Hook {hook_name} failed: {e}")
results.append(None)
return results
# Example plugin implementation
class MetricsCollectionPlugin(SAFLAPlugin):
"""Plugin for collecting custom metrics."""
@property
def name(self) -> str:
return "metrics_collection"
@property
def version(self) -> str:
return "1.0.0"
async def initialize(self, config: Dict[str, Any]) -> bool:
"""Initialize metrics collection."""
self.metrics_store = config.get('metrics_store', 'memory')
self.collection_interval = config.get('collection_interval', 60)
self.custom_metrics = config.get('custom_metrics', [])
# Start metrics collection loop
asyncio.create_task(self._metrics_collection_loop())
return True
async def cleanup(self) -> bool:
"""Cleanup metrics collection."""
# Stop collection loop and cleanup resources
return True
async def hook_observation_processed(self, observation: Dict[str, Any]) -> Dict[str, Any]:
"""Hook called when observation is processed."""
# Collect custom metrics from observation
custom_metrics = {}
for metric_config in self.custom_metrics:
metric_name = metric_config['name']
metric_value = self._extract_metric_value(observation, metric_config)
custom_metrics[metric_name] = metric_value
# Store metrics
await self._store_metrics(custom_metrics)
return custom_metrics
async def hook_delta_calculated(self, delta_result: 'DeltaResult') -> None:
"""Hook called when delta is calculated."""
# Record delta calculation metrics
await self._record_delta_metrics(delta_result)
async def _metrics_collection_loop(self):
"""Background metrics collection loop."""
while True:
try:
# Collect system metrics
system_metrics = await self._collect_system_metrics()
await self._store_metrics(system_metrics)
await asyncio.sleep(self.collection_interval)
except Exception as e:
logger.error(f"Metrics collection error: {e}")
await asyncio.sleep(10)Create specialized MCP servers for domain-specific integrations:
# mcp/custom_server.py
from typing import Dict, Any, List, Optional
import asyncio
import json
class CustomMCPServer:
"""Base class for custom MCP servers."""
def __init__(self, server_config: Dict[str, Any]):
self.config = server_config
self.tools = {}
self.resources = {}
self.running = False
def register_tool(self, name: str, tool_func: callable, schema: Dict[str, Any]):
"""Register a tool with the MCP server."""
self.tools[name] = {
'function': tool_func,
'schema': schema
}
def register_resource(self, uri: str, resource_func: callable):
"""Register a resource with the MCP server."""
self.resources[uri] = resource_func
async def start_server(self):
"""Start the MCP server."""
self.running = True
# Start server loop
await self._server_loop()
async def stop_server(self):
"""Stop the MCP server."""
self.running = False
async def _server_loop(self):
"""Main server loop."""
while self.running:
try:
# Handle incoming requests
await self._handle_requests()
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"MCP server error: {e}")
await asyncio.sleep(1)
class DatabaseMCPServer(CustomMCPServer):
"""MCP server for database operations."""
def __init__(self, server_config: Dict[str, Any]):
super().__init__(server_config)
self.db_connection = None
self._register_database_tools()
def _register_database_tools(self):
"""Register database-specific tools."""
# Query execution tool
self.register_tool(
"execute_query",
self._execute_query,
{
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL query to execute"},
"parameters": {"type": "array", "description": "Query parameters"}
},
"required": ["query"]
}
)
# Schema inspection tool
self.register_tool(
"inspect_schema",
self._inspect_schema,
{
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Table name to inspect"}
}
}
)
# Performance monitoring tool
self.register_tool(
"monitor_performance",
self._monitor_performance,
{
"type": "object",
"properties": {
"duration": {"type": "integer", "description": "Monitoring duration in seconds"}
}
}
)
async def _execute_query(self, query: str, parameters: List[Any] = None) -> Dict[str, Any]:
"""Execute database query."""
try:
# Validate query safety
if not self._is_safe_query(query):
raise ValueError("Unsafe query detected")
# Execute query
result = await self._run_query(query, parameters)
return {
"success": True,
"result": result,
"row_count": len(result) if isinstance(result, list) else 1
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def _inspect_schema(self, table_name: str = None) -> Dict[str, Any]:
"""Inspect database schema."""
try:
if table_name:
# Get specific table schema
schema = await self._get_table_schema(table_name)
else:
# Get all tables
schema = await self._get_all_tables()
return {
"success": True,
"schema": schema
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def _monitor_performance(self, duration: int = 60) -> Dict[str, Any]:
"""Monitor database performance."""
try:
# Start performance monitoring
start_time = time.time()
performance_data = []
while time.time() - start_time < duration:
metrics = await self._collect_performance_metrics()
performance_data.append({
"timestamp": time.time(),
"metrics": metrics
})
await asyncio.sleep(5) # Collect every 5 seconds
# Analyze performance data
analysis = self._analyze_performance_data(performance_data)
return {
"success": True,
"performance_data": performance_data,
"analysis": analysis
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
class MLModelMCPServer(CustomMCPServer):
"""MCP server for machine learning model operations."""
def __init__(self, server_config: Dict[str, Any]):
super().__init__(server_config)
self.models = {}
self._register_ml_tools()
def _register_ml_tools(self):
"""Register ML-specific tools."""
# Model training tool
self.register_tool(
"train_model",
self._train_model,
{
"type": "object",
"properties": {
"model_type": {"type": "string", "description": "Type of model to train"},
"training_data": {"type": "string", "description": "Path to training data"},
"hyperparameters": {"type": "object", "description": "Model hyperparameters"}
},
"required": ["model_type", "training_data"]
}
)
# Model prediction tool
self.register_tool(
"predict",
self._predict,
{
"type": "object",
"properties": {
"model_id": {"type": "string", "description": "Model identifier"},
"input_data": {"type": "array", "description": "Input data for prediction"}
},
"required": ["model_id", "input_data"]
}
)
# Model evaluation tool
self.register_tool(
"evaluate_model",
self._evaluate_model,
{
"type": "object",
"properties": {
"model_id": {"type": "string", "description": "Model identifier"},
"test_data": {"type": "string", "description": "Path to test data"}
},
"required": ["model_id", "test_data"]
}
)
async def _train_model(self, model_type: str, training_data: str,
hyperparameters: Dict[str, Any] = None) -> Dict[str, Any]:
"""Train machine learning model."""
try:
# Load training data
data = await self._load_training_data(training_data)
# Initialize model
model = self._create_model(model_type, hyperparameters)
# Train model
training_results = await self._train_model_async(model, data)
# Store trained model
model_id = str(uuid.uuid4())
self.models[model_id] = model
return {
"success": True,
"model_id": model_id,
"training_results": training_results
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def _predict(self, model_id: str, input_data: List[Any]) -> Dict[str, Any]:
"""Make predictions using trained model."""
try:
if model_id not in self.models:
raise ValueError(f"Model {model_id} not found")
model = self.models[model_id]
# Make predictions
predictions = await self._make_predictions(model, input_data)
return {
"success": True,
"predictions": predictions
}
except Exception as e:
return {
"success": False,
"error": str(e)
}This advanced topics guide provides comprehensive coverage of SAFLA's extensibility and advanced usage patterns, enabling sophisticated customizations and integrations for complex use cases.