Skip to content

Commit d11201d

Browse files
Lookup Join on Multiple Columns POC WIP
1 parent 90699d3 commit d11201d

File tree

6 files changed

+137
-109
lines changed

6 files changed

+137
-109
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ static TransportVersion def(int id) {
342342
public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00);
343343
public static final TransportVersion ESQL_CATEGORIZE_OPTIONS = def(9_122_0_00);
344344
public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO_RERANK_ADDED = def(9_123_0_00);
345+
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_124_0_00);
345346

346347
/*
347348
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 64 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1313
import org.elasticsearch.common.io.stream.StreamInput;
1414
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.common.io.stream.Writeable;
1516
import org.elasticsearch.compute.data.Block;
1617
import org.elasticsearch.compute.data.Page;
1718
import org.elasticsearch.compute.operator.AsyncOperator;
@@ -27,6 +28,7 @@
2728
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
2829
import org.elasticsearch.xpack.esql.core.tree.Source;
2930
import org.elasticsearch.xpack.esql.core.type.DataType;
31+
import org.elasticsearch.xpack.esql.planner.Layout;
3032

3133
import java.io.IOException;
3234
import java.util.Iterator;
@@ -37,47 +39,64 @@
3739

3840
// TODO rename package
3941
public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndexOperator.OngoingJoin> {
42+
public record MatchConfig(FieldAttribute.FieldName fieldName, int channel, DataType type) implements Writeable {
43+
public MatchConfig(FieldAttribute match, Layout.ChannelAndType input) {
44+
// TODO: Using exactAttribute was supposed to handle TEXT fields with KEYWORD subfields - but we don't allow these in lookup
45+
// indices, so the call to exactAttribute looks redundant now.
46+
this(match.exactAttribute().fieldName(), input.channel(), input.type());
47+
}
48+
49+
public MatchConfig(StreamInput in) throws IOException {
50+
this(new FieldAttribute.FieldName(in.readString()), in.readInt(), DataType.readFrom(in));
51+
}
52+
53+
@Override
54+
public void writeTo(StreamOutput out) throws IOException {
55+
out.writeString(fieldName.string());
56+
out.writeInt(channel);
57+
type.writeTo(out);
58+
}
59+
60+
}
61+
4062
public record Factory(
63+
List<MatchConfig> matchFields,
4164
String sessionId,
4265
CancellableTask parentTask,
4366
int maxOutstandingRequests,
44-
int inputChannel,
4567
Function<DriverContext, LookupFromIndexService> lookupService,
46-
DataType inputDataType,
4768
String lookupIndexPattern,
4869
String lookupIndex,
49-
FieldAttribute.FieldName matchField,
5070
List<NamedExpression> loadFields,
5171
Source source
5272
) implements OperatorFactory {
5373
@Override
5474
public String describe() {
55-
return "LookupOperator[index="
56-
+ lookupIndex
57-
+ " input_type="
58-
+ inputDataType
59-
+ " match_field="
60-
+ matchField.string()
61-
+ " load_fields="
62-
+ loadFields
63-
+ " inputChannel="
64-
+ inputChannel
65-
+ "]";
75+
StringBuilder stringBuilder = new StringBuilder();
76+
stringBuilder.append("LookupOperator[index=").append(lookupIndex).append(" load_fields=").append(loadFields);
77+
for (MatchConfig matchField : matchFields) {
78+
stringBuilder.append(" input_type=")
79+
.append(matchField.type)
80+
.append(" match_field=")
81+
.append(matchField.fieldName.string())
82+
.append(" inputChannel=")
83+
.append(matchField.channel);
84+
}
85+
stringBuilder.append("]");
86+
return stringBuilder.toString();
6687
}
6788

6889
@Override
6990
public Operator get(DriverContext driverContext) {
7091
return new LookupFromIndexOperator(
92+
matchFields,
7193
sessionId,
7294
driverContext,
7395
parentTask,
7496
maxOutstandingRequests,
75-
inputChannel,
7697
lookupService.apply(driverContext),
77-
inputDataType,
7898
lookupIndexPattern,
7999
lookupIndex,
80-
matchField.string(),
81100
loadFields,
82101
source
83102
);
@@ -87,14 +106,12 @@ public Operator get(DriverContext driverContext) {
87106
private final LookupFromIndexService lookupService;
88107
private final String sessionId;
89108
private final CancellableTask parentTask;
90-
private final int inputChannel;
91-
private final DataType inputDataType;
92109
private final String lookupIndexPattern;
93110
private final String lookupIndex;
94-
private final String matchField;
95111
private final List<NamedExpression> loadFields;
96112
private final Source source;
97113
private long totalTerms = 0L;
114+
private List<MatchConfig> matchFields;
98115
/**
99116
* Total number of pages emitted by this {@link Operator}.
100117
*/
@@ -105,43 +122,47 @@ public Operator get(DriverContext driverContext) {
105122
private OngoingJoin ongoing = null;
106123

107124
public LookupFromIndexOperator(
125+
List<MatchConfig> matchFields,
108126
String sessionId,
109127
DriverContext driverContext,
110128
CancellableTask parentTask,
111129
int maxOutstandingRequests,
112-
int inputChannel,
113130
LookupFromIndexService lookupService,
114-
DataType inputDataType,
115131
String lookupIndexPattern,
116132
String lookupIndex,
117-
String matchField,
118133
List<NamedExpression> loadFields,
119134
Source source
120135
) {
121136
super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests);
137+
this.matchFields = matchFields;
122138
this.sessionId = sessionId;
123139
this.parentTask = parentTask;
124-
this.inputChannel = inputChannel;
125140
this.lookupService = lookupService;
126-
this.inputDataType = inputDataType;
127141
this.lookupIndexPattern = lookupIndexPattern;
128142
this.lookupIndex = lookupIndex;
129-
this.matchField = matchField;
130143
this.loadFields = loadFields;
131144
this.source = source;
132145
}
133146

