Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 47 additions & 148 deletions src/praisonai/praisonai/agents_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,17 +512,26 @@ def generate_crew_and_kickoff(self):
# Route to YAMLWorkflowParser for advanced workflow patterns
return self._run_yaml_workflow(config)

config, adapter, tools_dict, topic = self._prepare(config)
return adapter.run(
config,
self.config_list,
topic,
tools_dict=tools_dict,
agent_callback=getattr(self, 'agent_callback', None),
task_callback=getattr(self, 'task_callback', None),
cli_config=getattr(self, 'cli_config', None),
)

def _prepare(self, config):
"""Shared preparation logic for both sync and async entry points."""
# Canonical format conversion: 'agents' -> 'roles', 'instructions' -> 'backstory'
# This ensures backward compatibility while supporting the new canonical format
if 'agents' in config and 'roles' not in config:
config['roles'] = {}
for agent_name, agent_config in config['agents'].items():
role_config = dict(agent_config) if agent_config else {}
# Convert 'instructions' to 'backstory' if present
# Note: preserve 'instructions' key for adapters that pass it to PraisonAgent
if 'instructions' in role_config and 'backstory' not in role_config:
role_config['backstory'] = role_config['instructions']
# Ensure required fields have defaults
if 'role' not in role_config:
role_config['role'] = agent_name.replace('_', ' ').title()
if 'goal' not in role_config:
Expand Down Expand Up @@ -555,24 +564,23 @@ def generate_crew_and_kickoff(self):
if isinstance(t, str) and t.strip():
needed_tools.add(t.strip())

# Resolve only the tools actually referenced in YAML using ToolResolver with instantiation
# Resolve only the tools actually referenced in YAML
for tool_name in needed_tools:
try:
resolved_tool = self.tool_resolver.resolve(tool_name, instantiate=True)
if resolved_tool is not None:
tools_dict[tool_name] = resolved_tool
except Exception as e:
self.logger.warning(f"Failed to resolve or instantiate tool '{tool_name}': {e}")
self.logger.warning(f"Failed to initialize tool '{tool_name}': {e}")
continue

except Exception as e:
self.logger.warning(f"Error collecting YAML tool references: {e}")

# Add tools from class names - use tool_resolver to check tool validity
for tool_class in self.tools:
if isinstance(tool_class, type):
try:
# Try to instantiate the tool to validate it
tool_instance = tool_class()
tool_name = tool_class.__name__
tools_dict[tool_name] = tool_instance
Expand All @@ -583,7 +591,7 @@ def generate_crew_and_kickoff(self):
root_directory = os.getcwd()
tools_py_path = os.path.join(root_directory, 'tools.py')
tools_dir_path = Path(root_directory) / 'tools'

# Use consolidated ToolResolver for tools.py loading
tools_dict.update(self.tool_resolver.get_local_tool_classes())
if os.path.isfile(tools_py_path):
Expand All @@ -595,9 +603,32 @@ def generate_crew_and_kickoff(self):

framework = self.framework or config.get('framework', 'crewai')

# Get initial adapter and resolve to concrete variant
initial_adapter = self._get_framework_adapter(framework)
adapter = initial_adapter.resolve()
# AutoGen version selection logic
if framework == "autogen":
autogen_v4_adapter = self._get_framework_adapter("autogen_v4")
autogen_v2_adapter = self._get_framework_adapter("autogen")

autogen_version = str(
config.get('autogen_version', os.environ.get("AUTOGEN_VERSION", "auto"))
).lower()
use_v4 = False

if autogen_version == "v0.4" and autogen_v4_adapter.is_available():
use_v4 = True
elif autogen_version == "v0.2" and autogen_v2_adapter.is_available():
use_v4 = False
elif autogen_version == "auto":
use_v4 = autogen_v4_adapter.is_available()
else:
use_v4 = autogen_v4_adapter.is_available() and not autogen_v2_adapter.is_available()

framework = "autogen_v4" if use_v4 else "autogen"

# Validate cli_backend compatibility
self._validate_cli_backend_compatibility(config, framework)

