-
-
Notifications
You must be signed in to change notification settings - Fork 5
inc: Add logs to Arroyo to track behavior during paused and empty message states #439
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
arroyo/processing/processor.py
Outdated
|
|
||
| METRICS_FREQUENCY_SEC = 1.0 # In seconds | ||
| BACKPRESSURE_THRESHOLD = 5.0 # In seconds | ||
| LOGGING_FREQUENCY_SEC = 60.0 # In seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make it longer now that we know what the likely issue is.
64 consumers would log this every second.
What about once every 2-3 minutes ?
| def _run_once(self) -> None: | ||
| self.__metrics_buffer.incr_counter("arroyo.consumer.run.count", 1) | ||
|
|
||
| if time.time() - self.__last_run_log_ts >= LOGGING_FREQUENCY_SEC: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please only calculate time.time() once, it's already very expensive to run it per _run_once
we also already have a counter metric for this, what is the purpose of the log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per-consumer breakdown
arroyo/processing/processor.py
Outdated
| ) | ||
|
|
||
| self.__last_run_log_ts = time.time() # This is for throttling the logging of each run loop per-consumer | ||
| self.__last_pause_ts = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can simplify your logic by setting all of these to 0, then you don't have to check for None
|
|
||
| # Records a log if the consumer has been active but receiving no message from poll() for longer than a threshold duration | ||
| elif time.time() - self.__last_empty_msg_ts >= LOGGING_FREQUENCY_SEC: | ||
| logger.info(f"Consumer is not paused but did not receive a message from underlying consumer for {LOGGING_FREQUENCY_SEC} seconds") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems severe enough to warn
We currently don't have these logs, which will also be helpful for breaking down per-consumer