Skip to content

Commit 72bd5a0

Browse files
authored
Simplify cluster state management in ESQL (#131635)
1 parent 558cc7a commit 72bd5a0

File tree

3 files changed

+100
-156
lines changed

3 files changed

+100
-156
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
package org.elasticsearch.xpack.esql.session;
99

1010
import org.elasticsearch.ElasticsearchSecurityException;
11+
import org.elasticsearch.ElasticsearchStatusException;
1112
import org.elasticsearch.ExceptionsHelper;
1213
import org.elasticsearch.action.ActionListener;
13-
import org.elasticsearch.action.OriginalIndices;
1414
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
1515
import org.elasticsearch.action.search.ShardSearchFailure;
1616
import org.elasticsearch.action.support.IndicesOptions;
@@ -313,48 +313,53 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) {
313313
}
314314

315315
/**
316-
* Checks the index expression for the presence of remote clusters. If found, it will ensure that the caller
317-
* has a valid Enterprise (or Trial) license on the querying cluster.
318-
* @param indices index expression requested by user
319-
* @param indicesGrouper grouper of index expressions by cluster alias
320-
* @param licenseState license state on the querying cluster
316+
* Checks the index expression for the presence of remote clusters.
317+
* If found, it will ensure that the caller has a valid Enterprise (or Trial) license on the querying cluster
318+
* as well as initialize corresponding cluster state in execution info.
321319
* @throws org.elasticsearch.ElasticsearchStatusException if the license is not valid (or present) for ES|QL CCS search.
322320
*/
323-
public static void checkForCcsLicense(
324-
EsqlExecutionInfo executionInfo,
325-
List<IndexPattern> indices,
321+
public static void initCrossClusterState(
326322
IndicesExpressionGrouper indicesGrouper,
327-
Set<String> configuredClusters,
328-
XPackLicenseState licenseState
329-
) {
330-
for (IndexPattern index : indices) {
331-
Map<String, OriginalIndices> groupedIndices;
332-
try {
333-
groupedIndices = indicesGrouper.groupIndices(configuredClusters, IndicesOptions.DEFAULT, index.indexPattern());
334-
} catch (NoSuchRemoteClusterException e) {
335-
if (EsqlLicenseChecker.isCcsAllowed(licenseState)) {
336-
throw e;
337-
} else {
338-
throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState);
339-
}
323+
XPackLicenseState licenseState,
324+
List<IndexPattern> patterns,
325+
EsqlExecutionInfo executionInfo
326+
) throws ElasticsearchStatusException {
327+
if (patterns.isEmpty()) {
328+
return;
329+
}
330+
assert patterns.size() == 1 : "Only single index pattern is supported";
331+
try {
332+
var groupedIndices = indicesGrouper.groupIndices(
333+
// indicesGrouper.getConfiguredClusters() might return mutable set that changes as clusters connect or disconnect.
334+
// it is copied here so that we have the same resolution when request contains multiple remote cluster patterns with *
335+
Set.copyOf(indicesGrouper.getConfiguredClusters()),
336+
IndicesOptions.DEFAULT,
337+
patterns.getFirst().indexPattern()
338+
);
339+
340+
// initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error
341+
// so that the CCS telemetry handler can recognize that this error is CCS-related
342+
for (var entry : groupedIndices.entrySet()) {
343+
final String clusterAlias = entry.getKey();
344+
final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
345+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
346+
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
347+
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
348+
});
340349
}
350+
341351
// check if it is a cross-cluster query
342352
if (groupedIndices.size() > 1 || groupedIndices.containsKey(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY) == false) {
343353
if (EsqlLicenseChecker.isCcsAllowed(licenseState) == false) {
344-
// initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error
345-
// so that the CCS telemetry handler can recognize that this error is CCS-related
346-
for (Map.Entry<String, OriginalIndices> entry : groupedIndices.entrySet()) {
347-
executionInfo.swapCluster(
348-
entry.getKey(),
349-
(k, v) -> new EsqlExecutionInfo.Cluster(
350-
entry.getKey(),
351-
Strings.arrayToCommaDelimitedString(entry.getValue().indices())
352-
)
353-
);
354-
}
355354
throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState);
356355
}
357356
}
357+
} catch (NoSuchRemoteClusterException e) {
358+
if (EsqlLicenseChecker.isCcsAllowed(licenseState)) {
359+
throw e;
360+
} else {
361+
throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState);
362+
}
358363
}
359364
}
360365

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,9 @@
1111
import org.elasticsearch.ExceptionsHelper;
1212
import org.elasticsearch.TransportVersions;
1313
import org.elasticsearch.action.ActionListener;
14-
import org.elasticsearch.action.OriginalIndices;
1514
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
1615
import org.elasticsearch.action.search.ShardSearchFailure;
17-
import org.elasticsearch.action.support.IndicesOptions;
1816
import org.elasticsearch.action.support.SubscribableListener;
19-
import org.elasticsearch.common.Strings;
2017
import org.elasticsearch.common.collect.Iterators;
2118
import org.elasticsearch.common.lucene.BytesRefs;
2219
import org.elasticsearch.common.regex.Regex;
@@ -156,7 +153,6 @@ public interface PlanRunner {
156153
private final PhysicalPlanOptimizer physicalPlanOptimizer;
157154
private final PlanTelemetry planTelemetry;
158155
private final IndicesExpressionGrouper indicesExpressionGrouper;
159-
private Set<String> configuredClusters;
160156
private final InferenceRunner inferenceRunner;
161157
private final RemoteClusterService remoteClusterService;
162158

@@ -410,8 +406,6 @@ public void analyzedPlan(
410406
plan.setAnalyzed();
411407
return plan;
412408
};
413-
// Capture configured remotes list to ensure consistency throughout the session
414-
configuredClusters = Set.copyOf(indicesExpressionGrouper.getConfiguredClusters());
415409

416410
PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
417411
var unresolvedPolicies = preAnalysis.enriches.stream()
@@ -422,10 +416,8 @@ public void analyzedPlan(
422416
)
423417
)
424418
.collect(Collectors.toSet());
425-
final List<IndexPattern> indices = preAnalysis.indices;
426419

427-
EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState());
428-
initializeClusterData(indices, executionInfo);
420+
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);
429421

430422
var listener = SubscribableListener.<EnrichResolution>newForked(
431423
l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l)
@@ -660,26 +652,6 @@ private void validateRemoteVersions(EsqlExecutionInfo executionInfo) {
660652
});
661653
}
662654

663-
private void initializeClusterData(List<IndexPattern> indices, EsqlExecutionInfo executionInfo) {
664-
if (indices.isEmpty()) {
665-
return;
666-
}
667-
assert indices.size() == 1 : "Only single index pattern is supported";
668-
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
669-
configuredClusters,
670-
IndicesOptions.DEFAULT,
671-
indices.getFirst().indexPattern()
672-
);
673-
for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
674-
final String clusterAlias = entry.getKey();
675-
String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
676-
executionInfo.swapCluster(clusterAlias, (k, v) -> {
677-
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
678-
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
679-
});
680-
}
681-
}
682-
683655
private void preAnalyzeMainIndices(
684656
PreAnalyzer.PreAnalysis preAnalysis,
685657
EsqlExecutionInfo executionInfo,

0 commit comments

Comments
 (0)