33import traceback
44
55from .printer import cons
6+ from .state import ARG
67
78class WorkerThread (threading .Thread ):
89 def __init__ (self , * args , ** kwargs ):
@@ -44,65 +45,120 @@ def sched(tasks: typing.List[Task], nThreads: int, devices: typing.Set[int] = No
4445
4546 sched .LOAD = { id : 0.0 for id in devices or [] }
4647
48+ # Debug logging setup
49+ gpu_mode = devices is not None and len (devices ) > 0
50+ debug_enabled = ARG ("sched_debug" , False ) # Check for --sched-debug flag
51+
52+ def debug_log (msg ):
53+ if debug_enabled :
54+ cons .print (msg )
55+
56+ if debug_enabled :
57+ debug_log (f"[SCHED DEBUG] Starting scheduler: { len (tasks )} tasks, { nThreads } threads, GPU mode: { gpu_mode } " )
58+ if gpu_mode :
59+ debug_log (f"[SCHED DEBUG] GPU devices: { devices } " )
60+
4761 def join_first_dead_thread (progress , complete_tracker ) -> None :
4862 nonlocal threads , nAvailable
4963
64+ debug_log (f"[SCHED DEBUG] Checking { len (threads )} active threads for completion" )
65+
5066 for threadID , threadHolder in enumerate (threads ):
5167 # Check if thread is not alive OR if it's been running for too long
5268 thread_not_alive = not threadHolder .thread .is_alive ()
69+
70+ debug_log (f"[SCHED DEBUG] Thread { threadID } : alive={ threadHolder .thread .is_alive ()} , devices={ threadHolder .devices } " )
5371
5472 if thread_not_alive :
73+ debug_log (f"[SCHED DEBUG] Thread { threadID } detected as dead, attempting to join..." )
74+
5575 # Properly join the thread with timeout to prevent infinite hangs
76+ join_start_time = time .time ()
77+ timeout_duration = 120.0 if gpu_mode else 30.0 # Longer timeout for GPU
78+
79+ debug_log (f"[SCHED DEBUG] Joining thread { threadID } with { timeout_duration } s timeout..." )
80+
5681 try :
57- threadHolder .thread .join (timeout = 30.0 ) # 30 second timeout
82+ threadHolder .thread .join (timeout = timeout_duration )
83+ join_end_time = time .time ()
84+ join_duration = join_end_time - join_start_time
85+
86+ debug_log (f"[SCHED DEBUG] Thread { threadID } join completed in { join_duration :.2f} s" )
5887
5988 # Double-check that thread actually finished joining
6089 if threadHolder .thread .is_alive ():
6190 # Thread didn't finish within timeout - this is a serious issue
62- raise RuntimeError (f"Thread { threadID } failed to join within 30 seconds timeout. "
63- f"Thread may be hung or in an inconsistent state." )
91+ debug_log (f"[SCHED DEBUG] ERROR: Thread { threadID } still alive after { timeout_duration } s timeout!" )
92+ debug_log (f"[SCHED DEBUG] Thread { threadID } devices: { threadHolder .devices } " )
93+ debug_log (f"[SCHED DEBUG] Thread { threadID } exception: { threadHolder .thread .exc } " )
94+ raise RuntimeError (f"Thread { threadID } failed to join within { timeout_duration } seconds timeout. "
95+ f"Thread may be hung or in an inconsistent state. "
96+ f"GPU devices: { threadHolder .devices } " )
6497
6598 except Exception as join_exc :
6699 # Handle join-specific exceptions with more context
100+ debug_log (f"[SCHED DEBUG] Exception during thread { threadID } join: { join_exc } " )
67101 raise RuntimeError (f"Failed to join thread { threadID } : { join_exc } . "
68- f"This may indicate a system threading issue or hung test case." ) from join_exc
102+ f"This may indicate a system threading issue or hung test case. "
103+ f"GPU devices: { threadHolder .devices } " ) from join_exc
104+
105+ debug_log (f"[SCHED DEBUG] Thread { threadID } successfully joined" )
69106
70107 # Check for and propagate any exceptions that occurred in the worker thread
71108 # But only if the worker function didn't complete successfully
72109 # (This allows test failures to be handled gracefully by handle_case)
73110 if threadHolder .thread .exc is not None :
111+ debug_log (f"[SCHED DEBUG] Thread { threadID } had exception: { threadHolder .thread .exc } " )
112+ debug_log (f"[SCHED DEBUG] Thread { threadID } completed successfully: { threadHolder .thread .completed_successfully } " )
113+
74114 if threadHolder .thread .completed_successfully :
75115 # Test framework handled the exception gracefully (e.g., test failure)
76116 # Don't re-raise - this is expected behavior
117+ debug_log (f"[SCHED DEBUG] Thread { threadID } exception was handled gracefully by test framework" )
77118 pass
78119 # Unhandled exception - this indicates a real problem
79120 elif hasattr (threadHolder .thread , 'exc_info' ) and threadHolder .thread .exc_info :
80121 error_msg = f"Worker thread { threadID } failed with unhandled exception:\n { threadHolder .thread .exc_info } "
122+ debug_log (f"[SCHED DEBUG] Thread { threadID } had unhandled exception!" )
81123 raise RuntimeError (error_msg ) from threadHolder .thread .exc
82124 else :
125+ debug_log (f"[SCHED DEBUG] Thread { threadID } had unhandled exception without details" )
83126 raise threadHolder .thread .exc
84127
128+ # Update scheduler state
85129 nAvailable += threadHolder .ppn
86130 for device in threadHolder .devices or set ():
131+ old_load = sched .LOAD [device ]
87132 sched .LOAD [device ] -= threadHolder .load / threadHolder .ppn
133+ debug_log (f"[SCHED DEBUG] Device { device } load: { old_load :.3f} -> { sched .LOAD [device ]:.3f} " )
88134
89135 progress .advance (complete_tracker )
90136
137+ debug_log (f"[SCHED DEBUG] Thread { threadID } cleanup complete, removing from active threads" )
91138 del threads [threadID ]
92139
93140 break
141+
142+ debug_log (f"[SCHED DEBUG] join_first_dead_thread completed, { len (threads )} threads remaining" )
94143
95144 with rich .progress .Progress (console = cons .raw , transient = True ) as progress :
96145 queue_tracker = progress .add_task ("Queued " , total = len (tasks ))
97146 complete_tracker = progress .add_task ("Completed" , total = len (tasks ))
98147
148+ debug_log (f"[SCHED DEBUG] Starting task queue processing..." )
149+
99150 # Queue Tests
100- for task in tasks :
151+ for task_idx , task in enumerate (tasks ):
152+ debug_log (f"[SCHED DEBUG] Processing task { task_idx + 1 } /{ len (tasks )} : ppn={ task .ppn } , load={ task .load } " )
153+
101154 # Wait until there are threads available
102155 while nAvailable < task .ppn :
156+ debug_log (f"[SCHED DEBUG] Waiting for resources: need { task .ppn } , have { nAvailable } " )
157+
103158 # This is important if "-j 1" is used (the default) since there
104159 # are test cases that require test.ppn=2
105160 if task .ppn > nThreads and nAvailable > 0 :
161+ debug_log (f"[SCHED DEBUG] Task requires more threads ({ task .ppn } ) than available ({ nThreads } ), but some are free ({ nAvailable } )" )
106162 break
107163
108164 # Keep track of threads that are done
@@ -118,24 +174,34 @@ def join_first_dead_thread(progress, complete_tracker) -> None:
118174 # Use the least loaded devices
119175 if devices is not None :
120176 use_devices = set ()
121- for _ in range (task .ppn ):
177+ debug_log (f"[SCHED DEBUG] Assigning GPU devices for task { task_idx + 1 } " )
178+ for device_idx in range (task .ppn ):
122179 device = min (sched .LOAD .items (), key = lambda x : x [1 ])[0 ]
123180 sched .LOAD [device ] += task .load / task .ppn
124181 use_devices .add (device )
182+ debug_log (f"[SCHED DEBUG] Assigned device { device } (load now: { sched .LOAD [device ]:.3f} )" )
125183
126184 nAvailable -= task .ppn
127185
186+ debug_log (f"[SCHED DEBUG] Starting thread for task { task_idx + 1 } , devices: { use_devices } " )
128187 thread = WorkerThread (target = task .func , args = tuple (task .args ) + (use_devices ,))
129188 thread .start ()
130189
131190 threads .append (WorkerThreadHolder (thread , task .ppn , task .load , use_devices ))
191+ debug_log (f"[SCHED DEBUG] Thread started for task { task_idx + 1 } , { len (threads )} active threads" )
192+
193+ debug_log (f"[SCHED DEBUG] All tasks queued, waiting for completion..." )
132194
133195 # Wait for the last tests to complete (MOVED INSIDE CONTEXT)
134196 while len (threads ) != 0 :
197+ debug_log (f"[SCHED DEBUG] Waiting for { len (threads )} threads to complete..." )
198+
135199 # Keep track of threads that are done
136200 join_first_dead_thread (progress , complete_tracker )
137201
138202 # Do not overwhelm this core with this loop
139203 time .sleep (0.05 )
140204
205+ debug_log (f"[SCHED DEBUG] Scheduler completed successfully!" )
206+
141207sched .LOAD = {}
0 commit comments