Skip to content

Commit 86f7c53

Browse files
fix: preserve tool call metadata and compatibility in parallel executor
Agent-Logs-Url: https://github.com/MervinPraison/PraisonAI/sessions/67d22169-f2e6-4727-9804-065fd54f9f78 Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com>
1 parent e11d456 commit 86f7c53

File tree

3 files changed

+39
-34
lines changed

3 files changed

+39
-34
lines changed

src/praisonai-agents/praisonaiagents/llm/llm.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import json
1717
import xml.etree.ElementTree as ET
1818
# Gap 2: Tool call execution imports
19-
from ..tools.call_executor import ToolCall, ToolResult, create_tool_call_executor
19+
from ..tools.call_executor import ToolCall, create_tool_call_executor
2020
# Display functions - lazy loaded to avoid importing rich at startup
2121
# These are only needed when output=verbose
2222
_display_module = None
@@ -1918,14 +1918,16 @@ def _prepare_return_value(text: str) -> Union[str, tuple]:
19181918

19191919
tool_results = []
19201920
for tool_result_obj in tool_results_batch:
1921+
if tool_result_obj.error is not None:
1922+
raise tool_result_obj.error
19211923
tool_result = tool_result_obj.result
19221924
tool_results.append(tool_result)
19231925
accumulated_tool_results.append(tool_result)
19241926

19251927
logging.debug(f"[RESPONSES_API] Executed tool {tool_result_obj.function_name} with result: {tool_result}")
19261928

19271929
if verbose:
1928-
display_message = f"Agent {agent_name} called function '{tool_result_obj.function_name}' with arguments: {tool_result_obj.arguments if hasattr(tool_result_obj, 'arguments') else 'N/A'}\n"
1930+
display_message = f"Agent {agent_name} called function '{tool_result_obj.function_name}' with arguments: {tool_result_obj.arguments}\n"
19291931
display_message += f"Function returned: {tool_result}" if tool_result else "Function returned no output"
19301932
_get_display_functions()['display_tool_call'](display_message, console=self.console)
19311933

@@ -1934,7 +1936,7 @@ def _prepare_return_value(text: str) -> Union[str, tuple]:
19341936
'tool_call',
19351937
message=f"Calling function: {tool_result_obj.function_name}",
19361938
tool_name=tool_result_obj.function_name,
1937-
tool_input=tool_result_obj.arguments if hasattr(tool_result_obj, 'arguments') else {},
1939+
tool_input=tool_result_obj.arguments,
19381940
tool_output=result_str[:200] if result_str else None,
19391941
)
19401942

@@ -1949,7 +1951,7 @@ def _prepare_return_value(text: str) -> Union[str, tuple]:
19491951
content = json.dumps(tool_result)
19501952
messages.append({
19511953
"role": "tool",
1952-
"tool_call_id": tool_call_id,
1954+
"tool_call_id": tool_result_obj.tool_call_id,
19531955
"content": content,
19541956
})
19551957

@@ -5504,4 +5506,4 @@ def _generate_tool_definition(self, function_or_name) -> Optional[Dict]:
55045506
}
55055507
}
55065508
logging.debug(f"Generated tool definition: {tool_def}")
5507-
return tool_def
5509+
return tool_def

src/praisonai-agents/praisonaiagents/tools/call_executor.py

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,11 @@
1111
- Thread-safe with bounded workers
1212
"""
1313

14-
import asyncio
1514
import concurrent.futures
1615
import logging
17-
from typing import Any, Callable, Dict, List, Optional, Protocol, Union
16+
from typing import Any, Callable, Dict, List, Optional, Protocol
1817
from dataclasses import dataclass
19-
from threading import BoundedSemaphore
18+
from ..trace.context_events import copy_context_to_callable
2019

2120
logger = logging.getLogger(__name__)
2221

@@ -34,6 +33,7 @@ class ToolCall:
3433
class ToolResult:
3534
"""Result of executing a single tool call."""
3635
function_name: str
36+
arguments: Dict[str, Any]
3737
result: Any
3838
tool_call_id: str
3939
is_ollama: bool
@@ -85,6 +85,7 @@ def execute_batch(
8585
)
8686
results.append(ToolResult(
8787
function_name=tool_call.function_name,
88+
arguments=tool_call.arguments,
8889
result=result,
8990
tool_call_id=tool_call.tool_call_id,
9091
is_ollama=tool_call.is_ollama
@@ -93,6 +94,7 @@ def execute_batch(
9394
logger.error(f"Tool execution error for {tool_call.function_name}: {e}")
9495
results.append(ToolResult(
9596
function_name=tool_call.function_name,
97+
arguments=tool_call.arguments,
9698
result=f"Error executing tool: {e}",
9799
tool_call_id=tool_call.tool_call_id,
98100
is_ollama=tool_call.is_ollama,
@@ -120,7 +122,6 @@ def __init__(self, max_workers: int = 5):
120122
max_workers: Maximum concurrent tool executions (default 5)
121123
"""
122124
self.max_workers = max_workers
123-
self._semaphore = BoundedSemaphore(max_workers)
124125

