@@ -104,37 +104,47 @@ async def execute_graph(self, source_code: str) -> Dict[str, Any]:
104104 results = {}
105105 failed_nodes = set ()
106106
107- # Execute in topological order
108- for node_id in nx .topological_sort (self .current_dag ):
109- if node_id in failed_nodes :
110- continue
111-
112- node = self .current_dag .nodes [node_id ]['data' ]
113-
114- # Validate state before execution
115- if not self ._validate_state_pre_execution ():
116- # State is invalid, attempt rollback
117- await self ._rollback_to_last_valid_state (self .step_index )
118- continue
107+ # Execute in topological generations for parallel DAG execution
108+ for generation in nx .topological_generations (self .current_dag ):
109+ tasks = []
110+ valid_nodes = []
119111
120- # Record step start
121- self .bus .record_step (self .trace_id , self .step_index , "node_start" , {"node_id" : node_id }, self .state_manager )
122-
123- try :
124- result = await self ._execute_node_with_validation (node )
125- results [node_id ] = result
112+ for node_id in generation :
113+ if node_id in failed_nodes :
114+ continue
115+
116+ node = self .current_dag .nodes [node_id ]['data' ]
126117
127- # Validate state after execution
128- if not self ._validate_state_post_execution ():
129- raise ValueError (f"State validation failed after executing node { node_id } " )
118+ # Validate state before execution
119+ if not self ._validate_state_pre_execution ():
120+ # State is invalid, attempt rollback
121+ await self ._rollback_to_last_valid_state (self .step_index )
122+ break
130123
131- # Record successful step
132- self .bus .record_step (self .trace_id , self .step_index + 1 , "node_success" , {"node_id" : node_id , "result" : str (result )}, self .state_manager )
133- self .step_index += 1
124+ # Record step start
125+ self .bus .record_step (self .trace_id , self .step_index , "node_start" , {"node_id" : node_id }, self .state_manager )
134126
135- except Exception as e :
136- # Failure detected - implement MVCC rollback and AST patching
137- await self ._handle_node_failure (node_id , e , failed_nodes )
127+ tasks .append (self ._execute_node_with_validation (node ))
128+ valid_nodes .append (node_id )
129+
130+ if tasks :
131+ results_list = await asyncio .gather (* tasks , return_exceptions = True )
132+ for idx , res in enumerate (results_list ):
133+ node_id = valid_nodes [idx ]
134+ if isinstance (res , Exception ):
135+ # Failure detected - implement MVCC rollback and AST patching
136+ await self ._handle_node_failure (node_id , res , failed_nodes )
137+ else :
138+ results [node_id ] = res
139+
140+ # Validate state after execution
141+ if not self ._validate_state_post_execution ():
142+ raise ValueError (f"State validation failed after executing node { node_id } " )
143+
144+ # Record successful step
145+ self .bus .record_step (self .trace_id , self .step_index + 1 , "node_success" , {"node_id" : node_id , "result" : str (res )}, self .state_manager )
146+
147+ self .step_index += 1
138148
139149 return results
140150
0 commit comments