Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 13 additions & 17 deletions src/praisonai/praisonai/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2140,25 +2140,21 @@ def _load_tools(self, tools_path: str) -> list:
except Exception as e:
print(f"[yellow]Warning: Failed to load tools from {tools_path}: {e}[/yellow]")
else:
# Treat as comma-separated tool names
try:
from praisonaiagents.tools import TOOL_MAPPINGS
import praisonaiagents.tools as tools_module

tool_names = [t.strip() for t in tools_path.split(',')]
for tool_name in tool_names:
if tool_name in TOOL_MAPPINGS:
try:
tool = getattr(tools_module, tool_name)
tools_list.append(tool)
except Exception as e:
print(f"[yellow]Warning: Failed to load tool '{tool_name}': {e}[/yellow]")
# Comma-separated names: use the unified resolver so CLI == YAML == Python
from ..tool_resolver import ToolResolver
resolver = ToolResolver()
tool_names = [t.strip() for t in tools_path.split(',') if t.strip()]
for tool_name in tool_names:
try:
tool = resolver.resolve(tool_name, instantiate=True)
if tool is not None:
tools_list.append(tool)
else:
print(f"[yellow]Warning: Unknown tool '{tool_name}'[/yellow]")
if tools_list:
print(f"[cyan]Loaded {len(tools_list)} built-in tools[/cyan]")
except ImportError:
print("[yellow]Warning: Could not import tools module[/yellow]")
except Exception as e:
print(f"[yellow]Warning: Failed to load tool '{tool_name}': {e}[/yellow]")
if tools_list:
print(f"[cyan]Loaded {len(tools_list)} tools[/cyan]")

return tools_list

