Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.Strings;

import java.util.Map;
import java.util.Set;

/**
* Interface for grouping index expressions, along with IndicesOptions by cluster alias.
Expand All @@ -29,23 +30,36 @@
public interface IndicesExpressionGrouper {

/**
* @param remoteClusterNames Set of configured remote cluster names.
* @param indicesOptions IndicesOptions to clarify how the index expression should be parsed/applied
* @param indexExpressionCsv Multiple index expressions as CSV string (with no spaces), e.g., "logs1,logs2,cluster-a:logs1".
* A single index expression is also supported.
* @return Map where the key is the cluster alias (for "local" cluster, it is RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
* and the value for that cluster from the index expression is an OriginalIndices object.
*/
default Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String indexExpressionCsv) {
return groupIndices(indicesOptions, Strings.splitStringByCommaToArray(indexExpressionCsv));
default Map<String, OriginalIndices> groupIndices(
Set<String> remoteClusterNames,
IndicesOptions indicesOptions,
String indexExpressionCsv
) {
return groupIndices(remoteClusterNames, indicesOptions, Strings.splitStringByCommaToArray(indexExpressionCsv));
}

/**
* Same behavior as the other groupIndices, except the incoming multiple index expressions must already be
* parsed into a String array.
* @param remoteClusterNames Set of configured remote cluster names.
* @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied
* @param indexExpressions Multiple index expressions as string[].
* @return Map where the key is the cluster alias (for "local" cluster, it is RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
* and the value for that cluster from the index expression is an OriginalIndices object.
*/
Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indexExpressions);
Map<String, OriginalIndices> groupIndices(Set<String> remoteClusterNames, IndicesOptions indicesOptions, String[] indexExpressions);

/**
* Returns a set of currently configured remote clusters.
*/
default Set<String> getConfiguredClusters() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid the default implementation here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could but it would make some tests a lot more verbose.

return Set.of();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no

