Skip to content

Commit f444a81

Browse files
committed
using callback to make agent stateless
1 parent ffe8fac commit f444a81

File tree

9 files changed

+64
-37
lines changed

9 files changed

+64
-37
lines changed

AgentCrew/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.6.6-3"
1+
__version__ = "0.6.6-4"

AgentCrew/main_docker.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ def setup_agents(services, config_path, remoting_provider=None, model_id=None):
284284
if remoting_provider:
285285
agent.set_custom_system_prompt(agent_manager.get_remote_system_prompt())
286286
agent.is_remoting_mode = True
287+
agent.activate()
287288
agent_manager.register_agent(agent)
288289

289290
from AgentCrew.modules.mcpclient.tool import register as mcp_register
@@ -468,13 +469,13 @@ def chat(provider, agent_config, mcp_config, memory_llm):
468469

469470
ui = ConsoleUI(message_handler)
470471
ui.start()
471-
except SystemExit:
472-
from AgentCrew.modules.mcpclient import MCPSessionManager
473-
474-
MCPSessionManager.get_instance().cleanup()
475472
except Exception as e:
476473
print(traceback.format_exc())
477474
click.echo(f"❌ Error: {str(e)}", err=True)
475+
finally:
476+
from AgentCrew.modules.mcpclient import MCPSessionManager
477+
478+
MCPSessionManager.get_instance().cleanup()
478479

479480

480481
@cli.command()
@@ -562,13 +563,13 @@ def a2a_server(
562563
click.echo(f"Starting A2A server on {host}:{port}")
563564
click.echo(f"Available agents: {', '.join(agent_manager.agents.keys())}")
564565
server.start()
565-
except SystemExit:
566-
from AgentCrew.modules.mcpclient import MCPSessionManager
567-
568-
MCPSessionManager.get_instance().cleanup()
569566
except Exception as e:
570567
print(traceback.format_exc())
571568
click.echo(f"❌ Error: {str(e)}", err=True)
569+
finally:
570+
from AgentCrew.modules.mcpclient import MCPSessionManager
571+
572+
MCPSessionManager.get_instance().cleanup()
572573

573574

574575
@cli.command()

AgentCrew/modules/a2a/task_manager.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,6 @@ async def on_send_message(
8282
)
8383
)
8484

85-
agent.activate()
86-
8785
# Generate task ID from message
8886
task_id = (
8987
request.params.message.taskId or f"task_{request.params.message.messageId}"
@@ -195,6 +193,9 @@ async def _process_agent_task(self, agent: LocalAgent, task: Task):
195193
if task.id not in self.task_history:
196194
raise ValueError("Task history is not existed")
197195

196+
input_tokens = 0
197+
output_tokens = 0
198+
198199
async def _process_task():
199200
# Process with agent
200201

@@ -203,12 +204,21 @@ async def _process_task():
203204
response_message = ""
204205
thinking_content = ""
205206
thinking_signature = ""
207+
tool_uses = []
208+
209+
def process_result(_tool_uses, _input_tokens, _output_tokens):
210+
nonlocal tool_uses, input_tokens, output_tokens
211+
tool_uses = _tool_uses
212+
input_tokens += _input_tokens
213+
output_tokens += _output_tokens
206214

207215
async for (
208216
response_message,
209217
chunk_text,
210218
thinking_chunk,
211-
) in agent.process_messages(self.task_history[task.id]):
219+
) in agent.process_messages(
220+
self.task_history[task.id], callback=process_result
221+
):
212222
# Update current response
213223
if response_message:
214224
current_response = response_message
@@ -260,8 +270,6 @@ async def _process_task():
260270
)
261271
)
262272

