@@ -266,14 +266,12 @@ async def async_execute_task_plan(task_id: str, user_id: str, run_id: str):
266266
267267 plan_description = task .get ("name" , "Unnamed plan" )
268268 original_context_str = json .dumps (original_context_data , indent = 2 , default = str ) if original_context_data else "No original context provided."
269- found_context_str = task .get ("found_context" , "No additional context was found by the research agent." )
270269 block_id_prompt = f"The block_id for this task is '{ block_id } '. You MUST pass this ID to the 'update_progress' tool in the 'block_id' parameter." if block_id else "This task did not originate from a tasks block."
271270
272271 full_plan_prompt = (
273272 f"You are Sentient, a resourceful and autonomous executor agent. Your goal is to complete the user's request by intelligently following the provided plan.\n \n " # noqa
274273 f"**User Context:**\n - **User's Name:** { user_name } \n - **User's Location:** { user_location } \n - **Current Date & Time:** { current_user_time } \n \n "
275274 f"{ trigger_event_prompt_section } "
276- f"**Retrieved Context (from research agent):**\n { found_context_str } \n \n "
277275 f"Your task ID is '{ task_id } ' and the current run ID is '{ run_id } '.\n \n "
278276 f"The original context that triggered this plan is:\n ---BEGIN CONTEXT---\n { original_context_str } \n ---END CONTEXT---\n \n "
279277 f"**Primary Objective:** '{ plan_description } '\n \n "
@@ -292,41 +290,91 @@ async def async_execute_task_plan(task_id: str, user_id: str, run_id: str):
292290 )
293291
294292 try :
295- initial_messages = [{'role' : 'user' , 'content' : "Begin executing the plan. Follow your instructions meticulously." }]
296-
297293 logger .info (f"Task { task_id } : Starting agent run." )
294+ initial_messages = [{'role' : 'user' , 'content' : "Begin executing the plan. Follow your instructions meticulously." }]
298295
299- final_assistant_content = ""
300- final_history = []
301- for current_history in run_main_agent (
296+ # Step 1: Fully exhaust the generator to let the agent run to completion.
297+ # This prevents async UI updates from interfering with the agent's internal loop.
298+ agent_generator = run_main_agent (
302299 system_message = full_plan_prompt ,
303300 function_list = tools_config ,
304301 messages = initial_messages
305- ):
306- final_history = current_history
307-
308- # After the loop, process the complete response
309- if final_history and final_history [- 1 ].get ("role" ) == "assistant" :
310- final_assistant_content = final_history [- 1 ].get ("content" , "" )
311-
312- final_answer_content = ""
313- if final_assistant_content :
314- parsed_updates = parse_agent_string_to_updates (final_assistant_content )
315- for update in parsed_updates :
316- # Per user request, do not push thoughts to the log, but parse everything else.
317- if update .get ("type" ) == "thought" :
318- continue
319- await add_progress_update (db , task_id , run_id , user_id , update , block_id )
320- if update .get ("type" ) == "final_answer" :
321- final_answer_content = update .get ("content" )
302+ )
303+ all_history_steps = list (agent_generator )
304+
305+ if not all_history_steps :
306+ raise Exception ("Agent run produced no history, indicating an immediate failure." )
307+
308+ # Step 2: After completion, process all history steps to send UI updates.
309+ last_history_len = len (initial_messages )
310+ for current_history in all_history_steps :
311+ new_messages = current_history [last_history_len :]
312+ for msg in new_messages :
313+ updates_to_push = []
314+ if msg .get ("role" ) == "assistant" :
315+ if msg .get ("content" ):
316+ updates_to_push .extend (parse_agent_string_to_updates (msg ["content" ]))
317+ if msg .get ("function_call" ):
318+ fc = msg ["function_call" ]
319+ params_str = fc .get ("arguments" , "{}" )
320+ params = JsonExtractor .extract_valid_json (params_str ) or {"raw_parameters" : params_str }
321+ updates_to_push .append ({
322+ "type" : "tool_call" ,
323+ "tool_name" : fc .get ("name" ),
324+ "parameters" : params
325+ })
326+ elif msg .get ("role" ) == "function" :
327+ result_str = msg .get ("content" , "{}" )
328+ result_content = JsonExtractor .extract_valid_json (result_str ) or {"raw_result" : result_str }
329+ is_error = isinstance (result_content , dict ) and result_content .get ("status" ) == "failure"
330+ updates_to_push .append ({
331+ "type" : "tool_result" ,
332+ "tool_name" : msg .get ("name" ),
333+ "result" : result_content .get ("result" , result_content .get ("error" , result_content )),
334+ "is_error" : is_error
335+ })
336+
337+ for update in updates_to_push :
338+ if update .get ("type" ) != "thought" :
339+ asyncio .create_task (add_progress_update (db , task_id , run_id , user_id , update , block_id ))
340+ last_history_len = len (current_history )
341+
342+ # Step 3: Check the final state for a valid answer.
343+ final_history = all_history_steps [- 1 ]
344+ final_assistant_message = next ((msg for msg in reversed (final_history ) if msg .get ("role" ) == "assistant" ), None )
345+
346+ has_final_answer = False
347+ if final_assistant_message and final_assistant_message .get ("content" ):
348+ parsed_updates = parse_agent_string_to_updates (final_assistant_message ["content" ])
349+ if any (upd .get ("type" ) == "final_answer" for upd in parsed_updates ):
350+ has_final_answer = True
351+
352+ if not has_final_answer :
353+ error_message = "Agent finished execution without providing a final answer as required by its instructions. The task may be incomplete."
354+ logger .error (f"Task { task_id } : { error_message } . Final history: { final_history } " )
355+ await add_progress_update (db , task_id , run_id , user_id , {"type" : "error" , "content" : error_message }, block_id = block_id )
356+ await update_task_run_status (db , task_id , run_id , "error" , user_id , details = {"error" : error_message }, block_id = block_id )
357+
358+ from workers .tasks import calculate_next_run
359+ schedule_type = task .get ('schedule' , {}).get ('type' )
360+ if schedule_type == 'recurring' :
361+ next_run_time , _ = calculate_next_run (task ['schedule' ], last_run = datetime .datetime .now (datetime .timezone .utc ))
362+ await db .tasks .update_one ({"_id" : task ["_id" ]}, {"$set" : {"status" : "active" , "next_execution_at" : next_run_time }})
363+ elif schedule_type == 'triggered' :
364+ await db .tasks .update_one ({"_id" : task ["_id" ]}, {"$set" : {"status" : "active" , "next_execution_at" : None }})
365+ else :
366+ await db .tasks .update_one ({"_id" : task ["_id" ]}, {"$set" : {"status" : "error" , "next_execution_at" : None }})
367+ return {"status" : "error" , "message" : error_message }
322368
369+ # If we have a final answer, the execution was successful.
323370 logger .info (f"Task { task_id } execution phase completed. Dispatching to result generator." )
324371 await add_progress_update (db , task_id , run_id , user_id , {"type" : "info" , "content" : "Execution finished. Generating final report..." }, block_id = block_id )
325372 await update_task_run_status (db , task_id , run_id , "completed" , user_id , block_id = block_id )
326373 capture_event (user_id , "task_execution_succeeded" , {"task_id" : task_id , "run_id" : run_id })
327374
328375 # Call the new result generator task
329376 generate_task_result .delay (task_id , run_id , user_id )
377+
330378 from workers .tasks import calculate_next_run
331379 schedule_type = task .get ('schedule' , {}).get ('type' )
332380 if schedule_type == 'recurring' :
0 commit comments