@@ -765,11 +765,14 @@ def __init__(
765765 self .max_batches_per_minute = max_batches_per_minute
766766 if max_batches_per_minute is not None :
767767 from collections import deque
768+
768769 self .batch_timestamps = deque ()
769770 else :
770771 self .batch_timestamps = None
771772
772- def _process_batch_and_update_state (self , batch , last_msg_id , trigger_reason = "unknown" ):
773+ def _process_batch_and_update_state (
774+ self , batch , last_msg_id , trigger_reason = "unknown"
775+ ):
773776 """Process a batch of messages and update connector state.
774777
775778 This method handles:
@@ -820,7 +823,7 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
820823 "trigger_reason" : trigger_reason ,
821824 "elapsed_time" : elapsed ,
822825 "timestamp" : time .time (),
823- }
826+ },
824827 }
825828
826829 self .callback (batch_data )
@@ -832,7 +835,7 @@ def _process_batch_and_update_state(self, batch, last_msg_id, trigger_reason="un
832835 {
833836 "batch_size" : len (batch ),
834837 "trigger" : trigger_reason ,
835- }
838+ },
836839 )
837840 self .exit_event .set ()
838841 return False
@@ -884,10 +887,9 @@ def _wait_for_rate_limit(self, q):
884887 "max_batches_per_minute" : self .max_batches_per_minute ,
885888 "current_batch_count" : len (self .batch_timestamps ),
886889 "wait_seconds" : round (wait_time , 2 ),
887- }
890+ },
888891 )
889892
890-
891893 chunk_size = 30.0
892894 total_slept = 0.0
893895
@@ -2316,7 +2318,7 @@ def listen_stream_batch(
23162318 if max_batches_per_minute > 10000 :
23172319 self .connector_logger .warning (
23182320 "Very high max_batches_per_minute configured" ,
2319- {"max_batches_per_minute" : max_batches_per_minute }
2321+ {"max_batches_per_minute" : max_batches_per_minute },
23202322 )
23212323
23222324 params = self ._resolve_stream_parameters (
@@ -2334,9 +2336,7 @@ def listen_stream_batch(
23342336 if max_batches_per_minute is not None :
23352337 self .connector_logger .info (
23362338 "Batch rate limiting enabled" ,
2337- {
2338- "max_batches_per_minute" : max_batches_per_minute
2339- }
2339+ {"max_batches_per_minute" : max_batches_per_minute },
23402340 )
23412341
23422342 self ._listen_stream_batch_thread = ListenStreamBatch (
0 commit comments