-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Close data node consumer on listener completion #137698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Pinging @elastic/es-search-foundations (Team:Search Foundations) |
| this.task = task; | ||
| this.countDown = new CountDown(queryPhaseResultConsumer.getNumShards()); | ||
| this.channel = channel; | ||
| this.listener = ActionListener.releaseBefore(queryPhaseResultConsumer, new ChannelActionListener<>(channel)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach structurally looks better, as it centralizes the consumer’s lifecycle, ensuring that every success/failure path passes through the same wrapper.
Both approaches behave the same,
- Serialize with an open consumer,
- Close the consumer
- Send response (or failure)
- Let respondAndRelease to free the bytes
The code in main
- Uses a local
ChannelActionListenerand atry (queryPhaseResultConsumer)block. On success, the consumer is closed at the end of thetry-with-resources(consumer) block (i.e., after serialization finishes, before building/sending the transport response).On failure, the consumer is also closed by thetry-with-resourcesand failure is sent viachannelListener.
The pr's code:
- Wraps the channel listener with
releaseBefore(consumer, …)so the consumer is always closed before sending success/failure. On success, the consumer is closed right before delegating to thechannel(via wrapper). Serialization happens with consumer open; then the wrapper closes it and writes.- On
failure,listener.onFailure(e)closes the consumer first (via wrapper) and then writes the failure.
I prefer the updated code, as releasing the consumer uniformly on both success and failure is cleaner; however, I’m not convinced it addresses the underlying issue.
I ran multiple local executions with @Repeat of SearchWithRandomDisconnectsIT#testSearchWithRandomDisconnects and was unable to reproduce the failure. Not sure if something changed or if it’s just my machine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, you're right, I'm struggling to reproduce on main myself now. I could have sworn I continued to see failures after my #136889 fix, but I might be mistaken - perhaps that solved it.
What do you think? I'm thinking to table this change, which might still be a worthy improvement, and just unmute SearchWithRandomDisconnectsIT for now. We can see if it's still failing in CI.
|
Closing this for now as I've opened #137763 to simply unmute. This may be a worthy improvement for the future but not a priority. Shout out @drempapis for double checking me here. |
Resolves #122707.
Currently, we close the consumer (therefore decRef-ing all consumed shard results) once all shards in the batched query request are complete. I found that SearchWithRandomDisconnectsIT often causes the batched request to complete before all shards are done, leading to the leak with QuerySearchResults sitting in an un-closed consumer (to reproduce this locally, just run the suite with
@Repeat(iterations = 100)).I think we should mirror what is done on the coord node side - tie the consumer to the request listener (see AbstractSearchAsyncAction).