Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,6 @@ tests:
- class: org.elasticsearch.xpack.security.authz.IndexAliasesTests
method: testRemoveIndex
issue: https://github.com/elastic/elasticsearch/issues/122221
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
method: testGroupingMultiValueByOrdinals
issue: https://github.com/elastic/elasticsearch/issues/122228
- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT
method: testFailureLoadingFields
issue: https://github.com/elastic/elasticsearch/issues/122132
- class: org.elasticsearch.blocks.SimpleBlocksIT
method: testConcurrentAddBlock
issue: https://github.com/elastic/elasticsearch/issues/122324
Expand All @@ -388,9 +382,6 @@ tests:
- class: org.elasticsearch.xpack.esql.action.CrossClusterCancellationIT
method: testCloseSkipUnavailable
issue: https://github.com/elastic/elasticsearch/issues/122336
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
method: testUnsupportedTypesOrdinalGrouping
issue: https://github.com/elastic/elasticsearch/issues/122342
- class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT
method: test {yaml=reference/alias/line_260}
issue: https://github.com/elastic/elasticsearch/issues/122343
Expand All @@ -401,9 +392,6 @@ tests:
issue: https://github.com/elastic/elasticsearch/issues/122377
- class: org.elasticsearch.repositories.blobstore.testkit.analyze.HdfsRepositoryAnalysisRestIT
issue: https://github.com/elastic/elasticsearch/issues/122378
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
method: testStatsMissingFieldWithStats
issue: https://github.com/elastic/elasticsearch/issues/122327
- class: org.elasticsearch.xpack.esql.CsvTests
issue: https://github.com/elastic/elasticsearch/issues/122440

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,35 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.transport.TransportException;

import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ArrayBlockingQueue;