/**
* Group indices by cluster alias mapped to OriginalIndices for that cluster.
* @param remoteClusterNames Set of configured remote cluster names.
* @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied
* @param indices Multiple index expressions as string[].
* @param returnLocalAll whether to support the _all functionality needed by _search
Expand All @@ -193,9 +194,14 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no
* If false, an empty map is returned when no indices are specified.
* @return Map keyed by cluster alias having OriginalIndices as the map value parsed from the String[] indices argument
*/
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll) {
public Map<String, OriginalIndices> groupIndices(
Set<String> remoteClusterNames,
IndicesOptions indicesOptions,
String[] indices,
boolean returnLocalAll
) {
final Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
final Map<String, List<String>> groupedIndices = groupClusterIndices(getRemoteClusterNames(), indices);
final Map<String, List<String>> groupedIndices = groupClusterIndices(remoteClusterNames, indices);
if (groupedIndices.isEmpty()) {
if (returnLocalAll) {
// search on _all in the local cluster if neither local indices nor remote indices were specified
Expand All @@ -214,12 +220,26 @@ public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions,
/**
* If no indices are specified, then a Map with one entry for the local cluster with an empty index array is returned.
* For details see {@code groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll)}
* @param remoteClusterNames Set of configured remote cluster names.
* @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied
* @param indices Multiple index expressions as string[].
* @return Map keyed by cluster alias having OriginalIndices as the map value parsed from the String[] indices argument
*/
public Map<String, OriginalIndices> groupIndices(Set<String> remoteClusterNames, IndicesOptions indicesOptions, String[] indices) {
return groupIndices(remoteClusterNames, indicesOptions, indices, true);
}

public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll) {
return groupIndices(getRemoteClusterNames(), indicesOptions, indices, returnLocalAll);
}

public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices) {
return groupIndices(indicesOptions, indices, true);
return groupIndices(getRemoteClusterNames(), indicesOptions, indices, true);
}

@Override
public Set<String> getConfiguredClusters() {
return getRemoteClusterNames();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ protected void getRemoteConnection(String cluster, ActionListener<Transport.Conn
);
}

public Map<String, List<String>> groupIndicesPerCluster(String[] indices) {
return remoteClusterService.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
public Map<String, List<String>> groupIndicesPerCluster(Set<String> remoteClusterNames, String[] indices) {
return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,13 @@ public static void checkForCcsLicense(
EsqlExecutionInfo executionInfo,
List<TableInfo> indices,
IndicesExpressionGrouper indicesGrouper,
Set<String> configuredClusters,
XPackLicenseState licenseState
) {
for (TableInfo tableInfo : indices) {
Map<String, OriginalIndices> groupedIndices;
try {
groupedIndices = indicesGrouper.groupIndices(IndicesOptions.DEFAULT, tableInfo.id().indexPattern());
groupedIndices = indicesGrouper.groupIndices(configuredClusters, IndicesOptions.DEFAULT, tableInfo.id().indexPattern());
} catch (NoSuchRemoteClusterException e) {
if (EsqlLicenseChecker.isCcsAllowed(licenseState)) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public interface PlanRunner {
private final PhysicalPlanOptimizer physicalPlanOptimizer;
private final PlanTelemetry planTelemetry;
private final IndicesExpressionGrouper indicesExpressionGrouper;
private Set<String> configuredClusters;

public EsqlSession(
String sessionId,
Expand Down Expand Up @@ -343,6 +344,8 @@ public void analyzedPlan(
plan.setAnalyzed();
return plan;
};
// Capture configured remotes list to ensure consistency throughout the session
configuredClusters = Set.copyOf(indicesExpressionGrouper.getConfiguredClusters());

PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
var unresolvedPolicies = preAnalysis.enriches.stream()
Expand All @@ -355,9 +358,10 @@ public void analyzedPlan(
.collect(Collectors.toSet());
final List<TableInfo> indices = preAnalysis.indices;

EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState());
EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState());

final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
configuredClusters,
indices.stream()
.flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().indexPattern())))
.toArray(String[]::new)
Expand All @@ -378,7 +382,7 @@ public void analyzedPlan(
// invalid index resolution to updateExecutionInfo
if (result.indices.isValid()) {
// CCS indices and skip_unavailable cluster values can stop the analysis right here
if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, result, logicalPlanListener, l)) return;
if (allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return;
}
// whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step
l.onResponse(result);
Expand Down Expand Up @@ -442,6 +446,7 @@ private void preAnalyzeIndices(
IndexPattern table = tableInfo.id();

Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
configuredClusters,
IndicesOptions.DEFAULT,
table.indexPattern()
);
Expand Down Expand Up @@ -501,13 +506,14 @@ private void preAnalyzeIndices(
}
}

private boolean analyzeCCSIndices(
/**
* Check if there are any clusters to search.
* @return true if there are no clusters to search, false otherwise
*/
private boolean allCCSClustersSkipped(
EsqlExecutionInfo executionInfo,
Set<String> targetClusters,
Set<EnrichPolicyResolver.UnresolvedPolicy> unresolvedPolicies,
PreAnalysisResult result,
ActionListener<LogicalPlan> logicalPlanListener,
ActionListener<PreAnalysisResult> l
ActionListener<LogicalPlan> logicalPlanListener
) {
IndexResolution indexResolution = result.indices;
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
Expand All @@ -520,22 +526,6 @@ private boolean analyzeCCSIndices(
return true;
}

Set<String> newClusters = enrichPolicyResolver.groupIndicesPerCluster(
indexResolution.get().concreteIndices().toArray(String[]::new)
).keySet();
// If new clusters appear when resolving the main indices, we need to resolve the enrich policies again
// or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again.
// TODO: add a test for this
if (targetClusters.containsAll(newClusters) == false
// do not bother with a re-resolution if only remotes were requested and all were offline
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isPresent()) {
enrichPolicyResolver.resolvePolicies(
newClusters,
unresolvedPolicies,
l.map(enrichResolution -> result.withEnrichResolution(enrichResolution))
);
return true;
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,22 +712,22 @@ public void testCheckForCcsLicense() {
List<TableInfo> indices = new ArrayList<>();
indices.add(new TableInfo(new IndexPattern(EMPTY, randomFrom("idx", "idx1,idx2*"))));

checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, platinumLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, goldLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, basicLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, standardLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, missingLicense);
checkForCcsLicense(executionInfo, indices, indicesGrouper, nullLicense);

checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, platinumLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, goldLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, basicLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, standardLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, missingLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), enterpriseLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), platinumLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), goldLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), trialLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), basicLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), standardLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), missingLicense);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), nullLicense);

checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), enterpriseLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), platinumLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), goldLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), trialLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), basicLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), standardLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), missingLicenseInactive);
}

// cross-cluster search requires a valid (active, non-expired) enterprise license OR a valid trial license
Expand All @@ -742,8 +742,8 @@ public void testCheckForCcsLicense() {
}

// licenses that work
checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), enterpriseLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), trialLicenseValid);

// all others fail ---

Expand Down Expand Up @@ -812,7 +812,7 @@ private void assertLicenseCheckFails(
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
ElasticsearchStatusException e = expectThrows(
ElasticsearchStatusException.class,
() -> checkForCcsLicense(executionInfo, indices, indicesGrouper, licenseState)
() -> checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), licenseState)
);
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(
Expand All @@ -825,7 +825,11 @@ private void assertLicenseCheckFails(

static class TestIndicesExpressionGrouper implements IndicesExpressionGrouper {
@Override
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indexExpressions) {
public Map<String, OriginalIndices> groupIndices(
Set<String> remoteClusterNames,
IndicesOptions indicesOptions,
String[] indexExpressions
) {
final Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
final String localKey = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void testFailedMetric() {
// test a failed query: xyz field doesn't exist
request.query("from test | stats m = max(xyz)");
EsqlSession.PlanRunner runPhase = (p, r) -> fail("this shouldn't happen");
IndicesExpressionGrouper groupIndicesByCluster = (indicesOptions, indexExpressions) -> Map.of(
IndicesExpressionGrouper groupIndicesByCluster = (remoteClusterNames, indicesOptions, indexExpressions) -> Map.of(
"",
new OriginalIndices(new String[] { "test" }, IndicesOptions.DEFAULT)
);
Expand Down