Skip to content

Commit 1dbe524

Browse files
Address more code review comments, part 2
1 parent 326cb82 commit 1dbe524

File tree

4 files changed

+57
-48
lines changed

4 files changed

+57
-48
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -271,20 +271,19 @@ protected void sendChildRequest(
271271
}
272272

273273
private void doLookup(T request, CancellableTask task, ActionListener<List<Page>> listener) {
274-
boolean started = false;
274+
for (int j = 0; j < request.inputPage.getBlockCount(); j++) {
275+
Block inputBlock = request.inputPage.getBlock(j);
276+
if (inputBlock.areAllValuesNull()) {
277+
List<Page> nullResponse = mergePages
278+
? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields))
279+
: List.of();
280+
listener.onResponse(nullResponse);
281+
return;
282+
}
283+
}
275284
final List<Releasable> releasables = new ArrayList<>(6);
285+
boolean started = false;
276286
try {
277-
for (int j = 0; j < request.inputPage.getBlockCount(); j++) {
278-
Block inputBlock = request.inputPage.getBlock(j);
279-
if (inputBlock.areAllValuesNull()) {
280-
List<Page> nullResponse = mergePages
281-
? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields))
282-
: List.of();
283-
listener.onResponse(nullResponse);
284-
return;
285-
}
286-
}
287-
288287
var projectState = projectResolver.getProjectState(clusterService.state());
289288
AliasFilter aliasFilter = indicesService.buildAliasFilter(
290289
projectState,
@@ -533,11 +532,6 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen
533532
final String sessionId;
534533
final ShardId shardId;
535534
final String indexPattern;
536-
/**
537-
* For mixed clusters with nodes &lt;8.14, this will be null.
538-
*/
539-
@Nullable
540-
final DataType inputDataType;
541535
final Page inputPage;
542536
final List<NamedExpression> extractFields;
543537
final Source source;
@@ -549,7 +543,6 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen
549543
String sessionId,
550544
ShardId shardId,
551545
String indexPattern,
552-
DataType inputDataType,
553546
Page inputPage,
554547
Page toRelease,
555548
List<NamedExpression> extractFields,
@@ -558,7 +551,6 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen
558551
this.sessionId = sessionId;
559552
this.shardId = shardId;
560553
this.indexPattern = indexPattern;
561-
this.inputDataType = inputDataType;
562554
this.inputPage = inputPage;
563555
this.toRelease = toRelease;
564556
this.extractFields = extractFields;
@@ -618,8 +610,6 @@ public final String toString() {
618610
+ sessionId
619611
+ " ,shard="
620612
+ shardId
621-
+ " ,input_type="
622-
+ inputDataType
623613
+ " ,extract_fields="
624614
+ extractFields
625615
+ " ,positions="

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@ public static class Request extends AbstractLookupService.Request {
191191
protected static class TransportRequest extends AbstractLookupService.TransportRequest {
192192
private final String matchType;
193193
private final String matchField;
194+
/**
195+
* For mixed clusters with nodes &lt;8.14, this will be null.
196+
*/
197+
@Nullable
198+
final DataType inputDataType;
194199

195200
TransportRequest(
196201
String sessionId,
@@ -203,9 +208,10 @@ protected static class TransportRequest extends AbstractLookupService.TransportR
203208
List<NamedExpression> extractFields,
204209
Source source
205210
) {
206-
super(sessionId, shardId, shardId.getIndexName(), inputDataType, inputPage, toRelease, extractFields, source);
211+
super(sessionId, shardId, shardId.getIndexName(), inputPage, toRelease, extractFields, source);
207212
this.matchType = matchType;
208213
this.matchField = matchField;
214+
this.inputDataType = inputDataType;
209215
}
210216

211217
static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
@@ -262,7 +268,7 @@ public void writeTo(StreamOutput out) throws IOException {
262268

263269
@Override
264270
protected String extraDescription() {
265-
return " ,match_type=" + matchType + " ,match_field=" + matchField;
271+
return " ,input_type=" + inputDataType + " ,match_type=" + matchType + " ,match_field=" + matchField;
266272
}
267273
}
268274

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,11 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
151151
MatchConfig matchField = matchFields.get(i);
152152
int inputChannel = matchField.channel;
153153
final Block inputBlock = inputPage.getBlock(inputChannel);
154-
if (i == 0) {
155-
// we only add to the totalRows once, so we can use the first block
156-
totalRows += inputBlock.getTotalValueCount();
157-
}
158154
inputBlockArray[i] = inputBlock;
159155
}
156+
// we only add to the totalRows once, so we can use the first block
157+
totalRows += inputPage.getBlock(0).getTotalValueCount();
158+
160159
LookupFromIndexService.Request request = new LookupFromIndexService.Request(
161160
sessionId,
162161
lookupIndex,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque
8484
request.sessionId,
8585
shardId,
8686
request.indexPattern,
87-
request.inputDataType,
8887
request.inputPage,
8988
null,
9089
request.extractFields,
@@ -149,18 +148,21 @@ public static class Request extends AbstractLookupService.Request {
149148
protected static class TransportRequest extends AbstractLookupService.TransportRequest {
150149
private final List<LookupFromIndexOperator.MatchConfig> matchFields;
151150

151+
// Right now we assume that the page contains the same number of blocks as matchFields and that the blocks are in the same order
152+
// so we are not passing any explicit channel information for now
153+
// In the future it might be possible to include channel information in the matchFields, or expressions, or both,
154+
// so that the same channel can be reused for multiple match fields or multiple times inside the same expression
152155
TransportRequest(
153156
String sessionId,
154157
ShardId shardId,
155158
String indexPattern,
156-
DataType inputDataType,
157159
Page inputPage,
158160
Page toRelease,
159161
List<NamedExpression> extractFields,
160162
List<LookupFromIndexOperator.MatchConfig> matchFields,
161163
Source source
162164
) {
163-
super(sessionId, shardId, indexPattern, inputDataType, inputPage, toRelease, extractFields, source);
165+
super(sessionId, shardId, indexPattern, inputPage, toRelease, extractFields, source);
164166
this.matchFields = matchFields;
165167
}
166168

@@ -177,14 +179,26 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
177179
indexPattern = shardId.getIndexName();
178180
}
179181

180-
DataType inputDataType = DataType.fromTypeName(in.readString());
182+
DataType inputDataType = null;
183+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS) == false) {
184+
inputDataType = DataType.fromTypeName(in.readString());
185+
}
186+
181187
Page inputPage;
182188
try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
183189
inputPage = new Page(bsi);
184190
}
185191
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
186192
List<NamedExpression> extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class);
187-
String matchField = in.readString();
193+
List<LookupFromIndexOperator.MatchConfig> matchFields = null;
194+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) {
195+
matchFields = planIn.readCollectionAsList(LookupFromIndexOperator.MatchConfig::new);
196+
} else {
197+
String matchField = in.readString();
198+
// For older versions, we only support a single match field.
199+
matchFields = new ArrayList<>(1);
200+
matchFields.add(new LookupFromIndexOperator.MatchConfig(new FieldAttribute.FieldName(matchField), 0, inputDataType));
201+
}
188202
var source = Source.EMPTY;
189203
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) {
190204
source = Source.readFrom(planIn);
@@ -195,19 +209,10 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
195209
String sourceText = in.readString();
196210
source = new Source(source.source(), sourceText);
197211
}
198-
List<LookupFromIndexOperator.MatchConfig> matchFields = null;
199-
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) {
200-
matchFields = planIn.readCollectionAsList(LookupFromIndexOperator.MatchConfig::new);
201-
} else {
202-
// For older versions, we only support a single match field.
203-
matchFields = new ArrayList<>(1);
204-
matchFields.add(new LookupFromIndexOperator.MatchConfig(new FieldAttribute.FieldName(matchField), 0, inputDataType));
205-
}
206212
TransportRequest result = new TransportRequest(
207213
sessionId,
208214
shardId,
209215
indexPattern,
210-
inputDataType,
211216
inputPage,
212217
inputPage,
213218
extractFields,
@@ -230,22 +235,31 @@ public void writeTo(StreamOutput out) throws IOException {
230235
} else if (indexPattern.equals(shardId.getIndexName()) == false) {
231236
throw new EsqlIllegalArgumentException("Aliases and index patterns are not allowed for LOOKUP JOIN [{}]", indexPattern);
232237
}
233-
out.writeString(inputDataType.typeName());
238+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS) == false) {
239+
// only write this for old versions
240+
// older versions only support a single match field
241+
if (matchFields.size() > 1) {
242+
throw new EsqlIllegalArgumentException("LOOKUP JOIN on multiple fields is not supported on remote node");
243+
}
244+
out.writeString(matchFields.get(0).type().typeName());
245+
}
234246
out.writeWriteable(inputPage);
235247
PlanStreamOutput planOut = new PlanStreamOutput(out, null);
236248
planOut.writeNamedWriteableCollection(extractFields);
237-
out.writeString(matchFields.get(0).fieldName().string());
249+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) {
250+
// serialize all match fields for new versions
251+
planOut.writeCollection(matchFields, (o, matchConfig) -> matchConfig.writeTo(o));
252+
} else {
253+
// older versions only support a single match field, we already checked this above when writing the datatype
254+
// send the field name of the first and only match field here
255+
out.writeString(matchFields.get(0).fieldName().string());
256+
}
238257
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) {
239258
source.writeTo(planOut);
240259
}
241260
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_SOURCE_TEXT)) {
242261
out.writeString(source.text());
243262
}
244-
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) {
245-
planOut.writeCollection(matchFields, (o, matchConfig) -> matchConfig.writeTo(o));
246-
} else if (matchFields.size() > 1) {
247-
throw new EsqlIllegalArgumentException("LOOKUP JOIN on multiple fields is not supported on remote node");
248-
}
249263
}
250264

251265
@Override

0 commit comments

Comments
 (0)