@@ -80,14 +80,14 @@ def read_json(file_name):
8080 return structure_dic
8181
8282
83- def define_args (participant ):
83+ def define_args (participant , idx ):
8484 """Use dictionary to get command args for each participant."""
8585 args = []
8686 args .extend ([f"kind={ participant .get ('kind' )} " ])
8787 args .extend ([f"samples={ participant .get ('samples' , 10 )} " ])
8888 args .extend ([f"timeout={ participant .get ('timeout' , 10 )} " ])
8989 args .extend ([f"expected_matches={ participant .get ('expected_matches' , 1 )} " ])
90- args .extend ([f"seed={ str (os .getpid ())} " ])
90+ args .extend ([f"seed={ str (os .getpid () + idx )} " ])
9191 args .extend ([f"builtin_flow_controller_bytes={ participant .get ('builtin_flow_controller_bytes' , 0 )} " ])
9292
9393 # Check if 'known_types' key exists and is a list
@@ -99,12 +99,12 @@ def define_args(participant):
9999 return args
100100
101101
102- def define_commands (executable_path , test_cases ):
102+ def define_commands (executable_path , test_cases , idx ):
103103 """Create commands for each participant adding executable to args."""
104104 all_commands = []
105105 for test_case in test_cases :
106106 # For each test case, create commands for all participants
107- commands = [[executable_path ] + define_args (participant ) for participant in test_case ['participants' ]]
107+ commands = [[executable_path ] + define_args (participant , idx ) for participant in test_case ['participants' ]]
108108 all_commands .extend (commands )
109109
110110 return all_commands
@@ -172,6 +172,14 @@ async def run_command(test_case, process_args, timeout):
172172
173173
174174async def execute_commands (test_case , commands ):
175+ """
176+ Execute a list of commands asynchronously.
177+
178+ :param[in] test_case Name of the test case.
179+ :param[in] commands List of commands to be executed.
180+
181+ :return Sum of the return codes of each command.
182+ """
175183 tasks = []
176184 async with asyncio .TaskGroup () as tg :
177185 for command in commands :
@@ -180,6 +188,75 @@ async def execute_commands(test_case, commands):
180188
181189 return sum ([proc .result () for proc in tasks ])
182190
191+ async def execute_commands_with_sem (test_case , commands , sem ):
192+ """
193+ Execute commands using a semaphore to limit parallel executions.
194+
195+ :param[in] test_case Name of the test case.
196+ :param[in] commands List of commands to be executed.
197+ :param[in] sem Semaphore to limit parallel executions.
198+ """
199+ async with sem :
200+ return await execute_commands (test_case , commands )
201+
202+ async def execute_test_cases (test_cases ):
203+ """
204+ Execute all test cases, retrying failed ones up to 3 times.
205+
206+ :param[in] test_cases List of test cases to be executed.
207+
208+ :return Tuple (total test value, list of successful cases, list of failling cases)
209+ """
210+ total_test_value = 0
211+ successful_cases = []
212+ failling_cases = []
213+
214+ # Limit parallel executions to half of CPU cores
215+ sem = asyncio .Semaphore (os .cpu_count () // 2 or 6 )
216+
217+ pending_cases = test_cases .copy ()
218+ max_test_runs = 3
219+ while pending_cases and max_test_runs > 0 :
220+ tasks = []
221+ idx = 0
222+ async with asyncio .TaskGroup () as tg :
223+ for test_case in pending_cases :
224+ # Define commands for each test case
225+ commands = define_commands (args .app , [test_case ], idx )
226+ # Execute the commands in parallel
227+ tasks .append (tg .create_task (execute_commands_with_sem (test_case ['TestCase' ], commands , sem )))
228+ idx += 1
229+
230+ next_pending_cases = []
231+ max_test_runs -= 1
232+
233+ # Collect results
234+ for test_case , task in zip (pending_cases , tasks ):
235+ test_value = task .result ()
236+ total_test_value += test_value
237+ if test_value == 0 :
238+ successful_cases .append (f"Test { test_case .get ('TestCase' )} " )
239+ else :
240+ next_pending_cases .append (test_case )
241+
242+ # If there are pending cases, they will be retried
243+ pending_cases = next_pending_cases
244+ if pending_cases .__len__ () > 0 and max_test_runs > 0 :
245+ # Inform about cases to be retried
246+ logger .info ("----------- RETRYING CASES -----------" )
247+ for failed_test in pending_cases :
248+ logger .info (f"Test { failed_test .get ('TestCase' )} " )
249+
250+ # Wait a bit before retrying
251+ await asyncio .sleep (2 )
252+ total_test_value = 0
253+
254+ # If there are still pending cases, they are considered failling
255+ for test_case in pending_cases :
256+ failling_cases .append (f"Test { test_case .get ('TestCase' )} " )
257+
258+ return total_test_value , successful_cases , failling_cases
259+
183260
184261if __name__ == '__main__' :
185262
@@ -205,20 +282,7 @@ async def execute_commands(test_case, commands):
205282 # Read test cases from the provided JSON file
206283 test_cases = read_json (args .file [0 ])['test_cases' ]
207284
208- total_test_value = 0
209- successful_cases = []
210- failling_cases = []
211-
212- for test_case in test_cases :
213- # Define commands for each test case
214- commands = define_commands (args .app , [test_case ])
215- # Execute the commands and get the return value
216- test_value = asyncio .run (execute_commands (test_case ['TestCase' ], commands ))
217- total_test_value += test_value
218- if test_value == 0 :
219- successful_cases .append (f"Test { test_case .get ('TestCase' )} " )
220- else :
221- failling_cases .append (f"Test { test_case .get ('TestCase' )} " )
285+ total_test_value , successful_cases , failling_cases = asyncio .run (execute_test_cases (test_cases ))
222286
223287 # Print the results
224288 if successful_cases .__len__ () > 0 :
0 commit comments