File tree Expand file tree Collapse file tree 1 file changed +15
-17
lines changed
src/praisonai-agents/praisonaiagents/agents Expand file tree Collapse file tree 1 file changed +15
-17
lines changed Original file line number Diff line number Diff line change @@ -480,24 +480,22 @@ async def arun_all_tasks(self):
480480 )
481481
482482 if self .process == "workflow" :
483- # Collect all tasks that should run in parallel
484- parallel_tasks = []
483+ tasks_to_run = []
485484 async for task_id in process .aworkflow ():
486- if self .tasks [task_id ].async_execution and self .tasks [task_id ].is_start :
487- parallel_tasks .append (task_id )
488- elif parallel_tasks :
489- # Execute collected parallel tasks
490- await asyncio .gather (* [self .arun_task (t ) for t in parallel_tasks ])
491- parallel_tasks = []
492- # Run the current non-parallel task
493- if self .tasks [task_id ].async_execution :
494- await self .arun_task (task_id )
495- else :
496- self .run_task (task_id )
497-
498- # Execute any remaining parallel tasks
499- if parallel_tasks :
500- await asyncio .gather (* [self .arun_task (t ) for t in parallel_tasks ])
485+ if self .tasks [task_id ].async_execution :
486+ tasks_to_run .append (self .arun_task (task_id ))
487+ else :
488+ # If we encounter a sync task, we must wait for the previous async tasks to finish.
489+ if tasks_to_run :
490+ await asyncio .gather (* tasks_to_run )
491+ tasks_to_run = []
492+
493+ # Run sync task in an executor to avoid blocking the event loop
494+ loop = asyncio .get_event_loop ()
495+ await loop .run_in_executor (None , self .run_task , task_id )
496+
497+ if tasks_to_run :
498+ await asyncio .gather (* tasks_to_run )
501499
502500 elif self .process == "sequential" :
503501 async for task_id in process .asequential ():
You can’t perform that action at this time.
0 commit comments