Skip to content
Open
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

METRICS_FREQUENCY_SEC = 1.0 # In seconds
BACKPRESSURE_THRESHOLD = 5.0 # In seconds
LOGGING_FREQUENCY_SEC = 180.0 # In seconds

F = TypeVar("F", bound=Callable[[Any], Any])

Expand Down Expand Up @@ -169,6 +170,10 @@ def __init__(
DlqPolicyWrapper(dlq_policy) if dlq_policy is not None else None
)

self.__last_run_log_ts = time.time() # This is for throttling the logging of each run loop per-consumer
self.__last_pause_ts = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can simplify your logic by setting all of these to 0, then you don't have to check for None

self.__last_empty_msg_ts = None

def _close_strategy() -> None:
start_close = time.time()

Expand Down Expand Up @@ -395,6 +400,10 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None:
def _run_once(self) -> None:
self.__metrics_buffer.incr_counter("arroyo.consumer.run.count", 1)

if time.time() - self.__last_run_log_ts >= LOGGING_FREQUENCY_SEC:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please only calculate time.time() once, it's already very expensive to run it per _run_once

we also already have a counter metric for this, what is the purpose of the log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per-consumer breakdown

logger.info("Arroyo consumer _run_once loop started")
self.__last_run_log_ts = time.time()

message_carried_over = self.__message is not None

if not message_carried_over:
Expand All @@ -408,6 +417,26 @@ def _run_once(self) -> None:
self.__metrics_buffer.incr_timing(
"arroyo.consumer.poll.time", time.time() - start_poll
)

if self.__message is None:
if not self.__is_paused:
if self.__last_empty_msg_ts is None:
self.__last_empty_msg_ts = time.time()

# Records a log if the consumer has been active but receiving no message from poll() for longer than a threshold duration
elif time.time() - self.__last_empty_msg_ts >= LOGGING_FREQUENCY_SEC:
logger.info(f"Consumer is not paused but did not receive a message from underlying consumer for {LOGGING_FREQUENCY_SEC} seconds")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems severe enough to warn

self.__last_empty_msg_ts = time.time()

else:
if self.__last_pause_ts is None:
self.__last_pause_ts = time.time()

# Records a log if the consumer has been paused for longer than a threshold duration
elif time.time() - self.__last_pause_ts >= LOGGING_FREQUENCY_SEC:
logger.info(f"Consumer has been paused for {LOGGING_FREQUENCY_SEC} seconds")
self.__last_pause_ts = time.time()

except RecoverableError:
return

Expand All @@ -423,6 +452,9 @@ def _run_once(self) -> None:
"arroyo.consumer.processing.time", time.time() - start_poll
)
if self.__message is not None:

# Reset the timer
self.__last_empty_msg_ts = None
try:
start_submit = time.time()
message = (
Expand Down Expand Up @@ -467,6 +499,9 @@ def _run_once(self) -> None:
paused_partitions,
)
self.__is_paused = False

# Reset if we unpause
self.__last_pause_ts = None
# unpause paused partitions... just in case a subset is paused
self.__metrics_buffer.incr_counter(
"arroyo.consumer.resume", 1
Expand All @@ -489,6 +524,8 @@ def _run_once(self) -> None:
self.__metrics_buffer.incr_counter("arroyo.consumer.resume", 1)
self.__consumer.resume([*self.__consumer.tell().keys()])
self.__is_paused = False
# Reset the timer since we unpaused
self.__last_pause_ts = None

# Clear backpressure timestamp if it is set
self._clear_backpressure()
Expand Down
Loading