Skip to content

Commit 00b41ed

Browse files
MatchConfig refactoring and add serialization test
1 parent 1dbe524 commit 00b41ed

File tree

8 files changed

+194
-49
lines changed

8 files changed

+194
-49
lines changed

test/framework/src/main/java/org/elasticsearch/test/AbstractWireSerializingTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ protected Writeable.Writer<T> instanceWriter() {
3737
* Copy the {@link Writeable} by round tripping it through {@linkplain StreamInput} and {@linkplain StreamOutput}.
3838
*/
3939
@Override
40-
protected final T copyInstance(T instance, TransportVersion version) throws IOException {
40+
protected T copyInstance(T instance, TransportVersion version) throws IOException {
4141
return copyInstance(instance, getNamedWriteableRegistry(), instanceWriter(), instanceReader(), version);
4242
}
4343
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.elasticsearch.xpack.esql.core.tree.Source;
6060
import org.elasticsearch.xpack.esql.core.type.DataType;
6161
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
62+
import org.elasticsearch.xpack.esql.enrich.MatchConfig;
6263
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
6364
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
6465
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -315,9 +316,9 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices)
315316
TEST_REQUEST_TIMEOUT
316317
);
317318
final String finalNodeWithShard = nodeWithShard;
318-
List<LookupFromIndexOperator.MatchConfig> matchFields = new ArrayList<>();
319+
List<MatchConfig> matchFields = new ArrayList<>();
319320
for (int i = 0; i < keyTypes.size(); i++) {
320-
matchFields.add(new LookupFromIndexOperator.MatchConfig(new FieldAttribute.FieldName("key" + i), i + 1, keyTypes.get(i)));
321+
matchFields.add(new MatchConfig(new FieldAttribute.FieldName("key" + i), i + 1, keyTypes.get(i)));
321322
}
322323
LookupFromIndexOperator.Factory lookup = new LookupFromIndexOperator.Factory(
323324
matchFields,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1313
import org.elasticsearch.common.io.stream.StreamInput;
1414
import org.elasticsearch.common.io.stream.StreamOutput;
15-
import org.elasticsearch.common.io.stream.Writeable;
1615
import org.elasticsearch.compute.data.Block;
1716
import org.elasticsearch.compute.data.Page;
1817
import org.elasticsearch.compute.operator.AsyncOperator;
@@ -24,11 +23,8 @@
2423
import org.elasticsearch.core.Releasables;
2524
import org.elasticsearch.tasks.CancellableTask;
2625
import org.elasticsearch.xcontent.XContentBuilder;
27-
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
2826
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
2927
import org.elasticsearch.xpack.esql.core.tree.Source;
30-
import org.elasticsearch.xpack.esql.core.type.DataType;
31-
import org.elasticsearch.xpack.esql.planner.Layout;
3228

3329
import java.io.IOException;
3430
import java.util.Iterator;
@@ -39,25 +35,6 @@
3935

4036
// TODO rename package
4137
public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndexOperator.OngoingJoin> {
42-
public record MatchConfig(FieldAttribute.FieldName fieldName, int channel, DataType type) implements Writeable {
43-
public MatchConfig(FieldAttribute match, Layout.ChannelAndType input) {
44-
// TODO: Using exactAttribute was supposed to handle TEXT fields with KEYWORD subfields - but we don't allow these in lookup
45-
// indices, so the call to exactAttribute looks redundant now.
46-
this(match.exactAttribute().fieldName(), input.channel(), input.type());
47-
}
48-
49-
public MatchConfig(StreamInput in) throws IOException {
50-
this(new FieldAttribute.FieldName(in.readString()), in.readInt(), DataType.readFrom(in));
51-
}
52-
53-
@Override
54-
public void writeTo(StreamOutput out) throws IOException {
55-
out.writeString(fieldName.string());
56-
out.writeInt(channel);
57-
type.writeTo(out);
58-
}
59-
60-
}
6138

6239
public record Factory(
6340
List<MatchConfig> matchFields,
@@ -76,11 +53,11 @@ public String describe() {
7653
stringBuilder.append("LookupOperator[index=").append(lookupIndex).append(" load_fields=").append(loadFields);
7754
for (MatchConfig matchField : matchFields) {
7855
stringBuilder.append(" input_type=")
79-
.append(matchField.type)
56+
.append(matchField.type())
8057
.append(" match_field=")
81-
.append(matchField.fieldName.string())
58+
.append(matchField.fieldName().string())
8259
.append(" inputChannel=")
83-
.append(matchField.channel);
60+
.append(matchField.channel());
8461
}
8562
stringBuilder.append("]");
8663
return stringBuilder.toString();
@@ -149,7 +126,7 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
149126
Block[] inputBlockArray = new Block[matchFields.size()];
150127
for (int i = 0; i < matchFields.size(); i++) {
151128
MatchConfig matchField = matchFields.get(i);
152-
int inputChannel = matchField.channel;
129+
int inputChannel = matchField.channel();
153130
final Block inputBlock = inputPage.getBlock(inputChannel);
154131
inputBlockArray[i] = inputBlock;
155132
}
@@ -214,11 +191,11 @@ public String toString() {
214191
stringBuilder.append("LookupOperator[index=").append(lookupIndex).append(" load_fields=").append(loadFields);
215192
for (MatchConfig matchField : matchFields) {
216193
stringBuilder.append(" input_type=")
217-
.append(matchField.type)
194+
.append(matchField.type())
218195
.append(" match_field=")
219-
.append(matchField.fieldName.string())
196+
.append(matchField.fieldName().string())
220197
.append(" inputChannel=")
221-
.append(matchField.channel);
198+
.append(matchField.channel());
222199
}
223200
stringBuilder.append("]");
224201
return stringBuilder.toString();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,12 @@ protected LookupEnrichQueryGenerator queryList(
102102
) {
103103
List<QueryList> queryLists = new ArrayList<>();
104104
for (int i = 0; i < request.matchFields.size(); i++) {
105-
LookupFromIndexOperator.MatchConfig matchField = request.matchFields.get(i);
105+
MatchConfig matchField = request.matchFields.get(i);
106106
QueryList q = termQueryList(
107107
context.getFieldType(matchField.fieldName().string()),
108108
context,
109109
aliasFilter,
110-
request.inputPage.getBlock(i),
110+
request.inputPage.getBlock(matchField.channel()),
111111
matchField.type()
112112
).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value");
113113
queryLists.add(q);
@@ -129,13 +129,13 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in
129129
}
130130

131131
public static class Request extends AbstractLookupService.Request {
132-
private final List<LookupFromIndexOperator.MatchConfig> matchFields;
132+
private final List<MatchConfig> matchFields;
133133

134134
Request(
135135
String sessionId,
136136
String index,
137137
String indexPattern,
138-
List<LookupFromIndexOperator.MatchConfig> matchFields,
138+
List<MatchConfig> matchFields,
139139
Page inputPage,
140140
List<NamedExpression> extractFields,
141141
Source source
@@ -146,20 +146,18 @@ public static class Request extends AbstractLookupService.Request {
146146
}
147147

148148
protected static class TransportRequest extends AbstractLookupService.TransportRequest {
149-
private final List<LookupFromIndexOperator.MatchConfig> matchFields;
149+
private final List<MatchConfig> matchFields;
150150

151151
// Right now we assume that the page contains the same number of blocks as matchFields and that the blocks are in the same order
152-
// so we are not passing any explicit channel information for now
153-
// In the future it might be possible to include channel information in the matchFields, or expressions, or both,
154-
// so that the same channel can be reused for multiple match fields or multiple times inside the same expression
152+
// The channel information inside the MatchConfig, should say the same thing
155153
TransportRequest(
156154
String sessionId,
157155
ShardId shardId,
158156
String indexPattern,
159157
Page inputPage,
160158
Page toRelease,
161159
List<NamedExpression> extractFields,
162-
List<LookupFromIndexOperator.MatchConfig> matchFields,
160+
List<MatchConfig> matchFields,
163161
Source source
164162
) {
165163
super(sessionId, shardId, indexPattern, inputPage, toRelease, extractFields, source);
@@ -190,14 +188,14 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
190188
}
191189
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
192190
List<NamedExpression> extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class);
193-
List<LookupFromIndexOperator.MatchConfig> matchFields = null;
191+
List<MatchConfig> matchFields = null;
194192
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) {
195-
matchFields = planIn.readCollectionAsList(LookupFromIndexOperator.MatchConfig::new);
193+
matchFields = planIn.readCollectionAsList(MatchConfig::new);
196194
} else {
197195
String matchField = in.readString();
198196
// For older versions, we only support a single match field.
199197
matchFields = new ArrayList<>(1);
200-
matchFields.add(new LookupFromIndexOperator.MatchConfig(new FieldAttribute.FieldName(matchField), 0, inputDataType));
198+
matchFields.add(new MatchConfig(new FieldAttribute.FieldName(matchField), 0, inputDataType));
201199
}
202200
var source = Source.EMPTY;
203201
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) {
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.enrich;
9+
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.io.stream.Writeable;
13+
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
14+
import org.elasticsearch.xpack.esql.core.type.DataType;
15+
import org.elasticsearch.xpack.esql.planner.Layout;
16+
17+
import java.io.IOException;
18+
import java.util.Objects;
19+
20+
public final class MatchConfig implements Writeable {
21+
private final FieldAttribute.FieldName fieldName;
22+
private final int channel;
23+
private final DataType type;
24+
25+
public MatchConfig(FieldAttribute.FieldName fieldName, int channel, DataType type) {
26+
this.fieldName = fieldName;
27+
this.channel = channel;
28+
this.type = type;
29+
}
30+
31+
public MatchConfig(FieldAttribute match, Layout.ChannelAndType input) {
32+
// TODO: Using exactAttribute was supposed to handle TEXT fields with KEYWORD subfields - but we don't allow these in lookup
33+
// indices, so the call to exactAttribute looks redundant now.
34+
this(match.exactAttribute().fieldName(), input.channel(), input.type());
35+
}
36+
37+
public MatchConfig(StreamInput in) throws IOException {
38+
this(new FieldAttribute.FieldName(in.readString()), in.readInt(), DataType.readFrom(in));
39+
}
40+
41+
@Override
42+
public void writeTo(StreamOutput out) throws IOException {
43+
out.writeString(fieldName.string());
44+
out.writeInt(channel);
45+
type.writeTo(out);
46+
}
47+
48+
public FieldAttribute.FieldName fieldName() {
49+
return fieldName;
50+
}
51+
52+
public int channel() {
53+
return channel;
54+
}
55+
56+
public DataType type() {
57+
return type;
58+
}
59+
60+
@Override
61+
public boolean equals(Object obj) {
62+
if (obj == this) return true;
63+
if (obj == null || obj.getClass() != this.getClass()) return false;
64+
var that = (MatchConfig) obj;
65+
return Objects.equals(this.fieldName, that.fieldName) && this.channel == that.channel && Objects.equals(this.type, that.type);
66+
}
67+
68+
@Override
69+
public int hashCode() {
70+
return Objects.hash(fieldName, channel, type);
71+
}
72+
73+
@Override
74+
public String toString() {
75+
return "MatchConfig[" + "fieldName=" + fieldName + ", " + "channel=" + channel + ", " + "type=" + type + ']';
76+
}
77+
78+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
8484
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
8585
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
86+
import org.elasticsearch.xpack.esql.enrich.MatchConfig;
8687
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
8788
import org.elasticsearch.xpack.esql.evaluator.command.GrokEvaluatorExtracter;
8889
import org.elasticsearch.xpack.esql.expression.Order;
@@ -769,15 +770,15 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
769770
if (join.leftFields().size() != join.rightFields().size()) {
770771
throw new IllegalArgumentException("can't plan [" + join + "]: mismatching left and right field count");
771772
}
772-
List<LookupFromIndexOperator.MatchConfig> matchFields = new ArrayList<>(join.leftFields().size());
773+
List<MatchConfig> matchFields = new ArrayList<>(join.leftFields().size());
773774
for (int i = 0; i < join.leftFields().size(); i++) {
774775
TypedAttribute left = (TypedAttribute) join.leftFields().get(i);
775776
FieldAttribute right = (FieldAttribute) join.rightFields().get(i);
776777
Layout.ChannelAndType input = source.layout.get(left.id());
777778
if (input == null) {
778779
throw new IllegalArgumentException("can't plan [" + join + "][" + left + "]");
779780
}
780-
matchFields.add(new LookupFromIndexOperator.MatchConfig(right, input));
781+
matchFields.add(new MatchConfig(right, input));
781782
}
782783

783784
return source.with(

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,10 @@ protected Operator.OperatorFactory simple(SimpleOptions options) {
156156
new ReferenceAttribute(Source.EMPTY, "lint", DataType.INTEGER)
157157
);
158158

159-
List<LookupFromIndexOperator.MatchConfig> matchFields = new ArrayList<>();
159+
List<MatchConfig> matchFields = new ArrayList<>();
160160
for (int i = 0; i < numberOfJoinColumns; i++) {
161161
FieldAttribute.FieldName matchField = new FieldAttribute.FieldName("match" + i);
162-
matchFields.add(new LookupFromIndexOperator.MatchConfig(matchField, i, inputDataType));
162+
matchFields.add(new MatchConfig(matchField, i, inputDataType));
163163
}
164164
return new LookupFromIndexOperator.Factory(
165165
matchFields,
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.enrich;
9+
10+
/*
11+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
12+
* or more contributor license agreements. Licensed under the "Elastic License
13+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
14+
* Public License v 1"; you may not use this file except in compliance with, at
15+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
16+
* License v3.0 only", or the "Server Side Public License, v 1".
17+
*/
18+
19+
import org.elasticsearch.TransportVersion;
20+
import org.elasticsearch.common.io.stream.Writeable;
21+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
22+
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
23+
import org.elasticsearch.xpack.esql.core.type.DataType;
24+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
25+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
26+
import org.elasticsearch.xpack.esql.session.Configuration;
27+
import org.junit.Before;
28+
29+
import java.io.IOException;
30+
31+
import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomConfiguration;
32+
33+
public class MatchConfigSerializationTests extends AbstractWireSerializingTestCase<MatchConfig> {
34+
35+
private Configuration config;
36+
37+
@Before
38+
public void initConfig() {
39+
this.config = randomConfiguration();
40+
}
41+
42+
@Override
43+
protected Writeable.Reader<MatchConfig> instanceReader() {
44+
return MatchConfig::new;
45+
}
46+
47+
@Override
48+
protected MatchConfig createTestInstance() {
49+
return randomMatchConfig();
50+
}
51+
52+
private MatchConfig randomMatchConfig() {
53+
// Implement logic to create a random MatchConfig instance
54+
String name = randomAlphaOfLengthBetween(1, 100);
55+
int channel = randomInt();
56+
DataType type = randomFrom(DataType.types());
57+
return new MatchConfig(new FieldAttribute.FieldName(name), channel, type);
58+
}
59+
60+
@Override
61+
protected MatchConfig mutateInstance(MatchConfig instance) {
62+
return mutateMatchConfig(instance);
63+
}
64+
65+
private MatchConfig mutateMatchConfig(MatchConfig instance) {
66+
int i = randomIntBetween(1, 3);
67+
return switch (i) {
68+
case 1 -> {
69+
String name = randomValueOtherThan(instance.fieldName().string(), () -> randomAlphaOfLengthBetween(1, 100));
70+
yield new MatchConfig(new FieldAttribute.FieldName(name), instance.channel(), instance.type());
71+
}
72+
case 2 -> {
73+
int channel = randomValueOtherThan(instance.channel(), () -> randomInt());
74+
yield new MatchConfig(instance.fieldName(), channel, instance.type());
75+
}
76+
default -> {
77+
DataType type = randomValueOtherThan(instance.type(), () -> randomFrom(DataType.types()));
78+
yield new MatchConfig(instance.fieldName(), instance.channel(), type);
79+
}
80+
};
81+
}
82+
83+
@Override
84+
protected MatchConfig copyInstance(MatchConfig instance, TransportVersion version) throws IOException {
85+
return copyInstance(instance, getNamedWriteableRegistry(), (out, v) -> v.writeTo(new PlanStreamOutput(out, config)), in -> {
86+
PlanStreamInput pin = new PlanStreamInput(in, in.namedWriteableRegistry(), config);
87+
return new MatchConfig(pin);
88+
}, version);
89+
}
90+
}

0 commit comments

Comments
 (0)