Skip to content

Commit f789277

Browse files
authored
Prefer client errors while collecting ES|QL failures (#122290)
Currently, the ES|QL failure collectors categorize errors into non-cancellation and cancellation errors, preferring to return non-cancellation errors to users. With the retry on shard-level failure, the failure collector can now collect more categories of errors: client errors, server errors, shard-unavailable errors, and cancellation errors. For easier diagnostics and operations (especially on serverless), the failure collectors prefer returning client (4xx) errors over server (5xx) errors, shard-unavailable errors, and cancellation errors. Relates #120774
1 parent ee3542a commit f789277

File tree

4 files changed

+77
-58
lines changed

4 files changed

+77
-58
lines changed

muted-tests.yml

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -368,12 +368,6 @@ tests:
368368
- class: org.elasticsearch.xpack.security.authz.IndexAliasesTests
369369
method: testRemoveIndex
370370
issue: https://github.com/elastic/elasticsearch/issues/122221
371-
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
372-
method: testGroupingMultiValueByOrdinals
373-
issue: https://github.com/elastic/elasticsearch/issues/122228
374-
- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT
375-
method: testFailureLoadingFields
376-
issue: https://github.com/elastic/elasticsearch/issues/122132
377371
- class: org.elasticsearch.blocks.SimpleBlocksIT
378372
method: testConcurrentAddBlock
379373
issue: https://github.com/elastic/elasticsearch/issues/122324
@@ -385,9 +379,6 @@ tests:
385379
- class: org.elasticsearch.xpack.esql.action.CrossClusterCancellationIT
386380
method: testCloseSkipUnavailable
387381
issue: https://github.com/elastic/elasticsearch/issues/122336
388-
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
389-
method: testUnsupportedTypesOrdinalGrouping
390-
issue: https://github.com/elastic/elasticsearch/issues/122342
391382
- class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT
392383
method: test {yaml=reference/alias/line_260}
393384
issue: https://github.com/elastic/elasticsearch/issues/122343
@@ -398,9 +389,6 @@ tests:
398389
issue: https://github.com/elastic/elasticsearch/issues/122377
399390
- class: org.elasticsearch.repositories.blobstore.testkit.analyze.HdfsRepositoryAnalysisRestIT
400391
issue: https://github.com/elastic/elasticsearch/issues/122378
401-
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
402-
method: testStatsMissingFieldWithStats
403-
issue: https://github.com/elastic/elasticsearch/issues/122327
404392
- class: org.elasticsearch.xpack.esql.CsvTests
405393
issue: https://github.com/elastic/elasticsearch/issues/122440
406394

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

Lines changed: 53 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,35 @@
99

1010
import org.elasticsearch.ElasticsearchException;
1111
import org.elasticsearch.ExceptionsHelper;
12-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
12+
import org.elasticsearch.action.support.TransportActions;
1313
import org.elasticsearch.tasks.TaskCancelledException;
1414
import org.elasticsearch.transport.TransportException;
1515

16+
import java.util.EnumMap;
17+
import java.util.List;
18+
import java.util.Map;
1619
import java.util.Queue;
17-
import java.util.concurrent.Semaphore;
20+
import java.util.concurrent.ArrayBlockingQueue;
1821

1922
/**
2023
* {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine.
21-
* The collected exceptions are categorized into task-cancelled and non-task-cancelled exceptions.
22-
* To limit memory usage, this class collects only the first 10 exceptions in each category by default.
23-
* When returning the accumulated failure to the caller, this class prefers non-task-cancelled exceptions
24-
* over task-cancelled ones as they are more useful for diagnosing issues.
24+
* The collected exceptions are categorized into client (4xx), server (5xx), shard-unavailable errors,
25+
* and cancellation errors. To limit memory usage, this class collects only the first 10 exceptions in
26+
* each category by default. When returning the accumulated failures to the caller, this class prefers
27+
* client (4xx) errors over server (5xx) errors, shard-unavailable errors, and cancellation errors,
28+
* as they are more useful for diagnosing issues.
2529
*/
2630
public final class FailureCollector {
27-
private final Queue<Exception> cancelledExceptions = ConcurrentCollections.newQueue();
28-
private final Semaphore cancelledExceptionsPermits;
2931

30-
private final Queue<Exception> nonCancelledExceptions = ConcurrentCollections.newQueue();
31-
private final Semaphore nonCancelledExceptionsPermits;
32+
private enum Category {
33+
CLIENT,
34+
SERVER,
35+
SHARD_UNAVAILABLE,
36+
CANCELLATION
37+
}
38+
39+
private final Map<Category, Queue<Exception>> categories;
40+
private final int maxExceptions;
3241

3342
private volatile boolean hasFailure = false;
3443
private Exception finalFailure = null;
@@ -41,8 +50,11 @@ public FailureCollector(int maxExceptions) {
4150
if (maxExceptions <= 0) {
4251
throw new IllegalArgumentException("maxExceptions must be at least one");
4352
}
44-
this.cancelledExceptionsPermits = new Semaphore(maxExceptions);
45-
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
53+
this.maxExceptions = maxExceptions;
54+
this.categories = new EnumMap<>(Category.class);
55+
for (Category c : Category.values()) {
56+
this.categories.put(c, new ArrayBlockingQueue<>(maxExceptions));
57+
}
4658
}
4759

4860
public static Exception unwrapTransportException(TransportException te) {
@@ -56,16 +68,24 @@ public static Exception unwrapTransportException(TransportException te) {
5668
}
5769
}
5870

59-
public void unwrapAndCollect(Exception e) {
60-
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
71+
private static Category getErrorCategory(Exception e) {
6172
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
62-
if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) {
63-
cancelledExceptions.add(e);
73+
return Category.CANCELLATION;
74+
} else if (TransportActions.isShardNotAvailableException(e)) {
75+
return Category.SHARD_UNAVAILABLE;
76+
} else {
77+
final int status = ExceptionsHelper.status(e).getStatus();
78+
if (400 <= status && status < 500) {
79+
return Category.CLIENT;
80+
} else {
81+
return Category.SERVER;
6482
}
65-
} else if (nonCancelledExceptionsPermits.tryAcquire()) {
66-
nonCancelledExceptions.add(e);
67-
cancelledExceptions.clear();
6883
}
84+
}
85+
86+
public void unwrapAndCollect(Exception e) {
87+
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
88+
categories.get(getErrorCategory(e)).offer(e);
6989
hasFailure = true;
7090
}
7191

@@ -77,8 +97,8 @@ public boolean hasFailure() {
7797
}
7898

7999
/**
80-
* Returns the accumulated failure, preferring non-task-cancelled exceptions over task-cancelled ones.
81-
* Once this method builds the failure, incoming failures are discarded.
100+
* Returns the accumulated failure, preferring client (4xx) errors over server (5xx) errors and cancellation errors,
101+
* as they are more useful for diagnosing issues. Once this method builds the failure, incoming failures are discarded.
82102
*
83103
* @return the accumulated failure, or {@code null} if no failure has been collected
84104
*/
@@ -98,21 +118,19 @@ private Exception buildFailure() {
98118
assert hasFailure;
99119
assert Thread.holdsLock(this);
100120
Exception first = null;
101-
for (Exception e : nonCancelledExceptions) {
102-
if (first == null) {
103-
first = e;
104-
} else if (first != e) {
105-
first.addSuppressed(e);
121+
int collected = 0;
122+
for (Category category : List.of(Category.CLIENT, Category.SERVER, Category.SHARD_UNAVAILABLE, Category.CANCELLATION)) {
123+
if (first != null && category == Category.CANCELLATION) {
124+
continue; // do not add cancellation errors if other errors present
106125
}
107-
}
108-
if (first != null) {
109-
return first;
110-
}
111-
for (Exception e : cancelledExceptions) {
112-
if (first == null) {
113-
first = e;
114-
} else if (first != e) {
115-
first.addSuppressed(e);
126+
for (Exception e : categories.get(category)) {
127+
if (++collected <= maxExceptions) {
128+
if (first == null) {
129+
first = e;
130+
} else if (first != e) {
131+
first.addSuppressed(e);
132+
}
133+
}
116134
}
117135
}
118136
assert first != null;

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
package org.elasticsearch.compute.operator;
99

1010
import org.elasticsearch.ExceptionsHelper;
11+
import org.elasticsearch.action.NoShardAvailableActionException;
1112
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
1213
import org.elasticsearch.common.Randomness;
1314
import org.elasticsearch.common.breaker.CircuitBreaker;
1415
import org.elasticsearch.common.breaker.CircuitBreakingException;
16+
import org.elasticsearch.index.shard.ShardId;
1517
import org.elasticsearch.tasks.TaskCancelledException;
1618
import org.elasticsearch.test.ESTestCase;
1719
import org.elasticsearch.transport.NodeDisconnectedException;
@@ -106,13 +108,28 @@ public void testEmpty() {
106108
public void testTransportExceptions() {
107109
FailureCollector collector = new FailureCollector(5);
108110
collector.unwrapAndCollect(new NodeDisconnectedException(DiscoveryNodeUtils.builder("node-1").build(), "/field_caps"));
109-
collector.unwrapAndCollect(new TransportException(new CircuitBreakingException("too large", CircuitBreaker.Durability.TRANSIENT)));
111+
collector.unwrapAndCollect(new TransportException(new IOException("disk issue")));
110112
Exception failure = collector.getFailure();
111113
assertNotNull(failure);
112114
assertThat(failure, instanceOf(NodeDisconnectedException.class));
113115
assertThat(failure.getMessage(), equalTo("[][0.0.0.0:1][/field_caps] disconnected"));
114116
Throwable[] suppressed = failure.getSuppressed();
115117
assertThat(suppressed, arrayWithSize(1));
116-
assertThat(suppressed[0], instanceOf(CircuitBreakingException.class));
118+
assertThat(suppressed[0], instanceOf(IOException.class));
119+
}
120+
121+
public void testErrorCategory() {
122+
FailureCollector collector = new FailureCollector(5);
123+
collector.unwrapAndCollect(new NoShardAvailableActionException(new ShardId("test", "n/a", 1), "not ready"));
124+
collector.unwrapAndCollect(
125+
new TransportException(new CircuitBreakingException("request is too large", CircuitBreaker.Durability.TRANSIENT))
126+
);
127+
Exception failure = collector.getFailure();
128+
assertNotNull(failure);
129+
assertThat(failure, instanceOf(CircuitBreakingException.class));
130+
assertThat(failure.getMessage(), equalTo("request is too large"));
131+
assertThat(failure.getSuppressed(), arrayWithSize(1));
132+
assertThat(failure.getSuppressed()[0], instanceOf(NoShardAvailableActionException.class));
133+
assertThat(failure.getSuppressed()[0].getMessage(), equalTo("not ready"));
117134
}
118135
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,6 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
124124
reportedFailure = true;
125125
reportFailures(computeListener);
126126
} else {
127-
pendingShardIds.removeIf(shr -> {
128-
var failure = shardFailures.get(shr);
129-
return failure != null && failure.fatal;
130-
});
131127
var nodeRequests = selectNodeRequests(targetShards);
132128
for (NodeRequest request : nodeRequests) {
133129
sendOneNodeRequest(targetShards, computeListener, request);
@@ -238,10 +234,6 @@ record TargetShards(Map<ShardId, TargetShard> shards, int totalShards, int skipp
238234
TargetShard getShard(ShardId shardId) {
239235
return shards.get(shardId);
240236
}
241-
242-
Set<ShardId> shardIds() {
243-
return shards.keySet();
244-
}
245237
}
246238

247239
/**
@@ -270,7 +262,11 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
270262
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();
271263
while (shardsIt.hasNext()) {
272264
ShardId shardId = shardsIt.next();
273-
assert shardFailures.get(shardId) == null || shardFailures.get(shardId).fatal == false;
265+
ShardFailure failure = shardFailures.get(shardId);
266+
if (failure != null && failure.fatal) {
267+
shardsIt.remove();
268+
continue;
269+
}
274270
TargetShard shard = targetShards.getShard(shardId);
275271
Iterator<DiscoveryNode> nodesIt = shard.remainingNodes.iterator();
276272
DiscoveryNode selectedNode = null;

0 commit comments

Comments
 (0)