Skip to content

Commit 2dca22e

Browse files
committed
Ensure the set of remote clusters is consistent over the life of ES|QL query
1 parent e6096a2 commit 2dca22e

File tree

8 files changed

+73
-33
lines changed

8 files changed

+73
-33
lines changed

server/src/main/java/org/elasticsearch/indices/IndicesExpressionGrouper.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.Strings;
1515

1616
import java.util.Map;
17+
import java.util.Set;
1718

1819
/**
1920
* Interface for grouping index expressions, along with IndicesOptions by cluster alias.
@@ -29,23 +30,36 @@
2930
public interface IndicesExpressionGrouper {
3031

3132
/**
33+
* @param remoteClusterNames Set of configured remote cluster names.
3234
* @param indicesOptions IndicesOptions to clarify how the index expression should be parsed/applied
3335
* @param indexExpressionCsv Multiple index expressions as CSV string (with no spaces), e.g., "logs1,logs2,cluster-a:logs1".
3436
* A single index expression is also supported.
3537
* @return Map where the key is the cluster alias (for "local" cluster, it is RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
3638
* and the value for that cluster from the index expression is an OriginalIndices object.
3739
*/
38-
default Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String indexExpressionCsv) {
39-
return groupIndices(indicesOptions, Strings.splitStringByCommaToArray(indexExpressionCsv));
40+
default Map<String, OriginalIndices> groupIndices(
41+
Set<String> remoteClusterNames,
42+
IndicesOptions indicesOptions,
43+
String indexExpressionCsv
44+
) {
45+
return groupIndices(remoteClusterNames, indicesOptions, Strings.splitStringByCommaToArray(indexExpressionCsv));
4046
}
4147

4248
/**
4349
* Same behavior as the other groupIndices, except the incoming multiple index expressions must already be
4450
* parsed into a String array.
51+
* @param remoteClusterNames Set of configured remote cluster names.
4552
* @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied
4653
* @param indexExpressions Multiple index expressions as string[].
4754
* @return Map where the key is the cluster alias (for "local" cluster, it is RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
4855
* and the value for that cluster from the index expression is an OriginalIndices object.
4956
*/
50-
Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indexExpressions);
57+
Map<String, OriginalIndices> groupIndices(Set<String> remoteClusterNames, IndicesOptions indicesOptions, String[] indexExpressions);
58+
59+
/**
60+
* Returns a set of currently configured remote clusters.
61+
*/
62+
default Set<String> getConfiguredClusters() {
63+
return Set.of();
64+
}
5165
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no
185185

