Skip to content

Commit 947fe66

Browse files
Merge pull request #1401 from MervinPraison/claude/issue-1392-20260416-0916
feat: implement parallel tool execution (Gap 2) with backward compatibility
2 parents 47d3e42 + 11bbf9a commit 947fe66

File tree

6 files changed

+498
-29
lines changed

6 files changed

+498
-29
lines changed

src/praisonai-agents/praisonaiagents/agent/agent.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,14 @@ def __init__(
768768
alternative="use 'execution=ExecutionConfig(rate_limiter=obj)' instead",
769769
stacklevel=3
770770
)
771+
if parallel_tool_calls is not None:
772+
warn_deprecated_param(
773+
"parallel_tool_calls",
774+
since="1.0.0",
775+
removal="2.0.0",
776+
alternative="use 'execution=ExecutionConfig(parallel_tool_calls=True)' instead",
777+
stacklevel=3
778+
)
771779
if verification_hooks is not None:
772780
warn_deprecated_param(
773781
"verification_hooks",
@@ -943,13 +951,17 @@ def __init__(
943951
allow_code_execution = True
944952
if _exec_config.code_mode != "safe":
945953
code_execution_mode = _exec_config.code_mode
954+
# Get parallel_tool_calls from ExecutionConfig
955+
parallel_tool_calls = _exec_config.parallel_tool_calls
946956
# Budget guard extraction
947957
_max_budget = getattr(_exec_config, 'max_budget', None)
948958
_on_budget_exceeded = getattr(_exec_config, 'on_budget_exceeded', 'stop') or 'stop'
949959
else:
950960
max_iter, max_rpm, max_execution_time, max_retry_limit = 20, None, None, 2
951961
_max_budget = None
952962
_on_budget_exceeded = 'stop'
963+
# Default parallel_tool_calls when no ExecutionConfig provided
964+
parallel_tool_calls = False
953965

954966
# ─────────────────────────────────────────────────────────────────────
955967
# Resolve TEMPLATES param - FAST PATH
@@ -1440,6 +1452,8 @@ def __init__(
14401452
self.self_reflect = True if self_reflect is None else self_reflect
14411453

14421454
self.instructions = instructions
1455+
# Gap 2: Store parallel tool calls setting for ToolCallExecutor selection
1456+
self.parallel_tool_calls = parallel_tool_calls
14431457
# Check for model name in environment variable if not provided
14441458
self._using_custom_llm = False
14451459
# Flag to track if final result has been displayed to prevent duplicates

src/praisonai-agents/praisonaiagents/agent/chat_mixin.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1250,6 +1250,7 @@ def _chat_impl(self, prompt, temperature, tools, output_json, output_pydantic, r
12501250
task_description=task_description,
12511251
task_id=task_id,
12521252
execute_tool_fn=self.execute_tool,
1253+
parallel_tool_calls=getattr(self.execution, "parallel_tool_calls", False),
12531254
reasoning_steps=reasoning_steps,
12541255
stream=stream
12551256
)
@@ -1719,6 +1720,7 @@ async def _achat_impl(self, prompt, temperature, tools, output_json, output_pyda
17191720
task_description=task_description,
17201721
task_id=task_id,
17211722
execute_tool_fn=self.execute_tool_async,
1723+
parallel_tool_calls=getattr(self.execution, "parallel_tool_calls", False),
17221724
reasoning_steps=reasoning_steps,
17231725
stream=stream
17241726
)
@@ -2248,7 +2250,8 @@ def _start_stream(self, prompt: str, **kwargs) -> Generator[str, None, None]:
22482250
task_name=kwargs.get('task_name'),
22492251
task_description=kwargs.get('task_description'),
22502252
task_id=kwargs.get('task_id'),
2251-
execute_tool_fn=self.execute_tool
2253+
execute_tool_fn=self.execute_tool,
2254+
parallel_tool_calls=getattr(self.execution, "parallel_tool_calls", False)
22522255
):
22532256
response_content += chunk
22542257
yield chunk

src/praisonai-agents/praisonaiagents/config/feature_configs.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,11 @@ class ExecutionConfig:
735735
# Action when budget exceeded: "stop" (default) raises BudgetExceededError,
736736
# "warn" logs warning but continues, or callable(total_cost, max_budget).
737737
on_budget_exceeded: Any = "stop"
738+
739+
# Parallel tool execution (Gap 2): Enable parallel execution of batched LLM tool calls
740+
# When True, multiple tool calls from LLM are executed concurrently instead of sequentially
741+
# Default False preserves existing behavior for backward compatibility
742+
parallel_tool_calls: bool = False
738743

739744
def to_dict(self) -> Dict[str, Any]:
740745
"""Convert to dictionary."""
@@ -749,6 +754,7 @@ def to_dict(self) -> Dict[str, Any]:
749754
"context_compaction": self.context_compaction,
750755
"max_context_tokens": self.max_context_tokens,
751756
"max_budget": self.max_budget,
757+
"parallel_tool_calls": self.parallel_tool_calls,
752758
}
753759

754760

src/praisonai-agents/praisonaiagents/llm/llm.py

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import time
1616
import json
1717
import xml.etree.ElementTree as ET
18+
# Gap 2: Tool call execution imports
19+
from ..tools.call_executor import ToolCall, create_tool_call_executor
1820
# Display functions - lazy loaded to avoid importing rich at startup
1921
# These are only needed when output=verbose
2022
_display_module = None
@@ -1649,6 +1651,7 @@ def get_response(
16491651
task_description: Optional[str] = None,
16501652
task_id: Optional[str] = None,
16511653
execute_tool_fn: Optional[Callable] = None,
1654+
parallel_tool_calls: bool = False, # Gap 2: Enable parallel tool execution
16521655
stream: bool = True,
16531656
stream_callback: Optional[Callable] = None,
16541657
emit_events: bool = False,
@@ -1893,26 +1896,47 @@ def _prepare_return_value(text: str) -> Union[str, tuple]:
18931896
"tool_calls": serializable_tool_calls,
18941897
})
18951898

