Skip to content

Commit c57f1c4

Browse files
authored
Ignore cancellation exceptions (#117657) (#118169) (#118181)
Today, when an ES|QL task encounters an exception, we trigger a cancellation on the root task, causing child tasks to fail due to cancellation. We chose not to include cancellation exceptions in the output, as they are unhelpful and add noise during problem analysis. However, these exceptions are still slipping through via RefCountingListener. This change addresses the issue by introducing ESQLRefCountingListener, ensuring that no cancellation exceptions are returned.
1 parent 94581db commit c57f1c4

File tree

11 files changed

+126
-34
lines changed

11 files changed

+126
-34
lines changed

docs/changelog/117657.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 117657
2+
summary: Ignore cancellation exceptions
3+
area: ES|QL
4+
type: bug
5+
issues: []
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.support.RefCountingRunnable;
12+
import org.elasticsearch.compute.operator.FailureCollector;
13+
import org.elasticsearch.core.Releasable;
14+
15+
/**
16+
* Similar to {@link org.elasticsearch.action.support.RefCountingListener},
17+
* but prefers non-task-cancelled exceptions over task-cancelled ones as they are more useful for diagnosing issues.
18+
* @see FailureCollector
19+
*/
20+
public final class EsqlRefCountingListener implements Releasable {
21+
private final FailureCollector failureCollector;
22+
private final RefCountingRunnable refs;
23+
24+
public EsqlRefCountingListener(ActionListener<Void> delegate) {
25+
this.failureCollector = new FailureCollector();
26+
this.refs = new RefCountingRunnable(() -> {
27+
Exception error = failureCollector.getFailure();
28+
if (error != null) {
29+
delegate.onFailure(error);
30+
} else {
31+
delegate.onResponse(null);
32+
}
33+
});
34+
}
35+
36+
public ActionListener<Void> acquire() {
37+
return refs.acquireListener().delegateResponse((l, e) -> {
38+
failureCollector.unwrapAndCollect(e);
39+
l.onFailure(e);
40+
});
41+
}
42+
43+
@Override
44+
public void close() {
45+
refs.close();
46+
}
47+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
import org.elasticsearch.tasks.TaskCancelledException;
1414
import org.elasticsearch.transport.TransportException;
1515

16-
import java.util.List;
1716
import java.util.Queue;
18-
import java.util.concurrent.atomic.AtomicInteger;
17+
import java.util.concurrent.Semaphore;
1918

2019
/**
2120
* {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine.
@@ -26,12 +25,11 @@
2625
*/
2726
public final class FailureCollector {
2827
private final Queue<Exception> cancelledExceptions = ConcurrentCollections.newQueue();
29-
private final AtomicInteger cancelledExceptionsCount = new AtomicInteger();
28+
private final Semaphore cancelledExceptionsPermits;
3029

3130
private final Queue<Exception> nonCancelledExceptions = ConcurrentCollections.newQueue();
32-
private final AtomicInteger nonCancelledExceptionsCount = new AtomicInteger();
31+
private final Semaphore nonCancelledExceptionsPermits;
3332

34-
private final int maxExceptions;
3533
private volatile boolean hasFailure = false;
3634
private Exception finalFailure = null;
3735

@@ -43,7 +41,8 @@ public FailureCollector(int maxExceptions) {
4341
if (maxExceptions <= 0) {
4442
throw new IllegalArgumentException("maxExceptions must be at least one");
4543
}
46-
this.maxExceptions = maxExceptions;
44+
this.cancelledExceptionsPermits = new Semaphore(maxExceptions);
45+
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
4746
}
4847

4948
private static Exception unwrapTransportException(TransportException te) {
@@ -60,13 +59,12 @@ private static Exception unwrapTransportException(TransportException te) {
6059
public void unwrapAndCollect(Exception e) {
6160
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
6261
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
63-
if (cancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
62+
if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) {
6463
cancelledExceptions.add(e);
6564
}
66-
} else {
67-
if (nonCancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
68-
nonCancelledExceptions.add(e);
69-
}
65+
} else if (nonCancelledExceptionsPermits.tryAcquire()) {
66+
nonCancelledExceptions.add(e);
67+
cancelledExceptions.clear();
7068
}
7169
hasFailure = true;
7270
}
@@ -99,20 +97,22 @@ public Exception getFailure() {
9997
private Exception buildFailure() {
10098
assert hasFailure;
10199
assert Thread.holdsLock(this);
102-
int total = 0;
103100
Exception first = null;
104-
for (var exceptions : List.of(nonCancelledExceptions, cancelledExceptions)) {
105-
for (Exception e : exceptions) {
106-
if (first == null) {
107-
first = e;
108-
total++;
109-
} else if (first != e) {
110-
first.addSuppressed(e);
111-
total++;
112-
}
113-
if (total >= maxExceptions) {
114-
return first;
115-
}
101+
for (Exception e : nonCancelledExceptions) {
102+
if (first == null) {
103+
first = e;
104+
} else if (first != e) {
105+
first.addSuppressed(e);
106+
}
107+
}
108+
if (first != null) {
109+
return first;
110+
}
111+
for (Exception e : cancelledExceptions) {
112+
if (first == null) {
113+
first = e;
114+
} else if (first != e) {
115+
first.addSuppressed(e);
116116
}
117117
}
118118
assert first != null;

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.operator;
99

10+
import org.elasticsearch.ExceptionsHelper;
1011
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
1112
import org.elasticsearch.common.Randomness;
1213
import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -86,6 +87,14 @@ public void testCollect() throws Exception {
8687
assertNotNull(failure);
8788
assertThat(failure, Matchers.in(nonCancelledExceptions));
8889
assertThat(failure.getSuppressed().length, lessThan(maxExceptions));
90+
assertTrue(
91+
"cancellation exceptions must be ignored",
92+
ExceptionsHelper.unwrapCausesAndSuppressed(failure, t -> t instanceof TaskCancelledException).isEmpty()
93+
);
94+
assertTrue(
95+
"remote transport exception must be unwrapped",
96+
ExceptionsHelper.unwrapCausesAndSuppressed(failure, t -> t instanceof TransportException).isEmpty()
97+
);
8998
}
9099

91100
public void testEmpty() {

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.lucene.document.InetAddressPoint;
1111
import org.apache.lucene.sandbox.document.HalfFloatPoint;
1212
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.ExceptionsHelper;
1314
import org.elasticsearch.common.Strings;
1415
import org.elasticsearch.common.breaker.CircuitBreaker;
1516
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
@@ -30,7 +31,9 @@
3031
import org.elasticsearch.geo.ShapeTestUtils;
3132
import org.elasticsearch.index.IndexMode;
3233
import org.elasticsearch.license.XPackLicenseState;
34+
import org.elasticsearch.tasks.TaskCancelledException;
3335
import org.elasticsearch.test.ESTestCase;
36+
import org.elasticsearch.transport.RemoteTransportException;
3437
import org.elasticsearch.xcontent.json.JsonXContent;
3538
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
3639
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
@@ -129,6 +132,8 @@
129132
import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.PATTERN;
130133
import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.VALUE;
131134
import static org.hamcrest.Matchers.instanceOf;
135+
import static org.junit.Assert.assertNotNull;
136+
import static org.junit.Assert.assertNull;
132137

133138
public final class EsqlTestUtils {
134139

@@ -784,4 +789,17 @@ public static QueryParam paramAsIdentifier(String name, Object value) {
784789
public static QueryParam paramAsPattern(String name, Object value) {
785790
return new QueryParam(name, value, NULL, PATTERN);
786791
}
792+
793+
/**
794+
* Asserts that:
795+
* 1. Cancellation exceptions are ignored when more relevant exceptions exist.
796+
* 2. Transport exceptions are unwrapped, and the actual causes are reported to users.
797+
*/
798+
public static void assertEsqlFailure(Exception e) {
799+
assertNotNull(e);
800+
var cancellationFailure = ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof TaskCancelledException).orElse(null);
801+
assertNull("cancellation exceptions must be ignored", cancellationFailure);
802+
ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof RemoteTransportException)
803+
.ifPresent(transportFailure -> assertNull("remote transport exception must be unwrapped", transportFailure.getCause()));
804+
}
787805
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ protected EsqlQueryResponse run(EsqlQueryRequest request) {
143143
return client.execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
144144
} catch (Exception e) {
145145
logger.info("request failed", e);
146+
EsqlTestUtils.assertEsqlFailure(e);
146147
ensureBlocksReleased();
147148
} finally {
148149
setRequestCircuitBreakerLimit(null);

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.plugins.Plugin;
2424
import org.elasticsearch.rest.RestStatus;
2525
import org.elasticsearch.test.junit.annotations.TestLogging;
26+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
2627

2728
import java.util.ArrayList;
2829
import java.util.Collection;
@@ -85,6 +86,7 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu
8586
} catch (Exception e) {
8687
logger.info("request failed", e);
8788
ensureBlocksReleased();
89+
EsqlTestUtils.assertEsqlFailure(e);
8890
throw e;
8991
} finally {
9092
setRequestCircuitBreakerLimit(null);

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.test.transport.MockTransportService;
3737
import org.elasticsearch.threadpool.ThreadPool;
3838
import org.elasticsearch.transport.TransportService;
39+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
3940
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
4041
import org.junit.Before;
4142

@@ -326,7 +327,15 @@ private void assertCancelled(ActionFuture<EsqlQueryResponse> response) throws Ex
326327
*/
327328
assertThat(
328329
cancelException.getMessage(),
329-
in(List.of("test cancel", "task cancelled", "request cancelled test cancel", "parent task was cancelled [test cancel]"))
330+
in(
331+
List.of(
332+
"test cancel",
333+
"task cancelled",
334+
"request cancelled test cancel",
335+
"parent task was cancelled [test cancel]",
336+
"cancelled on failure"
337+
)
338+
)
330339
);
331340
assertBusy(
332341
() -> assertThat(
@@ -422,6 +431,7 @@ protected void doRun() throws Exception {
422431
allowedFetching.countDown();
423432
}
424433
Exception failure = expectThrows(Exception.class, () -> future.actionGet().close());
434+
EsqlTestUtils.assertEsqlFailure(failure);
425435
assertThat(failure.getMessage(), containsString("failed to fetch pages"));
426436
// If we proceed without waiting for pages, we might cancel the main request before starting the data-node request.
427437
// As a result, the exchange sinks on data-nodes won't be removed until the inactive_timeout elapses, which is

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
2424
import org.elasticsearch.test.transport.MockTransportService;
2525
import org.elasticsearch.transport.TransportSettings;
26+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
2627

2728
import java.util.ArrayList;
2829
import java.util.Collection;
@@ -111,6 +112,7 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
111112
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
112113
ensureBlocksReleased();
113114
logger.info("--> failed to execute esql query with disruption; retrying...", e);
115+
EsqlTestUtils.assertEsqlFailure(e);
114116
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
115117
}
116118
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.support.RefCountingListener;
12+
import org.elasticsearch.compute.EsqlRefCountingListener;
1213
import org.elasticsearch.compute.operator.DriverProfile;
13-
import org.elasticsearch.compute.operator.FailureCollector;
1414
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
1515
import org.elasticsearch.core.Nullable;
1616
import org.elasticsearch.core.Releasable;
@@ -39,8 +39,7 @@
3939
final class ComputeListener implements Releasable {
4040
private static final Logger LOGGER = LogManager.getLogger(ComputeService.class);
4141

42-
private final RefCountingListener refs;
43-
private final FailureCollector failureCollector = new FailureCollector();
42+
private final EsqlRefCountingListener refs;
4443
private final AtomicBoolean cancelled = new AtomicBoolean();
4544
private final CancellableTask task;
4645
private final TransportService transportService;
@@ -105,7 +104,7 @@ private ComputeListener(
105104
: "clusterAlias and executionInfo must both be null or both non-null";
106105

107106
// listener that executes after all the sub-listeners refs (created via acquireCompute) have completed
108-
this.refs = new RefCountingListener(1, ActionListener.wrap(ignored -> {
107+
this.refs = new EsqlRefCountingListener(delegate.delegateFailure((l, ignored) -> {
109108
responseHeaders.finish();
110109
ComputeResponse result;
111110

@@ -136,7 +135,7 @@ private ComputeListener(
136135
}
137136
}
138137
delegate.onResponse(result);
139-
}, e -> delegate.onFailure(failureCollector.getFailure())));
138+
}));
140139
}
141140

142141
/**
@@ -176,7 +175,6 @@ private boolean isCCSListener(String computeClusterAlias) {
176175
*/
177176
ActionListener<Void> acquireAvoid() {
178177
return refs.acquire().delegateResponse((l, e) -> {
179-
failureCollector.unwrapAndCollect(e);
180178
try {
181179
if (cancelled.compareAndSet(false, true)) {
182180
LOGGER.debug("cancelling ESQL task {} on failure", task);

0 commit comments

Comments
 (0)