Skip to content

Commit 7fe48bf

Browse files
committed
fix(a2a): think content should treat as artifact
1 parent 6d63ab0 commit 7fe48bf

File tree

1 file changed

+175
-155
lines changed

1 file changed

+175
-155
lines changed

AgentCrew/modules/a2a/task_manager.py

Lines changed: 175 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -342,60 +342,72 @@ async def _process_agent_task(self, agent: LocalAgent, task: Task):
342342
output_tokens = 0
343343

344344
async def _process_task():
345-
current_response = ""
346-
response_message = ""
347-
thinking_content = ""
348-
thinking_signature = ""
349-
tool_uses = []
350-
351-
def process_result(_tool_uses, _input_tokens, _output_tokens):
352-
nonlocal tool_uses, input_tokens, output_tokens
353-
tool_uses = _tool_uses
354-
input_tokens += _input_tokens
355-
output_tokens += _output_tokens
345+
try:
346+
current_response = ""
347+
response_message = ""
348+
thinking_content = ""
349+
thinking_signature = ""
350+
tool_uses = []
351+
352+
def process_result(_tool_uses, _input_tokens, _output_tokens):
353+
nonlocal tool_uses, input_tokens, output_tokens
354+
tool_uses = _tool_uses
355+
input_tokens += _input_tokens
356+
output_tokens += _output_tokens
357+
358+
task_history = await self.store.get_task_history(task.context_id)
359+
async for (
360+
response_message,
361+
chunk_text,
362+
thinking_chunk,
363+
) in agent.process_messages(task_history, callback=process_result):
364+
if response_message:
365+
current_response = response_message
366+
367+
task.status.state = TaskState.working
368+
task.status.timestamp = datetime.now().isoformat()
369+
370+
if task.id in self.streaming_enabled_tasks:
371+
if thinking_chunk:
372+
think_text_chunk, signature = thinking_chunk
373+
if think_text_chunk:
374+
thinking_content += think_text_chunk
375+
376+
thinking_artifact = convert_agent_response_to_a2a_artifact(
377+
think_text_chunk,
378+
artifact_id=f"thinking_{task.id}_{datetime.now()}",
379+
)
380+
await self._record_and_emit_event(
381+
task.id,
382+
TaskArtifactUpdateEvent(
383+
task_id=task.id,
384+
context_id=task.context_id,
385+
artifact=thinking_artifact,
386+
),
387+
)
388+
if signature:
389+
thinking_signature += signature
356390

357-
task_history = await self.store.get_task_history(task.context_id)
358-
async for (
359-
response_message,
360-
chunk_text,
361-
thinking_chunk,
362-
) in agent.process_messages(task_history, callback=process_result):
363-
if response_message:
364-
current_response = response_message
365-
366-
task.status.state = TaskState.working
367-
task.status.timestamp = datetime.now().isoformat()
368-
369-
if task.id in self.streaming_enabled_tasks:
370-
if thinking_chunk:
371-
think_text_chunk, signature = thinking_chunk
372-
if think_text_chunk:
373-
thinking_content += think_text_chunk
391+
if chunk_text:
392+
artifact = convert_agent_response_to_a2a_artifact(
393+
chunk_text,
394+
artifact_id=f"artifact_{task.id}_{len(artifacts)}",
395+
)
374396
await self._record_and_emit_event(
375397
task.id,
376-
TaskStatusUpdateEvent(
398+
TaskArtifactUpdateEvent(
377399
task_id=task.id,
378400
context_id=task.context_id,
379-
status=TaskStatus(
380-
state=TaskState.working,
381-
message=convert_agent_message_to_a2a(
382-
{
383-
"role": "agent",
384-
"content": think_text_chunk,
385-
},
386-
f"msg_thinking_{hash(think_text_chunk)}",
387-
),
388-
),
389-
final=False,
401+
artifact=artifact,
390402
),
391403
)
392-
if signature:
393-
thinking_signature += signature
394404

395-
if chunk_text:
405+
if tool_uses and len(tool_uses) > 0:
406+
if task.id in self.streaming_enabled_tasks:
396407
artifact = convert_agent_response_to_a2a_artifact(
397-
chunk_text,
408+
"",
398409
artifact_id=f"artifact_{task.id}_{len(artifacts)}",
410+
tool_uses=tool_uses,
399411
)
400412
await self._record_and_emit_event(
401413
task.id,
@@ -405,119 +417,81 @@ def process_result(_tool_uses, _input_tokens, _output_tokens):
405417
artifact=artifact,
406418
),
407419
)
420+
await asyncio.sleep(0.7)
408421

