diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java index 337075edbdcf6..e2b1cf955ab90 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java @@ -9,26 +9,35 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.transport.TransportException; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; import java.util.Queue; -import java.util.concurrent.Semaphore; +import java.util.concurrent.ArrayBlockingQueue; /** * {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine. - * The collected exceptions are categorized into task-cancelled and non-task-cancelled exceptions. - * To limit memory usage, this class collects only the first 10 exceptions in each category by default. - * When returning the accumulated failure to the caller, this class prefers non-task-cancelled exceptions - * over task-cancelled ones as they are more useful for diagnosing issues. + * The collected exceptions are categorized into client (4xx), server (5xx), shard-unavailable errors, + * and cancellation errors. To limit memory usage, this class collects only the first 10 exceptions in + * each category by default. When returning the accumulated failures to the caller, this class prefers + * client (4xx) errors over server (5xx) errors, shard-unavailable errors, and cancellation errors, + * as they are more useful for diagnosing issues. */ public final class FailureCollector { - private final Queue cancelledExceptions = ConcurrentCollections.newQueue(); - private final Semaphore cancelledExceptionsPermits; - private final Queue nonCancelledExceptions = ConcurrentCollections.newQueue(); - private final Semaphore nonCancelledExceptionsPermits; + private enum Category { + CLIENT, + SERVER, + SHARD_UNAVAILABLE, + CANCELLATION + } + + private final Map> categories; + private final int maxExceptions; private volatile boolean hasFailure = false; private Exception finalFailure = null; @@ -41,8 +50,11 @@ public FailureCollector(int maxExceptions) { if (maxExceptions <= 0) { throw new IllegalArgumentException("maxExceptions must be at least one"); } - this.cancelledExceptionsPermits = new Semaphore(maxExceptions); - this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions); + this.maxExceptions = maxExceptions; + this.categories = new EnumMap<>(Category.class); + for (Category c : Category.values()) { + this.categories.put(c, new ArrayBlockingQueue<>(maxExceptions)); + } } private static Exception unwrapTransportException(TransportException te) { @@ -56,16 +68,24 @@ private static Exception unwrapTransportException(TransportException te) { } } - public void unwrapAndCollect(Exception e) { - e = e instanceof TransportException te ? unwrapTransportException(te) : e; + private static Category getErrorCategory(Exception e) { if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) { - if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) { - cancelledExceptions.add(e); + return Category.CANCELLATION; + } else if (TransportActions.isShardNotAvailableException(e)) { + return Category.SHARD_UNAVAILABLE; + } else { + final int status = ExceptionsHelper.status(e).getStatus(); + if (400 <= status && status < 500) { + return Category.CLIENT; + } else { + return Category.SERVER; } - } else if (nonCancelledExceptionsPermits.tryAcquire()) { - nonCancelledExceptions.add(e); - cancelledExceptions.clear(); } + } + + public void unwrapAndCollect(Exception e) { + e = e instanceof TransportException te ? unwrapTransportException(te) : e; + categories.get(getErrorCategory(e)).offer(e); hasFailure = true; } @@ -77,8 +97,8 @@ public boolean hasFailure() { } /** - * Returns the accumulated failure, preferring non-task-cancelled exceptions over task-cancelled ones. - * Once this method builds the failure, incoming failures are discarded. + * Returns the accumulated failure, preferring client (4xx) errors over server (5xx) errors and cancellation errors, + * as they are more useful for diagnosing issues. Once this method builds the failure, incoming failures are discarded. * * @return the accumulated failure, or {@code null} if no failure has been collected */ @@ -98,21 +118,19 @@ private Exception buildFailure() { assert hasFailure; assert Thread.holdsLock(this); Exception first = null; - for (Exception e : nonCancelledExceptions) { - if (first == null) { - first = e; - } else if (first != e) { - first.addSuppressed(e); + int collected = 0; + for (Category category : List.of(Category.CLIENT, Category.SERVER, Category.SHARD_UNAVAILABLE, Category.CANCELLATION)) { + if (first != null && category == Category.CANCELLATION) { + continue; // do not add cancellation errors if other errors present } - } - if (first != null) { - return first; - } - for (Exception e : cancelledExceptions) { - if (first == null) { - first = e; - } else if (first != e) { - first.addSuppressed(e); + for (Exception e : categories.get(category)) { + if (++collected <= maxExceptions) { + if (first == null) { + first = e; + } else if (first != e) { + first.addSuppressed(e); + } + } } } assert first != null; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java index 5fec82b32ddac..4007d4d433f5e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java @@ -8,10 +8,12 @@ package org.elasticsearch.compute.operator; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.NodeDisconnectedException; @@ -106,13 +108,28 @@ public void testEmpty() { public void testTransportExceptions() { FailureCollector collector = new FailureCollector(5); collector.unwrapAndCollect(new NodeDisconnectedException(DiscoveryNodeUtils.builder("node-1").build(), "/field_caps")); - collector.unwrapAndCollect(new TransportException(new CircuitBreakingException("too large", CircuitBreaker.Durability.TRANSIENT))); + collector.unwrapAndCollect(new TransportException(new IOException("disk issue"))); Exception failure = collector.getFailure(); assertNotNull(failure); assertThat(failure, instanceOf(NodeDisconnectedException.class)); assertThat(failure.getMessage(), equalTo("[][0.0.0.0:1][/field_caps] disconnected")); Throwable[] suppressed = failure.getSuppressed(); assertThat(suppressed, arrayWithSize(1)); - assertThat(suppressed[0], instanceOf(CircuitBreakingException.class)); + assertThat(suppressed[0], instanceOf(IOException.class)); + } + + public void testErrorCategory() { + FailureCollector collector = new FailureCollector(5); + collector.unwrapAndCollect(new NoShardAvailableActionException(new ShardId("test", "n/a", 1), "not ready")); + collector.unwrapAndCollect( + new TransportException(new CircuitBreakingException("request is too large", CircuitBreaker.Durability.TRANSIENT)) + ); + Exception failure = collector.getFailure(); + assertNotNull(failure); + assertThat(failure, instanceOf(CircuitBreakingException.class)); + assertThat(failure.getMessage(), equalTo("request is too large")); + assertThat(failure.getSuppressed(), arrayWithSize(1)); + assertThat(failure.getSuppressed()[0], instanceOf(NoShardAvailableActionException.class)); + assertThat(failure.getSuppressed()[0].getMessage(), equalTo("not ready")); } }