diff --git a/muted-tests.yml b/muted-tests.yml index 17c307d1bea50..e81baea950f36 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -371,12 +371,6 @@ tests: - class: org.elasticsearch.xpack.security.authz.IndexAliasesTests method: testRemoveIndex issue: https://github.com/elastic/elasticsearch/issues/122221 -- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT - method: testGroupingMultiValueByOrdinals - issue: https://github.com/elastic/elasticsearch/issues/122228 -- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT - method: testFailureLoadingFields - issue: https://github.com/elastic/elasticsearch/issues/122132 - class: org.elasticsearch.blocks.SimpleBlocksIT method: testConcurrentAddBlock issue: https://github.com/elastic/elasticsearch/issues/122324 @@ -388,9 +382,6 @@ tests: - class: org.elasticsearch.xpack.esql.action.CrossClusterCancellationIT method: testCloseSkipUnavailable issue: https://github.com/elastic/elasticsearch/issues/122336 -- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT - method: testUnsupportedTypesOrdinalGrouping - issue: https://github.com/elastic/elasticsearch/issues/122342 - class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT method: test {yaml=reference/alias/line_260} issue: https://github.com/elastic/elasticsearch/issues/122343 @@ -401,9 +392,6 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/122377 - class: org.elasticsearch.repositories.blobstore.testkit.analyze.HdfsRepositoryAnalysisRestIT issue: https://github.com/elastic/elasticsearch/issues/122378 -- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT - method: testStatsMissingFieldWithStats - issue: https://github.com/elastic/elasticsearch/issues/122327 - class: org.elasticsearch.xpack.esql.CsvTests issue: https://github.com/elastic/elasticsearch/issues/122440 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java index c492ba6796350..7040f8712e616 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java @@ -9,26 +9,35 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.transport.TransportException; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; import java.util.Queue; -import java.util.concurrent.Semaphore; +import java.util.concurrent.ArrayBlockingQueue; /** * {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine. - * The collected exceptions are categorized into task-cancelled and non-task-cancelled exceptions. - * To limit memory usage, this class collects only the first 10 exceptions in each category by default. - * When returning the accumulated failure to the caller, this class prefers non-task-cancelled exceptions - * over task-cancelled ones as they are more useful for diagnosing issues. + * The collected exceptions are categorized into client (4xx), server (5xx), shard-unavailable errors, + * and cancellation errors. To limit memory usage, this class collects only the first 10 exceptions in + * each category by default. When returning the accumulated failures to the caller, this class prefers + * client (4xx) errors over server (5xx) errors, shard-unavailable errors, and cancellation errors, + * as they are more useful for diagnosing issues. */ public final class FailureCollector { - private final Queue cancelledExceptions = ConcurrentCollections.newQueue(); - private final Semaphore cancelledExceptionsPermits; - private final Queue nonCancelledExceptions = ConcurrentCollections.newQueue(); - private final Semaphore nonCancelledExceptionsPermits; + private enum Category { + CLIENT, + SERVER, + SHARD_UNAVAILABLE, + CANCELLATION + } + + private final Map> categories; + private final int maxExceptions; private volatile boolean hasFailure = false; private Exception finalFailure = null; @@ -41,8 +50,11 @@ public FailureCollector(int maxExceptions) { if (maxExceptions <= 0) { throw new IllegalArgumentException("maxExceptions must be at least one"); } - this.cancelledExceptionsPermits = new Semaphore(maxExceptions); - this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions); + this.maxExceptions = maxExceptions; + this.categories = new EnumMap<>(Category.class); + for (Category c : Category.values()) { + this.categories.put(c, new ArrayBlockingQueue<>(maxExceptions)); + } } public static Exception unwrapTransportException(TransportException te) { @@ -56,16 +68,24 @@ public static Exception unwrapTransportException(TransportException te) { } } - public void unwrapAndCollect(Exception e) { - e = e instanceof TransportException te ? unwrapTransportException(te) : e; + private static Category getErrorCategory(Exception e) { if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) { - if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) { - cancelledExceptions.add(e); + return Category.CANCELLATION; + } else if (TransportActions.isShardNotAvailableException(e)) { + return Category.SHARD_UNAVAILABLE; + } else { + final int status = ExceptionsHelper.status(e).getStatus(); + if (400 <= status && status < 500) { + return Category.CLIENT; + } else { + return Category.SERVER; } - } else if (nonCancelledExceptionsPermits.tryAcquire()) { - nonCancelledExceptions.add(e); - cancelledExceptions.clear(); } + } + + public void unwrapAndCollect(Exception e) { + e = e instanceof TransportException te ? unwrapTransportException(te) : e; + categories.get(getErrorCategory(e)).offer(e); hasFailure = true; } @@ -77,8 +97,8 @@ public boolean hasFailure() { } /** - * Returns the accumulated failure, preferring non-task-cancelled exceptions over task-cancelled ones. - * Once this method builds the failure, incoming failures are discarded. + * Returns the accumulated failure, preferring client (4xx) errors over server (5xx) errors and cancellation errors, + * as they are more useful for diagnosing issues. Once this method builds the failure, incoming failures are discarded. * * @return the accumulated failure, or {@code null} if no failure has been collected */ @@ -98,21 +118,19 @@ private Exception buildFailure() { assert hasFailure; assert Thread.holdsLock(this); Exception first = null; - for (Exception e : nonCancelledExceptions) { - if (first == null) { - first = e; - } else if (first != e) { - first.addSuppressed(e); + int collected = 0; + for (Category category : List.of(Category.CLIENT, Category.SERVER, Category.SHARD_UNAVAILABLE, Category.CANCELLATION)) { + if (first != null && category == Category.CANCELLATION) { + continue; // do not add cancellation errors if other errors present } - } - if (first != null) { - return first; - } - for (Exception e : cancelledExceptions) { - if (first == null) { - first = e; - } else if (first != e) { - first.addSuppressed(e); + for (Exception e : categories.get(category)) { + if (++collected <= maxExceptions) { + if (first == null) { + first = e; + } else if (first != e) { + first.addSuppressed(e); + } + } } } assert first != null; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java index 5fec82b32ddac..4007d4d433f5e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java @@ -8,10 +8,12 @@ package org.elasticsearch.compute.operator; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.NodeDisconnectedException; @@ -106,13 +108,28 @@ public void testEmpty() { public void testTransportExceptions() { FailureCollector collector = new FailureCollector(5); collector.unwrapAndCollect(new NodeDisconnectedException(DiscoveryNodeUtils.builder("node-1").build(), "/field_caps")); - collector.unwrapAndCollect(new TransportException(new CircuitBreakingException("too large", CircuitBreaker.Durability.TRANSIENT))); + collector.unwrapAndCollect(new TransportException(new IOException("disk issue"))); Exception failure = collector.getFailure(); assertNotNull(failure); assertThat(failure, instanceOf(NodeDisconnectedException.class)); assertThat(failure.getMessage(), equalTo("[][0.0.0.0:1][/field_caps] disconnected")); Throwable[] suppressed = failure.getSuppressed(); assertThat(suppressed, arrayWithSize(1)); - assertThat(suppressed[0], instanceOf(CircuitBreakingException.class)); + assertThat(suppressed[0], instanceOf(IOException.class)); + } + + public void testErrorCategory() { + FailureCollector collector = new FailureCollector(5); + collector.unwrapAndCollect(new NoShardAvailableActionException(new ShardId("test", "n/a", 1), "not ready")); + collector.unwrapAndCollect( + new TransportException(new CircuitBreakingException("request is too large", CircuitBreaker.Durability.TRANSIENT)) + ); + Exception failure = collector.getFailure(); + assertNotNull(failure); + assertThat(failure, instanceOf(CircuitBreakingException.class)); + assertThat(failure.getMessage(), equalTo("request is too large")); + assertThat(failure.getSuppressed(), arrayWithSize(1)); + assertThat(failure.getSuppressed()[0], instanceOf(NoShardAvailableActionException.class)); + assertThat(failure.getSuppressed()[0].getMessage(), equalTo("not ready")); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 8c2a6bb06da9b..2d5b4169c0215 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -124,10 +124,6 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu reportedFailure = true; reportFailures(computeListener); } else { - pendingShardIds.removeIf(shr -> { - var failure = shardFailures.get(shr); - return failure != null && failure.fatal; - }); var nodeRequests = selectNodeRequests(targetShards); for (NodeRequest request : nodeRequests) { sendOneNodeRequest(targetShards, computeListener, request); @@ -238,10 +234,6 @@ record TargetShards(Map shards, int totalShards, int skipp TargetShard getShard(ShardId shardId) { return shards.get(shardId); } - - Set shardIds() { - return shards.keySet(); - } } /** @@ -270,7 +262,11 @@ private List selectNodeRequests(TargetShards targetShards) { final Iterator shardsIt = pendingShardIds.iterator(); while (shardsIt.hasNext()) { ShardId shardId = shardsIt.next(); - assert shardFailures.get(shardId) == null || shardFailures.get(shardId).fatal == false; + ShardFailure failure = shardFailures.get(shardId); + if (failure != null && failure.fatal) { + shardsIt.remove(); + continue; + } TargetShard shard = targetShards.getShard(shardId); Iterator nodesIt = shard.remainingNodes.iterator(); DiscoveryNode selectedNode = null;