125126
def execute_batch(
126127
self,
@@ -138,34 +139,35 @@ def execute_batch(
138139

139140
def _execute_single_tool(tool_call: ToolCall) -> ToolResult:
140141
"""Execute a single tool call with error handling."""
141-
with self._semaphore: # Respect max_workers bound
142-
try:
143-
result = execute_tool_fn(
144-
tool_call.function_name,
145-
tool_call.arguments,
146-
tool_call.tool_call_id
147-
)
148-
return ToolResult(
149-
function_name=tool_call.function_name,
150-
result=result,
151-
tool_call_id=tool_call.tool_call_id,
152-
is_ollama=tool_call.is_ollama
153-
)
154-
except Exception as e:
155-
logger.error(f"Tool execution error for {tool_call.function_name}: {e}")
156-
return ToolResult(
157-
function_name=tool_call.function_name,
158-
result=f"Error executing tool: {e}",
159-
tool_call_id=tool_call.tool_call_id,
160-
is_ollama=tool_call.is_ollama,
161-
error=e
162-
)
142+
try:
143+
result = execute_tool_fn(
144+
tool_call.function_name,
145+
tool_call.arguments,
146+
tool_call.tool_call_id
147+
)
148+
return ToolResult(
149+
function_name=tool_call.function_name,
150+
arguments=tool_call.arguments,
151+
result=result,
152+
tool_call_id=tool_call.tool_call_id,
153+
is_ollama=tool_call.is_ollama
154+
)
155+
except Exception as e:
156+
logger.error(f"Tool execution error for {tool_call.function_name}: {e}")
157+
return ToolResult(
158+
function_name=tool_call.function_name,
159+
arguments=tool_call.arguments,
160+
result=f"Error executing tool: {e}",
161+
tool_call_id=tool_call.tool_call_id,
162+
is_ollama=tool_call.is_ollama,
163+
error=e
164+
)
163165

164166
# Use ThreadPoolExecutor for sync tools
165167
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
166168
# Submit all tool calls
167169
future_to_index = {
168-
executor.submit(_execute_single_tool, tool_call): i
170+
executor.submit(copy_context_to_callable(_execute_single_tool), tool_call): i
169171
for i, tool_call in enumerate(tool_calls)
170172
}
171173

@@ -192,4 +194,4 @@ def create_tool_call_executor(parallel: bool = False, max_workers: int = 5) -> T
192194
if parallel:
193195
return ParallelToolCallExecutor(max_workers=max_workers)
194196
else:
195-
return SequentialToolCallExecutor()
197+
return SequentialToolCallExecutor()

src/praisonai-agents/test_parallel_tools.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def mock_execute_tool(name: str, args: dict, tool_call_id: str = None) -> str:
9191
assert len(seq_results) == len(par_results)
9292
for i, (seq_result, par_result) in enumerate(zip(seq_results, par_results)):
9393
assert seq_result.function_name == par_result.function_name
94+
assert seq_result.arguments == par_result.arguments
9495
assert seq_result.tool_call_id == par_result.tool_call_id
9596
print(f" Result {i+1}: {seq_result.function_name} -> {seq_result.result}")
9697

@@ -194,4 +195,4 @@ def main():
194195
print("reducing latency for I/O-bound workflows while maintaining backward compatibility.")
195196

196197
if __name__ == "__main__":
197-
main()
198+
main()

0 commit comments

Comments
 (0)