From 18eb40a49499789fe7915839591a0d5077a6f193 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 12 Feb 2025 08:59:05 -0800 Subject: [PATCH 1/3] Fix leak listener in exchange service --- .../elasticsearch/compute/EsqlRefCountingListener.java | 3 ++- .../compute/operator/exchange/ExchangeService.java | 9 ++++++++- .../elasticsearch/xpack/esql/plugin/ComputeListener.java | 3 ++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java index 69df0fb8ceff1..2dfc60744be2e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java @@ -34,7 +34,8 @@ public EsqlRefCountingListener(ActionListener delegate) { } public ActionListener acquire() { - return refs.acquireListener().delegateResponse((l, e) -> { + var listener = ActionListener.assertAtLeastOnce(refs.acquireListener()); + return listener.delegateResponse((l, e) -> { failureCollector.unwrapAndCollect(e); l.onFailure(e); }); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index ac02273a48ee4..dd36a6f455e8b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -366,7 +367,13 @@ private void doFetchPageAsync(boolean allSourcesFinished, ActionListener 0) { // This doesn't fully protect ESQL from OOM, but reduces the likelihood. - blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page"); + try { + blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page"); + } catch (Exception e) { + assert e instanceof CircuitBreakingException : new AssertionError(e); + listener.onFailure(e); + return; + } listener = ActionListener.runAfter(listener, () -> blockFactory.breaker().addWithoutBreaking(-reservedBytes)); } transportService.sendChildRequest( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index 3d358b8c7a8a2..c8b8e84fd2478 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -47,7 +47,8 @@ final class ComputeListener implements Releasable { * Acquires a new listener that doesn't collect result */ ActionListener acquireAvoid() { - return refs.acquire().delegateResponse((l, e) -> { + var listener = ActionListener.assertAtLeastOnce(refs.acquire()); + return listener.delegateResponse((l, e) -> { try { runOnFailure.run(); } finally { From 99412cee76215c0bc9e841437bfeb2eb9849b9a7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 12 Feb 2025 15:59:27 -0800 Subject: [PATCH 2/3] Update docs/changelog/122417.yaml --- docs/changelog/122417.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/122417.yaml diff --git a/docs/changelog/122417.yaml b/docs/changelog/122417.yaml new file mode 100644 index 0000000000000..55bab3c405ed9 --- /dev/null +++ b/docs/changelog/122417.yaml @@ -0,0 +1,5 @@ +pr: 122417 +summary: Fix listener leak in exchange service +area: ES|QL +type: bug +issues: [] From 5626059fa42e76942236121a54e8674afb3ee932 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 12 Feb 2025 16:00:22 -0800 Subject: [PATCH 3/3] Update docs/changelog/122417.yaml --- docs/changelog/122417.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/changelog/122417.yaml b/docs/changelog/122417.yaml index 55bab3c405ed9..f9e33df2a523c 100644 --- a/docs/changelog/122417.yaml +++ b/docs/changelog/122417.yaml @@ -2,4 +2,5 @@ pr: 122417 summary: Fix listener leak in exchange service area: ES|QL type: bug -issues: [] +issues: + - 122271