263-
# Get final result
264-
tool_uses, input_tokens, output_tokens = agent.get_process_result()
265273
if tool_uses and len(tool_uses) > 0:
266274
# Add thinking content as a separate message if available
267275
thinking_data = (

AgentCrew/modules/agents/base.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from typing import AsyncGenerator, Tuple, Dict, List, Optional, Any
2+
from typing import AsyncGenerator, Tuple, Dict, List, Optional, Any, Callable
33
from enum import Enum
44

55

@@ -77,7 +77,9 @@ def calculate_usage_cost(self, input_tokens, output_tokens) -> float:
7777

7878
@abstractmethod
7979
async def process_messages(
80-
self, messages: Optional[List[Dict[str, Any]]] = None
80+
self,
81+
messages: Optional[List[Dict[str, Any]]] = None,
82+
callback: Optional[Callable] = None,
8183
) -> AsyncGenerator:
8284
"""
8385
Process messages using this agent.

AgentCrew/modules/agents/local_agent.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from datetime import datetime
22
import os
3-
from typing import Dict, Any, List, Optional
3+
from typing import Dict, Any, List, Optional, Callable
44
from AgentCrew.modules.llm.base import BaseLLMService
55
from AgentCrew.modules.llm.message import MessageTransformer
66
from AgentCrew.modules.agents.base import BaseAgent, MessageType
@@ -385,7 +385,11 @@ def update_llm_service(self, new_llm_service: BaseLLMService) -> bool:
385385

386386
return True
387387

388-
async def process_messages(self, messages: Optional[List[Dict[str, Any]]] = None):
388+
async def process_messages(
389+
self,
390+
messages: Optional[List[Dict[str, Any]]] = None,
391+
callback: Optional[Callable] = None,
392+
):
389393
"""
390394
Process messages using this agent.
391395
@@ -400,9 +404,9 @@ async def process_messages(self, messages: Optional[List[Dict[str, Any]]] = None
400404
)
401405

402406
assistant_response = ""
403-
self.tool_uses = []
404-
self.input_tokens_usage = 0
405-
self.output_tokens_usage = 0
407+
_tool_uses = []
408+
_input_tokens_usage = 0
409+
_output_tokens_usage = 0
406410
# Ensure the first message is a system message with the agent's prompt
407411
if not messages:
408412
final_messages = list(self.history)
@@ -452,15 +456,23 @@ async def process_messages(self, messages: Optional[List[Dict[str, Any]]] = None
452456
chunk_text,
453457
thinking_chunk,
454458
) = self.llm.process_stream_chunk(
455-
chunk, assistant_response, self.tool_uses
459+
chunk, assistant_response, _tool_uses
456460
)
461+
yield (assistant_response, chunk_text, thinking_chunk)
462+
457463
if tool_uses:
458-
self.tool_uses = tool_uses
464+
_tool_uses = tool_uses
459465
if chunk_input_tokens > 0:
460-
self.input_tokens_usage = chunk_input_tokens
466+
_input_tokens_usage = chunk_input_tokens
461467
if chunk_output_tokens > 0:
462-
self.output_tokens_usage = chunk_output_tokens
463-
yield (assistant_response, chunk_text, thinking_chunk)
468+
_output_tokens_usage = chunk_output_tokens
469+
if callback:
470+
callback(_tool_uses, _input_tokens_usage, _output_tokens_usage)
471+
else:
472+
self.tool_uses = _tool_uses
473+
self.input_tokens_usage = _input_tokens_usage
474+
self.output_tokens_usage = _output_tokens_usage
475+
464476
except GeneratorExit as e:
465477
logger.warning(f"Stream processing interrupted: {e}")
466478
finally:

AgentCrew/modules/agents/remote_agent.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Dict, Any, List, Optional, Tuple
1+
from typing import Callable, Dict, Any, List, Optional, Tuple
22
from uuid import uuid4
33

44
from pydantic import ValidationError
@@ -95,7 +95,11 @@ def configure_think(self, think_setting):
9595
def calculate_usage_cost(self, input_tokens, output_tokens) -> float:
9696
return 0.0
9797

98-
async def process_messages(self, messages: Optional[List[Dict[str, Any]]] = None):
98+
async def process_messages(
99+
self,
100+
messages: Optional[List[Dict[str, Any]]] = None,
101+
callback: Optional[Callable] = None,
102+
):
99103
if not self.client or not self.agent_card:
100104
raise ValidationError(
101105
f"RemoteAgent '{self.name}' not properly initialized."

AgentCrew/modules/chat/message/handler.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,15 @@ async def get_assistant_response(
195195
# Create a reference to the streaming generator
196196
self.stream_generator = None
197197

198+
def process_result(_tool_uses, _input_tokens, _output_tokens):
199+
nonlocal tool_uses, input_tokens, output_tokens
200+
tool_uses = _tool_uses
201+
input_tokens += _input_tokens
202+
output_tokens += _output_tokens
203+
198204
try:
199205
# Store the generator in a variable so we can properly close it if needed
200-
self.stream_generator = self.agent.process_messages()
206+
self.stream_generator = self.agent.process_messages(callback=process_result)
201207

202208
async for (
203209
assistant_response,
@@ -239,12 +245,6 @@ async def get_assistant_response(
239245
time.sleep(0.5)
240246
self._notify("response_chunk", (chunk_text, assistant_response))
241247

242-
tool_uses, input_tokens_in_turn, output_tokens_in_turn = (
243-
self.agent.get_process_result()
244-
)
245-
input_tokens += input_tokens_in_turn
246-
output_tokens += output_tokens_in_turn
247-
248248
# Handle tool use if needed
249249
if not has_stop_interupted and tool_uses and len(tool_uses) > 0:
250250
# Add thinking content as a separate message if available

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "agentcrew-ai"
3-
version = "0.6.6-3"
3+
version = "0.6.6-4"
44
requires-python = ">=3.12"
55
classifiers = [
66
"Programming Language :: Python :: 3",

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)