/**
* {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine.
* The collected exceptions are categorized into task-cancelled and non-task-cancelled exceptions.
* To limit memory usage, this class collects only the first 10 exceptions in each category by default.
* When returning the accumulated failure to the caller, this class prefers non-task-cancelled exceptions
* over task-cancelled ones as they are more useful for diagnosing issues.
* The collected exceptions are categorized into client (4xx), server (5xx), shard-unavailable errors,
* and cancellation errors. To limit memory usage, this class collects only the first 10 exceptions in
* each category by default. When returning the accumulated failures to the caller, this class prefers
* client (4xx) errors over server (5xx) errors, shard-unavailable errors, and cancellation errors,
* as they are more useful for diagnosing issues.
*/
public final class FailureCollector {
private final Queue<Exception> cancelledExceptions = ConcurrentCollections.newQueue();
private final Semaphore cancelledExceptionsPermits;

private final Queue<Exception> nonCancelledExceptions = ConcurrentCollections.newQueue();
private final Semaphore nonCancelledExceptionsPermits;
private enum Category {
CLIENT,
SERVER,
SHARD_UNAVAILABLE,
CANCELLATION
}

private final Map<Category, Queue<Exception>> categories;
private final int maxExceptions;

private volatile boolean hasFailure = false;
private Exception finalFailure = null;
Expand All @@ -41,8 +50,11 @@ public FailureCollector(int maxExceptions) {
if (maxExceptions <= 0) {
throw new IllegalArgumentException("maxExceptions must be at least one");
}
this.cancelledExceptionsPermits = new Semaphore(maxExceptions);
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
this.maxExceptions = maxExceptions;
this.categories = new EnumMap<>(Category.class);
for (Category c : Category.values()) {
this.categories.put(c, new ArrayBlockingQueue<>(maxExceptions));
}
}

public static Exception unwrapTransportException(TransportException te) {
Expand All @@ -56,16 +68,24 @@ public static Exception unwrapTransportException(TransportException te) {
}
}

public void unwrapAndCollect(Exception e) {
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
private static Category getErrorCategory(Exception e) {
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) {
cancelledExceptions.add(e);
return Category.CANCELLATION;
} else if (TransportActions.isShardNotAvailableException(e)) {
return Category.SHARD_UNAVAILABLE;
} else {
final int status = ExceptionsHelper.status(e).getStatus();
if (400 <= status && status < 500) {
return Category.CLIENT;
} else {
return Category.SERVER;
}
} else if (nonCancelledExceptionsPermits.tryAcquire()) {
nonCancelledExceptions.add(e);
cancelledExceptions.clear();
}
}

public void unwrapAndCollect(Exception e) {
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
categories.get(getErrorCategory(e)).offer(e);
hasFailure = true;
}

Expand All @@ -77,8 +97,8 @@ public boolean hasFailure() {
}

/**
* Returns the accumulated failure, preferring non-task-cancelled exceptions over task-cancelled ones.
* Once this method builds the failure, incoming failures are discarded.
* Returns the accumulated failure, preferring client (4xx) errors over server (5xx) errors and cancellation errors,
* as they are more useful for diagnosing issues. Once this method builds the failure, incoming failures are discarded.
*
* @return the accumulated failure, or {@code null} if no failure has been collected
*/
Expand All @@ -98,21 +118,19 @@ private Exception buildFailure() {
assert hasFailure;
assert Thread.holdsLock(this);
Exception first = null;
for (Exception e : nonCancelledExceptions) {
if (first == null) {
first = e;
} else if (first != e) {
first.addSuppressed(e);
int collected = 0;
for (Category category : List.of(Category.CLIENT, Category.SERVER, Category.SHARD_UNAVAILABLE, Category.CANCELLATION)) {
if (first != null && category == Category.CANCELLATION) {
continue; // do not add cancellation errors if other errors present
}
}
if (first != null) {
return first;
}
for (Exception e : cancelledExceptions) {
if (first == null) {
first = e;
} else if (first != e) {
first.addSuppressed(e);
for (Exception e : categories.get(category)) {
if (++collected <= maxExceptions) {
if (first == null) {
first = e;
} else if (first != e) {
first.addSuppressed(e);
}
}
}
}
assert first != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
package org.elasticsearch.compute.operator;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.NodeDisconnectedException;
Expand Down Expand Up @@ -106,13 +108,28 @@ public void testEmpty() {
public void testTransportExceptions() {
FailureCollector collector = new FailureCollector(5);
collector.unwrapAndCollect(new NodeDisconnectedException(DiscoveryNodeUtils.builder("node-1").build(), "/field_caps"));
collector.unwrapAndCollect(new TransportException(new CircuitBreakingException("too large", CircuitBreaker.Durability.TRANSIENT)));
collector.unwrapAndCollect(new TransportException(new IOException("disk issue")));
Exception failure = collector.getFailure();
assertNotNull(failure);
assertThat(failure, instanceOf(NodeDisconnectedException.class));
assertThat(failure.getMessage(), equalTo("[][0.0.0.0:1][/field_caps] disconnected"));
Throwable[] suppressed = failure.getSuppressed();
assertThat(suppressed, arrayWithSize(1));
assertThat(suppressed[0], instanceOf(CircuitBreakingException.class));
assertThat(suppressed[0], instanceOf(IOException.class));
}

public void testErrorCategory() {
FailureCollector collector = new FailureCollector(5);
collector.unwrapAndCollect(new NoShardAvailableActionException(new ShardId("test", "n/a", 1), "not ready"));
collector.unwrapAndCollect(
new TransportException(new CircuitBreakingException("request is too large", CircuitBreaker.Durability.TRANSIENT))
);
Exception failure = collector.getFailure();
assertNotNull(failure);
assertThat(failure, instanceOf(CircuitBreakingException.class));
assertThat(failure.getMessage(), equalTo("request is too large"));
assertThat(failure.getSuppressed(), arrayWithSize(1));
assertThat(failure.getSuppressed()[0], instanceOf(NoShardAvailableActionException.class));
assertThat(failure.getSuppressed()[0].getMessage(), equalTo("not ready"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,6 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
reportedFailure = true;
reportFailures(computeListener);
} else {
pendingShardIds.removeIf(shr -> {
var failure = shardFailures.get(shr);
return failure != null && failure.fatal;
});
var nodeRequests = selectNodeRequests(targetShards);
for (NodeRequest request : nodeRequests) {
sendOneNodeRequest(targetShards, computeListener, request);
Expand Down Expand Up @@ -238,10 +234,6 @@ record TargetShards(Map<ShardId, TargetShard> shards, int totalShards, int skipp
TargetShard getShard(ShardId shardId) {
return shards.get(shardId);
}

Set<ShardId> shardIds() {
return shards.keySet();
}
}

/**
Expand Down Expand Up @@ -270,7 +262,11 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();
while (shardsIt.hasNext()) {
ShardId shardId = shardsIt.next();
assert shardFailures.get(shardId) == null || shardFailures.get(shardId).fatal == false;
ShardFailure failure = shardFailures.get(shardId);
if (failure != null && failure.fatal) {
shardsIt.remove();
continue;
}
TargetShard shard = targetShards.getShard(shardId);
Iterator<DiscoveryNode> nodesIt = shard.remainingNodes.iterator();
DiscoveryNode selectedNode = null;
Expand Down