diff --git a/src/praisonai/praisonai/cli/main.py b/src/praisonai/praisonai/cli/main.py index 1d235db65..5d9d62e76 100644 --- a/src/praisonai/praisonai/cli/main.py +++ b/src/praisonai/praisonai/cli/main.py @@ -2140,25 +2140,21 @@ def _load_tools(self, tools_path: str) -> list: except Exception as e: print(f"[yellow]Warning: Failed to load tools from {tools_path}: {e}[/yellow]") else: - # Treat as comma-separated tool names - try: - from praisonaiagents.tools import TOOL_MAPPINGS - import praisonaiagents.tools as tools_module - - tool_names = [t.strip() for t in tools_path.split(',')] - for tool_name in tool_names: - if tool_name in TOOL_MAPPINGS: - try: - tool = getattr(tools_module, tool_name) - tools_list.append(tool) - except Exception as e: - print(f"[yellow]Warning: Failed to load tool '{tool_name}': {e}[/yellow]") + # Comma-separated names: use the unified resolver so CLI == YAML == Python + from ..tool_resolver import ToolResolver + resolver = ToolResolver() + tool_names = [t.strip() for t in tools_path.split(',') if t.strip()] + for tool_name in tool_names: + try: + tool = resolver.resolve(tool_name, instantiate=True) + if tool is not None: + tools_list.append(tool) else: print(f"[yellow]Warning: Unknown tool '{tool_name}'[/yellow]") - if tools_list: - print(f"[cyan]Loaded {len(tools_list)} built-in tools[/cyan]") - except ImportError: - print("[yellow]Warning: Could not import tools module[/yellow]") + except Exception as e: + print(f"[yellow]Warning: Failed to load tool '{tool_name}': {e}[/yellow]") + if tools_list: + print(f"[cyan]Loaded {len(tools_list)} tools[/cyan]") return tools_list diff --git a/src/praisonai/praisonai/framework_adapters/base.py b/src/praisonai/praisonai/framework_adapters/base.py index 97fa7f4f9..c7f3a8773 100644 --- a/src/praisonai/praisonai/framework_adapters/base.py +++ b/src/praisonai/praisonai/framework_adapters/base.py @@ -92,33 +92,6 @@ async def arun( """ ... - async def arun( - self, - config: Dict[str, Any], - llm_config: List[Dict], - topic: str, - *, - tools_dict: Optional[Dict[str, Any]] = None, - agent_callback: Optional[Callable] = None, - task_callback: Optional[Callable] = None, - cli_config: Optional[Dict[str, Any]] = None, - ) -> str: - """ - Run the framework asynchronously with given configuration. - - Args: - config: Framework configuration - llm_config: LLM configuration list - topic: Topic for the tasks - tools_dict: Available tools dictionary - agent_callback: Callback for agent events - task_callback: Callback for task events - cli_config: CLI configuration - - Returns: - Execution result as string - """ - ... def cleanup(self) -> None: """Clean up any resources after execution.""" @@ -157,29 +130,6 @@ def _sub(m): # Only substitute simple variable names like {topic}, not JSON like {"level":2} return re.sub(r'\{([a-zA-Z_][a-zA-Z0-9_]*)\}', _sub, template) - async def arun( - self, - config: Dict[str, Any], - llm_config: List[Dict], - topic: str, - *, - tools_dict: Optional[Dict[str, Any]] = None, - agent_callback: Optional[Callable] = None, - task_callback: Optional[Callable] = None, - cli_config: Optional[Dict[str, Any]] = None, - ) -> str: - """ - Default async implementation that falls back to thread-offloaded sync. - - Framework adapters with native async support should override this method. - """ - import asyncio - return await asyncio.to_thread( - self.run, config, llm_config, topic, - tools_dict=tools_dict, agent_callback=agent_callback, - task_callback=task_callback, cli_config=cli_config - ) - def resolve(self) -> "FrameworkAdapter": """Default implementation returns self.""" return self @@ -200,8 +150,22 @@ async def arun( cli_config: Optional[Dict[str, Any]] = None, ) -> str: """ - Safe default for sync-only adapters (crewai, autogen v0.2): - run the sync implementation in a worker thread, freeing the loop. + Async execution. Default implementation offloads sync run() to a worker thread. + + Sync-only adapters (crewai, autogen v0.2) can use this default. + Native-async adapters should override this method. + + Args: + config: Framework configuration + llm_config: LLM configuration list + topic: Topic for the tasks + tools_dict: Available tools dictionary + agent_callback: Callback for agent events + task_callback: Callback for task events + cli_config: CLI configuration + + Returns: + Execution result as string """ import asyncio return await asyncio.to_thread( diff --git a/src/praisonai/praisonai/scheduler/_base_scheduler.py b/src/praisonai/praisonai/scheduler/_base_scheduler.py new file mode 100644 index 000000000..33606375f --- /dev/null +++ b/src/praisonai/praisonai/scheduler/_base_scheduler.py @@ -0,0 +1,81 @@ +""" +Shared, lock-agnostic scheduler logic for both sync and async variants. +""" + +import os +import json +import logging +from datetime import datetime +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + + +class _BaseAgentScheduler: + """Shared, lock-agnostic scheduler logic — used by both sync and async variants.""" + + is_running: bool + max_cost: Optional[float] + _execution_count: int + _success_count: int + _failure_count: int + _total_cost: float + _start_time: Optional[datetime] + + def _build_stats( + self, + *, + execs: int, + success: int, + failed: int, + total_cost: float, + ) -> Dict[str, Any]: + """Build stats dictionary for both sync and async schedulers.""" + runtime = ( + (datetime.now() - self._start_time).total_seconds() + if self._start_time else 0 + ) + return { + "is_running": self.is_running, + "total_executions": execs, + "successful_executions": success, + "failed_executions": failed, + "success_rate": (success / execs * 100) if execs > 0 else 0, + "total_cost_usd": round(total_cost, 4), + "remaining_budget": ( + round(self.max_cost - total_cost, 4) if self.max_cost is not None else None + ), + "runtime_seconds": runtime, + "cost_per_execution": ( + round(total_cost / execs, 4) if execs > 0 else 0 + ), + } + + def _update_state_if_daemon(self) -> None: + """Update ~/.praisonai/schedulers/*.json for the current PID, if present. + + Safe for both sync and async callers — it's plain blocking file I/O that + runs once per execution and is wrapped in try/except. + """ + try: + state_dir = os.path.expanduser("~/.praisonai/schedulers") + if not os.path.exists(state_dir): + return + current_pid = os.getpid() + for fname in os.listdir(state_dir): + if not fname.endswith(".json"): + continue + path = os.path.join(state_dir, fname) + try: + with open(path, "r") as f: + state = json.load(f) + if state.get("pid") == current_pid: + state["executions"] = self._execution_count + state["cost"] = round(self._total_cost, 4) + with open(path, "w") as f: + json.dump(state, f, indent=2) + break + except Exception: + continue + except Exception as e: + logger.debug("Failed to update state: %s", e) \ No newline at end of file diff --git a/src/praisonai/praisonai/scheduler/agent_scheduler.py b/src/praisonai/praisonai/scheduler/agent_scheduler.py index ade4882bc..81318c8c0 100644 --- a/src/praisonai/praisonai/scheduler/agent_scheduler.py +++ b/src/praisonai/praisonai/scheduler/agent_scheduler.py @@ -13,11 +13,12 @@ from .base import ScheduleParser, PraisonAgentExecutor from .shared import backoff_delay +from ._base_scheduler import _BaseAgentScheduler logger = logging.getLogger(__name__) -class AgentScheduler: +class AgentScheduler(_BaseAgentScheduler): """ Scheduler for running PraisonAI agents periodically. @@ -163,54 +164,12 @@ def get_stats(self) -> Dict[str, Any]: Dictionary with execution stats including cost """ with self._stats_lock: - runtime = (datetime.now() - self._start_time).total_seconds() if self._start_time else 0 - return { - "is_running": self.is_running, - "total_executions": self._execution_count, - "successful_executions": self._success_count, - "failed_executions": self._failure_count, - "success_rate": (self._success_count / self._execution_count * 100) if self._execution_count > 0 else 0, - "total_cost_usd": round(self._total_cost, 4), - "runtime_seconds": round(runtime, 1), - "cost_per_execution": round(self._total_cost / self._execution_count, 4) if self._execution_count > 0 else 0 - } - - def _update_state_if_daemon(self): - """Update state file with execution stats if running as daemon.""" - try: - import os - # Check if we're running as a daemon by looking for state file - state_dir = os.path.expanduser("~/.praisonai/schedulers") - if not os.path.exists(state_dir): - return - - # Try to find our state file by checking all state files for matching PID - current_pid = os.getpid() - for state_file in os.listdir(state_dir): - if not state_file.endswith('.json'): - continue - - state_path = os.path.join(state_dir, state_file) - try: - import json - with open(state_path, 'r') as f: - state = json.load(f) - - # Check if this is our state file - if state.get('pid') == current_pid: - # Update execution stats - state['executions'] = self._execution_count - state['cost'] = round(self._total_cost, 4) - - # Write back - with open(state_path, 'w') as f: - json.dump(state, f, indent=2) - break - except Exception: - continue - except Exception as e: - # Silently fail - don't break scheduler if state update fails - logger.debug(f"Failed to update state: {e}") + return self._build_stats( + execs=self._execution_count, + success=self._success_count, + failed=self._failure_count, + total_cost=self._total_cost, + ) def _run_schedule(self, interval: int, max_retries: int): """Internal method to run scheduled agent executions.""" diff --git a/src/praisonai/praisonai/scheduler/async_agent_scheduler.py b/src/praisonai/praisonai/scheduler/async_agent_scheduler.py index b67ef0d75..b64072b09 100644 --- a/src/praisonai/praisonai/scheduler/async_agent_scheduler.py +++ b/src/praisonai/praisonai/scheduler/async_agent_scheduler.py @@ -12,6 +12,7 @@ from abc import ABC, abstractmethod from .shared import ScheduleParser, backoff_delay, safe_call +from ._base_scheduler import _BaseAgentScheduler logger = logging.getLogger(__name__) @@ -62,7 +63,7 @@ async def execute(self, task: str) -> Any: raise -class AsyncAgentScheduler: +class AsyncAgentScheduler(_BaseAgentScheduler): """ Async-native scheduler for running PraisonAI agents periodically. @@ -124,6 +125,7 @@ def __init__( self._execution_count = 0 self._success_count = 0 self._failure_count = 0 + self._start_time: Optional[datetime] = None # Sync lock for async primitives creation and bound loop tracking self._primitives_lock = threading.Lock() @@ -169,6 +171,7 @@ async def start( try: interval = ScheduleParser.parse(schedule_expr) self.is_running = True + self._start_time = datetime.now() self._ensure_async_primitives() # bind to the loop start() runs on self._stop_event.clear() @@ -296,15 +299,9 @@ async def get_stats_async(self) -> Dict[str, Any]: failed = self._failure_count total_cost = self._total_cost - return { - "is_running": self.is_running, - "total_executions": execs, - "successful_executions": success, - "failed_executions": failed, - "success_rate": (success / execs * 100) if execs > 0 else 0, - "total_cost_usd": round(total_cost, 4), - "remaining_budget": round(self.max_cost - total_cost, 4) if self.max_cost is not None else None, - } + return self._build_stats( + execs=execs, success=success, failed=failed, total_cost=total_cost + ) def get_stats_sync(self) -> Dict[str, Any]: """ @@ -390,8 +387,7 @@ async def _execute_with_retry(self, max_retries: int): logger.info(f"Estimated cost this run: ${estimated_cost:.4f}, Total: ${self._total_cost:.4f}") safe_call(self.on_success, result) - # TODO: Add daemon state update from sync version: - # self._update_state_if_daemon() + await asyncio.to_thread(self._update_state_if_daemon) return except asyncio.TimeoutError as e: @@ -418,8 +414,7 @@ async def _execute_with_retry(self, max_retries: int): last_exc if last_exc is not None else RuntimeError(f"Failed after {max_retries} attempts") ) - # TODO: Add daemon state update from sync version: - # self._update_state_if_daemon() + await asyncio.to_thread(self._update_state_if_daemon) async def execute_once(self) -> Any: """ diff --git a/src/praisonai/praisonai/scheduler/base.py b/src/praisonai/praisonai/scheduler/base.py index 4e3382923..22acb8f8c 100644 --- a/src/praisonai/praisonai/scheduler/base.py +++ b/src/praisonai/praisonai/scheduler/base.py @@ -10,67 +10,11 @@ import logging from abc import ABC, abstractmethod from typing import Any +from .shared import ScheduleParser # Single source of truth logger = logging.getLogger(__name__) -class ScheduleParser: - """Parse schedule expressions into intervals in seconds.""" - - @staticmethod - def parse(schedule_expr: str) -> int: - """ - Parse schedule expression and return interval in seconds. - - Supported formats: - - "daily" -> 86400 seconds - - "hourly" -> 3600 seconds - - "*/30m" -> 1800 seconds (every 30 minutes) - - "*/6h" -> 21600 seconds (every 6 hours) - - "*/30s" -> 30 seconds (every 30 seconds) - - "3600" -> 3600 seconds (plain number) - - Args: - schedule_expr: Schedule expression string - - Returns: - Interval in seconds - - Raises: - ValueError: If schedule format is not supported - - Examples: - >>> ScheduleParser.parse("hourly") - 3600 - >>> ScheduleParser.parse("*/30m") - 1800 - >>> ScheduleParser.parse("daily") - 86400 - """ - schedule_expr = schedule_expr.strip().lower() - - if schedule_expr == "daily": - return 86400 - elif schedule_expr == "hourly": - return 3600 - elif schedule_expr.isdigit(): - return int(schedule_expr) - elif schedule_expr.startswith("*/"): - interval_part = schedule_expr[2:] - if interval_part.endswith("m"): - minutes = int(interval_part[:-1]) - return minutes * 60 - elif interval_part.endswith("h"): - hours = int(interval_part[:-1]) - return hours * 3600 - elif interval_part.endswith("s"): - return int(interval_part[:-1]) - else: - return int(interval_part) - else: - raise ValueError(f"Unsupported schedule format: {schedule_expr}") - - class ExecutorInterface(ABC): """Abstract interface for executors.""" diff --git a/src/praisonai/tests/unit/cli/test_tool_resolver_integration.py b/src/praisonai/tests/unit/cli/test_tool_resolver_integration.py new file mode 100644 index 000000000..74f2e9238 --- /dev/null +++ b/src/praisonai/tests/unit/cli/test_tool_resolver_integration.py @@ -0,0 +1,261 @@ +""" +Smoke tests for CLI tool resolution via ToolResolver. + +Tests for PR #1857 ensuring CLI tool loading uses unified ToolResolver +instead of direct TOOL_MAPPINGS access. +""" + +import pytest +from unittest.mock import Mock, patch, MagicMock + +from praisonai.cli.main import PraisonCLI + + +class TestCLIToolResolverSmoke: + """Smoke tests for CLI tool resolution integration.""" + + def test_load_tools_imports_tool_resolver(self): + """Test that _load_tools imports ToolResolver from correct module.""" + cli = PraisonCLI() + + # Mock the ToolResolver import + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + mock_instance = Mock() + MockResolver.return_value = mock_instance + mock_instance.resolve.return_value = None # No tools found + + # Call _load_tools - should import and use ToolResolver + result = cli._load_tools("nonexistent_tool") + + # Verify ToolResolver was imported and instantiated + MockResolver.assert_called_once() + assert result == [] + + def test_load_tools_resolver_instantiation_pattern(self): + """Test that ToolResolver is instantiated correctly.""" + cli = PraisonCLI() + + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + mock_instance = Mock() + MockResolver.return_value = mock_instance + mock_instance.resolve.return_value = Mock() # Mock tool + + cli._load_tools("test_tool") + + # Should create instance with no arguments (default constructor) + MockResolver.assert_called_once_with() + + def test_load_tools_calls_resolve_with_instantiate_true(self): + """Test that resolve is called with instantiate=True.""" + cli = PraisonCLI() + + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + mock_instance = Mock() + MockResolver.return_value = mock_instance + mock_tool = Mock() + mock_instance.resolve.return_value = mock_tool + + result = cli._load_tools("test_tool") + + # Verify resolve was called with instantiate=True + mock_instance.resolve.assert_called_once_with("test_tool", instantiate=True) + assert mock_tool in result + + def test_load_tools_multiple_comma_separated(self): + """Test loading multiple comma-separated tools.""" + cli = PraisonCLI() + + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + mock_instance = Mock() + MockResolver.return_value = mock_instance + + # Different return values for different tools + mock_tool1 = Mock() + mock_tool2 = Mock() + mock_instance.resolve.side_effect = [mock_tool1, mock_tool2] + + result = cli._load_tools("tool1,tool2") + + # Should call resolve twice + assert mock_instance.resolve.call_count == 2 + mock_instance.resolve.assert_any_call("tool1", instantiate=True) + mock_instance.resolve.assert_any_call("tool2", instantiate=True) + assert mock_tool1 in result + assert mock_tool2 in result + + def test_load_tools_whitespace_handling(self): + """Test that whitespace around tool names is handled correctly.""" + cli = PraisonCLI() + + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + mock_instance = Mock() + MockResolver.return_value = mock_instance + mock_tool = Mock() + mock_instance.resolve.return_value = mock_tool + + # Test with various whitespace combinations + result = cli._load_tools(" tool1 , tool2 , tool3 ") + + # Should strip whitespace from tool names + assert mock_instance.resolve.call_count == 3 + mock_instance.resolve.assert_any_call("tool1", instantiate=True) + mock_instance.resolve.assert_any_call("tool2", instantiate=True) + mock_instance.resolve.assert_any_call("tool3", instantiate=True) + + def test_load_tools_empty_string_filtering(self): + """Test that empty strings are filtered out.""" + cli = PraisonCLI() + + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + mock_instance = Mock() + MockResolver.return_value = mock_instance + mock_tool = Mock() + mock_instance.resolve.return_value = mock_tool + + # Include empty strings and whitespace-only strings + result = cli._load_tools("tool1,, , tool2,") + + # Should only resolve actual tool names + assert mock_instance.resolve.call_count == 2 + mock_instance.resolve.assert_any_call("tool1", instantiate=True) + mock_instance.resolve.assert_any_call("tool2", instantiate=True) + + def test_load_tools_none_return_handling(self): + """Test handling when resolver returns None (tool not found).""" + cli = PraisonCLI() + + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + mock_instance = Mock() + MockResolver.return_value = mock_instance + mock_instance.resolve.return_value = None # Tool not found + + result = cli._load_tools("unknown_tool") + + # Should handle None return gracefully + mock_instance.resolve.assert_called_once_with("unknown_tool", instantiate=True) + assert result == [] + + def test_load_tools_exception_handling(self): + """Test that exceptions during tool resolution are handled gracefully.""" + cli = PraisonCLI() + + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + mock_instance = Mock() + MockResolver.return_value = mock_instance + mock_instance.resolve.side_effect = Exception("Tool resolution error") + + # Should not raise, should handle exception gracefully + result = cli._load_tools("problematic_tool") + + # Should attempt resolution but return empty list on error + mock_instance.resolve.assert_called_once_with("problematic_tool", instantiate=True) + assert result == [] + + def test_load_tools_file_path_bypass(self): + """Test that file paths bypass the ToolResolver and use file loading.""" + cli = PraisonCLI() + + # Mock os.path.isfile to return True for file path test + with patch('os.path.isfile', return_value=True): + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + # Also need to mock the file loading parts + with patch('praisonai.cli.main.load_user_module', return_value=None): + result = cli._load_tools("/path/to/tools.py") + + # ToolResolver should NOT be called for file paths + MockResolver.assert_not_called() + + def test_load_tools_empty_input(self): + """Test handling of empty or None input.""" + cli = PraisonCLI() + + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + # Test empty string + result = cli._load_tools("") + assert result == [] + MockResolver.assert_not_called() + + # Test None (if passed somehow) + result = cli._load_tools(None) + assert result == [] + MockResolver.assert_not_called() + + def test_load_tools_preserves_tool_instances(self): + """Test that tool instances returned by resolver are preserved.""" + cli = PraisonCLI() + + # Create mock tools with specific attributes + mock_tool1 = Mock() + mock_tool1.name = "tool1" + mock_tool2 = Mock() + mock_tool2.name = "tool2" + + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + mock_instance = Mock() + MockResolver.return_value = mock_instance + mock_instance.resolve.side_effect = [mock_tool1, mock_tool2] + + result = cli._load_tools("tool1,tool2") + + # Should preserve exact tool instances + assert len(result) == 2 + assert mock_tool1 in result + assert mock_tool2 in result + assert result[0].name == "tool1" + assert result[1].name == "tool2" + + +class TestCLIToolResolverConsistency: + """Test that CLI tool loading is consistent with other interfaces.""" + + def test_resolver_import_path_consistency(self): + """Test that CLI imports ToolResolver from the correct module path.""" + # This test ensures the import path is correct and consistent + try: + from praisonai.tool_resolver import ToolResolver + from praisonai.cli.main import PraisonCLI + + # If both imports work, the path is consistent + assert ToolResolver is not None + assert PraisonCLI is not None + + except ImportError as e: + pytest.fail(f"Import path inconsistency detected: {e}") + + def test_resolver_instantiation_consistency(self): + """Test that CLI instantiates ToolResolver the same way as other components.""" + from praisonai.cli.main import PraisonCLI + + cli = PraisonCLI() + + # Mock to capture how ToolResolver is instantiated + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + mock_instance = Mock() + MockResolver.return_value = mock_instance + mock_instance.resolve.return_value = Mock() + + cli._load_tools("test_tool") + + # Should use default constructor (no arguments) + MockResolver.assert_called_once_with() + + def test_resolve_method_signature_consistency(self): + """Test that resolve method is called with expected signature.""" + from praisonai.cli.main import PraisonCLI + + cli = PraisonCLI() + + with patch('praisonai.cli.main.ToolResolver') as MockResolver: + mock_instance = Mock() + MockResolver.return_value = mock_instance + mock_instance.resolve.return_value = Mock() + + cli._load_tools("test_tool") + + # Verify the method signature matches expectations + mock_instance.resolve.assert_called_once_with("test_tool", instantiate=True) + + # Ensure no unexpected keyword arguments are passed + call_args = mock_instance.resolve.call_args + assert call_args[0] == ("test_tool",) # positional args + assert call_args[1] == {"instantiate": True} # keyword args \ No newline at end of file diff --git a/src/praisonai/tests/unit/scheduler/test_scheduler_fixes.py b/src/praisonai/tests/unit/scheduler/test_scheduler_fixes.py new file mode 100644 index 000000000..2437ace0f --- /dev/null +++ b/src/praisonai/tests/unit/scheduler/test_scheduler_fixes.py @@ -0,0 +1,446 @@ +""" +Unit tests for scheduler fixes identified in PR #1857. + +Tests for: +1. AsyncAgentScheduler._start_time initialization fix +2. Async I/O handling with asyncio.to_thread for _update_state_if_daemon +3. Zero budget (max_cost=0.0) handling in _BaseAgentScheduler._build_stats +4. Integration tests for the fixes + +These tests ensure the critical bugs identified by reviewers are properly fixed. +""" + +import asyncio +import pytest +import tempfile +import os +import json +from unittest.mock import AsyncMock, Mock, patch +from datetime import datetime + +# Import the scheduler classes to test +from praisonai.scheduler.async_agent_scheduler import AsyncAgentScheduler +from praisonai.scheduler._base_scheduler import _BaseAgentScheduler + + +class TestAsyncAgentSchedulerStartTimeInit: + """Test _start_time initialization fix for AsyncAgentScheduler.""" + + def test_start_time_initialized_in_init(self): + """Test that _start_time is properly initialized to None in __init__.""" + mock_agent = Mock() + scheduler = AsyncAgentScheduler(mock_agent, "test task") + + # _start_time should be initialized to None + assert hasattr(scheduler, '_start_time') + assert scheduler._start_time is None + + @pytest.mark.asyncio + async def test_start_time_set_on_start(self): + """Test that _start_time is set when scheduler starts.""" + mock_agent = Mock() + mock_agent.astart = AsyncMock(return_value="result") + scheduler = AsyncAgentScheduler(mock_agent, "test task") + + # Start the scheduler + await scheduler.start("*/1s", run_immediately=False) + + # _start_time should now be set + assert scheduler._start_time is not None + assert isinstance(scheduler._start_time, datetime) + + await scheduler.stop() + + @pytest.mark.asyncio + async def test_build_stats_with_start_time(self): + """Test that _build_stats works correctly when _start_time is set.""" + mock_agent = Mock() + mock_agent.astart = AsyncMock(return_value="result") + scheduler = AsyncAgentScheduler(mock_agent, "test task") + + # Start scheduler to set _start_time + await scheduler.start("*/1s", run_immediately=False) + + # Get stats - should include runtime_seconds + stats = await scheduler.get_stats_async() + assert 'runtime_seconds' in stats + assert stats['runtime_seconds'] >= 0 + + await scheduler.stop() + + @pytest.mark.asyncio + async def test_build_stats_without_start_time(self): + """Test that _build_stats works when _start_time is None.""" + mock_agent = Mock() + scheduler = AsyncAgentScheduler(mock_agent, "test task") + + # Don't start scheduler - _start_time remains None + stats = await scheduler.get_stats_async() + assert 'runtime_seconds' in stats + assert stats['runtime_seconds'] == 0 + + +class TestAsyncIODaemonStateUpdates: + """Test asyncio.to_thread handling for _update_state_if_daemon.""" + + @pytest.mark.asyncio + async def test_update_state_daemon_uses_asyncio_to_thread(self): + """Test that _update_state_if_daemon is called via asyncio.to_thread.""" + mock_agent = Mock() + mock_agent.astart = AsyncMock(return_value="result") + scheduler = AsyncAgentScheduler(mock_agent, "test task") + + # Mock the _update_state_if_daemon method + with patch.object(scheduler, '_update_state_if_daemon') as mock_update: + with patch('asyncio.to_thread') as mock_to_thread: + mock_to_thread.return_value = asyncio.create_future() + mock_to_thread.return_value.set_result(None) + + # Execute with retry - should call _update_state_if_daemon via asyncio.to_thread + await scheduler._execute_with_retry(max_retries=1) + + # Verify asyncio.to_thread was called with _update_state_if_daemon + mock_to_thread.assert_called_with(scheduler._update_state_if_daemon) + + @pytest.mark.asyncio + async def test_daemon_state_update_on_success_path(self): + """Test that daemon state is updated on successful execution.""" + mock_agent = Mock() + mock_agent.astart = AsyncMock(return_value="result") + scheduler = AsyncAgentScheduler(mock_agent, "test task") + + with patch('asyncio.to_thread') as mock_to_thread: + mock_to_thread.return_value = asyncio.create_future() + mock_to_thread.return_value.set_result(None) + + await scheduler._execute_with_retry(max_retries=1) + + # Should be called once on success path + assert mock_to_thread.call_count >= 1 + mock_to_thread.assert_called_with(scheduler._update_state_if_daemon) + + @pytest.mark.asyncio + async def test_daemon_state_update_on_failure_path(self): + """Test that daemon state is updated on failure execution.""" + mock_agent = Mock() + mock_agent.astart = AsyncMock(side_effect=Exception("test error")) + scheduler = AsyncAgentScheduler(mock_agent, "test task") + + with patch('asyncio.to_thread') as mock_to_thread: + mock_to_thread.return_value = asyncio.create_future() + mock_to_thread.return_value.set_result(None) + + await scheduler._execute_with_retry(max_retries=1) + + # Should be called once on failure path + assert mock_to_thread.call_count >= 1 + mock_to_thread.assert_called_with(scheduler._update_state_if_daemon) + + @pytest.mark.asyncio + async def test_no_blocking_io_on_event_loop(self): + """Test that no blocking I/O operations are performed directly on event loop.""" + mock_agent = Mock() + mock_agent.astart = AsyncMock(return_value="result") + scheduler = AsyncAgentScheduler(mock_agent, "test task") + + # Create a temporary state directory and file to ensure the daemon update logic runs + with tempfile.TemporaryDirectory() as temp_dir: + state_file = os.path.join(temp_dir, "test_scheduler.json") + state_data = { + "pid": os.getpid(), + "executions": 0, + "cost": 0.0 + } + with open(state_file, "w") as f: + json.dump(state_data, f) + + # Patch the state directory to use our temp directory + with patch('os.path.expanduser', return_value=temp_dir): + with patch('os.listdir', return_value=["test_scheduler.json"]): + # Execute - this should not block the event loop + await scheduler._execute_with_retry(max_retries=1) + + # Verify the scheduler completed without blocking + + +class TestZeroBudgetHandling: + """Test max_cost=0.0 handling in _BaseAgentScheduler._build_stats.""" + + def test_zero_budget_remaining_calculation(self): + """Test that max_cost=0.0 is handled correctly (not treated as None).""" + # Create a concrete scheduler class for testing + class TestScheduler(_BaseAgentScheduler): + def __init__(self): + self.is_running = False + self.max_cost = 0.0 # Zero budget + self._execution_count = 5 + self._success_count = 3 + self._failure_count = 2 + self._total_cost = 0.0 + self._start_time = datetime.now() + + scheduler = TestScheduler() + + # Build stats with zero budget + stats = scheduler._build_stats( + execs=5, + success=3, + failed=2, + total_cost=0.0 + ) + + # remaining_budget should be 0.0, not None + assert 'remaining_budget' in stats + assert stats['remaining_budget'] == 0.0 + assert stats['remaining_budget'] is not None + + def test_zero_budget_with_costs(self): + """Test zero budget with some costs incurred.""" + class TestScheduler(_BaseAgentScheduler): + def __init__(self): + self.is_running = False + self.max_cost = 0.0 # Zero budget + self._execution_count = 5 + self._success_count = 3 + self._failure_count = 2 + self._total_cost = 0.1 # Some cost incurred + self._start_time = datetime.now() + + scheduler = TestScheduler() + + stats = scheduler._build_stats( + execs=5, + success=3, + failed=2, + total_cost=0.1 + ) + + # remaining_budget should be negative (-0.1) + assert 'remaining_budget' in stats + assert stats['remaining_budget'] == -0.1 + + def test_none_budget_handling(self): + """Test that None budget (unlimited) returns None for remaining_budget.""" + class TestScheduler(_BaseAgentScheduler): + def __init__(self): + self.is_running = False + self.max_cost = None # Unlimited budget + self._execution_count = 5 + self._success_count = 3 + self._failure_count = 2 + self._total_cost = 0.1 + self._start_time = datetime.now() + + scheduler = TestScheduler() + + stats = scheduler._build_stats( + execs=5, + success=3, + failed=2, + total_cost=0.1 + ) + + # remaining_budget should be None for unlimited budget + assert 'remaining_budget' in stats + assert stats['remaining_budget'] is None + + @pytest.mark.asyncio + async def test_zero_budget_stops_execution(self): + """Test that zero budget prevents execution in AsyncAgentScheduler.""" + mock_agent = Mock() + mock_agent.astart = AsyncMock(return_value="result") + scheduler = AsyncAgentScheduler(mock_agent, "test task", max_cost=0.0) + + # Set some initial cost to trigger budget limit + scheduler._total_cost = 0.0 + + # Start scheduler to initialize async primitives + scheduler.is_running = True + scheduler._ensure_async_primitives() + + # Execute with retry - should stop due to zero budget + await scheduler._execute_with_retry(max_retries=1) + + # Should have set stop event + assert scheduler._stop_event.is_set() + assert not scheduler.is_running + + +class TestCLIToolResolverIntegration: + """Test CLI tool resolution uses ToolResolver instead of direct TOOL_MAPPINGS access.""" + + def test_cli_load_tools_uses_resolver(self): + """Test that _load_tools uses ToolResolver.resolve instead of direct TOOL_MAPPINGS.""" + from praisonai.praisonai.cli.main import PraisonCLI + + cli = PraisonCLI() + + # Mock ToolResolver + with patch('praisonai.praisonai.cli.main.ToolResolver') as MockResolver: + mock_resolver_instance = Mock() + MockResolver.return_value = mock_resolver_instance + mock_resolver_instance.resolve.return_value = Mock() # Mock tool + + # Test loading comma-separated tools + tools = cli._load_tools("tool1,tool2") + + # Verify ToolResolver was instantiated + MockResolver.assert_called_once() + + # Verify resolve was called for each tool with instantiate=True + assert mock_resolver_instance.resolve.call_count == 2 + mock_resolver_instance.resolve.assert_any_call("tool1", instantiate=True) + mock_resolver_instance.resolve.assert_any_call("tool2", instantiate=True) + + def test_cli_load_tools_handles_empty_strings(self): + """Test that _load_tools filters out empty strings from tool names.""" + from praisonai.praisonai.cli.main import PraisonCLI + + cli = PraisonCLI() + + with patch('praisonai.praisonai.cli.main.ToolResolver') as MockResolver: + mock_resolver_instance = Mock() + MockResolver.return_value = mock_resolver_instance + mock_resolver_instance.resolve.return_value = Mock() + + # Test with empty strings and spaces + tools = cli._load_tools("tool1, , tool2, ") + + # Should only call resolve for actual tool names + assert mock_resolver_instance.resolve.call_count == 2 + mock_resolver_instance.resolve.assert_any_call("tool1", instantiate=True) + mock_resolver_instance.resolve.assert_any_call("tool2", instantiate=True) + + def test_cli_load_tools_error_handling(self): + """Test that _load_tools handles resolution errors gracefully.""" + from praisonai.praisonai.cli.main import PraisonCLI + + cli = PraisonCLI() + + with patch('praisonai.praisonai.cli.main.ToolResolver') as MockResolver: + mock_resolver_instance = Mock() + MockResolver.return_value = mock_resolver_instance + # Simulate tool resolution failure + mock_resolver_instance.resolve.side_effect = Exception("Tool not found") + + # Should not raise, should handle gracefully + tools = cli._load_tools("nonexistent_tool") + + # Should return empty list + assert tools == [] + + +class TestFrameworkAdapterDuplication: + """Test that duplicate arun() methods have been properly removed.""" + + def test_no_duplicate_arun_in_protocol(self): + """Test that FrameworkAdapter protocol has single arun method.""" + from praisonai.praisonai.framework_adapters.base import FrameworkAdapter + + # Get all method names from the protocol + import inspect + methods = [name for name, _ in inspect.getmembers(FrameworkAdapter, inspect.ismethod) + if not name.startswith('__')] + + # Count arun occurrences + arun_count = methods.count('arun') + assert arun_count <= 1, f"Found {arun_count} arun methods in protocol, expected 0 or 1" + + def test_framework_adapter_has_cleanup_method(self): + """Test that FrameworkAdapter protocol includes cleanup method.""" + from praisonai.praisonai.framework_adapters.base import FrameworkAdapter + + # Check that cleanup method is defined in the protocol + import inspect + methods = [name for name, _ in inspect.getmembers(FrameworkAdapter)] + assert 'cleanup' in methods, "FrameworkAdapter protocol should include cleanup method" + + +class TestIntegrationScenarios: + """Integration tests combining multiple fixes.""" + + @pytest.mark.asyncio + async def test_full_scheduler_lifecycle_with_fixes(self): + """Test complete scheduler lifecycle with all fixes applied.""" + mock_agent = Mock() + mock_agent.astart = AsyncMock(return_value="result") + + # Create scheduler with small budget + scheduler = AsyncAgentScheduler(mock_agent, "test task", max_cost=0.01) + + # Verify initial state + assert scheduler._start_time is None + assert scheduler.max_cost == 0.01 + + # Start scheduler + await scheduler.start("*/1s", run_immediately=False) + + # Verify _start_time was set + assert scheduler._start_time is not None + + # Get stats - should include all fields with proper calculations + stats = await scheduler.get_stats_async() + assert 'runtime_seconds' in stats + assert 'remaining_budget' in stats + assert stats['remaining_budget'] is not None # Should be calculated, not None + + # Stop scheduler + await scheduler.stop() + assert not scheduler.is_running + + @pytest.mark.asyncio + async def test_daemon_state_persistence_integration(self): + """Test daemon state persistence works with async I/O fixes.""" + mock_agent = Mock() + mock_agent.astart = AsyncMock(return_value="result") + scheduler = AsyncAgentScheduler(mock_agent, "test task") + + # Create temporary state directory + with tempfile.TemporaryDirectory() as temp_dir: + state_file = os.path.join(temp_dir, "scheduler_state.json") + initial_state = { + "pid": os.getpid(), + "executions": 0, + "cost": 0.0 + } + with open(state_file, "w") as f: + json.dump(initial_state, f) + + # Mock the state directory + with patch('os.path.expanduser', return_value=temp_dir): + with patch('os.listdir', return_value=["scheduler_state.json"]): + # Execute should update state via asyncio.to_thread + await scheduler._execute_with_retry(max_retries=1) + + # Verify state file was updated (execution count incremented) + with open(state_file, "r") as f: + updated_state = json.load(f) + + assert updated_state["executions"] >= 1 + + def test_zero_cost_edge_cases(self): + """Test edge cases for zero cost handling.""" + class TestScheduler(_BaseAgentScheduler): + def __init__(self, max_cost, total_cost): + self.is_running = False + self.max_cost = max_cost + self._execution_count = 1 + self._success_count = 1 + self._failure_count = 0 + self._total_cost = total_cost + self._start_time = datetime.now() + + # Test exactly zero budget, zero cost + scheduler = TestScheduler(max_cost=0.0, total_cost=0.0) + stats = scheduler._build_stats(execs=1, success=1, failed=0, total_cost=0.0) + assert stats['remaining_budget'] == 0.0 + + # Test very small budget + scheduler = TestScheduler(max_cost=0.0001, total_cost=0.0) + stats = scheduler._build_stats(execs=1, success=1, failed=0, total_cost=0.0) + assert abs(stats['remaining_budget'] - 0.0001) < 1e-6 + + # Test floating point precision + scheduler = TestScheduler(max_cost=0.1, total_cost=0.05) + stats = scheduler._build_stats(execs=1, success=1, failed=0, total_cost=0.05) + assert abs(stats['remaining_budget'] - 0.05) < 1e-6 \ No newline at end of file