diff --git a/docs/changelog/122417.yaml b/docs/changelog/122417.yaml new file mode 100644 index 0000000000000..f9e33df2a523c --- /dev/null +++ b/docs/changelog/122417.yaml @@ -0,0 +1,6 @@ +pr: 122417 +summary: Fix listener leak in exchange service +area: ES|QL +type: bug +issues: + - 122271 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java index 69df0fb8ceff1..2dfc60744be2e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java @@ -34,7 +34,8 @@ public EsqlRefCountingListener(ActionListener delegate) { } public ActionListener acquire() { - return refs.acquireListener().delegateResponse((l, e) -> { + var listener = ActionListener.assertAtLeastOnce(refs.acquireListener()); + return listener.delegateResponse((l, e) -> { failureCollector.unwrapAndCollect(e); l.onFailure(e); }); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index ac02273a48ee4..dd36a6f455e8b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -366,7 +367,13 @@ private void doFetchPageAsync(boolean allSourcesFinished, ActionListener 0) { // This doesn't fully protect ESQL from OOM, but reduces the likelihood. - blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page"); + try { + blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page"); + } catch (Exception e) { + assert e instanceof CircuitBreakingException : new AssertionError(e); + listener.onFailure(e); + return; + } listener = ActionListener.runAfter(listener, () -> blockFactory.breaker().addWithoutBreaking(-reservedBytes)); } transportService.sendChildRequest( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index 3d358b8c7a8a2..c8b8e84fd2478 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -47,7 +47,8 @@ final class ComputeListener implements Releasable { * Acquires a new listener that doesn't collect result */ ActionListener acquireAvoid() { - return refs.acquire().delegateResponse((l, e) -> { + var listener = ActionListener.assertAtLeastOnce(refs.acquire()); + return listener.delegateResponse((l, e) -> { try { runOnFailure.run(); } finally {