# Get framework adapter and resolve to concrete variant
adapter = self._get_framework_adapter(framework).resolve()

# Validate framework availability early
from .framework_adapters.validators import assert_framework_available
Expand All @@ -615,15 +646,7 @@ def generate_crew_and_kickoff(self):
self.framework_adapter = adapter

self.logger.info(f"Using framework: {adapter.name}")
return adapter.run(
config,
self.config_list,
topic,
tools_dict=tools_dict,
agent_callback=getattr(self, 'agent_callback', None),
task_callback=getattr(self, 'task_callback', None),
cli_config=getattr(self, 'cli_config', None),
)
return config, adapter, tools_dict, topic
Comment thread
greptile-apps[bot] marked this conversation as resolved.

async def agenerate_crew_and_kickoff(self):
"""
Expand Down Expand Up @@ -659,132 +682,8 @@ async def agenerate_crew_and_kickoff(self):

async def _arun_framework(self, config):
"""Async version of _run_framework with shared preparation logic."""
# Canonical format conversion: 'agents' -> 'roles', 'instructions' -> 'backstory'
if 'agents' in config and 'roles' not in config:
config['roles'] = {}
for agent_name, agent_config in config['agents'].items():
role_config = dict(agent_config) if agent_config else {}
if 'instructions' in role_config and 'backstory' not in role_config:
role_config['backstory'] = role_config['instructions']
if 'role' not in role_config:
role_config['role'] = agent_name.replace('_', ' ').title()
if 'goal' not in role_config:
role_config['goal'] = role_config.get('backstory', 'Complete the assigned task')
if 'backstory' not in role_config:
role_config['backstory'] = f'You are a {role_config["role"]}'
config['roles'][agent_name] = role_config

# Get workflow input: 'input' is canonical, 'topic' is alias for backward compatibility
topic = config.get('input', config.get('topic', ''))

# Validate agents configuration for typos in field names
self._validate_agents_config(config)

tools_dict = {}

# Demand-driven tool resolution - only resolve tools actually used in YAML
if is_available("crewai") or is_available("autogen") or is_available("praisonaiagents") or is_available("ag2"):
try:
# Collect all tool names mentioned in the YAML config
needed_tools: set[str] = set()
for role_cfg in config.get('roles', {}).values():
for t in role_cfg.get('tools') or []:
if isinstance(t, str) and t.strip():
needed_tools.add(t.strip())
for task_cfg in (role_cfg.get('tasks') or {}).values():
if not isinstance(task_cfg, dict):
continue
for t in task_cfg.get('tools') or []:
if isinstance(t, str) and t.strip():
needed_tools.add(t.strip())

# Resolve only the tools actually referenced in YAML
for tool_name in needed_tools:
try:
resolved_tool = self.tool_resolver.resolve(tool_name)
if resolved_tool is None:
self.logger.warning(f"Tool '{tool_name}' not found")
continue
tools_dict[tool_name] = (
resolved_tool() if inspect.isclass(resolved_tool) else resolved_tool
)
except Exception as e:
self.logger.warning(f"Failed to initialize tool '{tool_name}': {e}")
continue

except Exception as e:
self.logger.warning(f"Error collecting YAML tool references: {e}")

# Add tools from class names - use tool_resolver to check tool validity
for tool_class in self.tools:
if isinstance(tool_class, type):
try:
tool_instance = tool_class()
tool_name = tool_class.__name__
tools_dict[tool_name] = tool_instance
self.logger.debug(f"Added tool: {tool_name}")
except Exception as e:
self.logger.warning(f"Failed to instantiate tool class {tool_class.__name__}: {e}")

root_directory = os.getcwd()
tools_py_path = os.path.join(root_directory, 'tools.py')
tools_dir_path = Path(root_directory) / 'tools'

# Use consolidated ToolResolver for tools.py loading
tools_dict.update(self.tool_resolver.get_local_tool_classes())
if os.path.isfile(tools_py_path):
self.logger.debug("tools.py exists in the root directory. Loading tools.py and skipping tools folder.")
elif tools_dir_path.is_dir():
tools_dict.update(self.tool_resolver.get_local_tool_classes_from_dir(tools_dir_path))
if tools_dict:
self.logger.debug("tools folder exists in the root directory")

framework = self.framework or config.get('framework', 'crewai')

# AutoGen version selection logic
if framework == "autogen":
autogen_v4_adapter = self._get_framework_adapter("autogen_v4")
autogen_v2_adapter = self._get_framework_adapter("autogen")

autogen_version = str(
config.get('autogen_version', os.environ.get("AUTOGEN_VERSION", "auto"))
).lower()
use_v4 = False

if autogen_version == "v0.4" and autogen_v4_adapter.is_available():
use_v4 = True
elif autogen_version == "v0.2" and autogen_v2_adapter.is_available():
use_v4 = False
elif autogen_version == "auto":
use_v4 = autogen_v4_adapter.is_available()
else:
use_v4 = autogen_v4_adapter.is_available() and not autogen_v2_adapter.is_available()

framework = "autogen_v4" if use_v4 else "autogen"

# Initialize AgentOps if configured
agentops_api_key = os.getenv("AGENTOPS_API_KEY")
if agentops_api_key:
try:
import agentops
agentops.init(agentops_api_key, default_tags=[framework])
except ImportError:
pass

# Update framework adapter if framework changed
if framework != self.framework:
self.framework = framework
self.framework_adapter = self._get_framework_adapter(framework)

# Validate framework availability
from .framework_adapters.validators import assert_framework_available
assert_framework_available(framework)

# Validate cli_backend compatibility
self._validate_cli_backend_compatibility(config, framework)

self.logger.info(f"Using framework: {framework}")
return await self.framework_adapter.arun(
config, adapter, tools_dict, topic = self._prepare(config)
return await adapter.arun(
config,
self.config_list,
topic,
Expand Down
41 changes: 25 additions & 16 deletions src/praisonai/praisonai/bots/_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import asyncio
import time
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Dict, Optional
import logging
Expand Down Expand Up @@ -66,7 +67,7 @@ def __init__(self, config: Optional[RateLimitConfig] = None):
self._config = config or RateLimitConfig()
self._tokens = float(self._config.burst_size)
self._last_refill = time.monotonic()
self._channel_last_send: Dict[str, float] = {}
self._channel_last_send: "OrderedDict[str, float]" = OrderedDict()
self._lock = asyncio.Lock()

@classmethod
Expand All @@ -88,6 +89,7 @@ async def acquire(self, channel_id: Optional[str] = None) -> None:
Args:
channel_id: Optional channel ID for per-channel limiting
"""
# Phase 1: under lock, compute waits + reserve token + update last_send.
async with self._lock:
now = time.monotonic()

