Skip to content

Commit 472ca2e

Browse files
committed
Prefer client errors while collecting ES|QL failures
1 parent 6d25cbf commit 472ca2e

File tree

4 files changed

+91
-50
lines changed

4 files changed

+91
-50
lines changed

muted-tests.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -403,12 +403,6 @@ tests:
403403
issue: https://github.com/elastic/elasticsearch/issues/122221
404404
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
405405
issue: https://github.com/elastic/elasticsearch/issues/121737
406-
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
407-
method: testGroupingMultiValueByOrdinals
408-
issue: https://github.com/elastic/elasticsearch/issues/122228
409-
- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT
410-
method: testFailureLoadingFields
411-
issue: https://github.com/elastic/elasticsearch/issues/122132
412406
- class: org.elasticsearch.blocks.SimpleBlocksIT
413407
method: testConcurrentAddBlock
414408
issue: https://github.com/elastic/elasticsearch/issues/122324

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

Lines changed: 67 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,51 @@
99

1010
import org.elasticsearch.ElasticsearchException;
1111
import org.elasticsearch.ExceptionsHelper;
12+
import org.elasticsearch.action.support.TransportActions;
1213
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1314
import org.elasticsearch.tasks.TaskCancelledException;
1415
import org.elasticsearch.transport.TransportException;
1516

17+
import java.util.EnumMap;
18+
import java.util.List;
19+
import java.util.Map;
1620
import java.util.Queue;
1721
import java.util.concurrent.Semaphore;
1822