Expand Down
68 changes: 16 additions & 52 deletions src/praisonai/praisonai/framework_adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,33 +92,6 @@ async def arun(
"""
...

async def arun(
self,
config: Dict[str, Any],
llm_config: List[Dict],
topic: str,
*,
tools_dict: Optional[Dict[str, Any]] = None,
agent_callback: Optional[Callable] = None,
task_callback: Optional[Callable] = None,
cli_config: Optional[Dict[str, Any]] = None,
) -> str:
"""
Run the framework asynchronously with given configuration.

Args:
config: Framework configuration
llm_config: LLM configuration list
topic: Topic for the tasks
tools_dict: Available tools dictionary
agent_callback: Callback for agent events
task_callback: Callback for task events
cli_config: CLI configuration

Returns:
Execution result as string
"""
...

def cleanup(self) -> None:
"""Clean up any resources after execution."""
Expand Down Expand Up @@ -157,29 +130,6 @@ def _sub(m):
# Only substitute simple variable names like {topic}, not JSON like {"level":2}
return re.sub(r'\{([a-zA-Z_][a-zA-Z0-9_]*)\}', _sub, template)

async def arun(
self,
config: Dict[str, Any],
llm_config: List[Dict],
topic: str,
*,
tools_dict: Optional[Dict[str, Any]] = None,
agent_callback: Optional[Callable] = None,
task_callback: Optional[Callable] = None,
cli_config: Optional[Dict[str, Any]] = None,
) -> str:
"""
Default async implementation that falls back to thread-offloaded sync.

Framework adapters with native async support should override this method.
"""
import asyncio
return await asyncio.to_thread(
self.run, config, llm_config, topic,
tools_dict=tools_dict, agent_callback=agent_callback,
task_callback=task_callback, cli_config=cli_config
)

def resolve(self) -> "FrameworkAdapter":
"""Default implementation returns self."""
return self
Expand All @@ -200,8 +150,22 @@ async def arun(
cli_config: Optional[Dict[str, Any]] = None,
) -> str:
"""
Safe default for sync-only adapters (crewai, autogen v0.2):
run the sync implementation in a worker thread, freeing the loop.
Async execution. Default implementation offloads sync run() to a worker thread.

Sync-only adapters (crewai, autogen v0.2) can use this default.
Native-async adapters should override this method.

Args:
config: Framework configuration
llm_config: LLM configuration list
topic: Topic for the tasks
tools_dict: Available tools dictionary
agent_callback: Callback for agent events
task_callback: Callback for task events
cli_config: CLI configuration

Returns:
Execution result as string
"""
import asyncio
return await asyncio.to_thread(
Expand Down
81 changes: 81 additions & 0 deletions src/praisonai/praisonai/scheduler/_base_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
Shared, lock-agnostic scheduler logic for both sync and async variants.
"""

import os
import json
import logging
from datetime import datetime
from typing import Any, Dict, Optional

logger = logging.getLogger(__name__)


class _BaseAgentScheduler:
"""Shared, lock-agnostic scheduler logic — used by both sync and async variants."""

is_running: bool
max_cost: Optional[float]
_execution_count: int
_success_count: int
_failure_count: int
_total_cost: float
_start_time: Optional[datetime]

def _build_stats(
self,
*,
execs: int,
success: int,
failed: int,
total_cost: float,
) -> Dict[str, Any]:
"""Build stats dictionary for both sync and async schedulers."""
runtime = (
(datetime.now() - self._start_time).total_seconds()
if self._start_time else 0
)
return {
"is_running": self.is_running,
"total_executions": execs,
"successful_executions": success,
"failed_executions": failed,
"success_rate": (success / execs * 100) if execs > 0 else 0,
"total_cost_usd": round(total_cost, 4),
"remaining_budget": (
round(self.max_cost - total_cost, 4) if self.max_cost is not None else None
),
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Comment on lines +45 to +47

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Handle zero-budget correctly in shared stats.

remaining_budget currently uses a truthiness check, so max_cost=0.0 is treated as “no budget limit” and returns None instead of 0.0/negative values.

💡 Suggested fix
             "remaining_budget": (
-                round(self.max_cost - total_cost, 4) if self.max_cost else None
+                round(self.max_cost - total_cost, 4) if self.max_cost is not None else None
             ),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"remaining_budget": (
round(self.max_cost - total_cost, 4) if self.max_cost else None
),
"remaining_budget": (
round(self.max_cost - total_cost, 4) if self.max_cost is not None else None
),
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai/praisonai/scheduler/_base_scheduler.py` around lines 45 - 47,
The shared stats computation treats self.max_cost truthily so max_cost=0.0 is
considered "no limit" and returns None; change the conditional in the
remaining_budget expression in _base_scheduler.py to check explicitly for None
(e.g., if self.max_cost is None) and otherwise compute round(self.max_cost -
total_cost, 4) so zero or negative budgets are returned correctly.

"runtime_seconds": runtime,
"cost_per_execution": (
round(total_cost / execs, 4) if execs > 0 else 0
),
}

def _update_state_if_daemon(self) -> None:
"""Update ~/.praisonai/schedulers/*.json for the current PID, if present.

Safe for both sync and async callers — it's plain blocking file I/O that
runs once per execution and is wrapped in try/except.
"""
try:
state_dir = os.path.expanduser("~/.praisonai/schedulers")
if not os.path.exists(state_dir):
return
current_pid = os.getpid()
for fname in os.listdir(state_dir):
if not fname.endswith(".json"):
continue
path = os.path.join(state_dir, fname)
try:
with open(path, "r") as f:
state = json.load(f)
if state.get("pid") == current_pid:
state["executions"] = self._execution_count
state["cost"] = round(self._total_cost, 4)
with open(path, "w") as f:
json.dump(state, f, indent=2)
break
except Exception:
continue
except Exception as e:
logger.debug("Failed to update state: %s", e)
57 changes: 8 additions & 49 deletions src/praisonai/praisonai/scheduler/agent_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@

from .base import ScheduleParser, PraisonAgentExecutor
from .shared import backoff_delay
from ._base_scheduler import _BaseAgentScheduler

logger = logging.getLogger(__name__)


class AgentScheduler:
class AgentScheduler(_BaseAgentScheduler):
"""
Scheduler for running PraisonAI agents periodically.

Expand Down Expand Up @@ -163,54 +164,12 @@ def get_stats(self) -> Dict[str, Any]:
Dictionary with execution stats including cost
"""
with self._stats_lock:
runtime = (datetime.now() - self._start_time).total_seconds() if self._start_time else 0
return {
"is_running": self.is_running,
"total_executions": self._execution_count,
"successful_executions": self._success_count,
"failed_executions": self._failure_count,
"success_rate": (self._success_count / self._execution_count * 100) if self._execution_count > 0 else 0,
"total_cost_usd": round(self._total_cost, 4),
"runtime_seconds": round(runtime, 1),
"cost_per_execution": round(self._total_cost / self._execution_count, 4) if self._execution_count > 0 else 0
}

def _update_state_if_daemon(self):
"""Update state file with execution stats if running as daemon."""
try:
import os
# Check if we're running as a daemon by looking for state file
state_dir = os.path.expanduser("~/.praisonai/schedulers")
if not os.path.exists(state_dir):
return

# Try to find our state file by checking all state files for matching PID
current_pid = os.getpid()
for state_file in os.listdir(state_dir):
if not state_file.endswith('.json'):
continue

state_path = os.path.join(state_dir, state_file)
try:
import json
with open(state_path, 'r') as f:
state = json.load(f)

# Check if this is our state file
if state.get('pid') == current_pid:
# Update execution stats
state['executions'] = self._execution_count
state['cost'] = round(self._total_cost, 4)

# Write back
with open(state_path, 'w') as f:
json.dump(state, f, indent=2)
break
except Exception:
continue
except Exception as e:
# Silently fail - don't break scheduler if state update fails
logger.debug(f"Failed to update state: {e}")
return self._build_stats(
execs=self._execution_count,
success=self._success_count,
failed=self._failure_count,
total_cost=self._total_cost,
)

def _run_schedule(self, interval: int, max_retries: int):
"""Internal method to run scheduled agent executions."""
Expand Down
23 changes: 9 additions & 14 deletions src/praisonai/praisonai/scheduler/async_agent_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from abc import ABC, abstractmethod

from .shared import ScheduleParser, backoff_delay, safe_call
from ._base_scheduler import _BaseAgentScheduler

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -62,7 +63,7 @@ async def execute(self, task: str) -> Any:
raise


class AsyncAgentScheduler:
class AsyncAgentScheduler(_BaseAgentScheduler):
"""
Async-native scheduler for running PraisonAI agents periodically.

Expand Down Expand Up @@ -124,6 +125,7 @@ def __init__(
self._execution_count = 0
self._success_count = 0
self._failure_count = 0
self._start_time: Optional[datetime] = None

# Sync lock for async primitives creation and bound loop tracking
self._primitives_lock = threading.Lock()
Expand Down Expand Up @@ -169,6 +171,7 @@ async def start(
try:
interval = ScheduleParser.parse(schedule_expr)
self.is_running = True
self._start_time = datetime.now()
self._ensure_async_primitives() # bind to the loop start() runs on
self._stop_event.clear()

Expand Down Expand Up @@ -296,15 +299,9 @@ async def get_stats_async(self) -> Dict[str, Any]:
failed = self._failure_count
total_cost = self._total_cost

return {
"is_running": self.is_running,
"total_executions": execs,
"successful_executions": success,
"failed_executions": failed,
"success_rate": (success / execs * 100) if execs > 0 else 0,
"total_cost_usd": round(total_cost, 4),
"remaining_budget": round(self.max_cost - total_cost, 4) if self.max_cost is not None else None,
}
return self._build_stats(
execs=execs, success=success, failed=failed, total_cost=total_cost
)

def get_stats_sync(self) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -390,8 +387,7 @@ async def _execute_with_retry(self, max_retries: int):
logger.info(f"Estimated cost this run: ${estimated_cost:.4f}, Total: ${self._total_cost:.4f}")

safe_call(self.on_success, result)
# TODO: Add daemon state update from sync version:
# self._update_state_if_daemon()
await asyncio.to_thread(self._update_state_if_daemon)
return

except asyncio.TimeoutError as e:
Expand All @@ -418,8 +414,7 @@ async def _execute_with_retry(self, max_retries: int):
last_exc if last_exc is not None
else RuntimeError(f"Failed after {max_retries} attempts")
)
# TODO: Add daemon state update from sync version:
# self._update_state_if_daemon()
await asyncio.to_thread(self._update_state_if_daemon)

async def execute_once(self) -> Any:
"""
Expand Down
Loading
Loading