134147
@Override
135148
protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener) {
136-
final Block inputBlock = inputPage.getBlock(inputChannel);
137-
totalTerms += inputBlock.getTotalValueCount();
149+
// what is happening here?
150+
// should I be getting multiple bloks, and send them using the LookupFromIndexService.Request
151+
// is the totalTerms supposed to be the total number of terms in all blocks?
152+
Block[] inputBlockArray = new Block[matchFields.size()];
153+
for (int i = 0; i < matchFields.size(); i++) {
154+
MatchConfig matchField = matchFields.get(i);
155+
int inputChannel = matchField.channel;
156+
final Block inputBlock = inputPage.getBlock(inputChannel);
157+
totalTerms += inputBlock.getTotalValueCount();
158+
inputBlockArray[i] = inputBlock;
159+
}
138160
LookupFromIndexService.Request request = new LookupFromIndexService.Request(
139161
sessionId,
140162
lookupIndex,
141163
lookupIndexPattern,
142-
inputDataType,
143-
matchField,
144-
new Page(inputBlock),
164+
matchFields,
165+
new Page(inputBlockArray),
145166
loadFields,
146167
source
147168
);
@@ -190,17 +211,18 @@ protected void releaseFetchedOnAnyThread(OngoingJoin ongoingJoin) {
190211

191212
@Override
192213
public String toString() {
193-
return "LookupOperator[index="
194-
+ lookupIndex
195-
+ " input_type="
196-
+ inputDataType
197-
+ " match_field="
198-
+ matchField
199-
+ " load_fields="
200-
+ loadFields
201-
+ " inputChannel="
202-
+ inputChannel
203-
+ "]";
214+
StringBuilder stringBuilder = new StringBuilder();
215+
stringBuilder.append("LookupOperator[index=").append(lookupIndex).append(" load_fields=").append(loadFields);
216+
for (MatchConfig matchField : matchFields) {
217+
stringBuilder.append(" input_type=")
218+
.append(matchField.type)
219+
.append(" match_field=")
220+
.append(matchField.fieldName.string())
221+
.append(" inputChannel=")
222+
.append(matchField.channel);
223+
}
224+
stringBuilder.append("]");
225+
return stringBuilder.toString();
204226
}
205227

206228
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.io.IOException;
4141
import java.util.List;
4242
import java.util.Objects;
43+
import java.util.stream.Collectors;
4344

4445
/**
4546
* {@link LookupFromIndexService} performs lookup against a Lookup index for
@@ -84,7 +85,7 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque
8485
request.inputPage,
8586
null,
8687
request.extractFields,
87-
request.matchField,
88+
request.matchFields,
8889
request.source
8990
);
9091
}
@@ -98,10 +99,14 @@ protected QueryList queryList(
9899
@Nullable DataType inputDataType,
99100
Warnings warnings
100101
) {
101-
return termQueryList(context.getFieldType(request.matchField), context, aliasFilter, inputBlock, inputDataType).onlySingleValues(
102-
warnings,
103-
"LOOKUP JOIN encountered multi-value"
104-
);
102+
// TODO: THIS NEEDS IMPLEMENTATION FOR MULTI-FIELD MATCHING
103+
return termQueryList(
104+
context.getFieldType(request.matchFields.get(0).fieldName().string()),
105+
context,
106+
aliasFilter,
107+
inputBlock,
108+
inputDataType
109+
).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value");
105110
}
106111

107112
@Override
@@ -115,25 +120,24 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in
115120
}
116121

117122
public static class Request extends AbstractLookupService.Request {
118-
private final String matchField;
123+
private final List<LookupFromIndexOperator.MatchConfig> matchFields;
119124

120125
Request(
121126
String sessionId,
122127
String index,
123128
String indexPattern,
124-
DataType inputDataType,
125-
String matchField,
129+
List<LookupFromIndexOperator.MatchConfig> matchFields,
126130
Page inputPage,
127131
List<NamedExpression> extractFields,
128132
Source source
129133
) {
130-
super(sessionId, index, indexPattern, inputDataType, inputPage, extractFields, source);
131-
this.matchField = matchField;
134+
super(sessionId, index, indexPattern, null, inputPage, extractFields, source);
135+
this.matchFields = matchFields;
132136
}
133137
}
134138

135139
protected static class TransportRequest extends AbstractLookupService.TransportRequest {
136-
private final String matchField;
140+
private final List<LookupFromIndexOperator.MatchConfig> matchFields;
137141

138142
TransportRequest(
139143
String sessionId,
@@ -143,11 +147,11 @@ protected static class TransportRequest extends AbstractLookupService.TransportR
143147
Page inputPage,
144148
Page toRelease,
145149
List<NamedExpression> extractFields,
146-
String matchField,
150+
List<LookupFromIndexOperator.MatchConfig> matchFields,
147151
Source source
148152
) {
149153
super(sessionId, shardId, indexPattern, inputDataType, inputPage, toRelease, extractFields, source);
150-
this.matchField = matchField;
154+
this.matchFields = matchFields;
151155
}
152156

153157
static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
@@ -181,6 +185,10 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
181185
String sourceText = in.readString();
182186
source = new Source(source.source(), sourceText);
183187
}
188+
List<LookupFromIndexOperator.MatchConfig> matchFields = null;
189+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) {
190+
matchFields = in.readCollectionAsList(LookupFromIndexOperator.MatchConfig::new);
191+
}
184192
TransportRequest result = new TransportRequest(
185193
sessionId,
186194
shardId,
@@ -189,7 +197,7 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
189197
inputPage,
190198
inputPage,
191199
extractFields,
192-
matchField,
200+
matchFields,
193201
source
194202
);
195203
result.setParentTask(parentTaskId);
@@ -213,18 +221,21 @@ public void writeTo(StreamOutput out) throws IOException {
213221
out.writeWriteable(inputPage);
214222
PlanStreamOutput planOut = new PlanStreamOutput(out, null);
215223
planOut.writeNamedWriteableCollection(extractFields);
216-
out.writeString(matchField);
224+
out.writeString(matchFields.get(0).fieldName().string());
217225
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) {
218226
source.writeTo(planOut);
219227
}
220228
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_SOURCE_TEXT)) {
221229
out.writeString(source.text());
222230
}
231+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) {
232+
out.writeCollection(matchFields, (o, matchConfig) -> matchConfig.writeTo(o));
233+
}
223234
}
224235

225236
@Override
226237
protected String extraDescription() {
227-
return " ,match_field=" + matchField;
238+
return " ,match_fields=" + matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", "));
228239
}
229240
}
230241

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -634,11 +634,6 @@ public PlanFactory visitJoinCommand(EsqlBaseParser.JoinCommandContext ctx) {
634634
}
635635
}
636636

637-
var matchFieldsCount = joinFields.size();
638-
if (matchFieldsCount > 1) {
639-
throw new ParsingException(source, "JOIN ON clause only supports one field at the moment, found [{}]", matchFieldsCount);
640-
}
641-
642637
return p -> {
643638
boolean hasRemotes = p.anyMatch(node -> {
644639
if (node instanceof UnresolvedRelation r) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -769,48 +769,33 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
769769
if (join.leftFields().size() != join.rightFields().size()) {
770770
throw new IllegalArgumentException("can't plan [" + join + "]: mismatching left and right field count");
771771
}
772-
List<MatchConfig> matchFields = new ArrayList<>(join.leftFields().size());
772+
List<LookupFromIndexOperator.MatchConfig> matchFields = new ArrayList<>(join.leftFields().size());
773773
for (int i = 0; i < join.leftFields().size(); i++) {
774774
TypedAttribute left = (TypedAttribute) join.leftFields().get(i);
775775
FieldAttribute right = (FieldAttribute) join.rightFields().get(i);
776776
Layout.ChannelAndType input = source.layout.get(left.id());
777777
if (input == null) {
778778
throw new IllegalArgumentException("can't plan [" + join + "][" + left + "]");
779779
}
780-
matchFields.add(new MatchConfig(right, input));
780+
matchFields.add(new LookupFromIndexOperator.MatchConfig(right, input));
781781
}
782-
if (matchFields.size() != 1) {
783-
throw new IllegalArgumentException("can't plan [" + join + "]: multiple join predicates are not supported");
784-
}
785-
// TODO support multiple match fields, and support more than equality predicates
786-
MatchConfig matchConfig = matchFields.getFirst();
787782

788783
return source.with(
789784
new LookupFromIndexOperator.Factory(
785+
matchFields,
790786
sessionId,
791787
parentTask,
792788
context.queryPragmas().enrichMaxWorkers(),
793-
matchConfig.channel(),
794789
ctx -> lookupFromIndexService,
795-
matchConfig.type(),
796790
localSourceExec.indexPattern(),
797791
indexName,
798-
matchConfig.fieldName(),
799792
join.addedFields().stream().map(f -> (NamedExpression) f).toList(),
800793
join.source()
801794
),
802795
layout
803796
);
804797
}
805798

806-
private record MatchConfig(FieldAttribute.FieldName fieldName, int channel, DataType type) {
807-
private MatchConfig(FieldAttribute match, Layout.ChannelAndType input) {
808-
// TODO: Using exactAttribute was supposed to handle TEXT fields with KEYWORD subfields - but we don't allow these in lookup
809-
// indices, so the call to exactAttribute looks redundant now.
810-
this(match.exactAttribute().fieldName(), input.channel(), input.type());
811-
}
812-
}
813-
814799
private PhysicalOperation planLocal(LocalSourceExec localSourceExec, LocalExecutionPlannerContext context) {
815800
Layout.Builder layout = new Layout.Builder();
816801
layout.append(localSourceExec.output());

0 commit comments

Comments
 (0)