Skip to content

Commit 328cdbe

Browse files
committed
qualify joins from resolved main indices remotes
1 parent 20db58a commit 328cdbe

File tree

2 files changed

+40
-91
lines changed

2 files changed

+40
-91
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Set;
4343
import java.util.stream.Collectors;
4444

45+
import static java.util.stream.Collectors.joining;
4546
import static java.util.stream.Collectors.toSet;
4647

4748
public class EsqlCCSUtils {
@@ -155,30 +156,6 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn
155156
}
156157
}
157158

158-
static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
159-
StringBuilder sb = new StringBuilder();
160-
for (String clusterAlias : executionInfo.clusterAliases()) {
161-
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
162-
// Exclude clusters which are either skipped or have no indices matching wildcard, or filtered out.
163-
if (cluster.getStatus() != Cluster.Status.SKIPPED && cluster.getStatus() != Cluster.Status.SUCCESSFUL) {
164-
if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
165-
sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
166-
} else {
167-
String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression();
168-
for (String index : indexExpression.split(",")) {
169-
sb.append(clusterAlias).append(':').append(index).append(',');
170-
}
171-
}
172-
}
173-
}
174-
175-
if (sb.length() > 0) {
176-
return sb.substring(0, sb.length() - 1);
177-
} else {
178-
return "";
179-
}
180-
}
181-
182159
static void updateExecutionInfoWithUnavailableClusters(
183160
EsqlExecutionInfo execInfo,
184161
Map<String, List<FieldCapabilitiesFailure>> failures
@@ -421,5 +398,14 @@ public static Set<String> getRemotesOf(Set<String> concreteIndices) {
421398
return concreteIndices.stream().map(RemoteClusterAware::parseClusterAlias).collect(toSet());
422399
}
423400

424-
401+
/**
402+
* Given input like index=lookup-1 and remotes=[r1,r2,r3] this constructs output like `r1:lookup-1,r2:lookup-1,r3-lookup-1`
403+
* This is needed in order to require lookup index is present on every remote in order to correctly execute a query with join.
404+
*/
405+
public static String qualifyWithRunningRemotes(String index, Set<String> remotes, EsqlExecutionInfo executionInfo) {
406+
return remotes.stream().filter(remote -> {
407+
var cluster = executionInfo.getCluster(remote);
408+
return cluster == null || cluster.getStatus() == Cluster.Status.RUNNING;
409+
}).map(remote -> RemoteClusterAware.buildRemoteIndexName(remote, index)).collect(joining(","));
410+
}
425411
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 29 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,14 @@
8282
import java.util.List;
8383
import java.util.Map;
8484
import java.util.Set;
85-
import java.util.stream.Collectors;
8685
import java.util.stream.Stream;
8786

8887
import static java.util.stream.Collectors.toSet;
8988
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
9089
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
9190
import static org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin.firstSubPlan;
91+
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.getRemotesOf;
92+
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.qualifyWithRunningRemotes;
9293

9394
public class EsqlSession {
9495

@@ -407,36 +408,20 @@ private void preAnalyzeLookupIndex(
407408
EsqlExecutionInfo executionInfo,
408409
ActionListener<PreAnalysisResult> listener
409410
) {
410-
String localPattern = lookupIndexPattern.indexPattern();
411-
assert RemoteClusterAware.isRemoteIndexName(localPattern) == false
412-
: "Lookup index name should not include remote, but got: " + localPattern;
411+
String lookupJoinPattern = lookupIndexPattern.indexPattern();
412+
assert RemoteClusterAware.isRemoteIndexName(lookupJoinPattern) == false
413+
: "Lookup index name should not include remote, but got: " + lookupJoinPattern;
413414
assert ThreadPool.assertCurrentThreadPool(
414415
ThreadPool.Names.SEARCH,
415416
ThreadPool.Names.SEARCH_COORDINATION,
416417
ThreadPool.Names.SYSTEM_READ
417418
);
418-
Set<String> fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames;
419-
420-
String patternWithRemotes;
421-
422-
if (executionInfo.getClusters().isEmpty()) {
423-
patternWithRemotes = localPattern;
424-
} else {
425-
// convert index -> cluster1:index,cluster2:index, etc.for each running cluster
426-
patternWithRemotes = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING)
427-
.map(c -> RemoteClusterAware.buildRemoteIndexName(c.getClusterAlias(), localPattern))
428-
.collect(Collectors.joining(","));
429-
}
430-
if (patternWithRemotes.isEmpty()) {
431-
return;
432-
}
433-
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
434419
indexResolver.resolveAsMergedMapping(
435-
patternWithRemotes,
436-
fieldNames,
420+
qualifyWithRunningRemotes(lookupJoinPattern, getRemotesOf(result.indices.resolvedIndices()), executionInfo),
421+
result.wildcardJoinIndices().contains(lookupJoinPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames,
437422
null,
438423
false,
439-
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution))
424+
listener.map(indexResolution -> receiveLookupIndexResolution(result, lookupJoinPattern, executionInfo, indexResolution))
440425
);
441426
}
442427