186186
/**
187187
* Group indices by cluster alias mapped to OriginalIndices for that cluster.
188+
* @param remoteClusterNames Set of configured remote cluster names.
188189
* @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied
189190
* @param indices Multiple index expressions as string[].
190191
* @param returnLocalAll whether to support the _all functionality needed by _search
@@ -193,9 +194,14 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no
193194
* If false, an empty map is returned when no indices are specified.
194195
* @return Map keyed by cluster alias having OriginalIndices as the map value parsed from the String[] indices argument
195196
*/
196-
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll) {
197+
public Map<String, OriginalIndices> groupIndices(
198+
Set<String> remoteClusterNames,
199+
IndicesOptions indicesOptions,
200+
String[] indices,
201+
boolean returnLocalAll
202+
) {
197203
final Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
198-
final Map<String, List<String>> groupedIndices = groupClusterIndices(getRemoteClusterNames(), indices);
204+
final Map<String, List<String>> groupedIndices = groupClusterIndices(remoteClusterNames, indices);
199205
if (groupedIndices.isEmpty()) {
200206
if (returnLocalAll) {
201207
// search on _all in the local cluster if neither local indices nor remote indices were specified
@@ -214,12 +220,26 @@ public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions,
214220
/**
215221
* If no indices are specified, then a Map with one entry for the local cluster with an empty index array is returned.
216222
* For details see {@code groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll)}
223+
* @param remoteClusterNames Set of configured remote cluster names.
217224
* @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied
218225
* @param indices Multiple index expressions as string[].
219226
* @return Map keyed by cluster alias having OriginalIndices as the map value parsed from the String[] indices argument
220227
*/
228+
public Map<String, OriginalIndices> groupIndices(Set<String> remoteClusterNames, IndicesOptions indicesOptions, String[] indices) {
229+
return groupIndices(remoteClusterNames, indicesOptions, indices, true);
230+
}
231+
232+
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll) {
233+
return groupIndices(getRemoteClusterNames(), indicesOptions, indices, returnLocalAll);
234+
}
235+
221236
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices) {
222-
return groupIndices(indicesOptions, indices, true);
237+
return groupIndices(getRemoteClusterNames(), indicesOptions, indices, true);
238+
}
239+
240+
@Override
241+
public Set<String> getConfiguredClusters() {
242+
return getRemoteClusterNames();
223243
}
224244

225245
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
7777
private transient TimeValue planningTookTime; // time elapsed since start of query to calling ComputeService.execute
7878
private volatile boolean isPartial; // Does this request have partial results?
7979
private transient volatile boolean isStopped; // Have we received stop command?
80+
private transient Set<String> configuredClusters; // Set of configured remote clusters
8081

8182
public EsqlExecutionInfo(boolean includeCCSMetadata) {
8283
this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true
@@ -354,6 +355,14 @@ public boolean isStopped() {
354355
return isStopped;
355356
}
356357

358+
public Set<String> getConfiguredClusters() {
359+
return configuredClusters;
360+
}
361+
362+
public void setConfiguredClusters(Set<String> configuredClusters) {
363+
this.configuredClusters = configuredClusters;
364+
}
365+
357366
/**
358367
* Represents the search metadata about a particular cluster involved in a cross-cluster search.
359368
* The Cluster object can represent either the local cluster or a remote cluster.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,8 +446,8 @@ protected void getRemoteConnection(String cluster, ActionListener<Transport.Conn
446446
);
447447
}
448448

449-
public Map<String, List<String>> groupIndicesPerCluster(String[] indices) {
450-
return remoteClusterService.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
449+
public Map<String, List<String>> groupIndicesPerCluster(Set<String> remoteClusterNames, String[] indices) {
450+
return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
451451
.entrySet()
452452
.stream()
453453
.collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices())));

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,11 @@ public static void checkForCcsLicense(
283283
for (TableInfo tableInfo : indices) {
284284
Map<String, OriginalIndices> groupedIndices;
285285
try {
286-
groupedIndices = indicesGrouper.groupIndices(IndicesOptions.DEFAULT, tableInfo.id().indexPattern());
286+
groupedIndices = indicesGrouper.groupIndices(
287+
executionInfo.getConfiguredClusters(),
288+
IndicesOptions.DEFAULT,
289+
tableInfo.id().indexPattern()
290+
);
287291
} catch (NoSuchRemoteClusterException e) {
288292
if (EsqlLicenseChecker.isCcsAllowed(licenseState)) {
289293
throw e;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,8 @@ public void analyzedPlan(
343343
plan.setAnalyzed();
344344
return plan;
345345
};
346+
// Capture configured remotes list to ensure consistency throughout the session
347+
executionInfo.setConfiguredClusters(indicesExpressionGrouper.getConfiguredClusters());
346348

347349
PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
348350
var unresolvedPolicies = preAnalysis.enriches.stream()
@@ -358,6 +360,7 @@ public void analyzedPlan(
358360
EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState());
359361

360362
final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
363+
executionInfo.getConfiguredClusters(),
361364
indices.stream()
362365
.flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().indexPattern())))
363366
.toArray(String[]::new)
@@ -378,7 +381,7 @@ public void analyzedPlan(
378381
// invalid index resolution to updateExecutionInfo
379382
if (result.indices.isValid()) {
380383
// CCS indices and skip_unavailable cluster values can stop the analysis right here
381-
if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, result, logicalPlanListener, l)) return;
384+
if (allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return;
382385
}
383386
// whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step
384387
l.onResponse(result);
@@ -442,6 +445,7 @@ private void preAnalyzeIndices(
442445
IndexPattern table = tableInfo.id();
443446

444447
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
448+
executionInfo.getConfiguredClusters(),
445449
IndicesOptions.DEFAULT,
446450
table.indexPattern()
447451
);
@@ -501,13 +505,14 @@ private void preAnalyzeIndices(
501505
}
502506
}
503507

504-
private boolean analyzeCCSIndices(
508+
/**
509+
* Check if there are any clusters to search.
510+
* @return true if there are no clusters to search, false otherwise
511+
*/
512+
private boolean allCCSClustersSkipped(
505513
EsqlExecutionInfo executionInfo,
506-
Set<String> targetClusters,
507-
Set<EnrichPolicyResolver.UnresolvedPolicy> unresolvedPolicies,
508514
PreAnalysisResult result,
509-
ActionListener<LogicalPlan> logicalPlanListener,
510-
ActionListener<PreAnalysisResult> l
515+
ActionListener<LogicalPlan> logicalPlanListener
511516
) {
512517
IndexResolution indexResolution = result.indices;
513518
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
@@ -520,22 +525,6 @@ private boolean analyzeCCSIndices(
520525
return true;
521526
}
522527

523-
Set<String> newClusters = enrichPolicyResolver.groupIndicesPerCluster(
524-
indexResolution.get().concreteIndices().toArray(String[]::new)
525-
).keySet();
526-
// If new clusters appear when resolving the main indices, we need to resolve the enrich policies again
527-
// or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again.
528-
// TODO: add a test for this
529-
if (targetClusters.containsAll(newClusters) == false
530-
// do not bother with a re-resolution if only remotes were requested and all were offline
531-
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isPresent()) {
532-
enrichPolicyResolver.resolvePolicies(
533-
newClusters,
534-
unresolvedPolicies,
535-
l.map(enrichResolution -> result.withEnrichResolution(enrichResolution))
536-
);
537-
return true;
538-
}
539528
return false;
540529
}
541530

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,11 @@ private void assertLicenseCheckFails(
825825

826826
static class TestIndicesExpressionGrouper implements IndicesExpressionGrouper {
827827
@Override
828-
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indexExpressions) {
828+
public Map<String, OriginalIndices> groupIndices(
829+
Set<String> remoteClusterNames,
830+
IndicesOptions indicesOptions,
831+
String[] indexExpressions
832+
) {
829833
final Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
830834
final String localKey = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
831835

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public void testFailedMetric() {
157157
// test a failed query: xyz field doesn't exist
158158
request.query("from test | stats m = max(xyz)");
159159
EsqlSession.PlanRunner runPhase = (p, r) -> fail("this shouldn't happen");
160-
IndicesExpressionGrouper groupIndicesByCluster = (indicesOptions, indexExpressions) -> Map.of(
160+
IndicesExpressionGrouper groupIndicesByCluster = (remoteClusterNames, indicesOptions, indexExpressions) -> Map.of(
161161
"",
162162
new OriginalIndices(new String[] { "test" }, IndicesOptions.DEFAULT)
163163
);

0 commit comments

Comments
 (0)