Skip to content

Commit 78baad4

Browse files
Make LookupFromIndexIT closer to production
1 parent f00978c commit 78baad4

File tree

1 file changed

+137
-21
lines changed

1 file changed

+137
-21
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 137 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@
5656
import org.elasticsearch.threadpool.ThreadPool;
5757
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
5858
import org.elasticsearch.xpack.esql.core.expression.Alias;
59+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
5960
import org.elasticsearch.xpack.esql.core.expression.Expression;
6061
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
6162
import org.elasticsearch.xpack.esql.core.expression.Literal;
63+
import org.elasticsearch.xpack.esql.core.expression.NameId;
6264
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
6365
import org.elasticsearch.xpack.esql.core.tree.Source;
6466
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -83,6 +85,7 @@
8385
import java.util.ArrayList;
8486
import java.util.Collections;
8587
import java.util.HashMap;
88+
import java.util.HashSet;
8689
import java.util.List;
8790
import java.util.Map;
8891
import java.util.Set;
@@ -95,6 +98,71 @@
9598
import static org.hamcrest.Matchers.hasSize;
9699

97100
public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
101+
// Precreate all attributes statically to ensure NameId matching
102+
private static final FieldAttribute R_FIELD_ATTR = new FieldAttribute(
103+
Source.EMPTY,
104+
null,
105+
null,
106+
"l",
107+
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
108+
);
109+
private static final FieldAttribute RKEY0_KEYWORD_ATTR = new FieldAttribute(
110+
Source.EMPTY,
111+
null,
112+
null,
113+
"rkey0",
114+
new EsField("rkey0", DataType.KEYWORD, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
115+
);
116+
private static final FieldAttribute RKEY0_LONG_ATTR = new FieldAttribute(
117+
Source.EMPTY,
118+
null,
119+
null,
120+
"rkey0",
121+
new EsField("rkey0", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
122+
);
123+
private static final FieldAttribute RKEY1_KEYWORD_ATTR = new FieldAttribute(
124+
Source.EMPTY,
125+
null,
126+
null,
127+
"rkey1",
128+
new EsField("rkey1", DataType.KEYWORD, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
129+
);
130+
private static final FieldAttribute RKEY1_LONG_ATTR = new FieldAttribute(
131+
Source.EMPTY,
132+
null,
133+
null,
134+
"rkey1",
135+
new EsField("rkey1", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
136+
);
137+
private static final FieldAttribute RKEY2_KEYWORD_ATTR = new FieldAttribute(
138+
Source.EMPTY,
139+
null,
140+
null,
141+
"rkey2",
142+
new EsField("rkey2", DataType.KEYWORD, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
143+
);
144+
private static final FieldAttribute RKEY2_LONG_ATTR = new FieldAttribute(
145+
Source.EMPTY,
146+
null,
147+
null,
148+
"rkey2",
149+
new EsField("rkey2", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
150+
);
151+
private static final FieldAttribute RKEY3_KEYWORD_ATTR = new FieldAttribute(
152+
Source.EMPTY,
153+
null,
154+
null,
155+
"rkey3",
156+
new EsField("rkey3", DataType.KEYWORD, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
157+
);
158+
private static final FieldAttribute RKEY3_INTEGER_ATTR = new FieldAttribute(
159+
Source.EMPTY,
160+
null,
161+
null,
162+
"rkey3",
163+
new EsField("rkey3", DataType.INTEGER, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
164+
);
165+
98166
public void testKeywordKey() throws IOException {
99167
runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" } }), null);
100168
}
@@ -121,35 +189,38 @@ public void testJoinOnThreeKeys() throws IOException {
121189
}
122190

123191
public void testJoinOnFourKeys() throws IOException {
192+
List<DataType> keyTypes = List.of(DataType.KEYWORD, DataType.LONG, DataType.KEYWORD, DataType.INTEGER);
124193
runLookup(
125-
List.of(DataType.KEYWORD, DataType.LONG, DataType.KEYWORD, DataType.INTEGER),
194+
keyTypes,
126195
new UsingSingleLookupTable(
127196
new Object[][] {
128197
new String[] { "aa", "bb", "cc", "dd" },
129198
new Long[] { 12L, 33L, 1L, 42L },
130199
new String[] { "one", "two", "three", "four" },
131200
new Integer[] { 1, 2, 3, 4 }, }
132201
),
133-
buildGreaterThanFilter(1L)
202+
buildGreaterThanFilter(1L, keyTypes)
134203
);
135204
}
136205

137206
public void testLongKey() throws IOException {
207+
List<DataType> keyTypes = List.of(DataType.LONG);
138208
runLookup(
139-
List.of(DataType.LONG),
209+
keyTypes,
140210
new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }),
141-
buildGreaterThanFilter(0L)
211+
buildGreaterThanFilter(0L, keyTypes)
142212
);
143213
}
144214

145215
/**
146216
* LOOKUP multiple results match.
147217
*/
148218
public void testLookupIndexMultiResults() throws IOException {
219+
List<DataType> keyTypes = List.of(DataType.KEYWORD);
149220
runLookup(
150-
List.of(DataType.KEYWORD),
221+
keyTypes,
151222
new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }),
152-
buildGreaterThanFilter(-1L)
223+
buildGreaterThanFilter(-1L, keyTypes)
153224
);
154225
}
155226

@@ -239,18 +310,66 @@ public void populate(int docCount, List<String> expected, Predicate<Integer> fil
239310
}
240311
}
241312

242-
private PhysicalPlan buildGreaterThanFilter(long value) {
243-
FieldAttribute filterAttribute = new FieldAttribute(
244-
Source.EMPTY,
245-
"l",
246-
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
247-
);
248-
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
249-
EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), Map.of(), Map.of(), List.of());
313+
private PhysicalPlan buildGreaterThanFilter(long value, List<DataType> keyTypes) {
314+
Expression greaterThan = new GreaterThan(Source.EMPTY, R_FIELD_ATTR, new Literal(Source.EMPTY, value, DataType.LONG));
315+
List<Attribute> rightSideAttributes = buildRightSideAttributes(keyTypes);
316+
rightSideAttributes.add(R_FIELD_ATTR);
317+
EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), Map.of(), Map.of(), rightSideAttributes);
250318
Filter filter = new Filter(Source.EMPTY, esRelation, greaterThan);
251319
return new FragmentExec(filter);
252320
}
253321