Expand All @@ -99,25 +101,32 @@ async def acquire(self, channel_id: Optional[str] = None) -> None:
)
self._last_refill = now

# Wait for global token
global_wait = 0.0
if self._tokens < 1.0:
wait_time = (1.0 - self._tokens) / self._config.messages_per_second
logger.debug(f"Rate limit: waiting {wait_time:.3f}s for global token")
await asyncio.sleep(wait_time)
self._tokens = 1.0

# Consume token
global_wait = (1.0 - self._tokens) / self._config.messages_per_second
self._tokens = 1.0 # reserve one future token
# Move refill anchor forward to the reservation time so
# concurrent callers cannot reuse the same future interval.
self._last_refill = now + global_wait
self._tokens -= 1.0
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Comment on lines 102 to 111

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Token reservation can over-admit concurrent sends

Line 102/Line 107 reserve a future token without advancing the refill timeline. A second concurrent caller can refill from the old self._last_refill and reserve the same future interval, so multiple sends can wake together and exceed messages_per_second.

Proposed fix
-            self._last_refill = now
+            self._last_refill = now
@@
             global_wait = 0.0
             if self._tokens < 1.0:
                 global_wait = (1.0 - self._tokens) / self._config.messages_per_second
-                self._tokens = 1.0  # reserved; consumed below
+                self._tokens = 1.0  # reserve one future token
+                # Move refill anchor forward to the reservation time so
+                # concurrent callers cannot reuse the same future interval.
+                self._last_refill = now + global_wait
             self._tokens -= 1.0
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self._last_refill = now
# Wait for global token
global_wait = 0.0
if self._tokens < 1.0:
wait_time = (1.0 - self._tokens) / self._config.messages_per_second
logger.debug(f"Rate limit: waiting {wait_time:.3f}s for global token")
await asyncio.sleep(wait_time)
self._tokens = 1.0
# Consume token
global_wait = (1.0 - self._tokens) / self._config.messages_per_second
self._tokens = 1.0 # reserved; consumed below
self._tokens -= 1.0
self._last_refill = now
global_wait = 0.0
if self._tokens < 1.0:
global_wait = (1.0 - self._tokens) / self._config.messages_per_second
self._tokens = 1.0 # reserve one future token
# Move refill anchor forward to the reservation time so
# concurrent callers cannot reuse the same future interval.
self._last_refill = now + global_wait
self._tokens -= 1.0
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai/praisonai/bots/_rate_limit.py` around lines 102 - 108, The bug
is that reserving a future token (setting self._tokens = 1.0) does not advance
the refill timeline, allowing concurrent callers to reserve the same future
slot; fix it by computing the required reservation wait (global_wait = (1.0 -
self._tokens) / self._config.messages_per_second when self._tokens < 1.0), then
advance the refill timeline by moving self._last_refill to now + global_wait (or
add global_wait to self._last_refill) before setting self._tokens = 1.0 and
decrementing it, ensuring subsequent callers see the updated timeline; update
the logic around _last_refill, _tokens and _config.messages_per_second
accordingly.


# Per-channel delay
channel_wait = 0.0
if channel_id:
last_send = self._channel_last_send.get(channel_id, 0.0)
channel_elapsed = now - last_send
if channel_elapsed < self._config.per_channel_delay:
wait_time = self._config.per_channel_delay - channel_elapsed
logger.debug(f"Rate limit: waiting {wait_time:.3f}s for channel {channel_id}")
await asyncio.sleep(wait_time)
self._channel_last_send[channel_id] = time.monotonic()
last = self._channel_last_send.pop(channel_id, 0.0)
projected_now = now + global_wait
elapsed = projected_now - last
if elapsed < self._config.per_channel_delay:
channel_wait = self._config.per_channel_delay - elapsed
# LRU touch + bounded insertion
self._channel_last_send[channel_id] = projected_now + channel_wait
while len(self._channel_last_send) > 4096:
self._channel_last_send.popitem(last=False)

# Phase 2: sleep OUTSIDE the lock so other channels proceed concurrently.
total_wait = global_wait + channel_wait
if total_wait > 0:
logger.debug(f"Rate limit: waiting {total_wait:.3f}s for channel {channel_id}")
await asyncio.sleep(total_wait)

def reset(self) -> None:
"""Reset rate limiter state."""
Expand Down
24 changes: 24 additions & 0 deletions src/praisonai/praisonai/sandbox/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import logging
import os
from typing import Any, Dict, Optional

from praisonaiagents.sandbox import (
Expand All @@ -16,6 +17,29 @@
logger = logging.getLogger(__name__)


def safe_sandbox_path(temp_dir: str | None, path: str) -> str | None:
"""Resolve a caller-supplied path to an absolute path inside temp_dir.
Returns None if the resolved path would escape the sandbox root,
preventing path-traversal attacks via sequences like `../../../etc/passwd`.
Args:
temp_dir: The sandbox root directory
path: User-supplied path to resolve
Returns:
Safe absolute path within sandbox, or None if path escapes sandbox
"""
if not temp_dir:
return None
candidate = os.path.realpath(os.path.join(temp_dir, path.lstrip("/")))
sandbox_root = os.path.realpath(temp_dir)
if not (candidate == sandbox_root or candidate.startswith(sandbox_root + os.sep)):
logger.warning("Path traversal attempt blocked: %s", path)
return None
return candidate


class SandboxToComputeAdapter:
"""Expose a ComputeProvider as the legacy SandboxConfig/SandboxResult API."""

Expand Down
Loading
Loading