@@ -777,7 +777,9 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
777777 - Invoking user callback with batch
778778 - Updating connector state with last processed message ID
779779 - Handling state reset scenarios
780- - ALWAYS updating state, even if callback fails
780+
781+ Note: If callback fails, state will NOT be updated and exception will propagate.
782+ This matches ListenStream behavior where failed messages are retried on restart.
781783
782784 Args:
783785 batch: List of SSE message objects to process
@@ -821,28 +823,7 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
821823 }
822824 }
823825
824- # Invoke user's callback function with exception handling
825- callback_failed = False
826- callback_error = None
827- try :
828- self .callback (batch_data )
829- except Exception as ex :
830- callback_failed = True
831- callback_error = ex
832- self .helper .connector_logger .error (
833- "Batch callback failed - continuing with next batch" ,
834- {
835- "error" : str (ex ),
836- "error_type" : type (ex ).__name__ ,
837- "batch_size" : len (batch ),
838- "trigger" : trigger_reason ,
839- "last_msg_id" : last_msg_id ,
840- },
841- )
842- self .helper .metric .inc ("error_count" )
843-
844- # Update state with last processed message EVEN IF CALLBACK FAILED
845- # This ensures the connector progresses through the stream
826+ self .callback (batch_data )
846827 state = self .helper .get_state ()
847828 if state is None :
848829 # State was reset from UI during processing
@@ -851,7 +832,6 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
851832 {
852833 "batch_size" : len (batch ),
853834 "trigger" : trigger_reason ,
854- "callback_failed" : callback_failed ,
855835 }
856836 )
857837 self .exit_event .set ()
@@ -861,16 +841,6 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
861841 state ["start_from" ] = str (last_msg_id )
862842 self .helper .set_state (state )
863843
864- if callback_failed :
865- self .helper .connector_logger .warning (
866- "State updated despite callback failure - batch will NOT be retried" ,
867- {
868- "batch_size" : len (batch ),
869- "last_msg_id" : last_msg_id ,
870- "error" : str (callback_error ),
871- }
872- )
873-
874844 return True
875845
876846 def _wait_for_rate_limit (self , q ):
@@ -1038,6 +1008,12 @@ def run(self) -> None:
10381008 state = self .helper .get_state ()
10391009 if state is None :
10401010 self .exit_event .set ()
1011+ else :
1012+ # Only update state if batch is empty to prevent message loss
1013+ # If batch has unprocessed messages, state will be updated after batch processing
1014+ if len (batch ) == 0 :
1015+ state ["start_from" ] = str (msg .id )
1016+ self .helper .set_state (state )
10411017 last_msg_id = msg .id
10421018 else :
10431019 batch .append (msg )
0 commit comments