1896-
tool_results = []
1899+
# Execute tool calls using ToolCallExecutor (Gap 2: parallel or sequential)
1900+
is_ollama = self._is_ollama_provider()
1901+
tool_calls_batch = []
1902+
1903+
# Prepare batch of ToolCall objects
18971904
for tool_call in tool_calls:
1898-
function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call)
1899-
1900-
logging.debug(f"[RESPONSES_API] Executing tool {function_name} with args: {arguments}")
1901-
tool_result = execute_tool_fn(function_name, arguments, tool_call_id=tool_call_id)
1905+
function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call, is_ollama=is_ollama)
1906+
tool_calls_batch.append(ToolCall(
1907+
function_name=function_name,
1908+
arguments=arguments,
1909+
tool_call_id=tool_call_id,
1910+
is_ollama=is_ollama
1911+
))
1912+
1913+
# Create appropriate executor based on parallel_tool_calls setting
1914+
executor = create_tool_call_executor(parallel=parallel_tool_calls)
1915+
1916+
# Execute batch
1917+
tool_results_batch = executor.execute_batch(tool_calls_batch, execute_tool_fn)
1918+
1919+
tool_results = []
1920+
for tool_call_obj, tool_result_obj in zip(tool_calls_batch, tool_results_batch):
1921+
if tool_result_obj.error is not None:
1922+
raise tool_result_obj.error
1923+
tool_result = tool_result_obj.result
19021924
tool_results.append(tool_result)
19031925
accumulated_tool_results.append(tool_result)
19041926

1927+
logging.debug(f"[RESPONSES_API] Executed tool {tool_result_obj.function_name} with result: {tool_result}")
1928+
19051929
if verbose:
1906-
display_message = f"Agent {agent_name} called function '{function_name}' with arguments: {arguments}\n"
1930+
display_message = f"Agent {agent_name} called function '{tool_call_obj.function_name}' with arguments: {tool_call_obj.arguments}\n"
19071931
display_message += f"Function returned: {tool_result}" if tool_result else "Function returned no output"
19081932
_get_display_functions()['display_tool_call'](display_message, console=self.console)
19091933

19101934
result_str = json.dumps(tool_result) if tool_result else "empty"
19111935
_get_display_functions()['execute_sync_callback'](
19121936
'tool_call',
1913-
message=f"Calling function: {function_name}",
1914-
tool_name=function_name,
1915-
tool_input=arguments,
1937+
message=f"Calling function: {tool_call_obj.function_name}",
1938+
tool_name=tool_call_obj.function_name,
1939+
tool_input=tool_call_obj.arguments,
19161940
tool_output=result_str[:200] if result_str else None,
19171941
)
19181942

