Skip to content

Commit c75e92a

Browse files
authored
ESQL: Allow using the same index in FROM and LOOKUP (#118768)
Changed the logic so that, instead of getting all the indices of the plan and then subtracting the lookup ones, it will now directly ignore the lookup part in the initial calculation.
1 parent 7cf28a9 commit c75e92a

File tree

4 files changed

+47
-48
lines changed

4 files changed

+47
-48
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,22 @@ language_code:integer | language_name:keyword | country:keyword
182182
2 | [German, German, German] | [Austria, Germany, Switzerland]
183183
;
184184

185+
repeatedIndexOnFrom
186+
required_capability: join_lookup_v7
187+
required_capability: join_lookup_repeated_index_from
188+
189+
FROM languages_lookup
190+
| LOOKUP JOIN languages_lookup ON language_code
191+
| SORT language_code
192+
;
193+
194+
language_code:integer | language_name:keyword
195+
1 | English
196+
2 | French
197+
3 | Spanish
198+
4 | German
199+
;
200+
185201
###############################################
186202
# Filtering tests with languages_lookup index
187203
###############################################
@@ -1061,4 +1077,3 @@ ignoreOrder:true
10611077
2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889 | QA | null
10621078
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | QA | null
10631079
;
1064-

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,11 @@ public enum Cap {
557557
*/
558558
JOIN_LOOKUP_V7(Build.current().isSnapshot()),
559559

560+
/**
561+
* LOOKUP JOIN with the same index as the FROM
562+
*/
563+
JOIN_LOOKUP_REPEATED_INDEX_FROM(JOIN_LOOKUP_V7.isEnabled()),
564+
560565
/**
561566
* Fix for https://github.com/elastic/elasticsearch/issues/117054
562567
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
2323
import org.elasticsearch.xpack.esql.core.expression.Expression;
2424
import org.elasticsearch.xpack.esql.core.expression.predicate.Predicates;
25+
import org.elasticsearch.xpack.esql.core.tree.Node;
2526
import org.elasticsearch.xpack.esql.core.tree.Source;
2627
import org.elasticsearch.xpack.esql.core.type.DataType;
2728
import org.elasticsearch.xpack.esql.core.util.Holder;
@@ -40,6 +41,7 @@
4041
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
4142
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
4243
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
44+
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
4345
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
4446
import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
4547
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
@@ -48,9 +50,12 @@
4850
import org.elasticsearch.xpack.esql.stats.SearchStats;
4951

5052
import java.util.ArrayList;
53+
import java.util.Collection;
5154
import java.util.LinkedHashSet;
5255
import java.util.List;
5356
import java.util.Set;
57+
import java.util.function.Consumer;
58+
import java.util.function.Function;
5459

5560
import static java.util.Arrays.asList;
5661
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES;
@@ -105,10 +110,27 @@ public static Set<String> planConcreteIndices(PhysicalPlan plan) {
105110
return Set.of();
106111
}
107112
var indices = new LinkedHashSet<String>();
108-
plan.forEachUp(FragmentExec.class, f -> f.fragment().forEachUp(EsRelation.class, r -> indices.addAll(r.index().concreteIndices())));
113+
// TODO: This only works for LEFT join, we still need to support RIGHT join
114+
forEachUpWithChildren(plan, node -> {
115+
if (node instanceof FragmentExec f) {
116+
f.fragment().forEachUp(EsRelation.class, r -> indices.addAll(r.index().concreteIndices()));
117+
}
118+
}, node -> node instanceof LookupJoinExec join ? List.of(join.left()) : node.children());
109119
return indices;
110120
}
111121

122+
/**
123+
* Similar to {@link Node#forEachUp(Consumer)}, but with a custom callback to get the node children.
124+
*/
125+
private static <T extends Node<T>> void forEachUpWithChildren(
126+
T node,
127+
Consumer<? super T> action,
128+
Function<? super T, Collection<T>> childrenGetter
129+
) {
130+
childrenGetter.apply(node).forEach(c -> forEachUpWithChildren(c, action, childrenGetter));
131+
action.accept(node);
132+
}
133+
112134
/**
113135
* Returns the original indices specified in the FROM command of the query. We need the original query to resolve alias filters.
114136
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,8 @@
6363
import org.elasticsearch.xpack.esql.core.expression.Attribute;
6464
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
6565
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
66-
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
67-
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
6866
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
6967
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
70-
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
71-
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
7268
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
7369
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
7470
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
@@ -81,7 +77,6 @@
8177
import java.util.Arrays;
8278
import java.util.Collections;
8379
import java.util.HashMap;
84-
import java.util.HashSet;
8580
import java.util.List;
8681
import java.util.Map;
8782
import java.util.Set;
@@ -167,11 +162,9 @@ public void execute(
167162
Map<String, OriginalIndices> clusterToConcreteIndices = transportService.getRemoteClusterService()
168163
.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planConcreteIndices(physicalPlan).toArray(String[]::new));
169164
QueryPragmas queryPragmas = configuration.pragmas();
170-
Set<String> lookupIndexNames = findLookupIndexNames(physicalPlan);
171-
Set<String> concreteIndexNames = selectConcreteIndices(clusterToConcreteIndices, lookupIndexNames);
172165
if (dataNodePlan == null) {
173-
if (concreteIndexNames.isEmpty() == false) {
174-
String error = "expected no concrete indices without data node plan; got " + concreteIndexNames;
166+
if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0) == false) {
167+
String error = "expected no concrete indices without data node plan; got " + clusterToConcreteIndices;
175168
assert false : error;
176169
listener.onFailure(new IllegalStateException(error));
177170
return;
@@ -194,7 +187,7 @@ public void execute(
194187
return;
195188
}
196189
} else {
197-
if (concreteIndexNames.isEmpty()) {
190+
if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0)) {
198191
var error = "expected concrete indices with data node plan but got empty; data node plan " + dataNodePlan;
199192
assert false : error;
200193
listener.onFailure(new IllegalStateException(error));
@@ -268,42 +261,6 @@ public void execute(
268261
}
269262
}
270263

271-
private Set<String> selectConcreteIndices(Map<String, OriginalIndices> clusterToConcreteIndices, Set<String> indexesToIgnore) {
272-
Set<String> concreteIndexNames = new HashSet<>();
273-
clusterToConcreteIndices.forEach((clusterAlias, concreteIndices) -> {
274-
for (String index : concreteIndices.indices()) {
275-
if (indexesToIgnore.contains(index) == false) {
276-
concreteIndexNames.add(index);
277-
}
278-
}
279-
});
280-
return concreteIndexNames;
281-
}
282-
283-
private Set<String> findLookupIndexNames(PhysicalPlan physicalPlan) {
284-
Set<String> lookupIndexNames = new HashSet<>();
285-
// When planning JOIN on the coordinator node: "LookupJoinExec.lookup()->FragmentExec.fragment()->EsRelation.index()"
286-
physicalPlan.forEachDown(
287-
LookupJoinExec.class,
288-
lookupJoinExec -> lookupJoinExec.lookup()
289-
.forEachDown(
290-
FragmentExec.class,
291-
frag -> frag.fragment().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name()))
292-
)
293-
);
294-
// When planning JOIN on the data node: "FragmentExec.fragment()->Join.right()->EsRelation.index()"
295-
// TODO this only works for LEFT join, so we still need to support RIGHT join
296-
physicalPlan.forEachDown(
297-
FragmentExec.class,
298-
fragmentExec -> fragmentExec.fragment()
299-
.forEachDown(
300-
Join.class,
301-
join -> join.right().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name()))
302-
)
303-
);
304-
return lookupIndexNames;
305-
}
306-
307264
// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
308265
private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
309266
if (execInfo.isCrossClusterSearch()) {

0 commit comments

Comments
 (0)