Skip to content

Commit 5510b7f

Browse files
authored
Ensure the set of remote clusters is consistent over the life of ES|QL query (elastic#126000) (elastic#126161)
* Ensure the set of remote clusters is consistent over the life of ES|QL query (cherry picked from commit d84b65d) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
1 parent c60b69f commit 5510b7f

File tree

7 files changed

+82
-53
lines changed

7 files changed

+82
-53
lines changed

server/src/main/java/org/elasticsearch/indices/IndicesExpressionGrouper.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.Strings;
1515

1616
import java.util.Map;
17+
import java.util.Set;
1718

1819
/**
1920
* Interface for grouping index expressions, along with IndicesOptions by cluster alias.
@@ -29,23 +30,36 @@
2930
public interface IndicesExpressionGrouper {
3031

3132
/**
33+
* @param remoteClusterNames Set of configured remote cluster names.
3234
* @param indicesOptions IndicesOptions to clarify how the index expression should be parsed/applied
3335
* @param indexExpressionCsv Multiple index expressions as CSV string (with no spaces), e.g., "logs1,logs2,cluster-a:logs1".
3436
* A single index expression is also supported.
3537
* @return Map where the key is the cluster alias (for "local" cluster, it is RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
3638
* and the value for that cluster from the index expression is an OriginalIndices object.
3739
*/
38-
default Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String indexExpressionCsv) {
39-
return groupIndices(indicesOptions, Strings.splitStringByCommaToArray(indexExpressionCsv));
40+
default Map<String, OriginalIndices> groupIndices(
41+
Set<String> remoteClusterNames,
42+
IndicesOptions indicesOptions,
43+
String indexExpressionCsv
44+
) {
45+
return groupIndices(remoteClusterNames, indicesOptions, Strings.splitStringByCommaToArray(indexExpressionCsv));
4046
}
4147

4248
/**
4349
* Same behavior as the other groupIndices, except the incoming multiple index expressions must already be
4450
* parsed into a String array.
51+
* @param remoteClusterNames Set of configured remote cluster names.
4552
* @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied
4653
* @param indexExpressions Multiple index expressions as string[].
4754
* @return Map where the key is the cluster alias (for "local" cluster, it is RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
4855
* and the value for that cluster from the index expression is an OriginalIndices object.
4956
*/
50-
Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indexExpressions);
57+
Map<String, OriginalIndices> groupIndices(Set<String> remoteClusterNames, IndicesOptions indicesOptions, String[] indexExpressions);
58+
59+
/**
60+
* Returns a set of currently configured remote clusters.
61+
*/
62+
default Set<String> getConfiguredClusters() {
63+
return Set.of();
64+
}
5165
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no
185185

