diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index cfa0082a..5a7fbbef 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -35,6 +35,7 @@ METRICS_FREQUENCY_SEC = 1.0 # In seconds BACKPRESSURE_THRESHOLD = 5.0 # In seconds +LOGGING_FREQUENCY_SEC = 180.0 # In seconds F = TypeVar("F", bound=Callable[[Any], Any]) @@ -169,6 +170,10 @@ def __init__( DlqPolicyWrapper(dlq_policy) if dlq_policy is not None else None ) + self.__last_run_log_ts = time.time() # This is for throttling the logging of each run loop per-consumer + self.__last_pause_ts: Optional[float] = None + self.__last_empty_msg_ts: Optional[float] = None + def _close_strategy() -> None: start_close = time.time() @@ -395,6 +400,10 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None: def _run_once(self) -> None: self.__metrics_buffer.incr_counter("arroyo.consumer.run.count", 1) + if time.time() - self.__last_run_log_ts >= LOGGING_FREQUENCY_SEC: + logger.info("Arroyo consumer _run_once loop started") + self.__last_run_log_ts = time.time() + message_carried_over = self.__message is not None if not message_carried_over: @@ -408,6 +417,26 @@ def _run_once(self) -> None: self.__metrics_buffer.incr_timing( "arroyo.consumer.poll.time", time.time() - start_poll ) + + if self.__message is None: + if not self.__is_paused: + if self.__last_empty_msg_ts is None: + self.__last_empty_msg_ts = time.time() + + # Records a log if the consumer has been active but receiving no message from poll() for longer than a threshold duration + elif time.time() - self.__last_empty_msg_ts >= LOGGING_FREQUENCY_SEC: + logger.info(f"Consumer is not paused but did not receive a message from underlying consumer for {LOGGING_FREQUENCY_SEC} seconds") + self.__last_empty_msg_ts = time.time() + + else: + if self.__last_pause_ts is None: + self.__last_pause_ts = time.time() + + # Records a log if the consumer has been paused for longer than a threshold duration + elif time.time() - self.__last_pause_ts >= LOGGING_FREQUENCY_SEC: + logger.info(f"Consumer has been paused for {LOGGING_FREQUENCY_SEC} seconds") + self.__last_pause_ts = time.time() + except RecoverableError: return @@ -423,6 +452,9 @@ def _run_once(self) -> None: "arroyo.consumer.processing.time", time.time() - start_poll ) if self.__message is not None: + + # Reset the timer + self.__last_empty_msg_ts = None try: start_submit = time.time() message = ( @@ -467,6 +499,9 @@ def _run_once(self) -> None: paused_partitions, ) self.__is_paused = False + + # Reset if we unpause + self.__last_pause_ts = None # unpause paused partitions... just in case a subset is paused self.__metrics_buffer.incr_counter( "arroyo.consumer.resume", 1 @@ -489,6 +524,8 @@ def _run_once(self) -> None: self.__metrics_buffer.incr_counter("arroyo.consumer.resume", 1) self.__consumer.resume([*self.__consumer.tell().keys()]) self.__is_paused = False + # Reset the timer since we unpaused + self.__last_pause_ts = None # Clear backpressure timestamp if it is set self._clear_backpressure()