1923
/**
2024
* {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine.
21-
* The collected exceptions are categorized into task-cancelled and non-task-cancelled exceptions.
22-
* To limit memory usage, this class collects only the first 10 exceptions in each category by default.
23-
* When returning the accumulated failure to the caller, this class prefers non-task-cancelled exceptions
24-
* over task-cancelled ones as they are more useful for diagnosing issues.
25+
* The collected exceptions are categorized into client (4xx), server (5xx), shard-unavailable errors,
26+
* and cancellation errors. To limit memory usage, this class collects only the first 10 exceptions in
27+
* each category by default. When returning the accumulated failures to the caller, this class prefers
28+
* client (4xx) errors over server (5xx) errors, shard-unavailable errors, and cancellation errors,
29+
* as they are more useful for diagnosing issues.
2530
*/
2631
public final class FailureCollector {
27-
private final Queue<Exception> cancelledExceptions = ConcurrentCollections.newQueue();
28-
private final Semaphore cancelledExceptionsPermits;
2932

30-
private final Queue<Exception> nonCancelledExceptions = ConcurrentCollections.newQueue();
31-
private final Semaphore nonCancelledExceptionsPermits;
33+
private enum Category {
34+
CLIENT,
35+
SERVER,
36+
SHARD_UNAVAILABLE,
37+
CANCELLATION
38+
}
39+
40+
private static final class CategorizedErrors {
41+
final Queue<Exception> exceptions = ConcurrentCollections.newQueue();
42+
final Semaphore permits;
43+
44+
CategorizedErrors(int permits) {
45+
this.permits = new Semaphore(permits);
46+
}
47+
48+
void maybeCollect(Exception e) {
49+
if (permits.tryAcquire()) {
50+
exceptions.add(e);
51+
}
52+
}
53+
}
54+
55+
private final Map<Category, CategorizedErrors> categories;
56+
private final int maxExceptions;
3257

3358
private volatile boolean hasFailure = false;
3459
private Exception finalFailure = null;
@@ -41,8 +66,11 @@ public FailureCollector(int maxExceptions) {
4166
if (maxExceptions <= 0) {
4267
throw new IllegalArgumentException("maxExceptions must be at least one");
4368
}
44-
this.cancelledExceptionsPermits = new Semaphore(maxExceptions);
45-
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
69+
this.maxExceptions = maxExceptions;
70+
this.categories = new EnumMap<>(Category.class);
71+
for (Category c : Category.values()) {
72+
this.categories.put(c, new CategorizedErrors(maxExceptions));
73+
}
4674
}
4775

4876
public static Exception unwrapTransportException(TransportException te) {
@@ -56,16 +84,24 @@ public static Exception unwrapTransportException(TransportException te) {
5684
}
5785
}
5886

59-
public void unwrapAndCollect(Exception e) {
60-
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
87+
private static Category getErrorCategory(Exception e) {
6188
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
62-
if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) {
63-
cancelledExceptions.add(e);
89+
return Category.CANCELLATION;
90+
} else if (TransportActions.isShardNotAvailableException(e)) {
91+
return Category.SHARD_UNAVAILABLE;
92+
} else {
93+
final int status = ExceptionsHelper.status(e).getStatus();
94+
if (400 <= status && status < 500) {
95+
return Category.CLIENT;
96+
} else {
97+
return Category.SERVER;
6498
}
65-
} else if (nonCancelledExceptionsPermits.tryAcquire()) {
66-
nonCancelledExceptions.add(e);
67-
cancelledExceptions.clear();
6899
}
100+
}
101+
102+
public void unwrapAndCollect(Exception e) {
103+
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
104+
categories.get(getErrorCategory(e)).maybeCollect(e);
69105
hasFailure = true;
70106
}
71107

@@ -77,8 +113,8 @@ public boolean hasFailure() {
77113
}
78114

79115
/**
80-
* Returns the accumulated failure, preferring non-task-cancelled exceptions over task-cancelled ones.
81-
* Once this method builds the failure, incoming failures are discarded.
116+
* Returns the accumulated failure, preferring client (4xx) errors over server (5xx) errors and cancellation errors,
117+
* as they are more useful for diagnosing issues. Once this method builds the failure, incoming failures are discarded.
82118
*
83119
* @return the accumulated failure, or {@code null} if no failure has been collected
84120
*/
@@ -98,21 +134,19 @@ private Exception buildFailure() {
98134
assert hasFailure;
99135
assert Thread.holdsLock(this);
100136
Exception first = null;
101-
for (Exception e : nonCancelledExceptions) {
102-
if (first == null) {
103-
first = e;
104-
} else if (first != e) {
105-
first.addSuppressed(e);
137+
int collected = 0;
138+
for (Category category : List.of(Category.CLIENT, Category.SERVER, Category.SHARD_UNAVAILABLE, Category.CANCELLATION)) {
139+
if (first != null && category == Category.CANCELLATION) {
140+
continue; // do not add cancellation errors if other errors present
106141
}
107-
}
108-
if (first != null) {
109-
return first;
110-
}
111-
for (Exception e : cancelledExceptions) {
112-
if (first == null) {
113-
first = e;
114-
} else if (first != e) {
115-
first.addSuppressed(e);
142+
for (Exception e : categories.get(category).exceptions) {
143+
if (++collected <= maxExceptions) {
144+
if (first == null) {
145+
first = e;
146+
} else if (first != e) {
147+
first.addSuppressed(e);
148+
}
149+
}
116150
}
117151
}
118152
assert first != null;

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
package org.elasticsearch.compute.operator;
99

1010
import org.elasticsearch.ExceptionsHelper;
11+
import org.elasticsearch.action.NoShardAvailableActionException;
1112
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
1213
import org.elasticsearch.common.Randomness;
1314
import org.elasticsearch.common.breaker.CircuitBreaker;
1415
import org.elasticsearch.common.breaker.CircuitBreakingException;
16+
import org.elasticsearch.index.shard.ShardId;
1517
import org.elasticsearch.tasks.TaskCancelledException;
1618
import org.elasticsearch.test.ESTestCase;
1719
import org.elasticsearch.transport.NodeDisconnectedException;
@@ -106,13 +108,28 @@ public void testEmpty() {
106108
public void testTransportExceptions() {
107109
FailureCollector collector = new FailureCollector(5);
108110
collector.unwrapAndCollect(new NodeDisconnectedException(DiscoveryNodeUtils.builder("node-1").build(), "/field_caps"));
109-
collector.unwrapAndCollect(new TransportException(new CircuitBreakingException("too large", CircuitBreaker.Durability.TRANSIENT)));
111+
collector.unwrapAndCollect(new TransportException(new IOException("disk issue")));
110112
Exception failure = collector.getFailure();
111113
assertNotNull(failure);
112114
assertThat(failure, instanceOf(NodeDisconnectedException.class));
113115
assertThat(failure.getMessage(), equalTo("[][0.0.0.0:1][/field_caps] disconnected"));
114116
Throwable[] suppressed = failure.getSuppressed();
115117
assertThat(suppressed, arrayWithSize(1));
116-
assertThat(suppressed[0], instanceOf(CircuitBreakingException.class));
118+
assertThat(suppressed[0], instanceOf(IOException.class));
119+
}
120+
121+
public void testErrorCategory() {
122+
FailureCollector collector = new FailureCollector(5);
123+
collector.unwrapAndCollect(new NoShardAvailableActionException(new ShardId("test", "n/a", 1), "not ready"));
124+
collector.unwrapAndCollect(
125+
new TransportException(new CircuitBreakingException("request is too large", CircuitBreaker.Durability.TRANSIENT))
126+
);
127+
Exception failure = collector.getFailure();
128+
assertNotNull(failure);
129+
assertThat(failure, instanceOf(CircuitBreakingException.class));
130+
assertThat(failure.getMessage(), equalTo("request is too large"));
131+
assertThat(failure.getSuppressed(), arrayWithSize(1));
132+
assertThat(failure.getSuppressed()[0], instanceOf(NoShardAvailableActionException.class));
133+
assertThat(failure.getSuppressed()[0].getMessage(), equalTo("not ready"));
117134
}
118135
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,6 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
124124
reportedFailure = true;
125125
reportFailures(computeListener);
126126
} else {
127-
pendingShardIds.removeIf(shr -> {
128-
var failure = shardFailures.get(shr);
129-
return failure != null && failure.fatal;
130-
});
131127
var nodeRequests = selectNodeRequests(targetShards);
132128
for (NodeRequest request : nodeRequests) {
133129
sendOneNodeRequest(targetShards, computeListener, request);
@@ -238,10 +234,6 @@ record TargetShards(Map<ShardId, TargetShard> shards, int totalShards, int skipp
238234
TargetShard getShard(ShardId shardId) {
239235
return shards.get(shardId);
240236
}
241-
242-
Set<ShardId> shardIds() {
243-
return shards.keySet();
244-
}
245237
}
246238

247239
/**
@@ -270,7 +262,11 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
270262
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();
271263
while (shardsIt.hasNext()) {
272264
ShardId shardId = shardsIt.next();
273-
assert shardFailures.get(shardId) == null || shardFailures.get(shardId).fatal == false;
265+
ShardFailure failure = shardFailures.get(shardId);
266+
if (failure != null && failure.fatal) {
267+
shardsIt.remove();
268+
continue;
269+
}
274270
TargetShard shard = targetShards.getShard(shardId);
275271
Iterator<DiscoveryNode> nodesIt = shard.remainingNodes.iterator();
276272
DiscoveryNode selectedNode = null;

0 commit comments

Comments
 (0)