Skip to content

Commit 44a054f

Browse files
authored
Fix listener leak in exchange service (elastic#122417) (elastic#122454)
If we hit the circuit breaker exception before fetching pages, we fail to notify the listener. Closes elastic#122271
1 parent 6124b48 commit 44a054f

File tree

4 files changed

+18
-3
lines changed

4 files changed

+18
-3
lines changed

docs/changelog/122417.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 122417
2+
summary: Fix listener leak in exchange service
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 122271

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public EsqlRefCountingListener(ActionListener<Void> delegate) {
3434
}
3535

3636
public ActionListener<Void> acquire() {
37-
return refs.acquireListener().delegateResponse((l, e) -> {
37+
var listener = ActionListener.assertAtLeastOnce(refs.acquireListener());
38+
return listener.delegateResponse((l, e) -> {
3839
failureCollector.unwrapAndCollect(e);
3940
l.onFailure(e);
4041
});

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.ActionListenerResponseHandler;
1616
import org.elasticsearch.action.support.ChannelActionListener;
1717
import org.elasticsearch.action.support.SubscribableListener;
18+
import org.elasticsearch.common.breaker.CircuitBreakingException;
1819
import org.elasticsearch.common.component.AbstractLifecycleComponent;
1920
import org.elasticsearch.common.io.stream.StreamInput;
2021
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -366,7 +367,13 @@ private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<Exchang
366367
final long reservedBytes = allSourcesFinished ? 0 : estimatedPageSizeInBytes.get();
367368
if (reservedBytes > 0) {
368369
// This doesn't fully protect ESQL from OOM, but reduces the likelihood.
369-
blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page");
370+
try {
371+
blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page");
372+
} catch (Exception e) {
373+
assert e instanceof CircuitBreakingException : new AssertionError(e);
374+
listener.onFailure(e);
375+
return;
376+
}
370377
listener = ActionListener.runAfter(listener, () -> blockFactory.breaker().addWithoutBreaking(-reservedBytes));
371378
}
372379
transportService.sendChildRequest(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ final class ComputeListener implements Releasable {
4747
* Acquires a new listener that doesn't collect result
4848
*/
4949
ActionListener<Void> acquireAvoid() {
50-
return refs.acquire().delegateResponse((l, e) -> {
50+
var listener = ActionListener.assertAtLeastOnce(refs.acquire());
51+
return listener.delegateResponse((l, e) -> {
5152
try {
5253
runOnFailure.run();
5354
} finally {

0 commit comments

Comments
 (0)