@@ -634,51 +619,29 @@ private void preAnalyzeMainIndices(
634619
ThreadPool.Names.SYSTEM_READ
635620
);
636621
// TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one
637-
List<IndexPattern> indices = preAnalysis.indices;
638-
if (indices.size() > 1) {
639-
// Note: JOINs are not supported but we detect them when
640-
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
641-
} else if (indices.size() == 1) {
642-
IndexPattern table = indices.getFirst();
643-
644-
// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
645-
// based only on available clusters (which could now be an empty list)
646-
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
647-
if (indexExpressionToResolve.isEmpty()) {
648-
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
649-
listener.onResponse(
650-
result.withIndexResolution(IndexResolution.valid(new EsIndex(table.indexPattern(), Map.of(), Map.of())))
651-
);
652-
} else {
653-
boolean includeAllDimensions = false;
654-
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
655-
if (preAnalysis.indexMode == IndexMode.TIME_SERIES) {
656-
includeAllDimensions = true;
657-
// TODO: Maybe if no indices are returned, retry without index mode and provide a clearer error message.
658-
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
659-
if (requestFilter != null) {
660-
requestFilter = new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter);
661-
} else {
662-
requestFilter = indexModeFilter;
663-
}
664-
}
665-
indexResolver.resolveAsMergedMapping(
666-
indexExpressionToResolve,
667-
result.fieldNames,
622+
switch (preAnalysis.indices.size()) {
623+
case 0 -> listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]")));
624+
case 1 -> indexResolver.resolveAsMergedMapping(
625+
preAnalysis.indices.getFirst().indexPattern(),
626+
result.fieldNames,
627+
merge(
668628
requestFilter,
669-
includeAllDimensions,
670-
listener.delegateFailure((l, indexResolution) -> {
671-
l.onResponse(result.withIndexResolution(indexResolution));
672-
})
673-
);
674-
}
629+
preAnalysis.indexMode == IndexMode.TIME_SERIES
630+
? new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName())
631+
: null
632+
),
633+
preAnalysis.indexMode == IndexMode.TIME_SERIES,
634+
listener.delegateFailure((l, indexResolution) -> l.onResponse(result.withIndexResolution(indexResolution)))
635+
);
636+
default -> listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
637+
}
638+
}
639+
640+
private static QueryBuilder merge(QueryBuilder q1, QueryBuilder q2) {
641+
if (q1 != null && q2 != null) {
642+
return new BoolQueryBuilder().filter(q1).filter(q2);
675643
} else {
676-
try {
677-
// occurs when dealing with local relations (row a = 1)
678-
listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]")));
679-
} catch (Exception ex) {
680-
listener.onFailure(ex);
681-
}
644+
return q1 != null ? q1 : q2;
682645
}
683646
}
684647

0 commit comments

Comments
 (0)