66from collections .abc import Iterable
77from typing import Any
88
9+ import psutil
910from pathos .multiprocessing import ProcessingPool as Pool
1011from rich import box
1112from rich .console import Console
@@ -121,7 +122,7 @@ def run_generator(input_test_cases: Iterable[TestCase], args=None):
121122 def debug_print (msg ):
122123 """Only print if verbose is enabled."""
123124 if args .verbose :
124- print (msg )
125+ print (msg , flush = True )
125126
126127 console = Console ()
127128 dumper = Dumper ()
@@ -135,16 +136,12 @@ def debug_print(msg):
135136 total_found += 1
136137 # Check if the test case should be filtered out
137138 if len (args .runners ) != 0 and test_case .runner_name not in args .runners :
138- debug_print (f"Filtered: { test_case .get_identifier ()} " )
139139 continue
140140 if len (args .presets ) != 0 and test_case .preset_name not in args .presets :
141- debug_print (f"Filtered: { test_case .get_identifier ()} " )
142141 continue
143142 if len (args .forks ) != 0 and test_case .fork_name not in args .forks :
144- debug_print (f"Filtered: { test_case .get_identifier ()} " )
145143 continue
146144 if len (args .cases ) != 0 and not any (s in test_case .case_name for s in args .cases ):
147- debug_print (f"Filtered: { test_case .get_identifier ()} " )
148145 continue
149146
150147 # Set the output dir and add this to out list
@@ -166,17 +163,60 @@ def worker_function(data):
166163 """Execute a test case and update active tests."""
167164 test_case , active_tests = data
168165 key = (uuid .uuid4 (), test_case .get_identifier ())
169- active_tests [key ] = time .time ()
166+ test_start = time .time ()
167+ active_tests [key ] = test_start
168+
169+ debug_print (f"Starting: { test_case .get_identifier ()} " )
170+
170171 try :
171172 execute_test (test_case , dumper )
172- debug_print (f"Generated: { test_case .get_identifier ()} " )
173+ elapsed = time .time () - test_start
174+ debug_print (f"Generated: { test_case .get_identifier ()} (took { elapsed :.2f} s)" )
173175 return "generated"
174176 except SkippedTest :
175- debug_print (f"Skipped: { test_case .get_identifier ()} " )
177+ elapsed = time .time () - test_start
178+ debug_print (f"Skipped: { test_case .get_identifier ()} (took { elapsed :.2f} s)" )
176179 return "skipped"
177180 finally :
178181 del active_tests [key ]
179182
183+ def periodic_status_print (active_tests , total_tasks , completed , skipped , interval = 300 ):
184+ """Print status updates periodically in verbose mode."""
185+ process = psutil .Process ()
186+ while completed .value < total_tasks :
187+ time .sleep (interval )
188+ remaining = total_tasks - completed .value
189+ if remaining > 0 :
190+ active_count = len (active_tests )
191+ # Get system-wide and process memory stats
192+ vm = psutil .virtual_memory ()
193+ total_memory_mb = vm .total / 1024 / 1024
194+ system_used_mb = vm .used / 1024 / 1024
195+ # Include main process + all child processes (worker pool)
196+ process_rss_mb = process .memory_info ().rss / 1024 / 1024
197+ for child in process .children (recursive = True ):
198+ try :
199+ process_rss_mb += child .memory_info ().rss / 1024 / 1024
200+ except (psutil .NoSuchProcess , psutil .AccessDenied ):
201+ pass
202+
203+ debug_print (
204+ f"Progress: { completed .value } /{ total_tasks } completed, "
205+ f"{ skipped .value } skipped, { active_count } active, "
206+ f"{ remaining } remaining, elapsed { time_since (start_time )} "
207+ )
208+ debug_print (
209+ f"Memory: "
210+ f"this process { process_rss_mb :.0f} MB, "
211+ f"all processes { system_used_mb :.0f} MB, "
212+ f"total available { total_memory_mb :.0f} MB"
213+ )
214+ if active_tests :
215+ for key , start_time_test in list (active_tests .items ()):
216+ debug_print (
217+ f" - Active: { key [1 ]} (running for { time_since (start_time_test )} )"
218+ )
219+
180220 def display_active_tests (active_tests , total_tasks , completed , skipped , width ):
181221 """Display a table of active tests."""
182222 with Live (console = console ) as live :
@@ -211,47 +251,69 @@ def display_active_tests(active_tests, total_tasks, completed, skipped, width):
211251 time .sleep (0.25 )
212252
213253 # Generate all of the test cases
214- with multiprocessing .Manager () as manager :
215- active_tests = manager .dict ()
216- completed = manager .Value ("i" , 0 )
217- skipped = manager .Value ("i" , 0 )
218- width = max ([len (t .get_identifier ()) for t in selected_test_cases ])
219-
220- if not args .verbose :
221- display_thread = threading .Thread (
222- target = display_active_tests ,
223- args = (active_tests , len (selected_test_cases ), completed , skipped , width ),
224- daemon = True ,
225- )
226- display_thread .start ()
227-
228- # Map each test case to a thread worker
229- inputs = [(t , active_tests ) for t in selected_test_cases ]
230-
231- if args .threads == 1 :
232- for input in inputs :
233- result = worker_function (input )
234- if result == "skipped" :
235- skipped .value += 1
236- completed .value += 1
237- else :
238- for result in Pool (processes = args .threads ).uimap (worker_function , inputs ):
239- if result == "skipped" :
240- skipped .value += 1
241- completed .value += 1
242-
243- if not args .verbose :
244- display_thread .join ()
245-
246- elapsed = round (time .time () - start_time , 2 )
247-
248- # Display final summary using rich
249- total_selected = len (selected_test_cases )
250- total_completed = completed .value - skipped .value
251- total_skipped = skipped .value
254+ try :
255+ with multiprocessing .Manager () as manager :
256+ active_tests = manager .dict ()
257+ completed = manager .Value ("i" , 0 )
258+ skipped = manager .Value ("i" , 0 )
259+ width = max ([len (t .get_identifier ()) for t in selected_test_cases ])
260+
261+ if not args .verbose :
262+ display_thread = threading .Thread (
263+ target = display_active_tests ,
264+ args = (active_tests , len (selected_test_cases ), completed , skipped , width ),
265+ daemon = True ,
266+ )
267+ display_thread .start ()
268+ else :
269+ # Start periodic status printing in verbose mode
270+ status_thread = threading .Thread (
271+ target = periodic_status_print ,
272+ args = (active_tests , len (selected_test_cases ), completed , skipped ),
273+ daemon = True ,
274+ )
275+ status_thread .start ()
252276
253- display_test_summary (
254- console , total_found , total_selected , total_completed , total_skipped , elapsed
255- )
277+ # Map each test case to a thread worker
278+ inputs = [(t , active_tests ) for t in selected_test_cases ]
256279
257- debug_print (f"Completed generation of { tests_prefix } in { elapsed } seconds" )
280+ if args .threads == 1 :
281+ for input in inputs :
282+ result = worker_function (input )
283+ if result == "skipped" :
284+ skipped .value += 1
285+ completed .value += 1
286+ else :
287+ pool = Pool (processes = args .threads )
288+ try :
289+ for result in pool .uimap (worker_function , inputs ):
290+ if result == "skipped" :
291+ skipped .value += 1
292+ completed .value += 1
293+ except KeyboardInterrupt :
294+ # Terminate pool immediately on interrupt
295+ pool .terminate ()
296+ pool .join ()
297+ raise
298+ else :
299+ # Normal cleanup when completed
300+ pool .close ()
301+ pool .join ()
302+
303+ if not args .verbose :
304+ display_thread .join ()
305+
306+ elapsed = round (time .time () - start_time , 2 )
307+
308+ # Display final summary using rich
309+ total_selected = len (selected_test_cases )
310+ total_completed = completed .value - skipped .value
311+ total_skipped = skipped .value
312+
313+ display_test_summary (
314+ console , total_found , total_selected , total_completed , total_skipped , elapsed
315+ )
316+
317+ debug_print (f"Completed generation of { tests_prefix } in { elapsed } seconds" )
318+ except KeyboardInterrupt :
319+ return
0 commit comments