186186
/**
187187
* Group indices by cluster alias mapped to OriginalIndices for that cluster.
188+
* @param remoteClusterNames Set of configured remote cluster names.
188189
* @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied
189190
* @param indices Multiple index expressions as string[].
190191
* @param returnLocalAll whether to support the _all functionality needed by _search
@@ -193,9 +194,14 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no
193194
* If false, an empty map is returned when no indices are specified.
194195
* @return Map keyed by cluster alias having OriginalIndices as the map value parsed from the String[] indices argument
195196
*/
196-
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll) {
197+
public Map<String, OriginalIndices> groupIndices(
198+
Set<String> remoteClusterNames,
199+
IndicesOptions indicesOptions,
200+
String[] indices,
201+
boolean returnLocalAll
202+
) {
197203
final Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
198-
final Map<String, List<String>> groupedIndices = groupClusterIndices(getRemoteClusterNames(), indices);
204+
final Map<String, List<String>> groupedIndices = groupClusterIndices(remoteClusterNames, indices);
199205
if (groupedIndices.isEmpty()) {
200206
if (returnLocalAll) {
201207
// search on _all in the local cluster if neither local indices nor remote indices were specified
@@ -214,12 +220,26 @@ public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions,
214220
/**
215221
* If no indices are specified, then a Map with one entry for the local cluster with an empty index array is returned.
216222
* For details see {@code groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll)}
223+
* @param remoteClusterNames Set of configured remote cluster names.
217224
* @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied
218225
* @param indices Multiple index expressions as string[].
219226
* @return Map keyed by cluster alias having OriginalIndices as the map value parsed from the String[] indices argument
220227
*/
228+
public Map<String, OriginalIndices> groupIndices(Set<String> remoteClusterNames, IndicesOptions indicesOptions, String[] indices) {
229+
return groupIndices(remoteClusterNames, indicesOptions, indices, true);
230+
}
231+
232+
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll) {
233+
return groupIndices(getRemoteClusterNames(), indicesOptions, indices, returnLocalAll);
234+
}
235+
221236
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices) {
222-
return groupIndices(indicesOptions, indices, true);
237+
return groupIndices(getRemoteClusterNames(), indicesOptions, indices, true);
238+
}
239+
240+
@Override
241+
public Set<String> getConfiguredClusters() {
242+
return getRemoteClusterNames();
223243
}
224244

225245
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,8 +446,8 @@ protected void getRemoteConnection(String cluster, ActionListener<Transport.Conn
446446
);
447447
}
448448

449-
public Map<String, List<String>> groupIndicesPerCluster(String[] indices) {
450-
return remoteClusterService.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
449+
public Map<String, List<String>> groupIndicesPerCluster(Set<String> remoteClusterNames, String[] indices) {
450+
return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
451451
.entrySet()
452452
.stream()
453453
.collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices())));

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,13 @@ public static void checkForCcsLicense(
278278
EsqlExecutionInfo executionInfo,
279279
List<TableInfo> indices,
280280
IndicesExpressionGrouper indicesGrouper,
281+
Set<String> configuredClusters,
281282
XPackLicenseState licenseState
282283
) {
283284
for (TableInfo tableInfo : indices) {
284285
Map<String, OriginalIndices> groupedIndices;
285286
try {
286-
groupedIndices = indicesGrouper.groupIndices(IndicesOptions.DEFAULT, tableInfo.id().indexPattern());
287+
groupedIndices = indicesGrouper.groupIndices(configuredClusters, IndicesOptions.DEFAULT, tableInfo.id().indexPattern());
287288
} catch (NoSuchRemoteClusterException e) {
288289
if (EsqlLicenseChecker.isCcsAllowed(licenseState)) {
289290
throw e;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public interface PlanRunner {
117117
private final PhysicalPlanOptimizer physicalPlanOptimizer;
118118
private final PlanTelemetry planTelemetry;
119119
private final IndicesExpressionGrouper indicesExpressionGrouper;
120+
private Set<String> configuredClusters;
120121

121122
public EsqlSession(
122123
String sessionId,
@@ -303,6 +304,8 @@ public void analyzedPlan(
303304
plan.setAnalyzed();
304305
return plan;
305306
};
307+
// Capture configured remotes list to ensure consistency throughout the session
308+
configuredClusters = Set.copyOf(indicesExpressionGrouper.getConfiguredClusters());
306309

307310
PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
308311
var unresolvedPolicies = preAnalysis.enriches.stream()
@@ -315,9 +318,10 @@ public void analyzedPlan(
315318
.collect(Collectors.toSet());
316319
final List<TableInfo> indices = preAnalysis.indices;
317320

318-
EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState());
321+
EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState());
319322

320323
final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
324+
configuredClusters,
321325
indices.stream()
322326
.flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().indexPattern())))
323327
.toArray(String[]::new)
@@ -338,7 +342,7 @@ public void analyzedPlan(
338342
// invalid index resolution to updateExecutionInfo
339343
if (result.indices.isValid()) {
340344
// CCS indices and skip_unavailable cluster values can stop the analysis right here
341-
if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, result, logicalPlanListener, l)) return;
345+
if (allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return;
342346
}
343347
// whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step
344348
l.onResponse(result);
@@ -402,6 +406,7 @@ private void preAnalyzeIndices(
402406
IndexPattern table = tableInfo.id();
403407

404408
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
409+
configuredClusters,
405410
IndicesOptions.DEFAULT,
406411
table.indexPattern()
407412
);
@@ -461,13 +466,14 @@ private void preAnalyzeIndices(
461466
}
462467
}
463468

464-
private boolean analyzeCCSIndices(
469+
/**
470+
* Check if there are any clusters to search.
471+
* @return true if there are no clusters to search, false otherwise
472+
*/
473+
private boolean allCCSClustersSkipped(
465474
EsqlExecutionInfo executionInfo,
466-
Set<String> targetClusters,
467-
Set<EnrichPolicyResolver.UnresolvedPolicy> unresolvedPolicies,
468475
PreAnalysisResult result,
469-
ActionListener<LogicalPlan> logicalPlanListener,
470-
ActionListener<PreAnalysisResult> l
476+
ActionListener<LogicalPlan> logicalPlanListener
471477
) {
472478
IndexResolution indexResolution = result.indices;
473479
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
@@ -479,22 +485,6 @@ private boolean analyzeCCSIndices(
479485
return true;
480486
}
481487

482-
Set<String> newClusters = enrichPolicyResolver.groupIndicesPerCluster(
483-
indexResolution.get().concreteIndices().toArray(String[]::new)
484-
).keySet();
485-
// If new clusters appear when resolving the main indices, we need to resolve the enrich policies again
486-
// or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again.
487-
// TODO: add a test for this
488-
if (targetClusters.containsAll(newClusters) == false
489-
// do not bother with a re-resolution if only remotes were requested and all were offline
490-
&& executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) {
491-
enrichPolicyResolver.resolvePolicies(
492-
newClusters,
493-
unresolvedPolicies,
494-
l.map(enrichResolution -> result.withEnrichResolution(enrichResolution))
495-
);
496-
return true;
497-
}
498488
return false;
499489
}
500490

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -712,22 +712,22 @@ public void testCheckForCcsLicense() {
712712
List<TableInfo> indices = new ArrayList<>();
713713
indices.add(new TableInfo(new IndexPattern(EMPTY, randomFrom("idx", "idx1,idx2*"))));
714714

715-
checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseValid);
716-
checkForCcsLicense(executionInfo, indices, indicesGrouper, platinumLicenseValid);
717-
checkForCcsLicense(executionInfo, indices, indicesGrouper, goldLicenseValid);
718-
checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseValid);
719-
checkForCcsLicense(executionInfo, indices, indicesGrouper, basicLicenseValid);
720-
checkForCcsLicense(executionInfo, indices, indicesGrouper, standardLicenseValid);
721-
checkForCcsLicense(executionInfo, indices, indicesGrouper, missingLicense);
722-
checkForCcsLicense(executionInfo, indices, indicesGrouper, nullLicense);
723-
724-
checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseInactive);
725-
checkForCcsLicense(executionInfo, indices, indicesGrouper, platinumLicenseInactive);
726-
checkForCcsLicense(executionInfo, indices, indicesGrouper, goldLicenseInactive);
727-
checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseInactive);
728-
checkForCcsLicense(executionInfo, indices, indicesGrouper, basicLicenseInactive);
729-
checkForCcsLicense(executionInfo, indices, indicesGrouper, standardLicenseInactive);
730-
checkForCcsLicense(executionInfo, indices, indicesGrouper, missingLicenseInactive);
715+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), enterpriseLicenseValid);
716+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), platinumLicenseValid);
717+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), goldLicenseValid);
718+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), trialLicenseValid);
719+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), basicLicenseValid);
720+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), standardLicenseValid);
721+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), missingLicense);
722+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), nullLicense);
723+
724+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), enterpriseLicenseInactive);
725+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), platinumLicenseInactive);
726+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), goldLicenseInactive);
727+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), trialLicenseInactive);
728+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), basicLicenseInactive);
729+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), standardLicenseInactive);
730+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), missingLicenseInactive);
731731
}
732732

