3737from .adapters import (
3838 convert_a2a_message_to_agent ,
3939 convert_agent_response_to_a2a_artifact ,
40- convert_agent_message_to_a2a ,
4140)
4241from .common .server .task_manager import TaskManager
4342from .errors import A2AError
@@ -342,60 +341,72 @@ async def _process_agent_task(self, agent: LocalAgent, task: Task):
342341 output_tokens = 0
343342
344343 async def _process_task ():
345- current_response = ""
346- response_message = ""
347- thinking_content = ""
348- thinking_signature = ""
349- tool_uses = []
350-
351- def process_result (_tool_uses , _input_tokens , _output_tokens ):
352- nonlocal tool_uses , input_tokens , output_tokens
353- tool_uses = _tool_uses
354- input_tokens += _input_tokens
355- output_tokens += _output_tokens
344+ try :
345+ current_response = ""
346+ response_message = ""
347+ thinking_content = ""
348+ thinking_signature = ""
349+ tool_uses = []
350+
351+ def process_result (_tool_uses , _input_tokens , _output_tokens ):
352+ nonlocal tool_uses , input_tokens , output_tokens
353+ tool_uses = _tool_uses
354+ input_tokens += _input_tokens
355+ output_tokens += _output_tokens
356+
357+ task_history = await self .store .get_task_history (task .context_id )
358+ async for (
359+ response_message ,
360+ chunk_text ,
361+ thinking_chunk ,
362+ ) in agent .process_messages (task_history , callback = process_result ):
363+ if response_message :
364+ current_response = response_message
365+
366+ task .status .state = TaskState .working
367+ task .status .timestamp = datetime .now ().isoformat ()
368+
369+ if task .id in self .streaming_enabled_tasks :
370+ if thinking_chunk :
371+ think_text_chunk , signature = thinking_chunk
372+ if think_text_chunk :
373+ thinking_content += think_text_chunk
374+
375+ thinking_artifact = convert_agent_response_to_a2a_artifact (
376+ think_text_chunk ,
377+ artifact_id = f"thinking_{ task .id } _{ datetime .now ()} " ,
378+ )
379+ await self ._record_and_emit_event (
380+ task .id ,
381+ TaskArtifactUpdateEvent (
382+ task_id = task .id ,
383+ context_id = task .context_id ,
384+ artifact = thinking_artifact ,
385+ ),
386+ )
387+ if signature :
388+ thinking_signature += signature
356389
357- task_history = await self .store .get_task_history (task .context_id )
358- async for (
359- response_message ,
360- chunk_text ,
361- thinking_chunk ,
362- ) in agent .process_messages (task_history , callback = process_result ):
363- if response_message :
364- current_response = response_message
365-
366- task .status .state = TaskState .working
367- task .status .timestamp = datetime .now ().isoformat ()
368-
369- if task .id in self .streaming_enabled_tasks :
370- if thinking_chunk :
371- think_text_chunk , signature = thinking_chunk
372- if think_text_chunk :
373- thinking_content += think_text_chunk
390+ if chunk_text :
391+ artifact = convert_agent_response_to_a2a_artifact (
392+ chunk_text ,
393+ artifact_id = f"artifact_{ task .id } _{ len (artifacts )} " ,
394+ )
374395 await self ._record_and_emit_event (
375396 task .id ,
376- TaskStatusUpdateEvent (
397+ TaskArtifactUpdateEvent (
377398 task_id = task .id ,
378399 context_id = task .context_id ,
379- status = TaskStatus (
380- state = TaskState .working ,
381- message = convert_agent_message_to_a2a (
382- {
383- "role" : "agent" ,
384- "content" : think_text_chunk ,
385- },
386- f"msg_thinking_{ hash (think_text_chunk )} " ,
387- ),
388- ),
389- final = False ,
400+ artifact = artifact ,
390401 ),
391402 )
392- if signature :
393- thinking_signature += signature
394403
395- if chunk_text :
404+ if tool_uses and len (tool_uses ) > 0 :
405+ if task .id in self .streaming_enabled_tasks :
396406 artifact = convert_agent_response_to_a2a_artifact (
397- chunk_text ,
407+ "" ,
398408 artifact_id = f"artifact_{ task .id } _{ len (artifacts )} " ,
409+ tool_uses = tool_uses ,
399410 )
400411 await self ._record_and_emit_event (
401412 task .id ,
@@ -405,119 +416,81 @@ def process_result(_tool_uses, _input_tokens, _output_tokens):
405416 artifact = artifact ,
406417 ),
407418 )
419+ await asyncio .sleep (0.7 )
408420
409- if tool_uses and len (tool_uses ) > 0 :
410- if task .id in self .streaming_enabled_tasks :
411- artifact = convert_agent_response_to_a2a_artifact (
412- "" ,
413- artifact_id = f"artifact_{ task .id } _{ len (artifacts )} " ,
414- tool_uses = tool_uses ,
415- )
416- await self ._record_and_emit_event (
417- task .id ,
418- TaskArtifactUpdateEvent (
419- task_id = task .id ,
420- context_id = task .context_id ,
421- artifact = artifact ,
422- ),
421+ thinking_data = (
422+ (thinking_content , thinking_signature )
423+ if thinking_content
424+ else None
423425 )
424- await asyncio .sleep (0.7 )
425-
426- thinking_data = (
427- (thinking_content , thinking_signature )
428- if thinking_content
429- else None
430- )
431- thinking_message = agent .format_message (
432- MessageType .Thinking , {"thinking" : thinking_data }
433- )
434- if thinking_message :
435- await self .store .append_task_history_message (
436- task .context_id , thinking_message
426+ thinking_message = agent .format_message (
427+ MessageType .Thinking , {"thinking" : thinking_data }
437428 )
429+ if thinking_message :
430+ await self .store .append_task_history_message (
431+ task .context_id , thinking_message
432+ )
438433
439- assistant_message = agent .format_message (
440- MessageType .Assistant ,
441- {
442- "message" : response_message ,
443- "tool_uses" : [
444- t for t in tool_uses if t ["name" ] != "transfer"
445- ],
446- },
447- )
448- if assistant_message :
449- await self .store .append_task_history_message (
450- task .context_id , assistant_message
434+ assistant_message = agent .format_message (
435+ MessageType .Assistant ,
436+ {
437+ "message" : response_message ,
438+ "tool_uses" : [
439+ t for t in tool_uses if t ["name" ] != "transfer"
440+ ],
441+ },
451442 )
452-
453- for tool_use in tool_uses :
454- tool_name = tool_use ["name" ]
455-
456- if tool_name == "ask" :
457- question = tool_use ["input" ].get ("question" , "" )
458- guided_answers = tool_use ["input" ].get ("guided_answers" , [])
459-
460- task .status .state = TaskState .input_required
461- task .status .timestamp = datetime .now ().isoformat ()
462- task .status .message = self ._create_ask_tool_message (
463- question , guided_answers
443+ if assistant_message :
444+ await self .store .append_task_history_message (
445+ task .context_id , assistant_message
464446 )
465447
466- await self ._record_and_emit_event (
467- task .id ,
468- TaskStatusUpdateEvent (
469- task_id = task .id ,
470- context_id = task .context_id ,
471- status = task .status ,
472- final = False ,
473- ),
474- )
448+ for tool_use in tool_uses :
449+ tool_name = tool_use ["name" ]
475450
476- wait_event = asyncio .Event ()
477- self .pending_ask_responses [task .id ] = wait_event
451+ if tool_name == "ask" :
452+ question = tool_use ["input" ].get ("question" , "" )
453+ guided_answers = tool_use ["input" ].get (
454+ "guided_answers" , []
455+ )
478456
479- try :
480- await asyncio . wait_for ( wait_event . wait (), timeout = 300 )
481- user_answer = self .ask_responses . get (
482- task . id , "No response received"
457+ task . status . state = TaskState . input_required
458+ task . status . timestamp = datetime . now (). isoformat ( )
459+ task . status . message = self ._create_ask_tool_message (
460+ question , guided_answers
483461 )
484- except asyncio .TimeoutError :
485- user_answer = "User did not respond in time."
486- finally :
487- self .pending_ask_responses .pop (task .id , None )
488- self .ask_responses .pop (task .id , None )
489462
490- tool_result = f"User's answer: { user_answer } "
463+ await self ._record_and_emit_event (
464+ task .id ,
465+ TaskStatusUpdateEvent (
466+ task_id = task .id ,
467+ context_id = task .context_id ,
468+ status = task .status ,
469+ final = False ,
470+ ),
471+ )
491472
492- task .status .state = TaskState .working
493- task .status .timestamp = datetime .now ().isoformat ()
494- task .status .message = None
473+ wait_event = asyncio .Event ()
474+ self .pending_ask_responses [task .id ] = wait_event
495475
496- tool_result_message = agent .format_message (
497- MessageType .ToolResult ,
498- {"tool_use" : tool_use , "tool_result" : tool_result },
499- )
500- if tool_result_message :
501- await self .store .append_task_history_message (
502- task .context_id , tool_result_message
503- )
476+ try :
477+ await asyncio .wait_for (
478+ wait_event .wait (), timeout = 300
479+ )
480+ user_answer = self .ask_responses .get (
481+ task .id , "No response received"
482+ )
483+ except asyncio .TimeoutError :
484+ user_answer = "User did not respond in time."
485+ finally :
486+ self .pending_ask_responses .pop (task .id , None )
487+ self .ask_responses .pop (task .id , None )
504488
505- await self ._record_and_emit_event (
506- task .id ,
507- TaskStatusUpdateEvent (
508- task_id = task .id ,
509- context_id = task .context_id ,
510- status = task .status ,
511- final = False ,
512- ),
513- )
489+ tool_result = f"User's answer: { user_answer } "
514490
515- else :
516- try :
517- tool_result = await agent .execute_tool_call (
518- tool_name ,
519- tool_use ["input" ],
520- )
491+ task .status .state = TaskState .working
492+ task .status .timestamp = datetime .now ().isoformat ()
493+ task .status .message = None
521494
522495 tool_result_message = agent .format_message (
523496 MessageType .ToolResult ,
@@ -528,22 +501,68 @@ def process_result(_tool_uses, _input_tokens, _output_tokens):
528501 task .context_id , tool_result_message
529502 )
530503
531- except Exception as e :
532- error_message = agent . format_message (
533- MessageType . ToolResult ,
534- {
535- "tool_use" : tool_use ,
536- "tool_result" : str ( e ) ,
537- "is_error" : True ,
538- } ,
504+ await self . _record_and_emit_event (
505+ task . id ,
506+ TaskStatusUpdateEvent (
507+ task_id = task . id ,
508+ context_id = task . context_id ,
509+ status = task . status ,
510+ final = False ,
511+ ) ,
539512 )
540- if error_message :
541- await self .store .append_task_history_message (
542- task .context_id , error_message
513+
514+ else :
515+ try :
516+ tool_result = await agent .execute_tool_call (
517+ tool_name ,
518+ tool_use ["input" ],
543519 )
544520
545- return await _process_task ()
546- return current_response
521+ tool_result_message = agent .format_message (
522+ MessageType .ToolResult ,
523+ {
524+ "tool_use" : tool_use ,
525+ "tool_result" : tool_result ,
526+ },
527+ )
528+ if tool_result_message :
529+ await self .store .append_task_history_message (
530+ task .context_id , tool_result_message
531+ )
532+
533+ except Exception as e :
534+ error_message = agent .format_message (
535+ MessageType .ToolResult ,
536+ {
537+ "tool_use" : tool_use ,
538+ "tool_result" : str (e ),
539+ "is_error" : True ,
540+ },
541+ )
542+ if error_message :
543+ await self .store .append_task_history_message (
544+ task .context_id , error_message
545+ )
546+
547+ return await _process_task ()
548+ return current_response
549+ except Exception as e :
550+ from openai import BadRequestError
551+
552+ if isinstance (e , BadRequestError ):
553+ if e .code == "model_max_prompt_tokens_exceeded" :
554+ from AgentCrew .modules .agents import LocalAgent
555+ from AgentCrew .modules .llm .model_registry import (
556+ ModelRegistry ,
557+ )
558+
559+ if isinstance (agent , LocalAgent ):
560+ max_token = ModelRegistry .get_model_limit (
561+ agent .get_model ()
562+ )
563+ agent .input_tokens_usage = max_token
564+ return await _process_task ()
565+ raise e
547566
548567 current_response = await _process_task ()
549568 if current_response .strip ():
0 commit comments