@@ -209,12 +209,10 @@ def _exec_in_kernel(self, fn):
209209 Note: The caller (begin_next while loop) will continue processing
210210 after this returns. We don't need to call begin_next() ourselves.
211211 """
212- import traceback
213-
214212 try :
215213 fn ()
216214 except Exception as e :
217- traceback . print_exc ( )
215+ self . log . error ( "Callable raised: %s" , e , exc_info = True )
218216 # Set error state so queue stops
219217 self ._last_error = QueueError (
220218 message = f"Callable raised: { e } " ,
@@ -224,6 +222,15 @@ def _exec_in_kernel(self, fn):
224222 # Clear current_action so status() reports correctly
225223 self .current_action = None
226224
225+ def _record_action_error (self , action_type : str , exc : Exception ) -> None :
226+ """Record a queue-level error and log it."""
227+ self .log .error ("%s error: %s" , action_type , exc , exc_info = True )
228+ self ._last_error = QueueError (
229+ message = f"{ action_type } error: { exc } " ,
230+ sweep_type = action_type ,
231+ exception_type = type (exc ).__name__ ,
232+ )
233+
227234 def _attach_queue_metadata_provider (self , sweep : BaseSweep ):
228235 """Attach a metadata provider wrapper so datasets record they were launched by SweepQueue.
229236
@@ -540,7 +547,12 @@ def start(self, rts=True):
540547 self .current_action = None
541548 elif isinstance (self .current_action , DatabaseEntry ):
542549 # DatabaseEntry changes the database and continues to next item
543- self .current_action .start ()
550+ try :
551+ self .current_action .start ()
552+ except Exception as e :
553+ self ._record_action_error ("DatabaseEntry" , e )
554+ self .current_action = None
555+ return
544556 # Continue with the next item in the queue
545557 self .begin_next ()
546558 elif callable (self .current_action ):
@@ -718,6 +730,19 @@ def status(self):
718730 else :
719731 effective_state = "idle"
720732
733+ # Provide error details even while a sweep is actively in ERROR
734+ if self ._last_error is not None :
735+ last_error_payload = self ._last_error .to_dict ()
736+ elif current_sweep_state == SweepState .ERROR and self .current_sweep is not None :
737+ error_msg = getattr (self .current_sweep .progressState , "error_message" , None )
738+ last_error_payload = QueueError (
739+ message = error_msg or "Unknown error" ,
740+ sweep_type = self .current_sweep .__class__ .__name__ ,
741+ exception_type = None ,
742+ ).to_dict ()
743+ else :
744+ last_error_payload = None
745+
721746 return {
722747 "effective_state" : effective_state ,
723748 # Return state name as string for JSON serialization
@@ -726,7 +751,7 @@ def status(self):
726751 "current_sweep_type" : current_sweep_type ,
727752 "current_action_type" : current_action_type ,
728753 # Convert QueueError to dict for JSON serialization
729- "last_error" : self . _last_error . to_dict () if self . _last_error else None ,
754+ "last_error" : last_error_payload ,
730755 }
731756
732757 def clear_error (self ):
@@ -944,7 +969,12 @@ def begin_next(self):
944969 # Process DatabaseEntry synchronously
945970 self .log .info (str (self .current_action ))
946971 time .sleep (0.5 ) # Small delay before database operation
947- self .current_action .start ()
972+ try :
973+ self .current_action .start ()
974+ except Exception as e :
975+ self ._record_action_error ("DatabaseEntry" , e )
976+ self .current_action = None
977+ break # Exit loop - queue stopped due to error
948978 # Continue loop to process next item immediately
949979
950980 elif callable (self .current_action ):
0 commit comments