@@ -1927,7 +1951,7 @@ def _prepare_return_value(text: str) -> Union[str, tuple]:
19271951
content = json.dumps(tool_result)
19281952
messages.append({
19291953
"role": "tool",
1930-
"tool_call_id": tool_call_id,
1954+
"tool_call_id": tool_result_obj.tool_call_id,
19311955
"content": content,
19321956
})
19331957

@@ -3142,6 +3166,7 @@ def get_response_stream(
31423166
task_description: Optional[str] = None,
31433167
task_id: Optional[str] = None,
31443168
execute_tool_fn: Optional[Callable] = None,
3169+
parallel_tool_calls: bool = False, # Gap 2: Enable parallel tool execution
31453170
**kwargs
31463171
):
31473172
"""Generator that yields real-time response chunks from the LLM.
@@ -3167,6 +3192,7 @@ def get_response_stream(
31673192
task_description: Optional task description for logging
31683193
task_id: Optional task ID for logging
31693194
execute_tool_fn: Optional function for executing tools
3195+
parallel_tool_calls: If True, execute batched LLM tool calls in parallel (default False)
31703196
**kwargs: Additional parameters
31713197
31723198
Yields:
@@ -3301,26 +3327,44 @@ def get_response_stream(
33013327
"tool_calls": serializable_tool_calls
33023328
})
33033329

3304-
# Execute tool calls and add results to conversation
3330+
# Execute tool calls using ToolCallExecutor (Gap 2: parallel or sequential)
3331+
is_ollama = self._is_ollama_provider()
3332+
tool_calls_batch = []
3333+
3334+
# Prepare batch of ToolCall objects
33053335
for tool_call in tool_calls:
3306-
is_ollama = self._is_ollama_provider()
33073336
function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call, is_ollama)
3308-
3309-
try:
3310-
# Execute the tool (pass tool_call_id for event correlation)
3311-
tool_result = execute_tool_fn(function_name, arguments, tool_call_id=tool_call_id)
3312-
3313-
# Add tool result to messages
3314-
tool_message = self._create_tool_message(function_name, tool_result, tool_call_id, is_ollama)
3315-
messages.append(tool_message)
3316-
3317-
except Exception as e:
3318-
logging.error(f"Tool execution error for {function_name}: {e}")
3319-
# Add error message to conversation
3320-
error_message = self._create_tool_message(
3321-
function_name, f"Error executing tool: {e}", tool_call_id, is_ollama
3337+
tool_calls_batch.append(ToolCall(
3338+
function_name=function_name,
3339+
arguments=arguments,
3340+
tool_call_id=tool_call_id,
3341+
is_ollama=is_ollama
3342+
))
3343+
3344+
# Create appropriate executor based on parallel_tool_calls setting
3345+
executor = create_tool_call_executor(parallel=parallel_tool_calls)
3346+
3347+
# Execute batch and add results to conversation
3348+
tool_results = executor.execute_batch(tool_calls_batch, execute_tool_fn)
3349+
3350+
for tool_result in tool_results:
3351+
if tool_result.error is None:
3352+
# Successful execution
3353+
tool_message = self._create_tool_message(
3354+
tool_result.function_name,
3355+
tool_result.result,
3356+
tool_result.tool_call_id,
3357+
tool_result.is_ollama
3358+
)
3359+
else:
3360+
# Error during execution (already logged by executor)
3361+
tool_message = self._create_tool_message(
3362+
tool_result.function_name,
3363+
tool_result.result, # Contains error message
3364+
tool_result.tool_call_id,
3365+
tool_result.is_ollama
33223366
)
3323-
messages.append(error_message)
3367+
messages.append(tool_message)
33243368

33253369
# Continue conversation after tool execution - get follow-up response
33263370
try:
@@ -5462,4 +5506,4 @@ def _generate_tool_definition(self, function_or_name) -> Optional[Dict]:
54625506
}
54635507
}
54645508
logging.debug(f"Generated tool definition: {tool_def}")
5465-
return tool_def
5509+
return tool_def

0 commit comments

Comments
 (0)