322+
/**
323+
* Builds a rightPreJoinPlan with right-side attributes, optionally including fields referenced in a filter expression.
324+
* This ensures collectRightSideFieldNameIds can always find the right-side fields.
325+
*/
326+
private PhysicalPlan buildRightPreJoinPlan(List<DataType> keyTypes, Expression filterExpression) {
327+
List<Attribute> rightSideAttributes = buildRightSideAttributes(keyTypes);
328+
// If there's a filter expression, check if it references R_FIELD_ATTR and include it
329+
if (filterExpression != null) {
330+
Set<NameId> referencedIds = new HashSet<>();
331+
for (Attribute attr : filterExpression.references()) {
332+
referencedIds.add(attr.id());
333+
}
334+
// If R_FIELD_ATTR is referenced, add it to the EsRelation
335+
if (referencedIds.contains(R_FIELD_ATTR.id())) {
336+
rightSideAttributes.add(R_FIELD_ATTR);
337+
}
338+
}
339+
EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), Map.of(), Map.of(), rightSideAttributes);
340+
return new FragmentExec(esRelation);
341+
}
342+
343+
/**
344+
* Gets the right-side attribute for a given index and type.
345+
* This ensures consistent NameId usage across join conditions and rightPreJoinPlan.
346+
*/
347+
private FieldAttribute getRightSideAttribute(int index, DataType keyType) {
348+
return switch (index) {
349+
case 0 -> keyType == DataType.KEYWORD ? RKEY0_KEYWORD_ATTR : RKEY0_LONG_ATTR;
350+
case 1 -> keyType == DataType.KEYWORD ? RKEY1_KEYWORD_ATTR : RKEY1_LONG_ATTR;
351+
case 2 -> keyType == DataType.KEYWORD ? RKEY2_KEYWORD_ATTR : RKEY2_LONG_ATTR;
352+
case 3 -> {
353+
if (keyType == DataType.INTEGER) {
354+
yield RKEY3_INTEGER_ATTR;
355+
} else if (keyType == DataType.KEYWORD) {
356+
yield RKEY3_KEYWORD_ATTR;
357+
} else {
358+
throw new IllegalArgumentException("Unsupported key type for rkey3: " + keyType);
359+
}
360+
}
361+
default -> throw new IllegalArgumentException("Unsupported number of keys: " + index);
362+
};
363+
}
364+
365+
private List<Attribute> buildRightSideAttributes(List<DataType> keyTypes) {
366+
List<Attribute> rightSideAttributes = new ArrayList<>();
367+
for (int i = 0; i < keyTypes.size(); i++) {
368+
rightSideAttributes.add(getRightSideAttribute(i, keyTypes.get(i)));
369+
}
370+
return rightSideAttributes;
371+
}
372+
254373
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, PhysicalPlan pushedDownFilter) throws IOException {
255374
String[] fieldMappers = new String[keyTypes.size() * 2];
256375
for (int i = 0; i < keyTypes.size(); i++) {
@@ -382,11 +501,7 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
382501
"key" + i,
383502
new EsField("key" + i, keyTypes.get(0), Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
384503
);
385-
FieldAttribute rightAttr = new FieldAttribute(
386-
Source.EMPTY,
387-
"rkey" + i,
388-
new EsField("rkey" + i, keyTypes.get(i), Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
389-
);
504+
FieldAttribute rightAttr = getRightSideAttribute(i, keyTypes.get(i));
390505
joinOnConditions.add(new Equals(Source.EMPTY, leftAttr, rightAttr));
391506
// randomly decide to apply the filter as additional join on filter instead of pushed down filter
392507
boolean applyAsJoinOnCondition = EsqlCapabilities.Cap.LOOKUP_JOIN_WITH_FULL_TEXT_FUNCTION.isEnabled()
@@ -396,14 +511,15 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
396511
&& pushedDownFilter instanceof FragmentExec fragmentExec
397512
&& fragmentExec.fragment() instanceof Filter filter) {
398513
joinOnConditions.add(filter.condition());
399-
pushedDownFilter = null;
514+
pushedDownFilter = new FragmentExec(filter.child());
400515
}
401516
}
402517
}
403518
// the matchFields are shared for both types of join
404519
for (int i = 0; i < keyTypes.size(); i++) {
405520
matchFields.add(new MatchConfig("key" + i, i + 1, keyTypes.get(i)));
406521
}
522+
PhysicalPlan rightPreJoinPlan = pushedDownFilter != null ? pushedDownFilter : buildRightPreJoinPlan(keyTypes, null);
407523
LookupFromIndexOperator.Factory lookup = new LookupFromIndexOperator.Factory(
408524
matchFields,
409525
"test",
@@ -414,7 +530,7 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
414530
"lookup",
415531
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
416532
Source.EMPTY,
417-
pushedDownFilter,
533+
rightPreJoinPlan,
418534
Predicates.combineAnd(joinOnConditions)
419535
);
420536
DriverContext driverContext = driverContext();

0 commit comments

Comments
 (0)