409-
if tool_uses and len(tool_uses) > 0:
410-
if task.id in self.streaming_enabled_tasks:
411-
artifact = convert_agent_response_to_a2a_artifact(
412-
"",
413-
artifact_id=f"artifact_{task.id}_{len(artifacts)}",
414-
tool_uses=tool_uses,
415-
)
416-
await self._record_and_emit_event(
417-
task.id,
418-
TaskArtifactUpdateEvent(
419-
task_id=task.id,
420-
context_id=task.context_id,
421-
artifact=artifact,
422-
),
422+
thinking_data = (
423+
(thinking_content, thinking_signature)
424+
if thinking_content
425+
else None
423426
)
424-
await asyncio.sleep(0.7)
425-
426-
thinking_data = (
427-
(thinking_content, thinking_signature)
428-
if thinking_content
429-
else None
430-
)
431-
thinking_message = agent.format_message(
432-
MessageType.Thinking, {"thinking": thinking_data}
433-
)
434-
if thinking_message:
435-
await self.store.append_task_history_message(
436-
task.context_id, thinking_message
427+
thinking_message = agent.format_message(
428+
MessageType.Thinking, {"thinking": thinking_data}
437429
)
430+
if thinking_message:
431+
await self.store.append_task_history_message(
432+
task.context_id, thinking_message
433+
)
438434

439-
assistant_message = agent.format_message(
440-
MessageType.Assistant,
441-
{
442-
"message": response_message,
443-
"tool_uses": [
444-
t for t in tool_uses if t["name"] != "transfer"
445-
],
446-
},
447-
)
448-
if assistant_message:
449-
await self.store.append_task_history_message(
450-
task.context_id, assistant_message
435+
assistant_message = agent.format_message(
436+
MessageType.Assistant,
437+
{
438+
"message": response_message,
439+
"tool_uses": [
440+
t for t in tool_uses if t["name"] != "transfer"
441+
],
442+
},
451443
)
452-
453-
for tool_use in tool_uses:
454-
tool_name = tool_use["name"]
455-
456-
if tool_name == "ask":
457-
question = tool_use["input"].get("question", "")
458-
guided_answers = tool_use["input"].get("guided_answers", [])
459-
460-
task.status.state = TaskState.input_required
461-
task.status.timestamp = datetime.now().isoformat()
462-
task.status.message = self._create_ask_tool_message(
463-
question, guided_answers
444+
if assistant_message:
445+
await self.store.append_task_history_message(
446+
task.context_id, assistant_message
464447
)
465448

466-
await self._record_and_emit_event(
467-
task.id,
468-
TaskStatusUpdateEvent(
469-
task_id=task.id,
470-
context_id=task.context_id,
471-
status=task.status,
472-
final=False,
473-
),
474-
)
449+
for tool_use in tool_uses:
450+
tool_name = tool_use["name"]
475451

476-
wait_event = asyncio.Event()
477-
self.pending_ask_responses[task.id] = wait_event
452+
if tool_name == "ask":
453+
question = tool_use["input"].get("question", "")
454+
guided_answers = tool_use["input"].get(
455+
"guided_answers", []
456+
)
478457

479-
try:
480-
await asyncio.wait_for(wait_event.wait(), timeout=300)
481-
user_answer = self.ask_responses.get(
482-
task.id, "No response received"
458+
task.status.state = TaskState.input_required
459+
task.status.timestamp = datetime.now().isoformat()
460+
task.status.message = self._create_ask_tool_message(
461+
question, guided_answers
483462
)
484-
except asyncio.TimeoutError:
485-
user_answer = "User did not respond in time."
486-
finally:
487-
self.pending_ask_responses.pop(task.id, None)
488-
self.ask_responses.pop(task.id, None)
489463

490-
tool_result = f"User's answer: {user_answer}"
464+
await self._record_and_emit_event(
465+
task.id,
466+
TaskStatusUpdateEvent(
467+
task_id=task.id,
468+
context_id=task.context_id,
469+
status=task.status,
470+
final=False,
471+
),
472+
)
491473

492-
task.status.state = TaskState.working
493-
task.status.timestamp = datetime.now().isoformat()
494-
task.status.message = None
474+
wait_event = asyncio.Event()
475+
self.pending_ask_responses[task.id] = wait_event
495476

496-
tool_result_message = agent.format_message(
497-
MessageType.ToolResult,
498-
{"tool_use": tool_use, "tool_result": tool_result},
499-
)
500-
if tool_result_message:
501-
await self.store.append_task_history_message(
502-
task.context_id, tool_result_message
503-
)
477+
try:
478+
await asyncio.wait_for(
479+
wait_event.wait(), timeout=300
480+
)
481+
user_answer = self.ask_responses.get(
482+
task.id, "No response received"
483+
)
484+
except asyncio.TimeoutError:
485+
user_answer = "User did not respond in time."
486+
finally:
487+
self.pending_ask_responses.pop(task.id, None)
488+
self.ask_responses.pop(task.id, None)
504489

505-
await self._record_and_emit_event(
506-
task.id,
507-
TaskStatusUpdateEvent(
508-
task_id=task.id,
509-
context_id=task.context_id,
510-
status=task.status,
511-
final=False,
512-
),
513-
)
490+
tool_result = f"User's answer: {user_answer}"
514491

515-
else:
516-
try:
517-
tool_result = await agent.execute_tool_call(
518-
tool_name,
519-
tool_use["input"],
520-
)
492+
task.status.state = TaskState.working
493+
task.status.timestamp = datetime.now().isoformat()
494+
task.status.message = None
521495

522496
tool_result_message = agent.format_message(
523497
MessageType.ToolResult,
@@ -528,22 +502,68 @@ def process_result(_tool_uses, _input_tokens, _output_tokens):
528502
task.context_id, tool_result_message
529503
)
530504

531-
except Exception as e:
532-
error_message = agent.format_message(
533-
MessageType.ToolResult,
534-
{
535-
"tool_use": tool_use,
536-
"tool_result": str(e),
537-
"is_error": True,
538-
},
505+
await self._record_and_emit_event(
506+
task.id,
507+
TaskStatusUpdateEvent(
508+
task_id=task.id,
509+
context_id=task.context_id,
510+
status=task.status,
511+
final=False,
512+
),
539513
)
540-
if error_message:
541-
await self.store.append_task_history_message(
542-
task.context_id, error_message
514+
515+
else:
516+
try:
517+
tool_result = await agent.execute_tool_call(
518+
tool_name,
519+
tool_use["input"],
543520
)
544521

545-
return await _process_task()
546-
return current_response
522+
tool_result_message = agent.format_message(
523+
MessageType.ToolResult,
524+
{
525+
"tool_use": tool_use,
526+
"tool_result": tool_result,
527+
},
528+
)
529+
if tool_result_message:
530+
await self.store.append_task_history_message(
531+
task.context_id, tool_result_message
532+
)
533+
534+
except Exception as e:
535+
error_message = agent.format_message(
536+
MessageType.ToolResult,
537+
{
538+
"tool_use": tool_use,
539+
"tool_result": str(e),
540+
"is_error": True,
541+
},
542+
)
543+
if error_message:
544+
await self.store.append_task_history_message(
545+
task.context_id, error_message
546+
)
547+
548+
return await _process_task()
549+
return current_response
550+
except Exception as e:
551+
from openai import BadRequestError
552+
553+
if isinstance(e, BadRequestError):
554+
if e.code == "model_max_prompt_tokens_exceeded":
555+
from AgentCrew.modules.agents import LocalAgent
556+
from AgentCrew.modules.llm.model_registry import (
557+
ModelRegistry,
558+
)
559+
560+
if isinstance(agent, LocalAgent):
561+
max_token = ModelRegistry.get_model_limit(
562+
agent.get_model()
563+
)
564+
agent.input_tokens_usage = max_token
565+
return await _process_task()
566+
raise e
547567

548568
current_response = await _process_task()
549569
if current_response.strip():

0 commit comments

Comments
 (0)