From 2eb3d30b9f79290820a9c7d38127c34dea2d45f6 Mon Sep 17 00:00:00 2001
From: Julian Kiryakov
Date: Wed, 22 Oct 2025 14:55:04 -0400
Subject: [PATCH 01/11] POC BulkKeywordQueryList
(cherry picked from commit 9b73572ac21a0a26f67e28453b874a5dc381d279)
# Conflicts:
# x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java
---
.../operator/lookup/BulkKeywordQueryList.java | 101 ++++++++++++++++++
.../lookup/EnrichQuerySourceOperator.java | 15 +++
.../lookup/LookupEnrichQueryGenerator.java | 7 ++
.../esql/enrich/ExpressionQueryList.java | 63 +++++++++++
4 files changed, 186 insertions(+)
create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkKeywordQueryList.java
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkKeywordQueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkKeywordQueryList.java
new file mode 100644
index 0000000000000..9d966c4bfefc3
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkKeywordQueryList.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator.lookup;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.PostingsEnum;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.operator.Warnings;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.search.internal.AliasFilter;
+
+import java.util.function.IntFunction;
+
+public class BulkKeywordQueryList {
+ private final MappedFieldType rightFieldType;
+ private final SearchExecutionContext context;
+ private final BytesRefBlock block;
+ private final ClusterService clusterService;
+ private final AliasFilter aliasFilter;
+ private final Warnings warnings;
+ private final String fieldName;
+ private final IntFunction
*
* The join process spawns a {@link Driver} per incoming page which runs in
- * two or three stages:
+ * two, three or four stages:
*
*
* Stage 1: Finding matching document IDs for the input page. This stage is done
@@ -114,7 +117,11 @@
* {@code [DocVector, IntBlock: positions, Block: field1, Block: field2,...]}.
*
*
- * Stage 3: Optionally this combines the extracted values based on positions and filling
+ * Stage 3: Optionally the BulkLookupMvFilterOperator removes false-positive
+ * multivalue matches when the {@link BulkKeywordLookup} optimization.
+ *
+ *
+ * Stage 4: Optionally this combines the extracted values based on positions and filling
* nulls for positions without matches. This is done by {@link MergePositionsOperator}.
* The output page is represented as {@code [Block: field1, Block: field2,...]}.
*
@@ -345,6 +352,8 @@ protected void doLookup(T request, CancellableTask task, ActionListener operators = new ArrayList<>();
if (request.extractFields.isEmpty() == false) {
var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields);
releasables.add(extractFieldsOperator);
operators.add(extractFieldsOperator);
}
+
+ // Stage 3 - 137269
+ Operator bulkLookupMvFilterOperator = bulkLookupMvFilterOperator(queryList, driverContext, warnings);
+ if (bulkLookupMvFilterOperator != null) {
+ operators.add(bulkLookupMvFilterOperator);
+ }
+
+ // Stage 4
operators.add(finishPages);
/*
@@ -493,6 +511,30 @@ private Operator dropDocBlockOperator(List extractFields) {
return new ProjectOperator(projection);
}
+ /**
+ * Returns an operator to remove false-positive multivalue matches from
+ * BulkKeywordLookup or null when that optimization is not used.
+ */
+ private static Operator bulkLookupMvFilterOperator(LookupEnrichQueryGenerator queryList, DriverContext driverContext, Warnings warnings)
+ {
+ final BulkKeywordLookup bulkLookup = queryList.getBulkKeywordLookup();
+ if (bulkLookup != null) {
+
+ // at this point the output page [DocVector, IntBlock: positions, Block: field1, Block: field2,...]
+ // get the channel ignoreing the DocVector and IntBlock
+ //
+ final int channelOffset = 2 + bulkLookup.getExtractChannelOffset();
+ return new FilterOperator(
+ new BulkLookupSingleValued(
+ driverContext,
+ channelOffset,
+ warnings
+ )
+ );
+ }
+ return null;
+ }
+
protected Page createNullResponse(int positionCount, List extractFields) {
final Block[] blocks = new Block[extractFields.size()];
try {
From 993c37d0c98b7b409d9324e1c9335bebda14e6c0 Mon Sep 17 00:00:00 2001
From: cimequinox
Date: Mon, 9 Mar 2026 07:49:57 -0700
Subject: [PATCH 10/11] Update csv tests
Add near-duplicate row with multivalues for testing
---
.../data/languages_non_unique_key.csv | 1 +
.../src/main/resources/inlinestats.csv-spec | 3 +-
.../src/main/resources/lookup-join.csv-spec | 35 +++++++++++++++++++
.../src/main/resources/views.csv-spec | 6 ++--
4 files changed, 42 insertions(+), 3 deletions(-)
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/languages_non_unique_key.csv b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/languages_non_unique_key.csv
index d6381b174d739..6d4d34c4d84ea 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/languages_non_unique_key.csv
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/languages_non_unique_key.csv
@@ -4,6 +4,7 @@ language_code:integer,language_name:keyword,country:keyword
1,,United Kingdom
1,English,United States of America
2,German,[Germany,Austria]
+2,German,[Germany,Belgium]
2,German,Switzerland
2,German,
4,Quenya,
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index 336d198fd58bc..6a8f065b05127 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -4180,12 +4180,13 @@ FROM languages_lookup_non_unique_key
| EVAL language_code = null::integer
| INLINE STATS MAX(language_code) BY language_code
| SORT country
-| LIMIT 5
+| LIMIT 6
;
country:keyword |language_name:keyword |MAX(language_code):integer |language_code:integer
Atlantis |null |null |null
[Austria, Germany]|German |null |null
+[Belgium, Germany]|German |null |null
Canada |English |null |null
Mv-Land |Mv-Lang |null |null
Mv-Land2 |Mv-Lang2 |null |null
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec
index 62d17789ce0f0..89dae2fc85694 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec
@@ -592,6 +592,41 @@ language_code:integer | language_name:keyword | country:text
8 | null | null
;
+mvKeywordJoinWarningsFromLeftSide
+required_capability: join_lookup_v12
+required_capability: async_operator_warnings_fix
+
+ROW name = ["English", "Spanish"]
+| LOOKUP JOIN languages_lookup_non_unique_key ON name == language_name
+| KEEP name, country.keyword, language_name
+| LIMIT 1
+;
+
+warning:Line 2:3: evaluation of [LOOKUP JOIN languages_lookup_non_unique_key ON name == language_name] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value
+
+name:keyword | country.keyword:keyword | language_name:keyword
+[English, Spanish] | null | null
+;
+
+mvKeywordJoinWarningsFromRightSide
+required_capability: join_lookup_v12
+required_capability: async_operator_warnings_fix
+
+ROW name = "Germany"
+| LOOKUP JOIN languages_lookup_non_unique_key ON name == country.keyword
+| KEEP name, country.keyword, language_name
+| LIMIT 1
+;
+
+warning:Line 2:3: evaluation of [LOOKUP JOIN languages_lookup_non_unique_key ON name == language_name] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value
+
+name:keyword | country.keyword:keyword | language_name:keyword
+Germany | null | null
+;
+
+
###############################################
# Filtering tests with languages_lookup index
###############################################
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/views.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/views.csv-spec
index 22a160b641663..e38f6210d7d7e 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/views.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/views.csv-spec
@@ -159,8 +159,9 @@ ignoreOrder:true
count:long | country:keyword
1 | Atlantis
1 | Austria
+1 | Belgium
1 | Canada
-1 | Germany
+2 | Germany
1 | Switzerland
1 | United Kingdom
1 | United States
@@ -176,8 +177,9 @@ ignoreOrder:true
count:long | country:keyword
1 | Atlantis
1 | Austria
+1 | Belgium
1 | Canada
-1 | Germany
+2 | Germany
1 | Switzerland
1 | United Kingdom
1 | United States
From 42db6bd96757f5774e5e513dea640dba1e3323b6 Mon Sep 17 00:00:00 2001
From: elasticsearchmachine
Date: Mon, 9 Mar 2026 15:16:30 +0000
Subject: [PATCH 11/11] [CI] Auto commit changes from spotless
---
.../operator/lookup/BulkKeywordLookup.java | 6 ++---
.../lookup/BulkLookupSingleValued.java | 8 +++----
.../lookup/EnrichQuerySourceOperator.java | 23 ++++++++++---------
.../lookup/BulkLookupSingleValuedTests.java | 14 ++++-------
.../esql/enrich/AbstractLookupService.java | 15 +++++-------
.../esql/enrich/ExpressionQueryList.java | 2 +-
6 files changed, 28 insertions(+), 40 deletions(-)
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkKeywordLookup.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkKeywordLookup.java
index 39f61ce149fcf..bc0f1f1e8ec4f 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkKeywordLookup.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkKeywordLookup.java
@@ -55,7 +55,7 @@ public BulkKeywordLookup(
) {
this.rightFieldType = rightFieldType;
this.context = context;
- this.matchChannelOffset = matchChannelOffset; // offset of field in left (input) page
+ this.matchChannelOffset = matchChannelOffset; // offset of field in left (input) page
this.extractChannelOffset = extractChannelOffset; // offset of field in right (output) page
this.clusterService = clusterService;
this.aliasFilter = aliasFilter;
@@ -81,9 +81,7 @@ public int processQuery(
final BytesRefBlock block = inputPage.getBlock(matchChannelOffset);
final int valueCount = block.getValueCount(position);
if (valueCount > 1) {
- warnings.registerException(
- new IllegalArgumentException("LOOKUP JOIN encountered multi-value")
- );
+ warnings.registerException(new IllegalArgumentException("LOOKUP JOIN encountered multi-value"));
return 0; // Skip multi-value positions
}
if (valueCount < 1) {
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValued.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValued.java
index b12ad850d0ad2..1203551180945 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValued.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValued.java
@@ -20,8 +20,8 @@
* Used in AbstractLookupService to filter out false-positive matches when using BulkKeywordLookup optimization.
*/
public record BulkLookupSingleValued(DriverContext context, int channelOffset, Warnings warnings)
- implements EvalOperator.ExpressionEvaluator
-{
+ implements
+ EvalOperator.ExpressionEvaluator {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(BulkLookupSingleValued.class);
@Override
@@ -39,9 +39,7 @@ public Block eval(Page page) {
singles.appendBoolean(valueCount == 1);
}
if (encounteredMultiValue) {
- warnings.registerException(
- new IllegalArgumentException("LOOKUP JOIN encountered multi-value")
- );
+ warnings.registerException(new IllegalArgumentException("LOOKUP JOIN encountered multi-value"));
}
final Block result = singles.build().asBlock();
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java
index 3a404d585f133..87d17ba3b3e59 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java
@@ -196,10 +196,11 @@ public Page getOutput() {
}
}
- private Page processBulkQueries(Page inputPage,
- IntVector.Builder positionsBuilder,
- IntVector.Builder segmentsBuilder,
- IntVector.Builder docsBuilder
+ private Page processBulkQueries(
+ Page inputPage,
+ IntVector.Builder positionsBuilder,
+ IntVector.Builder segmentsBuilder,
+ IntVector.Builder docsBuilder
) throws IOException {
queryPosition++;
BulkKeywordLookup bulkKeywordLookup = queryList.getBulkKeywordLookup();
@@ -207,18 +208,18 @@ private Page processBulkQueries(Page inputPage,
bulkKeywordLookup.initializeCaches(indexReader);
while (queryPosition < queryList.getPositionCount(inputPage)) {
int matches = bulkKeywordLookup.processQuery(
- inputPage,
- queryPosition,
- indexReader,
- docsBuilder,
- segmentsBuilder,
- positionsBuilder
+ inputPage,
+ queryPosition,
+ indexReader,
+ docsBuilder,
+ segmentsBuilder,
+ positionsBuilder
);
totalMatches += matches;
queryPosition++;
}
final Page result = buildPage(totalMatches, positionsBuilder, segmentsBuilder, docsBuilder);
-
+
return result;
}
diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValuedTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValuedTests.java
index 656ee558be848..b2391e3947692 100644
--- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValuedTests.java
+++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValuedTests.java
@@ -33,22 +33,16 @@ public class BulkLookupSingleValuedTests extends OperatorTestCase {
@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
- final Object[] possibilities = {
- "single",
- List.of("multiple", "values")
- };
+ final Object[] possibilities = { "single", List.of("multiple", "values") };
// returns pages with two blocks
- // in first block even rows have single values, odd rows have multi values
+ // in first block even rows have single values, odd rows have multi values
// in second block even rows have value == true, odd rows have value == false
//
return new ListRowsBlockSourceOperator(
blockFactory,
List.of(ElementType.BYTES_REF, ElementType.BOOLEAN),
- IntStream
- .range(0, size)
- .mapToObj(l -> List.of(possibilities[l % 2], (l % 2) == 0))
- .toList()
+ IntStream.range(0, size).mapToObj(l -> List.of(possibilities[l % 2], (l % 2) == 0)).toList()
);
}
@@ -58,7 +52,7 @@ protected void assertSimpleOutput(List input, List results) {
final BytesRef scratch = new BytesRef();
for (var page : results) {
final BytesRefBlock b0 = page.getBlock(0);
- final BooleanBlock b1 = page.getBlock(1);
+ final BooleanBlock b1 = page.getBlock(1);
for (int p = 0; p < page.getPositionCount(); p++) {
final BytesRef bytesValue = b0.getBytesRef(p, scratch);
final Boolean boolValue = b1.getBoolean(p);
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
index aa272b6a297b0..e63b39968cc02 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
@@ -515,8 +515,11 @@ private Operator dropDocBlockOperator(List extractFields) {
* Returns an operator to remove false-positive multivalue matches from
* BulkKeywordLookup or null when that optimization is not used.
*/
- private static Operator bulkLookupMvFilterOperator(LookupEnrichQueryGenerator queryList, DriverContext driverContext, Warnings warnings)
- {
+ private static Operator bulkLookupMvFilterOperator(
+ LookupEnrichQueryGenerator queryList,
+ DriverContext driverContext,
+ Warnings warnings
+ ) {
final BulkKeywordLookup bulkLookup = queryList.getBulkKeywordLookup();
if (bulkLookup != null) {
@@ -524,13 +527,7 @@ private static Operator bulkLookupMvFilterOperator(LookupEnrichQueryGenerator qu
// get the channel ignoreing the DocVector and IntBlock
//
final int channelOffset = 2 + bulkLookup.getExtractChannelOffset();
- return new FilterOperator(
- new BulkLookupSingleValued(
- driverContext,
- channelOffset,
- warnings
- )
- );
+ return new FilterOperator(new BulkLookupSingleValued(driverContext, channelOffset, warnings));
}
return null;
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java
index bc9ca91aaef29..49521f5a1f318 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java
@@ -208,7 +208,7 @@ private boolean applyAsFastKeywordFilter(
// BulkLookupMvFilterOperator needs the extractChannelOffset later
// when filtering out false-positive multivalue matches
- //
+ //
int extractChannelOffset = -1;
for (int i = 0; i < extractFields.size(); i++) {
if (extractFields.get(i).name().equals(rightAttribute.name())) {