Skip to content

Commit 1bc1890

Browse files
authored
[8.19] Refactor and simplify missing index & unavailability handling (#131252) (#131305)
* Refactor and simplify missing index & unavailability handling (#131252) (cherry picked from commit 366bc00) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java * add getClusterStates
1 parent 172a9ee commit 1bc1890

File tree

4 files changed

+45
-52
lines changed

4 files changed

+45
-52
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.ConcurrentMap;
3939
import java.util.function.BiFunction;
4040
import java.util.function.Predicate;
41+
import java.util.stream.Stream;
4142

4243
/**
4344
* Holds execution metadata about ES|QL queries for cross-cluster searches in order to display
@@ -311,6 +312,15 @@ public int getClusterStateCount(Cluster.Status status) {
311312
return (int) clusterInfo.values().stream().filter(cluster -> cluster.getStatus() == status).count();
312313
}
313314

315+
/**
316+
* @param status the status you want to access
317+
* @return a stream of clusters with that status
318+
*/
319+
public Stream<Cluster> getClusterStates(Cluster.Status status) {
320+
assert clusterInfo.isEmpty() == false : "ClusterMap in EsqlExecutionInfo must not be empty";
321+
return clusterInfo.values().stream().filter(cluster -> cluster.getStatus() == status);
322+
}
323+
314324
@Override
315325
public String toString() {
316326
return "EsqlExecutionInfo{"

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.compute.operator.DriverCompletionInfo;
1919
import org.elasticsearch.core.Nullable;
2020
import org.elasticsearch.index.IndexNotFoundException;
21-
import org.elasticsearch.index.query.QueryBuilder;
2221
import org.elasticsearch.indices.IndicesExpressionGrouper;
2322
import org.elasticsearch.license.XPackLicenseState;
2423
import org.elasticsearch.transport.ConnectTransportException;
@@ -37,11 +36,11 @@
3736
import java.util.ArrayList;
3837
import java.util.Collections;
3938
import java.util.HashMap;
40-
import java.util.HashSet;
4139
import java.util.List;
4240
import java.util.Map;
4341
import java.util.Objects;
4442
import java.util.Set;
43+
import java.util.stream.Collectors;
4544

4645
public class EsqlCCSUtils {
4746

@@ -177,7 +176,11 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu
177176
}
178177
}
179178

180-
static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map<String, FieldCapabilitiesFailure> unavailable) {
179+
static void updateExecutionInfoWithUnavailableClusters(
180+
EsqlExecutionInfo execInfo,
181+
Map<String, List<FieldCapabilitiesFailure>> failures
182+
) {
183+
Map<String, FieldCapabilitiesFailure> unavailable = determineUnavailableRemoteClusters(failures);
181184
for (Map.Entry<String, FieldCapabilitiesFailure> entry : unavailable.entrySet()) {
182185
String clusterAlias = entry.getKey();
183186
boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable();
@@ -196,14 +199,16 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf
196199
static void updateExecutionInfoWithClustersWithNoMatchingIndices(
197200
EsqlExecutionInfo executionInfo,
198201
IndexResolution indexResolution,
199-
Set<String> unavailableClusters,
200-
QueryBuilder filter
202+
boolean usedFilter
201203
) {
202-
final Set<String> clustersWithNoMatchingIndices = new HashSet<>(executionInfo.clusterAliases());
204+
// Get the clusters which are still running, and we will check whether they have any matching indices.
205+
// NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters.
206+
final Set<String> clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING)
207+
.map(Cluster::getClusterAlias)
208+
.collect(Collectors.toSet());
203209
for (String indexName : indexResolution.resolvedIndices()) {
204210
clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName));
205211
}
206-
clustersWithNoMatchingIndices.removeAll(unavailableClusters);
207212
/*
208213
* Rules enforced at planning time around non-matching indices
209214
* 1. fail query if no matching indices on any cluster (VerificationException) - that is handled elsewhere
@@ -216,24 +221,20 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
216221
* Mark it as SKIPPED with 0 shards searched and took=0.
217222
*/
218223
for (String c : clustersWithNoMatchingIndices) {
219-
if (executionInfo.getCluster(c).getStatus() != Cluster.Status.RUNNING) {
220-
// if cluster was already in a terminal state, we don't need to check it again
221-
continue;
222-
}
223224
final String indexExpression = executionInfo.getCluster(c).getIndexExpression();
224225
if (concreteIndexRequested(executionInfo.getCluster(c).getIndexExpression())) {
225226
String error = Strings.format(
226227
"Unknown index [%s]",
227228
(c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? indexExpression : c + ":" + indexExpression)
228229
);
229-
if (executionInfo.isSkipUnavailable(c) == false || filter != null) {
230+
if (executionInfo.isSkipUnavailable(c) == false || usedFilter) {
230231
if (fatalErrorMessage == null) {
231232
fatalErrorMessage = error;
232233
} else {
233234
fatalErrorMessage += "; " + error;
234235
}
235236
}
236-
if (filter == null) {
237+
if (usedFilter == false) {
237238
// We check for filter since the filter may be the reason why the index is missing, and then we don't want to mark yet
238239
markClusterWithFinalStateAndNoShards(
239240
executionInfo,
@@ -269,8 +270,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
269270

270271
// Filter-less version, mainly for testing where we don't need filter support
271272
static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
272-
var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures()).keySet();
273-
updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, unavailableClusters, null);
273+
updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, false);
274274
}
275275

276276
// visible for testing

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -426,14 +426,8 @@ public void analyzedPlan(
426426
try {
427427
// the order here is tricky - if the cluster has been filtered and later became unavailable,
428428
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
429-
var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures());
430-
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unavailableClusters);
431-
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(
432-
executionInfo,
433-
result.indices,
434-
unavailableClusters.keySet(),
435-
null
436-
);
429+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures());
430+
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, false);
437431
plan = analyzeAction.apply(result);
438432
} catch (Exception e) {
439433
l.onFailure(e);
@@ -532,11 +526,9 @@ private boolean allCCSClustersSkipped(
532526
ActionListener<LogicalPlan> logicalPlanListener
533527
) {
534528
IndexResolution indexResolution = result.indices;
535-
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(
536-
executionInfo,
537-
EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures())
538-
);
539-
if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
529+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures());
530+
if (executionInfo.isCrossClusterSearch()
531+
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
540532
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
541533
// to let the LogicalPlanActionListener decide how to proceed
542534
LOGGER.debug("No more clusters to search, ending analysis stage");
@@ -564,13 +556,7 @@ private static void analyzeAndMaybeRetry(
564556
if (result.indices.isValid() || requestFilter != null) {
565557
// We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report
566558
// when the resolution result is not valid for a different reason.
567-
var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures()).keySet();
568-
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(
569-
executionInfo,
570-
result.indices,
571-
unavailableClusters,
572-
requestFilter
573-
);
559+
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter != null);
574560
}
575561
plan = analyzeAction.apply(result);
576562
} catch (Exception e) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() {
153153
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true));
154154

