Skip to content

Commit 4db37fd

Browse files
Looking join on multiple fields WIP
1 parent 2c90817 commit 4db37fd

File tree

10 files changed

+193
-40
lines changed

10 files changed

+193
-40
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
*/
3838
public final class EnrichQuerySourceOperator extends SourceOperator {
3939
private final BlockFactory blockFactory;
40-
private final QueryList queryList;
40+
private final LookupEnrichQueryGenerator queryList;
4141
private int queryPosition = -1;
4242
private final ShardContext shardContext;
4343
private final IndexReader indexReader;
@@ -51,7 +51,7 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
5151
public EnrichQuerySourceOperator(
5252
BlockFactory blockFactory,
5353
int maxPageSize,
54-
QueryList queryList,
54+
LookupEnrichQueryGenerator queryList,
5555
ShardContext shardContext,
5656
Warnings warnings
5757
) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator.lookup;
9+
10+
import org.apache.lucene.search.BooleanClause;
11+
import org.apache.lucene.search.BooleanQuery;
12+
import org.apache.lucene.search.Query;
13+
14+
import java.util.List;
15+
16+
public class ExpressionQueryList implements LookupEnrichQueryGenerator {
17+
List<QueryList> queryLists;
18+
19+
public ExpressionQueryList(List<QueryList> queryLists) {
20+
if (queryLists.size() < 2) {
21+
throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists");
22+
}
23+
this.queryLists = queryLists;
24+
}
25+
26+
@Override
27+
public Query getQuery(int position) {
28+
// JULIAN: Do we have a guarantee that the positions will match across all QueryLists?
29+
// for now we only support AND of the queries in the lists
30+
BooleanQuery.Builder builder = new BooleanQuery.Builder();
31+
for (QueryList queryList : queryLists) {
32+
Query q = queryList.getQuery(position);
33+
builder.add(q, BooleanClause.Occur.MUST);
34+
35+
}
36+
return builder.build();
37+
}
38+
39+
@Override
40+
public int getPositionCount() {
41+
int positionCount = queryLists.get(0).getPositionCount();
42+
for (QueryList queryList : queryLists) {
43+
if (queryList.getPositionCount() != positionCount) {
44+
throw new IllegalArgumentException(
45+
"All QueryLists must have the same position count, expected: "
46+
+ positionCount
47+
+ ", but got: "
48+
+ queryList.getPositionCount()
49+
);
50+
}
51+
}
52+
return queryLists.get(0).getPositionCount();
53+
}
54+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator.lookup;
9+
10+
import org.apache.lucene.search.Query;
11+
import org.elasticsearch.core.Nullable;
12+
13+
public interface LookupEnrichQueryGenerator {
14+
15+
/**
16+
* Returns the query at the given position.
17+
*/
18+
@Nullable
19+
Query getQuery(int position);
20+
21+
int getPositionCount();
22+
23+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
/**
5050
* Generates a list of Lucene queries based on the input block.
5151
*/
52-
public abstract class QueryList {
52+
public abstract class QueryList implements LookupEnrichQueryGenerator {
5353
protected final SearchExecutionContext searchExecutionContext;
5454
protected final AliasFilter aliasFilter;
5555
protected final MappedFieldType field;
@@ -74,7 +74,7 @@ protected QueryList(
7474
/**
7575
* Returns the number of positions in this query list
7676
*/
77-
int getPositionCount() {
77+
public int getPositionCount() {
7878
return block.getPositionCount();
7979
}
8080

@@ -87,7 +87,7 @@ int getPositionCount() {
8787
*/
8888
public abstract QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage);
8989

90-
final Query getQuery(int position) {
90+
public final Query getQuery(int position) {
9191
final int valueCount = block.getValueCount(position);
9292
if (onlySingleValueParams != null && valueCount != 1) {
9393
if (valueCount > 1) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.compute.operator.ProjectOperator;
4444
import org.elasticsearch.compute.operator.Warnings;
4545
import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator;
46+
import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator;
4647
import org.elasticsearch.compute.operator.lookup.MergePositionsOperator;
4748
import org.elasticsearch.compute.operator.lookup.QueryList;
4849
import org.elasticsearch.core.AbstractRefCounted;
@@ -191,7 +192,7 @@ public ThreadContext getThreadContext() {
191192
/**
192193
* Build a list of queries to perform inside the actual lookup.
193194
*/
194-
protected abstract QueryList queryList(
195+
protected abstract LookupEnrichQueryGenerator queryList(
195196
T request,
196197
SearchExecutionContext context,
197198
AliasFilter aliasFilter,
@@ -271,17 +272,20 @@ protected void sendChildRequest(
271272
}
272273

273274
private void doLookup(T request, CancellableTask task, ActionListener<List<Page>> listener) {
274-
Block inputBlock = request.inputPage.getBlock(0);
275-
if (inputBlock.areAllValuesNull()) {
276-
List<Page> nullResponse = mergePages
277-
? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields))
278-
: List.of();
279-
listener.onResponse(nullResponse);
280-
return;
281-
}
282-
final List<Releasable> releasables = new ArrayList<>(6);
283275
boolean started = false;
276+
final List<Releasable> releasables = new ArrayList<>(6);
284277
try {
278+
for (int j = 0; j < request.inputPage.getBlockCount(); j++) {
279+
Block inputBlock = request.inputPage.getBlock(j);
280+
if (inputBlock.areAllValuesNull()) {
281+
List<Page> nullResponse = mergePages
282+
? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields))
283+
: List.of();
284+
listener.onResponse(nullResponse);
285+
return;
286+
}
287+
}
288+
285289
var projectState = projectResolver.getProjectState(clusterService.state());
286290
AliasFilter aliasFilter = indicesService.buildAliasFilter(
287291
projectState,
@@ -305,6 +309,8 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
305309
final int[] mergingChannels = IntStream.range(0, request.extractFields.size()).map(i -> i + 2).toArray();
306310
final Operator finishPages;
307311
final OrdinalBytesRefBlock ordinalsBytesRefBlock;
312+
// JULIAN: SHOULD WE DO THE NEXT CODE FOR EACH BLOCK IN THE PAGE?
313+
Block inputBlock = request.inputPage.getBlock(0);
308314
if (mergePages // TODO fix this optimization for Lookup.
309315
&& inputBlock instanceof BytesRefBlock bytesRefBlock
310316
&& (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) {
@@ -334,7 +340,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
334340
request.source.source().getColumnNumber(),
335341
request.source.text()
336342
);
337-
QueryList queryList = queryList(
343+
LookupEnrichQueryGenerator queryList = queryList(
338344
request,
339345
shardContext.executionContext,
340346
aliasFilter,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.compute.data.BlockStreamInput;
2626
import org.elasticsearch.compute.data.Page;
2727
import org.elasticsearch.compute.operator.Warnings;
28+
import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator;
2829
import org.elasticsearch.compute.operator.lookup.QueryList;
2930
import org.elasticsearch.core.Nullable;
3031
import org.elasticsearch.core.Releasables;
@@ -110,7 +111,7 @@ protected TransportRequest transportRequest(EnrichLookupService.Request request,
110111
}
111112

112113
@Override
113-
protected QueryList queryList(
114+
protected LookupEnrichQueryGenerator queryList(
114115
TransportRequest request,
115116
SearchExecutionContext context,
116117
AliasFilter aliasFilter,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,8 @@ public LookupFromIndexOperator(
146146

147147
@Override
148148
protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener) {
149-
// what is happening here?
150-
// should I be getting multiple bloks, and send them using the LookupFromIndexService.Request
151-
// is the totalTerms supposed to be the total number of terms in all blocks?
149+
// JULIAN: should I be getting multiple blocks, and send them using the LookupFromIndexService.Request
150+
// is the totalTerms supposed to be the total number of terms in all blocks combined?
152151
Block[] inputBlockArray = new Block[matchFields.size()];
153152
for (int i = 0; i < matchFields.size(); i++) {
154153
MatchConfig matchField = matchFields.get(i);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.elasticsearch.compute.data.BlockStreamInput;
2121
import org.elasticsearch.compute.data.Page;
2222
import org.elasticsearch.compute.operator.Warnings;
23+
import org.elasticsearch.compute.operator.lookup.ExpressionQueryList;
24+
import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator;
2325
import org.elasticsearch.compute.operator.lookup.QueryList;
2426
import org.elasticsearch.core.Nullable;
2527
import org.elasticsearch.core.Releasables;
@@ -38,6 +40,7 @@
3840
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
3941

4042
import java.io.IOException;
43+
import java.util.ArrayList;
4144
import java.util.List;
4245
import java.util.Objects;
4346
import java.util.stream.Collectors;
@@ -91,22 +94,36 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque
9194
}
9295

9396
@Override
94-
protected QueryList queryList(
97+
protected LookupEnrichQueryGenerator queryList(
9598
TransportRequest request,
9699
SearchExecutionContext context,
97100
AliasFilter aliasFilter,
98101
Block inputBlock,
99102
@Nullable DataType inputDataType,
100103
Warnings warnings
101104
) {
102-
// TODO: THIS NEEDS IMPLEMENTATION FOR MULTI-FIELD MATCHING
103-
return termQueryList(
104-
context.getFieldType(request.matchFields.get(0).fieldName().string()),
105-
context,
106-
aliasFilter,
107-
inputBlock,
108-
inputDataType
109-
).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value");
105+
if (request.matchFields.size() == 1) {
106+
return termQueryList(
107+
context.getFieldType(request.matchFields.get(0).fieldName().string()),
108+
context,
109+
aliasFilter,
110+
inputBlock,
111+
inputDataType
112+
).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value");
113+
}
114+
List<QueryList> queryLists = new ArrayList<>();
115+
for (int i = 0; i < request.matchFields.size(); i++) {
116+
LookupFromIndexOperator.MatchConfig matchField = request.matchFields.get(i);
117+
QueryList q = termQueryList(
118+
context.getFieldType(matchField.fieldName().string()),
119+
context,
120+
aliasFilter,
121+
request.inputPage.getBlock(i),
122+
matchField.type()
123+
).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value");
124+
queryLists.add(q);
125+
}
126+
return new ExpressionQueryList(queryLists);
110127
}
111128

112129
@Override

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,16 +142,17 @@ protected Operator.OperatorFactory simple(SimpleOptions options) {
142142
new ReferenceAttribute(Source.EMPTY, "lkwd", DataType.KEYWORD),
143143
new ReferenceAttribute(Source.EMPTY, "lint", DataType.INTEGER)
144144
);
145+
List<LookupFromIndexOperator.MatchConfig> matchFields = List.of(
146+
new LookupFromIndexOperator.MatchConfig(matchField, inputChannel, inputDataType)
147+
);
145148
return new LookupFromIndexOperator.Factory(
149+
matchFields,
146150
sessionId,
147151
parentTask,
148152
maxOutstandingRequests,
149-
inputChannel,
150153
this::lookupService,
151-
inputDataType,
152154
lookupIndex,
153155
lookupIndex,
154-
matchField,
155156
loadFields,
156157
Source.EMPTY
157158
);
@@ -165,7 +166,7 @@ protected Matcher<String> expectedDescriptionOfSimple() {
165166
@Override
166167
protected Matcher<String> expectedToStringOfSimple() {
167168
return matchesPattern(
168-
"LookupOperator\\[index=idx input_type=LONG match_field=match load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+] inputChannel=0]"
169+
"LookupOperator\\[index=idx load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+] input_type=LONG match_field=match inputChannel=0]"
169170
);
170171
}
171172

0 commit comments

Comments
 (0)