Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,35 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.transport.TransportException;

import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ArrayBlockingQueue;

/**
* {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine.
* The collected exceptions are categorized into task-cancelled and non-task-cancelled exceptions.
* To limit memory usage, this class collects only the first 10 exceptions in each category by default.
* When returning the accumulated failure to the caller, this class prefers non-task-cancelled exceptions
* over task-cancelled ones as they are more useful for diagnosing issues.
* The collected exceptions are categorized into client (4xx), server (5xx), shard-unavailable errors,
* and cancellation errors. To limit memory usage, this class collects only the first 10 exceptions in
* each category by default. When returning the accumulated failures to the caller, this class prefers
* client (4xx) errors over server (5xx) errors, shard-unavailable errors, and cancellation errors,
* as they are more useful for diagnosing issues.
*/
public final class FailureCollector {
private final Queue<Exception> cancelledExceptions = ConcurrentCollections.newQueue();
private final Semaphore cancelledExceptionsPermits;

private final Queue<Exception> nonCancelledExceptions = ConcurrentCollections.newQueue();
private final Semaphore nonCancelledExceptionsPermits;
private enum Category {
CLIENT,
SERVER,
SHARD_UNAVAILABLE,
CANCELLATION
}

private final Map<Category, Queue<Exception>> categories;
private final int maxExceptions;

private volatile boolean hasFailure = false;
private Exception finalFailure = null;
Expand All @@ -41,8 +50,11 @@ public FailureCollector(int maxExceptions) {
if (maxExceptions <= 0) {
throw new IllegalArgumentException("maxExceptions must be at least one");
}
this.cancelledExceptionsPermits = new Semaphore(maxExceptions);
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
this.maxExceptions = maxExceptions;
this.categories = new EnumMap<>(Category.class);
for (Category c : Category.values()) {
this.categories.put(c, new ArrayBlockingQueue<>(maxExceptions));
}
}

private static Exception unwrapTransportException(TransportException te) {
Expand All @@ -56,16 +68,24 @@ private static Exception unwrapTransportException(TransportException te) {
}
}

public void unwrapAndCollect(Exception e) {
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
private static Category getErrorCategory(Exception e) {
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) {
cancelledExceptions.add(e);
return Category.CANCELLATION;
} else if (TransportActions.isShardNotAvailableException(e)) {
return Category.SHARD_UNAVAILABLE;
} else {
final int status = ExceptionsHelper.status(e).getStatus();
if (400 <= status && status < 500) {
return Category.CLIENT;
} else {
return Category.SERVER;
}
} else if (nonCancelledExceptionsPermits.tryAcquire()) {
nonCancelledExceptions.add(e);
cancelledExceptions.clear();
}
}

public void unwrapAndCollect(Exception e) {
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
categories.get(getErrorCategory(e)).offer(e);
hasFailure = true;
}

Expand All @@ -77,8 +97,8 @@ public boolean hasFailure() {
}

/**
* Returns the accumulated failure, preferring non-task-cancelled exceptions over task-cancelled ones.
* Once this method builds the failure, incoming failures are discarded.
* Returns the accumulated failure, preferring client (4xx) errors over server (5xx) errors and cancellation errors,
* as they are more useful for diagnosing issues. Once this method builds the failure, incoming failures are discarded.
*
* @return the accumulated failure, or {@code null} if no failure has been collected
*/
Expand All @@ -98,21 +118,19 @@ private Exception buildFailure() {
assert hasFailure;
assert Thread.holdsLock(this);
Exception first = null;
for (Exception e : nonCancelledExceptions) {
if (first == null) {
first = e;
} else if (first != e) {
first.addSuppressed(e);
int collected = 0;
for (Category category : List.of(Category.CLIENT, Category.SERVER, Category.SHARD_UNAVAILABLE, Category.CANCELLATION)) {
if (first != null && category == Category.CANCELLATION) {
continue; // do not add cancellation errors if other errors present
}
}
if (first != null) {
return first;
}
for (Exception e : cancelledExceptions) {
if (first == null) {
first = e;
} else if (first != e) {
first.addSuppressed(e);
for (Exception e : categories.get(category)) {
if (++collected <= maxExceptions) {
if (first == null) {
first = e;
} else if (first != e) {
first.addSuppressed(e);
}
}
}
}
assert first != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
package org.elasticsearch.compute.operator;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.NodeDisconnectedException;
Expand Down Expand Up @@ -106,13 +108,28 @@ public void testEmpty() {
public void testTransportExceptions() {
FailureCollector collector = new FailureCollector(5);
collector.unwrapAndCollect(new NodeDisconnectedException(DiscoveryNodeUtils.builder("node-1").build(), "/field_caps"));
collector.unwrapAndCollect(new TransportException(new CircuitBreakingException("too large", CircuitBreaker.Durability.TRANSIENT)));
collector.unwrapAndCollect(new TransportException(new IOException("disk issue")));
Exception failure = collector.getFailure();
assertNotNull(failure);
assertThat(failure, instanceOf(NodeDisconnectedException.class));
assertThat(failure.getMessage(), equalTo("[][0.0.0.0:1][/field_caps] disconnected"));
Throwable[] suppressed = failure.getSuppressed();
assertThat(suppressed, arrayWithSize(1));
assertThat(suppressed[0], instanceOf(CircuitBreakingException.class));
assertThat(suppressed[0], instanceOf(IOException.class));
}

public void testErrorCategory() {
FailureCollector collector = new FailureCollector(5);
collector.unwrapAndCollect(new NoShardAvailableActionException(new ShardId("test", "n/a", 1), "not ready"));
collector.unwrapAndCollect(
new TransportException(new CircuitBreakingException("request is too large", CircuitBreaker.Durability.TRANSIENT))
);
Exception failure = collector.getFailure();
assertNotNull(failure);
assertThat(failure, instanceOf(CircuitBreakingException.class));
assertThat(failure.getMessage(), equalTo("request is too large"));
assertThat(failure.getSuppressed(), arrayWithSize(1));
assertThat(failure.getSuppressed()[0], instanceOf(NoShardAvailableActionException.class));
assertThat(failure.getSuppressed()[0].getMessage(), equalTo("not ready"));
}
}