155155
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
156-
var unvailableClusters = Map.of(REMOTE1_ALIAS, failure, REMOTE2_ALIAS, failure);
156+
var unvailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure), REMOTE2_ALIAS, List.of(failure));
157157
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters);
158158

159159
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER_ALIAS, REMOTE1_ALIAS, REMOTE2_ALIAS)));
@@ -185,7 +185,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() {
185185
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
186186
RemoteTransportException e = expectThrows(
187187
RemoteTransportException.class,
188-
() -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, failure))
188+
() -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, List.of(failure)))
189189
);
190190
assertThat(e.status().getStatus(), equalTo(500));
191191
assertThat(
@@ -338,8 +338,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
338338
);
339339
// remote1 is unavailable
340340
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
341-
var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure));
342-
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
341+
var failures = Map.of(REMOTE1_ALIAS, List.of(failure));
342+
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures);
343343

344344
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
345345

@@ -349,9 +349,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
349349

350350
EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
351351
assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
352-
// since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed
353-
// by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters)
354-
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
352+
// since remote1 is in the failures Map (passed to IndexResolution.valid),
353+
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
355354

356355
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
357356
assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*"));
@@ -381,18 +380,17 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
381380
);
382381

383382
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
384-
var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure));
385-
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
383+
var failures = Map.of(REMOTE1_ALIAS, List.of(failure));
384+
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures);
386385
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
387386

388387
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS);
389388
assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
390389
assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
391390

392391
EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
393-
// since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed
394-
// by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters)
395-
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
392+
// skipped since remote1 is in the failures Map
393+
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
396394

397395
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
398396
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
@@ -430,8 +428,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
430428

431429
// remote1 is unavailable
432430
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
433-
var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure));
434-
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
431+
var failures = Map.of(REMOTE1_ALIAS, List.of(failure));
432+
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures);
435433

436434
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
437435

@@ -441,9 +439,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
441439

442440
EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
443441
assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
444-
// since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed
445-
// by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters)
446-
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
442+
// skipped since remote1 is in the failures Map
443+
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
447444

448445
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
449446
assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*"));

0 commit comments

Comments
 (0)