Skip to content

Commit 44bdc79

Browse files
MatchConfig serialization refactor
1 parent 98c4242 commit 44bdc79

File tree

10 files changed

+317
-56
lines changed

10 files changed

+317
-56
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 132 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,100 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
163163
new EsField("rkey3", DataType.INTEGER, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
164164
);
165165

166+
// Precreate left-side key attributes (from source index) - up to 4 keys as seen in tests
167+
private static final FieldAttribute KEY0_KEYWORD_ATTR = new FieldAttribute(
168+
Source.EMPTY,
169+
null,
170+
null,
171+
"key0",
172+
new EsField("key0", DataType.KEYWORD, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
173+
);
174+
private static final FieldAttribute KEY0_LONG_ATTR = new FieldAttribute(
175+
Source.EMPTY,
176+
null,
177+
null,
178+
"key0",
179+
new EsField("key0", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
180+
);
181+
private static final FieldAttribute KEY1_KEYWORD_ATTR = new FieldAttribute(
182+
Source.EMPTY,
183+
null,
184+
null,
185+
"key1",
186+
new EsField("key1", DataType.KEYWORD, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
187+
);
188+
private static final FieldAttribute KEY1_LONG_ATTR = new FieldAttribute(
189+
Source.EMPTY,
190+
null,
191+
null,
192+
"key1",
193+
new EsField("key1", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
194+
);
195+
private static final FieldAttribute KEY2_KEYWORD_ATTR = new FieldAttribute(
196+
Source.EMPTY,
197+
null,
198+
null,
199+
"key2",
200+
new EsField("key2", DataType.KEYWORD, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
201+
);
202+
private static final FieldAttribute KEY2_LONG_ATTR = new FieldAttribute(
203+
Source.EMPTY,
204+
null,
205+
null,
206+
"key2",
207+
new EsField("key2", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
208+
);
209+
private static final FieldAttribute KEY3_KEYWORD_ATTR = new FieldAttribute(
210+
Source.EMPTY,
211+
null,
212+
null,
213+
"key3",
214+
new EsField("key3", DataType.KEYWORD, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
215+
);
216+
private static final FieldAttribute KEY3_INTEGER_ATTR = new FieldAttribute(
217+
Source.EMPTY,
218+
null,
219+
null,
220+
"key3",
221+
new EsField("key3", DataType.INTEGER, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
222+
);
223+
224+
// Index all attributes by name and type for easy lookup - built dynamically from attribute names
225+
// Key format: "name:type" (e.g., "key0:KEYWORD", "key0:LONG", "rkey0:KEYWORD", "l:LONG")
226+
private static final Map<String, FieldAttribute> ATTRIBUTES_BY_NAME_AND_TYPE = Map.ofEntries(
227+
// Left-side attributes (from source index)
228+
Map.entry(KEY0_KEYWORD_ATTR.name() + ":" + KEY0_KEYWORD_ATTR.dataType(), KEY0_KEYWORD_ATTR),
229+
Map.entry(KEY0_LONG_ATTR.name() + ":" + KEY0_LONG_ATTR.dataType(), KEY0_LONG_ATTR),
230+
Map.entry(KEY1_KEYWORD_ATTR.name() + ":" + KEY1_KEYWORD_ATTR.dataType(), KEY1_KEYWORD_ATTR),
231+
Map.entry(KEY1_LONG_ATTR.name() + ":" + KEY1_LONG_ATTR.dataType(), KEY1_LONG_ATTR),
232+
Map.entry(KEY2_KEYWORD_ATTR.name() + ":" + KEY2_KEYWORD_ATTR.dataType(), KEY2_KEYWORD_ATTR),
233+
Map.entry(KEY2_LONG_ATTR.name() + ":" + KEY2_LONG_ATTR.dataType(), KEY2_LONG_ATTR),
234+
Map.entry(KEY3_KEYWORD_ATTR.name() + ":" + KEY3_KEYWORD_ATTR.dataType(), KEY3_KEYWORD_ATTR),
235+
Map.entry(KEY3_INTEGER_ATTR.name() + ":" + KEY3_INTEGER_ATTR.dataType(), KEY3_INTEGER_ATTR),
236+
// Right-side attributes (from lookup index)
237+
Map.entry(RKEY0_KEYWORD_ATTR.name() + ":" + RKEY0_KEYWORD_ATTR.dataType(), RKEY0_KEYWORD_ATTR),
238+
Map.entry(RKEY0_LONG_ATTR.name() + ":" + RKEY0_LONG_ATTR.dataType(), RKEY0_LONG_ATTR),
239+
Map.entry(RKEY1_KEYWORD_ATTR.name() + ":" + RKEY1_KEYWORD_ATTR.dataType(), RKEY1_KEYWORD_ATTR),
240+
Map.entry(RKEY1_LONG_ATTR.name() + ":" + RKEY1_LONG_ATTR.dataType(), RKEY1_LONG_ATTR),
241+
Map.entry(RKEY2_KEYWORD_ATTR.name() + ":" + RKEY2_KEYWORD_ATTR.dataType(), RKEY2_KEYWORD_ATTR),
242+
Map.entry(RKEY2_LONG_ATTR.name() + ":" + RKEY2_LONG_ATTR.dataType(), RKEY2_LONG_ATTR),
243+
Map.entry(RKEY3_KEYWORD_ATTR.name() + ":" + RKEY3_KEYWORD_ATTR.dataType(), RKEY3_KEYWORD_ATTR),
244+
Map.entry(RKEY3_INTEGER_ATTR.name() + ":" + RKEY3_INTEGER_ATTR.dataType(), RKEY3_INTEGER_ATTR),
245+
Map.entry(R_FIELD_ATTR.name() + ":" + R_FIELD_ATTR.dataType(), R_FIELD_ATTR)
246+
);
247+
248+
/**
249+
* Gets a FieldAttribute by name and type. Throws IllegalArgumentException if not found.
250+
*/
251+
private static FieldAttribute getAttribute(String name, DataType type) {
252+
String key = name + ":" + type;
253+
FieldAttribute attr = ATTRIBUTES_BY_NAME_AND_TYPE.get(key);
254+
if (attr == null) {
255+
throw new IllegalArgumentException("Attribute not found: " + key);
256+
}
257+
return attr;
258+
}
259+
166260
public void testKeywordKey() throws IOException {
167261
runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" } }), null);
168262
}
@@ -340,26 +434,40 @@ private PhysicalPlan buildRightPreJoinPlan(List<DataType> keyTypes, Expression f
340434
return new FragmentExec(esRelation);
341435
}
342436

437+
/**
438+
* Gets the left-side attribute for a given index and type.
439+
* This ensures consistent NameId usage across MatchConfig and join conditions.
440+
*/
441+
private FieldAttribute getLeftSideAttribute(int index, DataType keyType) {
442+
String name = "key" + index;
443+
// Handle special case for key3 which can be INTEGER or KEYWORD
444+
if (index == 3 && keyType == DataType.INTEGER) {
445+
return getAttribute(name, DataType.INTEGER);
446+
}
447+
// Default to KEYWORD or LONG based on keyType
448+
if (keyType == DataType.KEYWORD) {
449+
return getAttribute(name, DataType.KEYWORD);
450+
} else {
451+
return getAttribute(name, DataType.LONG);
452+
}
453+
}
454+
343455
/**
344456
* Gets the right-side attribute for a given index and type.
345457
* This ensures consistent NameId usage across join conditions and rightPreJoinPlan.
346458
*/
347459
private FieldAttribute getRightSideAttribute(int index, DataType keyType) {
348-
return switch (index) {
349-
case 0 -> keyType == DataType.KEYWORD ? RKEY0_KEYWORD_ATTR : RKEY0_LONG_ATTR;
350-
case 1 -> keyType == DataType.KEYWORD ? RKEY1_KEYWORD_ATTR : RKEY1_LONG_ATTR;
351-
case 2 -> keyType == DataType.KEYWORD ? RKEY2_KEYWORD_ATTR : RKEY2_LONG_ATTR;
352-
case 3 -> {
353-
if (keyType == DataType.INTEGER) {
354-
yield RKEY3_INTEGER_ATTR;
355-
} else if (keyType == DataType.KEYWORD) {
356-
yield RKEY3_KEYWORD_ATTR;
357-
} else {
358-
throw new IllegalArgumentException("Unsupported key type for rkey3: " + keyType);
359-
}
360-
}
361-
default -> throw new IllegalArgumentException("Unsupported number of keys: " + index);
362-
};
460+
String name = "rkey" + index;
461+
// Handle special case for rkey3 which can be INTEGER or KEYWORD
462+
if (index == 3 && keyType == DataType.INTEGER) {
463+
return getAttribute(name, DataType.INTEGER);
464+
}
465+
// Default to KEYWORD or LONG based on keyType
466+
if (keyType == DataType.KEYWORD) {
467+
return getAttribute(name, DataType.KEYWORD);
468+
} else {
469+
return getAttribute(name, DataType.LONG);
470+
}
363471
}
364472

365473
private List<Attribute> buildRightSideAttributes(List<DataType> keyTypes) {
@@ -491,22 +599,19 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
491599
TEST_REQUEST_TIMEOUT
492600
);
493601
final String finalNodeWithShard = nodeWithShard;
494-
boolean expressionJoin = EsqlCapabilities.Cap.LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION.isEnabled() ? randomBoolean() : false;
602+
boolean expressionJoin = true;// EsqlCapabilities.Cap.LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION.isEnabled() ? randomBoolean() : false;
495603
List<MatchConfig> matchFields = new ArrayList<>();
496604
List<Expression> joinOnConditions = new ArrayList<>();
497605
if (expressionJoin) {
498606
for (int i = 0; i < keyTypes.size(); i++) {
499-
FieldAttribute leftAttr = new FieldAttribute(
500-
Source.EMPTY,
501-
"key" + i,
502-
new EsField("key" + i, keyTypes.get(0), Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
503-
);
607+
// Use precreated static attributes to ensure NameId consistency
608+
FieldAttribute leftAttr = getLeftSideAttribute(i, keyTypes.get(i));
504609
FieldAttribute rightAttr = getRightSideAttribute(i, keyTypes.get(i));
505610
joinOnConditions.add(new Equals(Source.EMPTY, leftAttr, rightAttr));
506611
// randomly decide to apply the filter as additional join on filter instead of pushed down filter
507-
boolean applyAsJoinOnCondition = EsqlCapabilities.Cap.LOOKUP_JOIN_WITH_FULL_TEXT_FUNCTION.isEnabled()
508-
? randomBoolean()
509-
: false;
612+
boolean applyAsJoinOnCondition = true;// EsqlCapabilities.Cap.LOOKUP_JOIN_WITH_FULL_TEXT_FUNCTION.isEnabled()
613+
// ? randomBoolean()
614+
// : false;
510615
if (applyAsJoinOnCondition
511616
&& pushedDownFilter instanceof FragmentExec fragmentExec
512617
&& fragmentExec.fragment() instanceof Filter filter) {
@@ -516,8 +621,10 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
516621
}
517622
}
518623
// the matchFields are shared for both types of join
624+
// Use precreated static attributes to ensure NameId consistency with join conditions
519625
for (int i = 0; i < keyTypes.size(); i++) {
520-
matchFields.add(new MatchConfig("key" + i, i + 1, keyTypes.get(i)));
626+
FieldAttribute keyAttr = getLeftSideAttribute(i, keyTypes.get(i));
627+
matchFields.add(new MatchConfig(keyAttr, i + 1, keyTypes.get(i)));
521628
}
522629
PhysicalPlan rightPreJoinPlan = pushedDownFilter != null ? pushedDownFilter : buildRightPreJoinPlan(keyTypes, null);
523630
LookupFromIndexOperator.Factory lookup = new LookupFromIndexOperator.Factory(

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinGeneralExpressionIT.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
5252
private void ensureIndicesAndData() {
5353
assumeTrue(
5454
"LOOKUP JOIN ON general expressions requires capability",
55-
EsqlCapabilities.Cap.LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION.isEnabled()
55+
EsqlCapabilities.Cap.LOOKUP_JOIN_WITH_GENERAL_EXPRESSION.isEnabled()
5656
);
5757
if (indexExists(MAIN_INDEX) == false) {
5858
createIndices();
@@ -63,7 +63,7 @@ private void ensureIndicesAndData() {
6363
private void ensureMultiColumnIndicesAndData() {
6464
assumeTrue(
6565
"LOOKUP JOIN ON general expressions requires capability",
66-
EsqlCapabilities.Cap.LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION.isEnabled()
66+
EsqlCapabilities.Cap.LOOKUP_JOIN_WITH_GENERAL_EXPRESSION.isEnabled()
6767
);
6868
if (indexExists(MULTI_COL_MAIN_INDEX) == false) {
6969
loadMultiColumnDataFromCsv();
@@ -1004,4 +1004,44 @@ public void testComplexExpressionRelatingLeftAndRightMultiColumn() {
10041004
);
10051005
}
10061006
}
1007+
1008+
public void testLookupJoinWithLikeCondition() {
1009+
ensureMultiColumnIndicesAndData();
1010+
// Test LOOKUP JOIN with LIKE condition in the ON clause
1011+
// Matches rows where other1 ends with "ta" (pattern "*ta")
1012+
String query = String.format(Locale.ROOT, """
1013+
FROM %s
1014+
| RENAME id_int AS id_left, is_active_bool AS is_active_left
1015+
| LOOKUP JOIN %s ON id_int == id_left AND is_active_left == is_active_bool AND other1 LIKE "*ta"
1016+
| KEEP id_left, name_str, extra1, other1, other2
1017+
| SORT id_left, name_str, extra1, other1, other2
1018+
""", MULTI_COL_MAIN_INDEX, MULTI_COL_LOOKUP_INDEX);
1019+
1020+
try (EsqlQueryResponse response = runQuery(query)) {
1021+
List<List<Object>> values = getValuesList(response);
1022+
// Expected results based on CSV spec: rows where other1 matches "*ta" pattern
1023+
// Note: Some rows may have null values when the LIKE condition doesn't match
1024+
verifyResults(
1025+
values,
1026+
new Object[] { 1, "Alice", "foo", "beta", 2000 }, // other1="beta" ends with "ta"
1027+
new Object[] { List.of(1, 19, 21), null, "zyx", null, null }, // no match for multi-value id
1028+
new Object[] { 2, null, "bar", null, null }, // no match (other1 doesn't end with "ta")
1029+
new Object[] { 3, "Charlie", "baz", "delta", 4000 }, // other1="delta" ends with "ta"
1030+
new Object[] { 4, "David", "qux", "zeta", 6000 }, // other1="zeta" ends with "ta"
1031+
new Object[] { 5, "Eve", "quux", "eta", 7000 }, // other1="eta" ends with "ta"
1032+
new Object[] { 5, "Eve", "quux", "theta", 8000 }, // other1="theta" ends with "ta"
1033+
new Object[] { 6, null, "corge", "iota", 9000 }, // other1="iota" ends with "ta"
1034+
new Object[] { 7, null, "grault", null, null }, // no match
1035+
new Object[] { 8, null, "garply", null, null }, // no match
1036+
new Object[] { 9, null, "waldo", null, null }, // no match
1037+
new Object[] { 10, null, "fred", null, null }, // no match
1038+
new Object[] { 12, null, "xyzzy", null, null }, // no match
1039+
new Object[] { 13, null, "thud", null, null }, // no match
1040+
new Object[] { 14, null, "foo2", null, null }, // no match
1041+
new Object[] { 15, null, "bar2", null, null }, // no match
1042+
new Object[] { List.of(17, 18), null, "xyz", null, null }, // no match for multi-value id
1043+
new Object[] { null, null, "plugh", null, null } // no match for null id
1044+
);
1045+
}
1046+
}
10071047
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,7 @@ private List<MatchConfig> collectLeftSideFieldsToBroadcast(LookupEnrichQueryGene
696696
// we do match by just name
697697
// we made sure the same attribute is not on the right side with the checks above
698698
for (MatchConfig matchField : lookupRequest.getMatchFields()) {
699-
if (attr.name().equals(matchField.fieldName())) {
699+
if (attr.equals(matchField.fieldName())) {
700700
builder.append(attr);
701701
allLeftFieldsToBroadcast.add(matchField);
702702
addedNameIds.add(nameId);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,10 +313,11 @@ private boolean applyAsLeftRightBinaryComparison(
313313
// The right side is the field from the lookup index
314314
// Check if the left side is in the matchFields
315315
// If it is its corresponding page is the corresponding number in inputPage
316+
// Compare by attribute (equals compares by NameId) to ensure we match the same attribute instance
316317
Block block = null;
317318
DataType dataType = null;
318319
for (int i = 0; i < matchFields.size(); i++) {
319-
if (matchFields.get(i).fieldName().equals(leftAttribute.name())) {
320+
if (matchFields.get(i).fieldName().equals(leftAttribute)) {
320321
block = inputPage.getBlock(i);
321322
dataType = matchFields.get(i).type();
322323
break;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ private List<MatchConfig> uniqueMatchFieldsByName(List<MatchConfig> matchFields)
184184
return matchFields;
185185
}
186186
List<MatchConfig> uniqueFields = new ArrayList<>();
187-
Set<String> seenFieldNames = new HashSet<>();
187+
Set<NamedExpression> seenFieldNames = new HashSet<>();
188188
for (MatchConfig matchField : matchFields) {
189189
if (seenFieldNames.add(matchField.fieldName())) {
190190
uniqueFields.add(matchField);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@
3434
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
3535
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
3636
import org.elasticsearch.xpack.esql.core.expression.Expression;
37+
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
3738
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
3839
import org.elasticsearch.xpack.esql.core.tree.Source;
3940
import org.elasticsearch.xpack.esql.core.type.DataType;
41+
import org.elasticsearch.xpack.esql.core.type.EsField;
4042
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
4143
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
4244
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
@@ -47,6 +49,7 @@
4749
import java.io.IOException;
4850
import java.util.ArrayList;
4951
import java.util.List;
52+
import java.util.Map;
5053
import java.util.Objects;
5154
import java.util.stream.Collectors;
5255

@@ -118,7 +121,7 @@ protected LookupEnrichQueryGenerator queryList(
118121
for (int i = 0; i < request.matchFields.size(); i++) {
119122
MatchConfig matchField = request.matchFields.get(i);
120123
QueryList q = termQueryList(
121-
context.getFieldType(matchField.fieldName()),
124+
context.getFieldType(matchField.fieldName().name()),
122125
context,
123126
aliasFilter,
124127
request.inputPage.getBlock(matchField.channel()),
@@ -244,9 +247,17 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
244247
matchFields = planIn.readCollectionAsList(MatchConfig::new);
245248
} else {
246249
String matchField = in.readString();
250+
FieldAttribute fieldName = new FieldAttribute(
251+
Source.EMPTY,
252+
null,
253+
null,
254+
matchField,
255+
new EsField(matchField, inputDataType, Map.of(), true, EsField.TimeSeriesFieldType.NONE)
256+
);
257+
;
247258
// For older versions, we only support a single match field.
248259
matchFields = new ArrayList<>(1);
249-
matchFields.add(new MatchConfig(matchField, 0, inputDataType));
260+
matchFields.add(new MatchConfig(fieldName, 0, inputDataType));
250261
}
251262
var source = Source.readFrom(planIn);
252263
// Source.readFrom() requires the query from the Configuration passed to PlanStreamInput.
@@ -319,7 +330,7 @@ public void writeTo(StreamOutput out) throws IOException {
319330
} else {
320331
// older versions only support a single match field, we already checked this above when writing the datatype
321332
// send the field name of the first and only match field here
322-
out.writeString(matchFields.get(0).fieldName());
333+
out.writeString(matchFields.get(0).fieldName().name());
323334
}
324335
source.writeTo(planOut);
325336
if (out.getTransportVersion().supports(ESQL_LOOKUP_JOIN_SOURCE_TEXT)) {
@@ -340,7 +351,7 @@ public void writeTo(StreamOutput out) throws IOException {
340351
@Override
341352
protected String extraDescription() {
342353
return " ,match_fields="
343-
+ matchFields.stream().map(MatchConfig::fieldName).collect(Collectors.joining(", "))
354+
+ matchFields.stream().map(x -> x.fieldName().name()).collect(Collectors.joining(", "))
344355
+ ", right_pre_join_plan="
345356
+ (rightPreJoinPlan == null ? "null" : rightPreJoinPlan.toString());
346357
}

0 commit comments

Comments
 (0)