733733
// cross-cluster search requires a valid (active, non-expired) enterprise license OR a valid trial license
@@ -742,8 +742,8 @@ public void testCheckForCcsLicense() {
742742
}
743743

744744
// licenses that work
745-
checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseValid);
746-
checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseValid);
745+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), enterpriseLicenseValid);
746+
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), trialLicenseValid);
747747

748748
// all others fail ---
749749

@@ -812,7 +812,7 @@ private void assertLicenseCheckFails(
812812
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
813813
ElasticsearchStatusException e = expectThrows(
814814
ElasticsearchStatusException.class,
815-
() -> checkForCcsLicense(executionInfo, indices, indicesGrouper, licenseState)
815+
() -> checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), licenseState)
816816
);
817817
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
818818
assertThat(
@@ -825,7 +825,11 @@ private void assertLicenseCheckFails(
825825

826826
static class TestIndicesExpressionGrouper implements IndicesExpressionGrouper {
827827
@Override
828-
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indexExpressions) {
828+
public Map<String, OriginalIndices> groupIndices(
829+
Set<String> remoteClusterNames,
830+
IndicesOptions indicesOptions,
831+
String[] indexExpressions
832+
) {
829833
final Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
830834
final String localKey = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
831835

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public void testFailedMetric() {
156156
// test a failed query: xyz field doesn't exist
157157
request.query("from test | stats m = max(xyz)");
158158
EsqlSession.PlanRunner runPhase = (p, r) -> fail("this shouldn't happen");
159-
IndicesExpressionGrouper groupIndicesByCluster = (indicesOptions, indexExpressions) -> Map.of(
159+
IndicesExpressionGrouper groupIndicesByCluster = (remoteClusterNames, indicesOptions, indexExpressions) -> Map.of(
160160
"",
161161
new OriginalIndices(new String[] { "test" }, IndicesOptions.DEFAULT)
162162
);

0 commit comments

Comments
 (0)