@@ -77,14 +77,13 @@ def _update_cursor(self, conn, event_id: int) -> None:
7777 )
7878 conn .commit ()
7979
80- def _run_once (self , on_event : Callable , batch_size : int = 100 ) -> int :
80+ def _run_once (self , on_event : Callable , batch_size : int = 100 ) -> bool :
81+ """Returns True on success, False if a processing error occurred."""
8182 with self ._engine .connect () as conn :
8283 self ._ensure_reader_state (conn )
8384 last_id = self ._get_last_event_id (conn )
8485
8586 rows = self ._event_log .fetch_batch (last_id , batch_size )
86- if not rows :
87- return 0
8887
8988 with self ._engine .connect () as conn :
9089 for event_id , event in rows :
@@ -96,10 +95,10 @@ def _run_once(self, on_event: Callable, batch_size: int = 100) -> int:
9695 event_id ,
9796 type (event ).event_type ,
9897 )
99- return 1
98+ return False
10099 self ._update_cursor (conn , event_id )
101100
102- return 0
101+ return True
103102
104103 def run (
105104 self ,
@@ -108,7 +107,7 @@ def run(
108107 batch_size : int = 100 ,
109108 ) -> int :
110109 while True :
111- exit_code = self ._run_once (on_event , batch_size )
112- if exit_code != 0 or poll_interval is None :
113- return exit_code
110+ success = self ._run_once (on_event , batch_size )
111+ if not success or poll_interval is None :
112+ return 0 if success else 1
114113 time .sleep (poll_interval )
0 commit comments