From 472ca2e4b671815340810691c45568637227295f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 11 Feb 2025 11:16:32 -0800 Subject: [PATCH 1/2] Prefer client errors while collecting ES|QL failures --- muted-tests.yml | 6 -- .../compute/operator/FailureCollector.java | 100 ++++++++++++------ .../operator/FailureCollectorTests.java | 21 +++- .../esql/plugin/DataNodeRequestSender.java | 14 +-- 4 files changed, 91 insertions(+), 50 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index be974ca2860f1..0a6d8eef7514c 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -403,12 +403,6 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/122221 - class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT issue: https://github.com/elastic/elasticsearch/issues/121737 -- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT - method: testGroupingMultiValueByOrdinals - issue: https://github.com/elastic/elasticsearch/issues/122228 -- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT - method: testFailureLoadingFields - issue: https://github.com/elastic/elasticsearch/issues/122132 - class: org.elasticsearch.blocks.SimpleBlocksIT method: testConcurrentAddBlock issue: https://github.com/elastic/elasticsearch/issues/122324 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java index c492ba6796350..16ce15c5fc78e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java @@ -9,26 +9,51 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.transport.TransportException; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.Semaphore; /** * {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine. - * The collected exceptions are categorized into task-cancelled and non-task-cancelled exceptions. - * To limit memory usage, this class collects only the first 10 exceptions in each category by default. - * When returning the accumulated failure to the caller, this class prefers non-task-cancelled exceptions - * over task-cancelled ones as they are more useful for diagnosing issues. + * The collected exceptions are categorized into client (4xx), server (5xx), shard-unavailable errors, + * and cancellation errors. To limit memory usage, this class collects only the first 10 exceptions in + * each category by default. When returning the accumulated failures to the caller, this class prefers + * client (4xx) errors over server (5xx) errors, shard-unavailable errors, and cancellation errors, + * as they are more useful for diagnosing issues. */ public final class FailureCollector { - private final Queue cancelledExceptions = ConcurrentCollections.newQueue(); - private final Semaphore cancelledExceptionsPermits; - private final Queue nonCancelledExceptions = ConcurrentCollections.newQueue(); - private final Semaphore nonCancelledExceptionsPermits; + private enum Category { + CLIENT, + SERVER, + SHARD_UNAVAILABLE, + CANCELLATION + } + + private static final class CategorizedErrors { + final Queue exceptions = ConcurrentCollections.newQueue(); + final Semaphore permits; + + CategorizedErrors(int permits) { + this.permits = new Semaphore(permits); + } + + void maybeCollect(Exception e) { + if (permits.tryAcquire()) { + exceptions.add(e); + } + } + } + + private final Map categories; + private final int maxExceptions; private volatile boolean hasFailure = false; private Exception finalFailure = null; @@ -41,8 +66,11 @@ public FailureCollector(int maxExceptions) { if (maxExceptions <= 0) { throw new IllegalArgumentException("maxExceptions must be at least one"); } - this.cancelledExceptionsPermits = new Semaphore(maxExceptions); - this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions); + this.maxExceptions = maxExceptions; + this.categories = new EnumMap<>(Category.class); + for (Category c : Category.values()) { + this.categories.put(c, new CategorizedErrors(maxExceptions)); + } } public static Exception unwrapTransportException(TransportException te) { @@ -56,16 +84,24 @@ public static Exception unwrapTransportException(TransportException te) { } } - public void unwrapAndCollect(Exception e) { - e = e instanceof TransportException te ? unwrapTransportException(te) : e; + private static Category getErrorCategory(Exception e) { if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) { - if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) { - cancelledExceptions.add(e); + return Category.CANCELLATION; + } else if (TransportActions.isShardNotAvailableException(e)) { + return Category.SHARD_UNAVAILABLE; + } else { + final int status = ExceptionsHelper.status(e).getStatus(); + if (400 <= status && status < 500) { + return Category.CLIENT; + } else { + return Category.SERVER; } - } else if (nonCancelledExceptionsPermits.tryAcquire()) { - nonCancelledExceptions.add(e); - cancelledExceptions.clear(); } + } + + public void unwrapAndCollect(Exception e) { + e = e instanceof TransportException te ? unwrapTransportException(te) : e; + categories.get(getErrorCategory(e)).maybeCollect(e); hasFailure = true; } @@ -77,8 +113,8 @@ public boolean hasFailure() { } /** - * Returns the accumulated failure, preferring non-task-cancelled exceptions over task-cancelled ones. - * Once this method builds the failure, incoming failures are discarded. + * Returns the accumulated failure, preferring client (4xx) errors over server (5xx) errors and cancellation errors, + * as they are more useful for diagnosing issues. Once this method builds the failure, incoming failures are discarded. * * @return the accumulated failure, or {@code null} if no failure has been collected */ @@ -98,21 +134,19 @@ private Exception buildFailure() { assert hasFailure; assert Thread.holdsLock(this); Exception first = null; - for (Exception e : nonCancelledExceptions) { - if (first == null) { - first = e; - } else if (first != e) { - first.addSuppressed(e); + int collected = 0; + for (Category category : List.of(Category.CLIENT, Category.SERVER, Category.SHARD_UNAVAILABLE, Category.CANCELLATION)) { + if (first != null && category == Category.CANCELLATION) { + continue; // do not add cancellation errors if other errors present } - } - if (first != null) { - return first; - } - for (Exception e : cancelledExceptions) { - if (first == null) { - first = e; - } else if (first != e) { - first.addSuppressed(e); + for (Exception e : categories.get(category).exceptions) { + if (++collected <= maxExceptions) { + if (first == null) { + first = e; + } else if (first != e) { + first.addSuppressed(e); + } + } } } assert first != null; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java index 5fec82b32ddac..4007d4d433f5e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java @@ -8,10 +8,12 @@ package org.elasticsearch.compute.operator; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.NodeDisconnectedException; @@ -106,13 +108,28 @@ public void testEmpty() { public void testTransportExceptions() { FailureCollector collector = new FailureCollector(5); collector.unwrapAndCollect(new NodeDisconnectedException(DiscoveryNodeUtils.builder("node-1").build(), "/field_caps")); - collector.unwrapAndCollect(new TransportException(new CircuitBreakingException("too large", CircuitBreaker.Durability.TRANSIENT))); + collector.unwrapAndCollect(new TransportException(new IOException("disk issue"))); Exception failure = collector.getFailure(); assertNotNull(failure); assertThat(failure, instanceOf(NodeDisconnectedException.class)); assertThat(failure.getMessage(), equalTo("[][0.0.0.0:1][/field_caps] disconnected")); Throwable[] suppressed = failure.getSuppressed(); assertThat(suppressed, arrayWithSize(1)); - assertThat(suppressed[0], instanceOf(CircuitBreakingException.class)); + assertThat(suppressed[0], instanceOf(IOException.class)); + } + + public void testErrorCategory() { + FailureCollector collector = new FailureCollector(5); + collector.unwrapAndCollect(new NoShardAvailableActionException(new ShardId("test", "n/a", 1), "not ready")); + collector.unwrapAndCollect( + new TransportException(new CircuitBreakingException("request is too large", CircuitBreaker.Durability.TRANSIENT)) + ); + Exception failure = collector.getFailure(); + assertNotNull(failure); + assertThat(failure, instanceOf(CircuitBreakingException.class)); + assertThat(failure.getMessage(), equalTo("request is too large")); + assertThat(failure.getSuppressed(), arrayWithSize(1)); + assertThat(failure.getSuppressed()[0], instanceOf(NoShardAvailableActionException.class)); + assertThat(failure.getSuppressed()[0].getMessage(), equalTo("not ready")); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 8c2a6bb06da9b..2d5b4169c0215 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -124,10 +124,6 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu reportedFailure = true; reportFailures(computeListener); } else { - pendingShardIds.removeIf(shr -> { - var failure = shardFailures.get(shr); - return failure != null && failure.fatal; - }); var nodeRequests = selectNodeRequests(targetShards); for (NodeRequest request : nodeRequests) { sendOneNodeRequest(targetShards, computeListener, request); @@ -238,10 +234,6 @@ record TargetShards(Map shards, int totalShards, int skipp TargetShard getShard(ShardId shardId) { return shards.get(shardId); } - - Set shardIds() { - return shards.keySet(); - } } /** @@ -270,7 +262,11 @@ private List selectNodeRequests(TargetShards targetShards) { final Iterator shardsIt = pendingShardIds.iterator(); while (shardsIt.hasNext()) { ShardId shardId = shardsIt.next(); - assert shardFailures.get(shardId) == null || shardFailures.get(shardId).fatal == false; + ShardFailure failure = shardFailures.get(shardId); + if (failure != null && failure.fatal) { + shardsIt.remove(); + continue; + } TargetShard shard = targetShards.getShard(shardId); Iterator nodesIt = shard.remainingNodes.iterator(); DiscoveryNode selectedNode = null; From ae8a308d1859a8fb25d32b093c47652a3644d44c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 12 Feb 2025 17:54:00 -0800 Subject: [PATCH 2/2] Use blocking queue --- .../compute/operator/FailureCollector.java | 26 ++++--------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java index 16ce15c5fc78e..7040f8712e616 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java @@ -10,7 +10,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.support.TransportActions; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.transport.TransportException; @@ -18,7 +17,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.concurrent.Semaphore; +import java.util.concurrent.ArrayBlockingQueue; /** * {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine. @@ -37,22 +36,7 @@ private enum Category { CANCELLATION } - private static final class CategorizedErrors { - final Queue exceptions = ConcurrentCollections.newQueue(); - final Semaphore permits; - - CategorizedErrors(int permits) { - this.permits = new Semaphore(permits); - } - - void maybeCollect(Exception e) { - if (permits.tryAcquire()) { - exceptions.add(e); - } - } - } - - private final Map categories; + private final Map> categories; private final int maxExceptions; private volatile boolean hasFailure = false; @@ -69,7 +53,7 @@ public FailureCollector(int maxExceptions) { this.maxExceptions = maxExceptions; this.categories = new EnumMap<>(Category.class); for (Category c : Category.values()) { - this.categories.put(c, new CategorizedErrors(maxExceptions)); + this.categories.put(c, new ArrayBlockingQueue<>(maxExceptions)); } } @@ -101,7 +85,7 @@ private static Category getErrorCategory(Exception e) { public void unwrapAndCollect(Exception e) { e = e instanceof TransportException te ? unwrapTransportException(te) : e; - categories.get(getErrorCategory(e)).maybeCollect(e); + categories.get(getErrorCategory(e)).offer(e); hasFailure = true; } @@ -139,7 +123,7 @@ private Exception buildFailure() { if (first != null && category == Category.CANCELLATION) { continue; // do not add cancellation errors if other errors present } - for (Exception e : categories.get(category).exceptions) { + for (Exception e : categories.get(category)) { if (++collected <= maxExceptions) { if (first == null) { first = e;