Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toSet;

public class EsqlCCSUtils {

Expand Down Expand Up @@ -153,30 +155,6 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn
}
}

static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
StringBuilder sb = new StringBuilder();
for (String clusterAlias : executionInfo.clusterAliases()) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
// Exclude clusters which are either skipped or have no indices matching wildcard, or filtered out.
if (cluster.getStatus() != Cluster.Status.SKIPPED && cluster.getStatus() != Cluster.Status.SUCCESSFUL) {
if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
} else {
String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression();
for (String index : indexExpression.split(",")) {
sb.append(clusterAlias).append(':').append(index).append(',');
}
}
}
}

if (sb.length() > 0) {
return sb.substring(0, sb.length() - 1);
} else {
return "";
}
}

static void updateExecutionInfoWithUnavailableClusters(
EsqlExecutionInfo execInfo,
Map<String, List<FieldCapabilitiesFailure>> failures
Expand Down Expand Up @@ -206,7 +184,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
// NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters.
final Set<String> clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING)
.map(Cluster::getClusterAlias)
.collect(Collectors.toSet());
.collect(toSet());
for (String indexName : indexResolution.resolvedIndices()) {
clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName));
}
Expand Down Expand Up @@ -414,4 +392,22 @@ public static String inClusterName(String clusterAlias) {
return "in remote cluster [" + clusterAlias + "]";
}
}

public static Set<String> getRemotesOf(Set<String> concreteIndices) {
return concreteIndices.stream().map(RemoteClusterAware::parseClusterAlias).collect(toSet());
}

/**
* Given input like index=lookup-1 and remotes=[r1,r2,r3] this constructs output like `r1:lookup-1,r2:lookup-1,r3-lookup-1`
* This is needed in order to require lookup index is present on every remote in order to correctly execute a query with join.
*/
public static String qualifyWithRunningRemotes(String index, Set<String> remotes, EsqlExecutionInfo executionInfo) {
if (remotes.isEmpty()) {
return index;
}
return remotes.stream().filter(remote -> {
var cluster = executionInfo.getCluster(remote);
return cluster == null || cluster.getStatus() == Cluster.Status.RUNNING;
}).map(remote -> RemoteClusterAware.buildRemoteIndexName(remote, index)).collect(joining(","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,18 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
import static org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin.firstSubPlan;
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.getRemotesOf;
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.qualifyWithRunningRemotes;

public class EsqlSession {

Expand Down Expand Up @@ -372,58 +374,54 @@ public void analyzedPlan(
return;
}

PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
var preAnalysis = preAnalyzer.preAnalyze(parsed);
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);

var listener = SubscribableListener. //
SubscribableListener. //
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches, executionInfo, l))
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l));
// first resolve the lookup indices, then the main indices
for (var index : preAnalysis.lookupIndices) {
listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l));
}
listener.<PreAnalysisResult>andThen((l, result) -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
.<LogicalPlan>andThen((l, result) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, result, l))
.<PreAnalysisResult>andThen((l, r) -> resolveInferences(parsed, r, l))
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l))
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices.iterator(), r, executionInfo, l))
.<LogicalPlan>andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l))
.addListener(logicalPlanListener);
}

private void preAnalyzeLookupIndices(
Iterator<IndexPattern> lookupIndices,
PreAnalysisResult preAnalysisResult,
EsqlExecutionInfo executionInfo,
ActionListener<PreAnalysisResult> listener
) {
if (lookupIndices.hasNext()) {
preAnalyzeLookupIndex(lookupIndices.next(), preAnalysisResult, executionInfo, listener.delegateFailureAndWrap((l, r) -> {
preAnalyzeLookupIndices(lookupIndices, r, executionInfo, l);
}));
} else {
listener.onResponse(preAnalysisResult);
}
}

private void preAnalyzeLookupIndex(
IndexPattern lookupIndexPattern,
PreAnalysisResult result,
EsqlExecutionInfo executionInfo,
ActionListener<PreAnalysisResult> listener
) {
String localPattern = lookupIndexPattern.indexPattern();
assert RemoteClusterAware.isRemoteIndexName(localPattern) == false
: "Lookup index name should not include remote, but got: " + localPattern;
String lookupJoinPattern = lookupIndexPattern.indexPattern();
assert RemoteClusterAware.isRemoteIndexName(lookupJoinPattern) == false
: "Lookup index name should not include remote, but got: " + lookupJoinPattern;
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
Set<String> fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames;

String patternWithRemotes;

if (executionInfo.getClusters().isEmpty()) {
patternWithRemotes = localPattern;
} else {
// convert index -> cluster1:index,cluster2:index, etc.for each running cluster
patternWithRemotes = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING)
.map(c -> RemoteClusterAware.buildRemoteIndexName(c.getClusterAlias(), localPattern))
.collect(Collectors.joining(","));
}
if (patternWithRemotes.isEmpty()) {
return;
}
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
indexResolver.resolveAsMergedMapping(
patternWithRemotes,
fieldNames,
qualifyWithRunningRemotes(lookupJoinPattern, getRemotesOf(result.indices.resolvedIndices()), executionInfo),
result.wildcardJoinIndices().contains(lookupJoinPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames,
null,
false,
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution))
listener.map(indexResolution -> receiveLookupIndexResolution(result, lookupJoinPattern, executionInfo, indexResolution))
);
}

Expand Down Expand Up @@ -621,51 +619,29 @@ private void preAnalyzeMainIndices(
ThreadPool.Names.SYSTEM_READ
);
// TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one
List<IndexPattern> indices = preAnalysis.indices;
if (indices.size() > 1) {
// Note: JOINs are not supported but we detect them when
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
} else if (indices.size() == 1) {
IndexPattern table = indices.getFirst();

// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
// based only on available clusters (which could now be an empty list)
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
listener.onResponse(
result.withIndexResolution(IndexResolution.valid(new EsIndex(table.indexPattern(), Map.of(), Map.of())))
);
} else {
boolean includeAllDimensions = false;
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
if (preAnalysis.indexMode == IndexMode.TIME_SERIES) {
includeAllDimensions = true;
// TODO: Maybe if no indices are returned, retry without index mode and provide a clearer error message.
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
if (requestFilter != null) {
requestFilter = new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter);
} else {
requestFilter = indexModeFilter;
}
}
indexResolver.resolveAsMergedMapping(
indexExpressionToResolve,
result.fieldNames,
switch (preAnalysis.indices.size()) {
case 0 -> listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]")));
case 1 -> indexResolver.resolveAsMergedMapping(
preAnalysis.indices.getFirst().indexPattern(),
result.fieldNames,
merge(
requestFilter,
includeAllDimensions,
listener.delegateFailure((l, indexResolution) -> {
l.onResponse(result.withIndexResolution(indexResolution));
})
);
}
preAnalysis.indexMode == IndexMode.TIME_SERIES
? new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName())
: null
),
preAnalysis.indexMode == IndexMode.TIME_SERIES,
listener.delegateFailure((l, indexResolution) -> l.onResponse(result.withIndexResolution(indexResolution)))
);
default -> listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
}
}

private static QueryBuilder merge(QueryBuilder q1, QueryBuilder q2) {
if (q1 != null && q2 != null) {
return new BoolQueryBuilder().filter(q1).filter(q2);
} else {
try {
// occurs when dealing with local relations (row a = 1)
listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]")));
} catch (Exception ex) {
listener.onFailure(ex);
}
return q1 != null ? q1 : q2;
}
}

Expand Down
Loading