Skip to content

Commit 25fd1be

Browse files
authored
Ignore cancellation exceptions (#117657) (#118169)
Today, when an ES|QL task encounters an exception, we trigger a cancellation on the root task, causing child tasks to fail due to cancellation. We chose not to include cancellation exceptions in the output, as they are unhelpful and add noise during problem analysis. However, these exceptions are still slipping through via RefCountingListener. This change addresses the issue by introducing ESQLRefCountingListener, ensuring that no cancellation exceptions are returned.
1 parent e4b0f8a commit 25fd1be

File tree

12 files changed

+135
-42
lines changed

12 files changed

+135
-42
lines changed

docs/changelog/117657.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 117657
2+
summary: Ignore cancellation exceptions
3+
area: ES|QL
4+
type: bug
5+
issues: []
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.support.RefCountingRunnable;
12+
import org.elasticsearch.compute.operator.FailureCollector;
13+
import org.elasticsearch.core.Releasable;
14+
15+
/**
16+
* Similar to {@link org.elasticsearch.action.support.RefCountingListener},
17+
* but prefers non-task-cancelled exceptions over task-cancelled ones as they are more useful for diagnosing issues.
18+
* @see FailureCollector
19+
*/
20+
public final class EsqlRefCountingListener implements Releasable {
21+
private final FailureCollector failureCollector;
22+
private final RefCountingRunnable refs;
23+
24+
public EsqlRefCountingListener(ActionListener<Void> delegate) {
25+
this.failureCollector = new FailureCollector();
26+
this.refs = new RefCountingRunnable(() -> {
27+
Exception error = failureCollector.getFailure();
28+
if (error != null) {
29+
delegate.onFailure(error);
30+
} else {
31+
delegate.onResponse(null);
32+
}
33+
});
34+
}
35+
36+
public ActionListener<Void> acquire() {
37+
return refs.acquireListener().delegateResponse((l, e) -> {
38+
failureCollector.unwrapAndCollect(e);
39+
l.onFailure(e);
40+
});
41+
}
42+
43+
@Override
44+
public void close() {
45+
refs.close();
46+
}
47+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
import org.elasticsearch.tasks.TaskCancelledException;
1414
import org.elasticsearch.transport.TransportException;
1515

16-
import java.util.List;
1716
import java.util.Queue;
18-
import java.util.concurrent.atomic.AtomicInteger;
17+
import java.util.concurrent.Semaphore;
1918

2019
/**
2120
* {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine.
@@ -26,12 +25,11 @@
2625
*/
2726
public final class FailureCollector {
2827
private final Queue<Exception> cancelledExceptions = ConcurrentCollections.newQueue();
29-
private final AtomicInteger cancelledExceptionsCount = new AtomicInteger();
28+
private final Semaphore cancelledExceptionsPermits;
3029

3130
private final Queue<Exception> nonCancelledExceptions = ConcurrentCollections.newQueue();
32-
private final AtomicInteger nonCancelledExceptionsCount = new AtomicInteger();
31+
private final Semaphore nonCancelledExceptionsPermits;
3332

34-
private final int maxExceptions;
3533
private volatile boolean hasFailure = false;
3634
private Exception finalFailure = null;
3735

@@ -43,7 +41,8 @@ public FailureCollector(int maxExceptions) {
4341
if (maxExceptions <= 0) {
4442
throw new IllegalArgumentException("maxExceptions must be at least one");
4543
}
46-
this.maxExceptions = maxExceptions;
44+
this.cancelledExceptionsPermits = new Semaphore(maxExceptions);
45+
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
4746
}
4847

4948
private static Exception unwrapTransportException(TransportException te) {
@@ -60,13 +59,12 @@ private static Exception unwrapTransportException(TransportException te) {
6059
public void unwrapAndCollect(Exception e) {
6160
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
6261
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
63-
if (cancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
62+
if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) {
6463
cancelledExceptions.add(e);
6564
}
66-
} else {
67-
if (nonCancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
68-
nonCancelledExceptions.add(e);
69-
}
65+
} else if (nonCancelledExceptionsPermits.tryAcquire()) {
66+
nonCancelledExceptions.add(e);
67+
cancelledExceptions.clear();
7068
}
7169
hasFailure = true;
7270
}
@@ -99,20 +97,22 @@ public Exception getFailure() {
9997
private Exception buildFailure() {
10098
assert hasFailure;
10199
assert Thread.holdsLock(this);
102-
int total = 0;
103100
Exception first = null;
104-
for (var exceptions : List.of(nonCancelledExceptions, cancelledExceptions)) {
105-
for (Exception e : exceptions) {
106-
if (first == null) {
107-
first = e;
108-
total++;
109-
} else if (first != e) {
110-
first.addSuppressed(e);
111-
total++;
112-
}
113-
if (total >= maxExceptions) {
114-
return first;
115-
}
101+
for (Exception e : nonCancelledExceptions) {
102+
if (first == null) {
103+
first = e;
104+
} else if (first != e) {
105+
first.addSuppressed(e);
106+
}
107+
}
108+
if (first != null) {
109+
return first;
110+
}
111+
for (Exception e : cancelledExceptions) {
112+
if (first == null) {
113+
first = e;
114+
} else if (first != e) {
115+
first.addSuppressed(e);
116116
}
117117
}
118118
assert first != null;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99

1010
import org.elasticsearch.ExceptionsHelper;
1111
import org.elasticsearch.action.ActionListener;
12-
import org.elasticsearch.action.support.RefCountingListener;
12+
import org.elasticsearch.action.support.RefCountingRunnable;
1313
import org.elasticsearch.action.support.SubscribableListener;
1414
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
15+
import org.elasticsearch.compute.EsqlRefCountingListener;
1516
import org.elasticsearch.compute.data.Page;
1617
import org.elasticsearch.compute.operator.FailureCollector;
1718
import org.elasticsearch.compute.operator.IsBlockedResult;
@@ -54,20 +55,20 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
5455
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
5556
this.outstandingSources = new PendingInstances(() -> buffer.finish(true));
5657
buffer.addCompletionListener(ActionListener.running(() -> {
57-
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener).delegateFailure((l, unused) -> {
58+
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
59+
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
5860
final Exception e = failure.getFailure();
5961
if (e != null) {
60-
l.onFailure(e);
62+
listener.onFailure(e);
6163
} else {
62-
l.onResponse(null);
64+
listener.onResponse(null);
6365
}
64-
});
65-
try (RefCountingListener refs = new RefCountingListener(listener)) {
66+
})) {
6667
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
6768
// Create an outstanding instance and then finish to complete the completionListener
6869
// if we haven't registered any instances of exchange sinks or exchange sources before.
6970
pending.trackNewInstance();
70-
pending.completion.addListener(refs.acquire());
71+
pending.completion.addListener(refs.acquireListener());
7172
pending.finishInstance();
7273
}
7374
}
@@ -269,7 +270,7 @@ public void onFailure(Exception e) {
269270

270271
@Override
271272
protected void doRun() {
272-
try (RefCountingListener refs = new RefCountingListener(sinkListener)) {
273+
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(sinkListener)) {
273274
for (int i = 0; i < instances; i++) {
274275
var fetcher = new RemoteSinkFetcher(remoteSink, failFast, refs.acquire());
275276
fetcher.fetchPage();

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.operator;
99

10+
import org.elasticsearch.ExceptionsHelper;
1011
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
1112
import org.elasticsearch.common.Randomness;
1213
import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -86,6 +87,14 @@ public void testCollect() throws Exception {
8687
assertNotNull(failure);
8788
assertThat(failure, Matchers.in(nonCancelledExceptions));
8889
assertThat(failure.getSuppressed().length, lessThan(maxExceptions));
90+
assertTrue(
91+
"cancellation exceptions must be ignored",
92+
ExceptionsHelper.unwrapCausesAndSuppressed(failure, t -> t instanceof TaskCancelledException).isEmpty()
93+
);
94+
assertTrue(
95+
"remote transport exception must be unwrapped",
96+
ExceptionsHelper.unwrapCausesAndSuppressed(failure, t -> t instanceof TransportException).isEmpty()
97+
);
8998
}
9099

91100
public void testEmpty() {

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.lucene.document.InetAddressPoint;
1111
import org.apache.lucene.sandbox.document.HalfFloatPoint;
1212
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.ExceptionsHelper;
1314
import org.elasticsearch.common.Strings;
1415
import org.elasticsearch.common.breaker.CircuitBreaker;
1516
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
@@ -30,7 +31,9 @@
3031
import org.elasticsearch.geo.ShapeTestUtils;
3132
import org.elasticsearch.index.IndexMode;
3233
import org.elasticsearch.license.XPackLicenseState;
34+
import org.elasticsearch.tasks.TaskCancelledException;
3335
import org.elasticsearch.test.ESTestCase;
36+
import org.elasticsearch.transport.RemoteTransportException;
3437
import org.elasticsearch.xcontent.json.JsonXContent;
3538
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
3639
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
@@ -129,6 +132,8 @@
129132
import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.PATTERN;
130133
import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.VALUE;
131134
import static org.hamcrest.Matchers.instanceOf;
135+
import static org.junit.Assert.assertNotNull;
136+
import static org.junit.Assert.assertNull;
132137

133138
public final class EsqlTestUtils {
134139

@@ -784,4 +789,17 @@ public static QueryParam paramAsIdentifier(String name, Object value) {
784789
public static QueryParam paramAsPattern(String name, Object value) {
785790
return new QueryParam(name, value, NULL, PATTERN);
786791
}
792+
793+
/**
794+
* Asserts that:
795+
* 1. Cancellation exceptions are ignored when more relevant exceptions exist.
796+
* 2. Transport exceptions are unwrapped, and the actual causes are reported to users.
797+
*/
798+
public static void assertEsqlFailure(Exception e) {
799+
assertNotNull(e);
800+
var cancellationFailure = ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof TaskCancelledException).orElse(null);
801+
assertNull("cancellation exceptions must be ignored", cancellationFailure);
802+
ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof RemoteTransportException)
803+
.ifPresent(transportFailure -> assertNull("remote transport exception must be unwrapped", transportFailure.getCause()));
804+
}
787805
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ protected EsqlQueryResponse run(EsqlQueryRequest request) {
143143
return client.execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
144144
} catch (Exception e) {
145145
logger.info("request failed", e);
146+
EsqlTestUtils.assertEsqlFailure(e);
146147
ensureBlocksReleased();
147148
} finally {
148149
setRequestCircuitBreakerLimit(null);

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.plugins.Plugin;
2424
import org.elasticsearch.rest.RestStatus;
2525
import org.elasticsearch.test.junit.annotations.TestLogging;
26+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
2627

2728
import java.util.ArrayList;
2829
import java.util.Collection;
@@ -85,6 +86,7 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu
8586
} catch (Exception e) {
8687
logger.info("request failed", e);
8788
ensureBlocksReleased();
89+
EsqlTestUtils.assertEsqlFailure(e);
8890
throw e;
8991
} finally {
9092
setRequestCircuitBreakerLimit(null);

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.test.transport.MockTransportService;
3737
import org.elasticsearch.threadpool.ThreadPool;
3838
import org.elasticsearch.transport.TransportService;
39+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
3940
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
4041
import org.junit.Before;
4142

@@ -338,7 +339,15 @@ private void assertCancelled(ActionFuture<EsqlQueryResponse> response) throws Ex
338339
*/
339340
assertThat(
340341
cancelException.getMessage(),
341-
in(List.of("test cancel", "task cancelled", "request cancelled test cancel", "parent task was cancelled [test cancel]"))
342+
in(
343+
List.of(
344+
"test cancel",
345+
"task cancelled",
346+
"request cancelled test cancel",
347+
"parent task was cancelled [test cancel]",
348+
"cancelled on failure"
349+
)
350+
)
342351
);
343352
assertBusy(
344353
() -> assertThat(
@@ -434,6 +443,7 @@ protected void doRun() throws Exception {
434443
allowedFetching.countDown();
435444
}
436445
Exception failure = expectThrows(Exception.class, () -> future.actionGet().close());
446+
EsqlTestUtils.assertEsqlFailure(failure);
437447
assertThat(failure.getMessage(), containsString("failed to fetch pages"));
438448
// If we proceed without waiting for pages, we might cancel the main request before starting the data-node request.
439449
// As a result, the exchange sinks on data-nodes won't be removed until the inactive_timeout elapses, which is

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
2424
import org.elasticsearch.test.transport.MockTransportService;
2525
import org.elasticsearch.transport.TransportSettings;
26+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
2627

2728
import java.util.ArrayList;
2829
import java.util.Collection;
@@ -111,6 +112,7 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
111112
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
112113
ensureBlocksReleased();
113114
logger.info("--> failed to execute esql query with disruption; retrying...", e);
115+
EsqlTestUtils.assertEsqlFailure(e);
114116
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
115117
}
116118
}

0 commit comments

Comments
 (0)