From 8d78ea0be6b6e27cb5093d2178732771afdec72f Mon Sep 17 00:00:00 2001 From: Rakshitha Ireddi Date: Fri, 13 Feb 2026 23:43:37 +0530 Subject: [PATCH 1/2] feat: Add adaptive tool execution analytics and intelligent retry system - Implement comprehensive execution analytics tracking (success rate, latency, error patterns) - Add intelligent error classification system (timeout, network, validation, etc.) - Implement adaptive retry strategies with exponential/linear backoff - Add result caching system to avoid redundant tool calls - Integrate analytics into A1 agent with public API methods - Provide analytics summary and error analysis DataFrames - Support cache management and analytics reset functionality This feature enables: - Performance monitoring and optimization of tool usage - Automatic retry with context-aware strategies - Reduced redundant API calls through intelligent caching - Comprehensive error analysis for debugging and improvement --- biomni/agent/a1.py | 62 +++- biomni/tool/execution_analytics.py | 449 +++++++++++++++++++++++++++++ biomni/tool/tool_registry.py | 6 +- 3 files changed, 515 insertions(+), 2 deletions(-) create mode 100644 biomni/tool/execution_analytics.py diff --git a/biomni/agent/a1.py b/biomni/agent/a1.py index 7a62e59f1..4faba71cb 100644 --- a/biomni/agent/a1.py +++ b/biomni/agent/a1.py @@ -18,6 +18,7 @@ from biomni.know_how import KnowHowLoader from biomni.llm import SourceType, get_llm from biomni.model.retriever import ToolRetriever +from biomni.tool.execution_analytics import ExecutionAnalytics from biomni.tool.support_tools import run_python_repl from biomni.tool.tool_registry import ToolRegistry from biomni.utils import ( @@ -205,8 +206,16 @@ def __init__( self.module2api = module2api self.use_tool_retriever = use_tool_retriever + # Initialize execution analytics system early (before tool registry) + self.execution_analytics = ExecutionAnalytics( + enable_caching=True, + cache_ttl=3600, + max_retries=3, + enable_analytics=True, + ) + if self.use_tool_retriever: - self.tool_registry = ToolRegistry(module2api) + self.tool_registry = ToolRegistry(module2api, execution_analytics=self.execution_analytics) self.retriever = ToolRetriever() # Initialize know-how loader @@ -220,6 +229,8 @@ def __init__( # Add timeout parameter self.timeout_seconds = timeout_seconds # 10 minutes default timeout + + self.configure() def add_tool(self, api): @@ -2058,6 +2069,55 @@ def create_mcp_server(self, tool_modules=None): print(f"Created MCP server with {registered_tools} tools") return mcp + def get_execution_analytics(self, tool_name: str = None): + """Get execution analytics for tools. + + Args: + tool_name: Optional tool name to get analytics for a specific tool. + If None, returns analytics for all tools. + + Returns: + Dictionary mapping tool names to ToolAnalytics objects, or + a single ToolAnalytics object if tool_name is specified. + """ + return self.execution_analytics.get_tool_analytics(tool_name) + + def get_analytics_summary(self): + """Get a summary of all tool execution analytics as a DataFrame. + + Returns: + pandas DataFrame with columns: tool_name, total_executions, + success_rate, failure_rate, avg_execution_time, cache_hit_rate, + most_common_error, last_execution + """ + return self.execution_analytics.get_analytics_summary() + + def get_error_analysis(self): + """Get error analysis across all tools. + + Returns: + pandas DataFrame with error statistics by tool and error type + """ + return self.execution_analytics.get_error_analysis() + + def clear_execution_cache(self, tool_name: str = None): + """Clear cached execution results. + + Args: + tool_name: Optional tool name to clear cache for a specific tool. + If None, clears cache for all tools. + """ + self.execution_analytics.clear_cache(tool_name) + if tool_name: + print(f"Cleared execution cache for tool: {tool_name}") + else: + print("Cleared execution cache for all tools") + + def reset_execution_analytics(self): + """Reset all execution analytics data.""" + self.execution_analytics.reset_analytics() + print("Reset all execution analytics") + def save_conversation_history(self, filepath: str, include_images: bool = True, save_pdf: bool = True) -> None: """Save the complete conversation history as PDF only. diff --git a/biomni/tool/execution_analytics.py b/biomni/tool/execution_analytics.py new file mode 100644 index 000000000..fead5bd20 --- /dev/null +++ b/biomni/tool/execution_analytics.py @@ -0,0 +1,449 @@ +""" +Tool Execution Analytics and Adaptive Retry System + +This module provides comprehensive analytics tracking, performance monitoring, +and intelligent retry mechanisms for tool execution in Biomni agents. + +Features: +- Execution performance tracking (success rate, latency, error patterns) +- Adaptive retry strategies based on error classification +- Result caching to avoid redundant tool calls +- Analytics reporting for tool optimization +""" + +import hashlib +import json +import time +from collections import defaultdict +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Callable, Dict, List, Optional, Tuple + +import pandas as pd + + +class ErrorType(Enum): + """Classification of error types for adaptive retry strategies.""" + + TIMEOUT = "timeout" + NETWORK = "network" + VALIDATION = "validation" + RESOURCE = "resource" + PERMISSION = "permission" + NOT_FOUND = "not_found" + RATE_LIMIT = "rate_limit" + UNKNOWN = "unknown" + + +class RetryStrategy(Enum): + """Retry strategies based on error type.""" + + IMMEDIATE = "immediate" # Retry immediately (for transient errors) + EXPONENTIAL_BACKOFF = "exponential_backoff" # Exponential backoff (for rate limits) + LINEAR_BACKOFF = "linear_backoff" # Linear backoff (for network issues) + NO_RETRY = "no_retry" # Don't retry (for permanent errors) + + +@dataclass +class ExecutionRecord: + """Record of a single tool execution attempt.""" + + tool_name: str + timestamp: float + execution_time: float + success: bool + error_type: Optional[ErrorType] = None + error_message: Optional[str] = None + parameters_hash: str = "" + result_hash: Optional[str] = None + retry_count: int = 0 + + +@dataclass +class ToolAnalytics: + """Analytics for a specific tool.""" + + tool_name: str + total_executions: int = 0 + successful_executions: int = 0 + failed_executions: int = 0 + total_execution_time: float = 0.0 + average_execution_time: float = 0.0 + error_counts: Dict[ErrorType, int] = field(default_factory=lambda: defaultdict(int)) + last_execution: Optional[float] = None + cache_hits: int = 0 + cache_misses: int = 0 + + @property + def success_rate(self) -> float: + """Calculate success rate.""" + if self.total_executions == 0: + return 0.0 + return self.successful_executions / self.total_executions + + @property + def failure_rate(self) -> float: + """Calculate failure rate.""" + if self.total_executions == 0: + return 0.0 + return self.failed_executions / self.total_executions + + +class ExecutionAnalytics: + """Comprehensive analytics and retry system for tool execution.""" + + def __init__( + self, + enable_caching: bool = True, + cache_ttl: int = 3600, + max_retries: int = 3, + enable_analytics: bool = True, + ): + """Initialize the execution analytics system. + + Args: + enable_caching: Whether to cache successful tool results + cache_ttl: Time-to-live for cached results in seconds + max_retries: Maximum number of retry attempts + enable_analytics: Whether to track execution analytics + """ + self.enable_caching = enable_caching + self.cache_ttl = cache_ttl + self.max_retries = max_retries + self.enable_analytics = enable_analytics + + # Analytics storage + self.tool_analytics: Dict[str, ToolAnalytics] = {} + self.execution_history: List[ExecutionRecord] = [] + + # Result cache: {tool_name: {param_hash: (result, timestamp)}} + self.result_cache: Dict[str, Dict[str, Tuple[Any, float]]] = defaultdict(dict) + + # Error classification patterns + self.error_patterns = { + ErrorType.TIMEOUT: ["timeout", "timed out", "exceeded", "deadline"], + ErrorType.NETWORK: ["connection", "network", "dns", "unreachable", "refused"], + ErrorType.VALIDATION: ["validation", "invalid", "malformed", "format", "type error"], + ErrorType.RESOURCE: ["memory", "out of memory", "resource", "quota", "limit exceeded"], + ErrorType.PERMISSION: ["permission", "forbidden", "unauthorized", "access denied"], + ErrorType.NOT_FOUND: ["not found", "404", "does not exist", "missing"], + ErrorType.RATE_LIMIT: ["rate limit", "429", "too many requests", "throttle"], + } + + # Retry strategy mapping + self.retry_strategies = { + ErrorType.TIMEOUT: RetryStrategy.EXPONENTIAL_BACKOFF, + ErrorType.NETWORK: RetryStrategy.LINEAR_BACKOFF, + ErrorType.RATE_LIMIT: RetryStrategy.EXPONENTIAL_BACKOFF, + ErrorType.RESOURCE: RetryStrategy.NO_RETRY, + ErrorType.PERMISSION: RetryStrategy.NO_RETRY, + ErrorType.NOT_FOUND: RetryStrategy.NO_RETRY, + ErrorType.VALIDATION: RetryStrategy.NO_RETRY, + ErrorType.UNKNOWN: RetryStrategy.IMMEDIATE, + } + + def _classify_error(self, error: Exception) -> ErrorType: + """Classify an error into an ErrorType.""" + error_str = str(error).lower() + error_type_str = type(error).__name__.lower() + + # Check error message patterns + for error_type, patterns in self.error_patterns.items(): + for pattern in patterns: + if pattern in error_str or pattern in error_type_str: + return error_type + + return ErrorType.UNKNOWN + + def _get_parameters_hash(self, tool_name: str, args: tuple, kwargs: dict) -> str: + """Generate a hash for tool parameters.""" + param_str = json.dumps( + {"tool": tool_name, "args": args, "kwargs": kwargs}, sort_keys=True, default=str + ) + return hashlib.md5(param_str.encode()).hexdigest() + + def _get_result_hash(self, result: Any) -> str: + """Generate a hash for tool result.""" + result_str = json.dumps(result, sort_keys=True, default=str) + return hashlib.md5(result_str.encode()).hexdigest() + + def _get_cached_result(self, tool_name: str, param_hash: str) -> Optional[Any]: + """Retrieve cached result if available and not expired.""" + if not self.enable_caching: + return None + + if tool_name not in self.result_cache: + return None + + if param_hash not in self.result_cache[tool_name]: + return None + + result, timestamp = self.result_cache[tool_name][param_hash] + current_time = time.time() + + # Check if cache entry is still valid + if current_time - timestamp > self.cache_ttl: + # Cache expired, remove it + del self.result_cache[tool_name][param_hash] + return None + + return result + + def _cache_result(self, tool_name: str, param_hash: str, result: Any): + """Cache a successful tool result.""" + if not self.enable_caching: + return + + current_time = time.time() + self.result_cache[tool_name][param_hash] = (result, current_time) + + def _calculate_backoff_delay( + self, retry_count: int, strategy: RetryStrategy, base_delay: float = 1.0 + ) -> float: + """Calculate delay before retry based on strategy.""" + if strategy == RetryStrategy.IMMEDIATE: + return 0.0 + elif strategy == RetryStrategy.EXPONENTIAL_BACKOFF: + return base_delay * (2 ** retry_count) + elif strategy == RetryStrategy.LINEAR_BACKOFF: + return base_delay * (retry_count + 1) + elif strategy == RetryStrategy.NO_RETRY: + return float("inf") + else: + return 0.0 + + def _update_analytics( + self, + tool_name: str, + execution_time: float, + success: bool, + error_type: Optional[ErrorType] = None, + from_cache: bool = False, + ): + """Update analytics for a tool execution.""" + if not self.enable_analytics: + return + + if tool_name not in self.tool_analytics: + self.tool_analytics[tool_name] = ToolAnalytics(tool_name=tool_name) + + analytics = self.tool_analytics[tool_name] + analytics.total_executions += 1 + analytics.total_execution_time += execution_time + analytics.last_execution = time.time() + + if from_cache: + analytics.cache_hits += 1 + else: + analytics.cache_misses += 1 + + if success: + analytics.successful_executions += 1 + else: + analytics.failed_executions += 1 + if error_type: + analytics.error_counts[error_type] += 1 + + # Update average execution time + if analytics.total_executions > 0: + analytics.average_execution_time = ( + analytics.total_execution_time / analytics.total_executions + ) + + def execute_with_retry( + self, + tool_func: Callable, + tool_name: str, + args: tuple = (), + kwargs: dict = None, + retry_on_error: bool = True, + ) -> Tuple[Any, ExecutionRecord]: + """Execute a tool with retry logic and analytics tracking. + + Args: + tool_func: The tool function to execute + tool_name: Name of the tool for tracking + args: Positional arguments for the tool + kwargs: Keyword arguments for the tool + retry_on_error: Whether to retry on errors + + Returns: + Tuple of (result, execution_record) + """ + if kwargs is None: + kwargs = {} + + param_hash = self._get_parameters_hash(tool_name, args, kwargs) + + # Check cache first + cached_result = self._get_cached_result(tool_name, param_hash) + if cached_result is not None: + record = ExecutionRecord( + tool_name=tool_name, + timestamp=time.time(), + execution_time=0.0, + success=True, + parameters_hash=param_hash, + result_hash=self._get_result_hash(cached_result), + ) + self._update_analytics(tool_name, 0.0, True, from_cache=True) + return cached_result, record + + # Execute with retry logic + last_error = None + last_error_type = None + retry_count = 0 + + while retry_count <= self.max_retries: + start_time = time.time() + try: + result = tool_func(*args, **kwargs) + execution_time = time.time() - start_time + + # Cache successful result + self._cache_result(tool_name, param_hash, result) + + # Record successful execution + record = ExecutionRecord( + tool_name=tool_name, + timestamp=start_time, + execution_time=execution_time, + success=True, + parameters_hash=param_hash, + result_hash=self._get_result_hash(result), + retry_count=retry_count, + ) + + self.execution_history.append(record) + self._update_analytics(tool_name, execution_time, True) + + return result, record + + except Exception as e: + execution_time = time.time() - start_time + error_type = self._classify_error(e) + last_error = e + last_error_type = error_type + + # Record failed execution + record = ExecutionRecord( + tool_name=tool_name, + timestamp=start_time, + execution_time=execution_time, + success=False, + error_type=error_type, + error_message=str(e), + parameters_hash=param_hash, + retry_count=retry_count, + ) + + self.execution_history.append(record) + self._update_analytics(tool_name, execution_time, False, error_type) + + # Determine retry strategy + if not retry_on_error or retry_count >= self.max_retries: + break + + strategy = self.retry_strategies.get(error_type, RetryStrategy.NO_RETRY) + if strategy == RetryStrategy.NO_RETRY: + break + + retry_count += 1 + delay = self._calculate_backoff_delay(retry_count, strategy) + if delay > 0: + time.sleep(min(delay, 60.0)) # Cap delay at 60 seconds + + # All retries exhausted or no retry strategy + raise last_error + + def get_tool_analytics(self, tool_name: Optional[str] = None) -> Dict[str, ToolAnalytics]: + """Get analytics for a specific tool or all tools. + + Args: + tool_name: Name of tool to get analytics for, or None for all tools + + Returns: + Dictionary mapping tool names to their analytics + """ + if tool_name: + return {tool_name: self.tool_analytics.get(tool_name)} if tool_name in self.tool_analytics else {} + return dict(self.tool_analytics) + + def get_analytics_summary(self) -> pd.DataFrame: + """Get a summary of all tool analytics as a DataFrame. + + Returns: + DataFrame with columns: tool_name, total_executions, success_rate, + failure_rate, avg_execution_time, cache_hit_rate, most_common_error + """ + rows = [] + for tool_name, analytics in self.tool_analytics.items(): + most_common_error = ( + max(analytics.error_counts.items(), key=lambda x: x[1])[0].value + if analytics.error_counts + else None + ) + + cache_total = analytics.cache_hits + analytics.cache_misses + cache_hit_rate = ( + analytics.cache_hits / cache_total if cache_total > 0 else 0.0 + ) + + rows.append( + { + "tool_name": tool_name, + "total_executions": analytics.total_executions, + "success_rate": analytics.success_rate, + "failure_rate": analytics.failure_rate, + "avg_execution_time": analytics.average_execution_time, + "cache_hit_rate": cache_hit_rate, + "most_common_error": most_common_error, + "last_execution": ( + datetime.fromtimestamp(analytics.last_execution).isoformat() + if analytics.last_execution + else None + ), + } + ) + + return pd.DataFrame(rows) + + def get_error_analysis(self) -> pd.DataFrame: + """Get error analysis across all tools. + + Returns: + DataFrame with error statistics by tool and error type + """ + rows = [] + for tool_name, analytics in self.tool_analytics.items(): + for error_type, count in analytics.error_counts.items(): + rows.append( + { + "tool_name": tool_name, + "error_type": error_type.value, + "error_count": count, + "error_rate": count / analytics.total_executions if analytics.total_executions > 0 else 0.0, + } + ) + + return pd.DataFrame(rows) + + def clear_cache(self, tool_name: Optional[str] = None): + """Clear cached results for a specific tool or all tools. + + Args: + tool_name: Name of tool to clear cache for, or None for all tools + """ + if tool_name: + if tool_name in self.result_cache: + self.result_cache[tool_name].clear() + else: + self.result_cache.clear() + + def reset_analytics(self): + """Reset all analytics data.""" + self.tool_analytics.clear() + self.execution_history.clear() + self.result_cache.clear() + diff --git a/biomni/tool/tool_registry.py b/biomni/tool/tool_registry.py index 246fc023e..2a61002e3 100644 --- a/biomni/tool/tool_registry.py +++ b/biomni/tool/tool_registry.py @@ -1,12 +1,16 @@ import pickle +from typing import Optional import pandas as pd +from biomni.tool.execution_analytics import ExecutionAnalytics + class ToolRegistry: - def __init__(self, tools): + def __init__(self, tools, execution_analytics: Optional[ExecutionAnalytics] = None): self.tools = [] self.next_id = 0 + self.execution_analytics = execution_analytics for j in tools.values(): for tool in j: From 5607841205943a8eb9b8904d08db00d549355c30 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 13 Feb 2026 18:21:48 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- biomni/agent/a1.py | 1 - biomni/tool/execution_analytics.py | 54 ++++++++++++------------------ biomni/tool/tool_registry.py | 3 +- 3 files changed, 22 insertions(+), 36 deletions(-) diff --git a/biomni/agent/a1.py b/biomni/agent/a1.py index 4faba71cb..301b5c292 100644 --- a/biomni/agent/a1.py +++ b/biomni/agent/a1.py @@ -230,7 +230,6 @@ def __init__( # Add timeout parameter self.timeout_seconds = timeout_seconds # 10 minutes default timeout - self.configure() def add_tool(self, api): diff --git a/biomni/tool/execution_analytics.py b/biomni/tool/execution_analytics.py index fead5bd20..4375f8d0c 100644 --- a/biomni/tool/execution_analytics.py +++ b/biomni/tool/execution_analytics.py @@ -15,10 +15,11 @@ import json import time from collections import defaultdict +from collections.abc import Callable from dataclasses import dataclass, field from datetime import datetime from enum import Enum -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any import pandas as pd @@ -53,10 +54,10 @@ class ExecutionRecord: timestamp: float execution_time: float success: bool - error_type: Optional[ErrorType] = None - error_message: Optional[str] = None + error_type: ErrorType | None = None + error_message: str | None = None parameters_hash: str = "" - result_hash: Optional[str] = None + result_hash: str | None = None retry_count: int = 0 @@ -70,8 +71,8 @@ class ToolAnalytics: failed_executions: int = 0 total_execution_time: float = 0.0 average_execution_time: float = 0.0 - error_counts: Dict[ErrorType, int] = field(default_factory=lambda: defaultdict(int)) - last_execution: Optional[float] = None + error_counts: dict[ErrorType, int] = field(default_factory=lambda: defaultdict(int)) + last_execution: float | None = None cache_hits: int = 0 cache_misses: int = 0 @@ -114,11 +115,11 @@ def __init__( self.enable_analytics = enable_analytics # Analytics storage - self.tool_analytics: Dict[str, ToolAnalytics] = {} - self.execution_history: List[ExecutionRecord] = [] + self.tool_analytics: dict[str, ToolAnalytics] = {} + self.execution_history: list[ExecutionRecord] = [] # Result cache: {tool_name: {param_hash: (result, timestamp)}} - self.result_cache: Dict[str, Dict[str, Tuple[Any, float]]] = defaultdict(dict) + self.result_cache: dict[str, dict[str, tuple[Any, float]]] = defaultdict(dict) # Error classification patterns self.error_patterns = { @@ -158,9 +159,7 @@ def _classify_error(self, error: Exception) -> ErrorType: def _get_parameters_hash(self, tool_name: str, args: tuple, kwargs: dict) -> str: """Generate a hash for tool parameters.""" - param_str = json.dumps( - {"tool": tool_name, "args": args, "kwargs": kwargs}, sort_keys=True, default=str - ) + param_str = json.dumps({"tool": tool_name, "args": args, "kwargs": kwargs}, sort_keys=True, default=str) return hashlib.md5(param_str.encode()).hexdigest() def _get_result_hash(self, result: Any) -> str: @@ -168,7 +167,7 @@ def _get_result_hash(self, result: Any) -> str: result_str = json.dumps(result, sort_keys=True, default=str) return hashlib.md5(result_str.encode()).hexdigest() - def _get_cached_result(self, tool_name: str, param_hash: str) -> Optional[Any]: + def _get_cached_result(self, tool_name: str, param_hash: str) -> Any | None: """Retrieve cached result if available and not expired.""" if not self.enable_caching: return None @@ -198,14 +197,12 @@ def _cache_result(self, tool_name: str, param_hash: str, result: Any): current_time = time.time() self.result_cache[tool_name][param_hash] = (result, current_time) - def _calculate_backoff_delay( - self, retry_count: int, strategy: RetryStrategy, base_delay: float = 1.0 - ) -> float: + def _calculate_backoff_delay(self, retry_count: int, strategy: RetryStrategy, base_delay: float = 1.0) -> float: """Calculate delay before retry based on strategy.""" if strategy == RetryStrategy.IMMEDIATE: return 0.0 elif strategy == RetryStrategy.EXPONENTIAL_BACKOFF: - return base_delay * (2 ** retry_count) + return base_delay * (2**retry_count) elif strategy == RetryStrategy.LINEAR_BACKOFF: return base_delay * (retry_count + 1) elif strategy == RetryStrategy.NO_RETRY: @@ -218,7 +215,7 @@ def _update_analytics( tool_name: str, execution_time: float, success: bool, - error_type: Optional[ErrorType] = None, + error_type: ErrorType | None = None, from_cache: bool = False, ): """Update analytics for a tool execution.""" @@ -247,9 +244,7 @@ def _update_analytics( # Update average execution time if analytics.total_executions > 0: - analytics.average_execution_time = ( - analytics.total_execution_time / analytics.total_executions - ) + analytics.average_execution_time = analytics.total_execution_time / analytics.total_executions def execute_with_retry( self, @@ -258,7 +253,7 @@ def execute_with_retry( args: tuple = (), kwargs: dict = None, retry_on_error: bool = True, - ) -> Tuple[Any, ExecutionRecord]: + ) -> tuple[Any, ExecutionRecord]: """Execute a tool with retry logic and analytics tracking. Args: @@ -292,7 +287,6 @@ def execute_with_retry( # Execute with retry logic last_error = None - last_error_type = None retry_count = 0 while retry_count <= self.max_retries: @@ -324,7 +318,6 @@ def execute_with_retry( execution_time = time.time() - start_time error_type = self._classify_error(e) last_error = e - last_error_type = error_type # Record failed execution record = ExecutionRecord( @@ -357,7 +350,7 @@ def execute_with_retry( # All retries exhausted or no retry strategy raise last_error - def get_tool_analytics(self, tool_name: Optional[str] = None) -> Dict[str, ToolAnalytics]: + def get_tool_analytics(self, tool_name: str | None = None) -> dict[str, ToolAnalytics]: """Get analytics for a specific tool or all tools. Args: @@ -380,15 +373,11 @@ def get_analytics_summary(self) -> pd.DataFrame: rows = [] for tool_name, analytics in self.tool_analytics.items(): most_common_error = ( - max(analytics.error_counts.items(), key=lambda x: x[1])[0].value - if analytics.error_counts - else None + max(analytics.error_counts.items(), key=lambda x: x[1])[0].value if analytics.error_counts else None ) cache_total = analytics.cache_hits + analytics.cache_misses - cache_hit_rate = ( - analytics.cache_hits / cache_total if cache_total > 0 else 0.0 - ) + cache_hit_rate = analytics.cache_hits / cache_total if cache_total > 0 else 0.0 rows.append( { @@ -429,7 +418,7 @@ def get_error_analysis(self) -> pd.DataFrame: return pd.DataFrame(rows) - def clear_cache(self, tool_name: Optional[str] = None): + def clear_cache(self, tool_name: str | None = None): """Clear cached results for a specific tool or all tools. Args: @@ -446,4 +435,3 @@ def reset_analytics(self): self.tool_analytics.clear() self.execution_history.clear() self.result_cache.clear() - diff --git a/biomni/tool/tool_registry.py b/biomni/tool/tool_registry.py index 2a61002e3..3a85705cd 100644 --- a/biomni/tool/tool_registry.py +++ b/biomni/tool/tool_registry.py @@ -1,5 +1,4 @@ import pickle -from typing import Optional import pandas as pd @@ -7,7 +6,7 @@ class ToolRegistry: - def __init__(self, tools, execution_analytics: Optional[ExecutionAnalytics] = None): + def __init__(self, tools, execution_analytics: ExecutionAnalytics | None = None): self.tools = [] self.next_id = 0 self.execution_analytics = execution_analytics