Skip to content

Commit 5842b3d

Browse files
authored
Prefer client errors while collecting ES|QL failures (#122290) (#122463)
Currently, the ES|QL failure collectors categorize errors into non-cancellation and cancellation errors, preferring to return non-cancellation errors to users. With the retry on shard-level failure, the failure collector can now collect more categories of errors: client errors, server errors, shard-unavailable errors, and cancellation errors. For easier diagnostics and operations (especially on serverless), the failure collectors prefer returning client (4xx) errors over server (5xx) errors, shard-unavailable errors, and cancellation errors. Relates #120774
1 parent 6690c97 commit 5842b3d

File tree

2 files changed

+72
-37
lines changed

2 files changed

+72
-37
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

Lines changed: 53 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,35 @@
99

1010
import org.elasticsearch.ElasticsearchException;
1111
import org.elasticsearch.ExceptionsHelper;
12-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
12+
import org.elasticsearch.action.support.TransportActions;
1313
import org.elasticsearch.tasks.TaskCancelledException;
1414
import org.elasticsearch.transport.TransportException;
1515

16+
import java.util.EnumMap;
17+
import java.util.List;
18+
import java.util.Map;
1619
import java.util.Queue;
17-
import java.util.concurrent.Semaphore;
20+
import java.util.concurrent.ArrayBlockingQueue;
1821

1922
/**
2023
* {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine.
21-
* The collected exceptions are categorized into task-cancelled and non-task-cancelled exceptions.
22-
* To limit memory usage, this class collects only the first 10 exceptions in each category by default.
23-
* When returning the accumulated failure to the caller, this class prefers non-task-cancelled exceptions
24-
* over task-cancelled ones as they are more useful for diagnosing issues.
24+
* The collected exceptions are categorized into client (4xx), server (5xx), shard-unavailable errors,
25+
* and cancellation errors. To limit memory usage, this class collects only the first 10 exceptions in
26+
* each category by default. When returning the accumulated failures to the caller, this class prefers
27+
* client (4xx) errors over server (5xx) errors, shard-unavailable errors, and cancellation errors,
28+
* as they are more useful for diagnosing issues.
2529
*/
2630
public final class FailureCollector {
27-
private final Queue<Exception> cancelledExceptions = ConcurrentCollections.newQueue();
28-
private final Semaphore cancelledExceptionsPermits;
2931

30-
private final Queue<Exception> nonCancelledExceptions = ConcurrentCollections.newQueue();
31-
private final Semaphore nonCancelledExceptionsPermits;
32+
private enum Category {
33+
CLIENT,
34+
SERVER,
35+
SHARD_UNAVAILABLE,
36+
CANCELLATION
37+
}
38+
39+
private final Map<Category, Queue<Exception>> categories;
40+
private final int maxExceptions;
3241

3342
private volatile boolean hasFailure = false;
3443
private Exception finalFailure = null;
@@ -41,8 +50,11 @@ public FailureCollector(int maxExceptions) {
4150
if (maxExceptions <= 0) {
4251
throw new IllegalArgumentException("maxExceptions must be at least one");
4352
}
44-
this.cancelledExceptionsPermits = new Semaphore(maxExceptions);
45-
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
53+
this.maxExceptions = maxExceptions;
54+
this.categories = new EnumMap<>(Category.class);
55+
for (Category c : Category.values()) {
56+
this.categories.put(c, new ArrayBlockingQueue<>(maxExceptions));
57+
}
4658
}
4759

4860
private static Exception unwrapTransportException(TransportException te) {
@@ -56,16 +68,24 @@ private static Exception unwrapTransportException(TransportException te) {
5668
}
5769
}
5870

59-
public void unwrapAndCollect(Exception e) {
60-
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
71+
private static Category getErrorCategory(Exception e) {
6172
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
62-
if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) {
63-
cancelledExceptions.add(e);
73+
return Category.CANCELLATION;
74+
} else if (TransportActions.isShardNotAvailableException(e)) {
75+
return Category.SHARD_UNAVAILABLE;
76+
} else {
77+
final int status = ExceptionsHelper.status(e).getStatus();
78+
if (400 <= status && status < 500) {
79+
return Category.CLIENT;
80+
} else {
81+
return Category.SERVER;
6482
}
65-
} else if (nonCancelledExceptionsPermits.tryAcquire()) {
66-
nonCancelledExceptions.add(e);
67-
cancelledExceptions.clear();
6883
}
84+
}
85+
86+
public void unwrapAndCollect(Exception e) {
87+
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
88+
categories.get(getErrorCategory(e)).offer(e);
6989
hasFailure = true;
7090
}
7191

@@ -77,8 +97,8 @@ public boolean hasFailure() {
7797
}
7898

7999
/**
80-
* Returns the accumulated failure, preferring non-task-cancelled exceptions over task-cancelled ones.
81-
* Once this method builds the failure, incoming failures are discarded.
100+
* Returns the accumulated failure, preferring client (4xx) errors over server (5xx) errors and cancellation errors,
101+
* as they are more useful for diagnosing issues. Once this method builds the failure, incoming failures are discarded.
82102
*
83103
* @return the accumulated failure, or {@code null} if no failure has been collected
84104
*/
@@ -98,21 +118,19 @@ private Exception buildFailure() {
98118
assert hasFailure;
99119
assert Thread.holdsLock(this);
100120
Exception first = null;
101-
for (Exception e : nonCancelledExceptions) {
102-
if (first == null) {
103-
first = e;
104-
} else if (first != e) {
105-
first.addSuppressed(e);
121+
int collected = 0;
122+
for (Category category : List.of(Category.CLIENT, Category.SERVER, Category.SHARD_UNAVAILABLE, Category.CANCELLATION)) {
123+
if (first != null && category == Category.CANCELLATION) {
124+
continue; // do not add cancellation errors if other errors present
106125
}
107-
}
108-
if (first != null) {
109-
return first;
110-
}
111-
for (Exception e : cancelledExceptions) {
112-
if (first == null) {
113-
first = e;
114-
} else if (first != e) {
115-
first.addSuppressed(e);
126+
for (Exception e : categories.get(category)) {
127+
if (++collected <= maxExceptions) {
128+
if (first == null) {
129+
first = e;
130+
} else if (first != e) {
131+
first.addSuppressed(e);
132+
}
133+
}
116134
}
117135
}
118136
assert first != null;

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
package org.elasticsearch.compute.operator;
99

1010
import org.elasticsearch.ExceptionsHelper;
11+
import org.elasticsearch.action.NoShardAvailableActionException;
1112
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
1213
import org.elasticsearch.common.Randomness;
1314
import org.elasticsearch.common.breaker.CircuitBreaker;
1415
import org.elasticsearch.common.breaker.CircuitBreakingException;
16+
import org.elasticsearch.index.shard.ShardId;
1517
import org.elasticsearch.tasks.TaskCancelledException;
1618
import org.elasticsearch.test.ESTestCase;
1719
import org.elasticsearch.transport.NodeDisconnectedException;
@@ -106,13 +108,28 @@ public void testEmpty() {
106108
public void testTransportExceptions() {
107109
FailureCollector collector = new FailureCollector(5);
108110
collector.unwrapAndCollect(new NodeDisconnectedException(DiscoveryNodeUtils.builder("node-1").build(), "/field_caps"));
109-
collector.unwrapAndCollect(new TransportException(new CircuitBreakingException("too large", CircuitBreaker.Durability.TRANSIENT)));
111+
collector.unwrapAndCollect(new TransportException(new IOException("disk issue")));
110112
Exception failure = collector.getFailure();
111113
assertNotNull(failure);
112114
assertThat(failure, instanceOf(NodeDisconnectedException.class));
113115
assertThat(failure.getMessage(), equalTo("[][0.0.0.0:1][/field_caps] disconnected"));
114116
Throwable[] suppressed = failure.getSuppressed();
115117
assertThat(suppressed, arrayWithSize(1));
116-
assertThat(suppressed[0], instanceOf(CircuitBreakingException.class));
118+
assertThat(suppressed[0], instanceOf(IOException.class));
119+
}
120+
121+
public void testErrorCategory() {
122+
FailureCollector collector = new FailureCollector(5);
123+
collector.unwrapAndCollect(new NoShardAvailableActionException(new ShardId("test", "n/a", 1), "not ready"));
124+
collector.unwrapAndCollect(
125+
new TransportException(new CircuitBreakingException("request is too large", CircuitBreaker.Durability.TRANSIENT))
126+
);
127+
Exception failure = collector.getFailure();
128+
assertNotNull(failure);
129+
assertThat(failure, instanceOf(CircuitBreakingException.class));
130+
assertThat(failure.getMessage(), equalTo("request is too large"));
131+
assertThat(failure.getSuppressed(), arrayWithSize(1));
132+
assertThat(failure.getSuppressed()[0], instanceOf(NoShardAvailableActionException.class));
133+
assertThat(failure.getSuppressed()[0].getMessage(), equalTo("not ready"));
117134
}
118135
}

0 commit comments

Comments
 (0)