Skip to content

Commit a59785f

Browse files
authored
Fix listener leak in exchange service (elastic#122417) (elastic#122454) (elastic#122467)
If we hit the circuit breaker exception before fetching pages, we fail to notify the listener. Closes elastic#122271
1 parent 1a42688 commit a59785f

File tree

4 files changed

+18
-3
lines changed

4 files changed

+18
-3
lines changed

docs/changelog/122417.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 122417
2+
summary: Fix listener leak in exchange service
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 122271

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public EsqlRefCountingListener(ActionListener<Void> delegate) {
3434
}
3535

3636
public ActionListener<Void> acquire() {
37-
return refs.acquireListener().delegateResponse((l, e) -> {
37+
var listener = ActionListener.assertAtLeastOnce(refs.acquireListener());
38+
return listener.delegateResponse((l, e) -> {
3839
failureCollector.unwrapAndCollect(e);
3940
l.onFailure(e);
4041
});

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.ActionListenerResponseHandler;
1616
import org.elasticsearch.action.support.ChannelActionListener;
17+
import org.elasticsearch.common.breaker.CircuitBreakingException;
1718
import org.elasticsearch.common.component.AbstractLifecycleComponent;
1819
import org.elasticsearch.common.io.stream.StreamInput;
1920
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -334,7 +335,13 @@ private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<Exchang
334335
final long reservedBytes = allSourcesFinished ? 0 : estimatedPageSizeInBytes.get();
335336
if (reservedBytes > 0) {
336337
// This doesn't fully protect ESQL from OOM, but reduces the likelihood.
337-
blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page");
338+
try {
339+
blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page");
340+
} catch (Exception e) {
341+
assert e instanceof CircuitBreakingException : new AssertionError(e);
342+
listener.onFailure(e);
343+
return;
344+
}
338345
listener = ActionListener.runAfter(listener, () -> blockFactory.breaker().addWithoutBreaking(-reservedBytes));
339346
}
340347
transportService.sendChildRequest(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ private boolean isCCSListener(String computeClusterAlias) {
174174
* Acquires a new listener that doesn't collect result
175175
*/
176176
ActionListener<Void> acquireAvoid() {
177-
return refs.acquire().delegateResponse((l, e) -> {
177+
var listener = ActionListener.assertAtLeastOnce(refs.acquire());
178+
return listener.delegateResponse((l, e) -> {
178179
try {
179180
if (cancelled.compareAndSet(false, true)) {
180181
LOGGER.debug("cancelling ESQL task {} on failure", task);

0 commit comments

Comments
 (0)