MINOR: Keep pendingTask as WakeupFuture if currentTask is completed already. #21586
MINOR: Keep pendingTask as WakeupFuture if currentTask is completed already. #21586Nikita-Shupletsov wants to merge 3 commits intoapache:trunkfrom
Conversation
Added additional logging when a shutdown is requested.
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the PR @Nikita-Shupletsov LGTM, but lets get another look
| private final int maxMessages; | ||
| private final CountDownLatch shutdownLatch = new CountDownLatch(1); | ||
| private int consumedMessages = 0; | ||
| private boolean shutdownRequested = false; |
There was a problem hiding this comment.
might be overkill but maybe this should be marked volitile not sure if that's necessary if this is running on the main thread
There was a problem hiding this comment.
I was thinking about it as well as AtomicBoolean. but I don't think they would make that much of a difference: we change the value only once. there is ultimately no harm if we run the loop one more time before we exit.
but I can mark it volatile to be 100% by the book
There was a problem hiding this comment.
Either volatile or AtomicBoolean should work fine.
kirktrue
left a comment
There was a problem hiding this comment.
Thanks for the PR @Nikita-Shupletsov!
Does this PR fix a bug in the system tests or is it a workaround for a bug in the new consumer? My read of the PR description is the latter.
|
|
||
| @Override | ||
| public String name() { | ||
| return "shutdown_requested"; |
There was a problem hiding this comment.
I'm not seeing anything in the verifiable_consumer.py (or elsewhere) that's looking for this event. Am I missing something?
There was a problem hiding this comment.
it's for debugging. right now when a test fails because it couldn't stop a VerifiableConsumer it's impossible to tell what exactly happened: did we even receive a shutdown event? when did we receive it?
adding another log message should help
| private final int maxMessages; | ||
| private final CountDownLatch shutdownLatch = new CountDownLatch(1); | ||
| private int consumedMessages = 0; | ||
| private boolean shutdownRequested = false; |
There was a problem hiding this comment.
Either volatile or AtomicBoolean should work fine.
you are right. sorry, I overlooked. clearTask does not clear WakeupFuture. so it shouldn't cleaned up. |
|
@kirktrue |
System tests that use VerifiableConsumer are flaky because VerifiableConsumer isn't shutting down on request in certain situations.
There can be a race condition in the commitSync method, as the future that we set as the active task to the wakeupTrigger can be already completed by the time we are setting it. Which leads to the wakeup request never being fulfilled.
Added a check if the task we are receiving in setActiveTask was triggered when we complete it exceptionally.
Also added additional logging when a shutdown is requested to make debugging easier.