diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkKeywordLookup.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkKeywordLookup.java new file mode 100644 index 0000000000000..bc0f1f1e8ec4f --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkKeywordLookup.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.lookup; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.Warnings; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.search.internal.AliasFilter; + +import java.io.IOException; +import java.util.function.BiFunction; + +public class BulkKeywordLookup { + private final MappedFieldType rightFieldType; + private final int matchChannelOffset; + private final int extractChannelOffset; + private final SearchExecutionContext context; + private final ClusterService clusterService; + private final AliasFilter aliasFilter; + private final Warnings warnings; + private final String fieldName; + private final BiFunction blockValueReader; + + private TermsEnum[] termsEnumCache = null; + private PostingsEnum[] postingsCache = null; + private final BytesRef scratch = new BytesRef(); + + public BulkKeywordLookup( + MappedFieldType rightFieldType, + ElementType leftElementType, + SearchExecutionContext context, + int matchChannelOffset, + int extractChannelOffset, + ClusterService clusterService, + AliasFilter aliasFilter, + Warnings warnings + ) { + this.rightFieldType = rightFieldType; + this.context = context; + this.matchChannelOffset = matchChannelOffset; // offset of field in left (input) page + this.extractChannelOffset = extractChannelOffset; // offset of field in right (output) page + this.clusterService = clusterService; + this.aliasFilter = aliasFilter; + this.warnings = warnings; + this.fieldName = rightFieldType.name(); + this.blockValueReader = QueryList.createBlockValueReaderForType(leftElementType); + } + + /** + * Process a single query at the given position using direct Lucene index access. + * This method bypasses Lucene's query framework entirely and directly accesses + * the inverted index using TermsEnum and PostingsEnum for maximum performance. + */ + public int processQuery( + Page inputPage, + int position, + IndexReader indexReader, + IntVector.Builder docsBuilder, + IntVector.Builder segmentsBuilder, + IntVector.Builder positionsBuilder + ) { + try { + final BytesRefBlock block = inputPage.getBlock(matchChannelOffset); + final int valueCount = block.getValueCount(position); + if (valueCount > 1) { + warnings.registerException(new IllegalArgumentException("LOOKUP JOIN encountered multi-value")); + return 0; // Skip multi-value positions + } + if (valueCount < 1) { + return 0; // Skip null positions + } + final int firstValueIndex = block.getFirstValueIndex(position); + final BytesRef termBytes = block.getBytesRef(firstValueIndex, scratch); + int totalMatches = 0; + for (LeafReaderContext leafContext : indexReader.leaves()) { + int leafOrd = leafContext.ord; + TermsEnum termsEnum = termsEnumCache[leafOrd]; + if (termsEnum.seekExact(termBytes) == false) { + continue; // Term doesn't exist in this segment + } + PostingsEnum postings = postingsCache[leafOrd]; + if (postings == null) { + postings = termsEnum.postings(null, 0); + postingsCache[leafOrd] = postings; + } + + // Reset the postings to the current term (reuse the cached PostingsEnum) + postings = termsEnum.postings(postings, 0); + + Bits liveDocs = leafContext.reader().getLiveDocs(); + int docId; + while ((docId = postings.nextDoc()) != PostingsEnum.NO_MORE_DOCS) { + // Check if document is not deleted + if (liveDocs != null && liveDocs.get(docId) == false) { + continue; // Skip deleted documents + } + docsBuilder.appendInt(docId); + if (segmentsBuilder != null) { + segmentsBuilder.appendInt(leafContext.ord); + } + positionsBuilder.appendInt(position); + totalMatches++; + } + } + return totalMatches; + } catch (Exception e) { + warnings.registerException(e); + return 0; + } + } + + public int getPositionCount(Page inputPage) { + final Block block = inputPage.getBlock(matchChannelOffset); + return block.getPositionCount(); + } + + public int getExtractChannelOffset() { + return extractChannelOffset; + } + + /** + * Initialize caches for the given index reader. This should be called once + * before the first processQuery call for a given index reader. + */ + public void initializeCaches(IndexReader indexReader) throws IOException { + if (termsEnumCache == null) { + final int numLeaves = indexReader.leaves().size(); + termsEnumCache = new TermsEnum[numLeaves]; + postingsCache = new PostingsEnum[numLeaves]; + + // Pre-populate caches with TermsEnum for each leaf + for (int i = 0; i < numLeaves; i++) { + LeafReaderContext leafContext = indexReader.leaves().get(i); + Terms terms = leafContext.reader().terms(fieldName); + termsEnumCache[i] = terms.iterator(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValued.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValued.java new file mode 100644 index 0000000000000..1203551180945 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValued.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.lookup; + +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.EvalOperator; +import org.elasticsearch.compute.operator.Warnings; + +/** + * Emit page of boolean values where corresponding position in specified channel contains a single value. + * Used in AbstractLookupService to filter out false-positive matches when using BulkKeywordLookup optimization. + */ +public record BulkLookupSingleValued(DriverContext context, int channelOffset, Warnings warnings) + implements + EvalOperator.ExpressionEvaluator { + private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(BulkLookupSingleValued.class); + + @Override + public Block eval(Page page) { + final Block block = page.getBlock(channelOffset); + final int positionCount = block.getPositionCount(); + final BooleanVector.FixedBuilder singles = context.blockFactory().newBooleanVectorFixedBuilder(positionCount); + + boolean encounteredMultiValue = false; + for (int p = 0; p < positionCount; p++) { + final int valueCount = block.getValueCount(p); + if (valueCount > 1) { + encounteredMultiValue = true; + } + singles.appendBoolean(valueCount == 1); + } + if (encounteredMultiValue) { + warnings.registerException(new IllegalArgumentException("LOOKUP JOIN encountered multi-value")); + } + + final Block result = singles.build().asBlock(); + return result; + } + + @Override + public long baseRamBytesUsed() { + return BASE_RAM_BYTES_USED; + } + + @Override + public String toString() { + return "BulkLookupSingleValued[channelOffset=" + channelOffset + ']'; + } + + @Override + public void close() {} +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java index 256d1b7544a60..87d17ba3b3e59 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java @@ -146,6 +146,9 @@ public Page getOutput() { if (indexReader.leaves().size() > 1) { segmentsBuilder = blockFactory.newIntVectorBuilder(estimatedSize); } + if (queryList.getBulkKeywordLookup() != null) { + return processBulkQueries(inputPage, positionsBuilder, segmentsBuilder, docsBuilder); + } int totalMatches = 0; do { Query query; @@ -193,6 +196,33 @@ public Page getOutput() { } } + private Page processBulkQueries( + Page inputPage, + IntVector.Builder positionsBuilder, + IntVector.Builder segmentsBuilder, + IntVector.Builder docsBuilder + ) throws IOException { + queryPosition++; + BulkKeywordLookup bulkKeywordLookup = queryList.getBulkKeywordLookup(); + int totalMatches = 0; + bulkKeywordLookup.initializeCaches(indexReader); + while (queryPosition < queryList.getPositionCount(inputPage)) { + int matches = bulkKeywordLookup.processQuery( + inputPage, + queryPosition, + indexReader, + docsBuilder, + segmentsBuilder, + positionsBuilder + ); + totalMatches += matches; + queryPosition++; + } + final Page result = buildPage(totalMatches, positionsBuilder, segmentsBuilder, docsBuilder); + + return result; + } + Page buildPage(int positions, IntVector.Builder positionsBuilder, IntVector.Builder segmentsBuilder, IntVector.Builder docsBuilder) { IntVector positionsVector = null; IntVector shardsVector = null; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java index 897c2aba4711b..7ca4e672933f9 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java @@ -29,4 +29,11 @@ public interface LookupEnrichQueryGenerator { */ int getPositionCount(Page inputPage); + /** + * Returns a BulkKeywordLookup if applicable, null otherwise. + */ + default BulkKeywordLookup getBulkKeywordLookup() { + return null; + }; + } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValuedTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValuedTests.java new file mode 100644 index 0000000000000..b2391e3947692 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/BulkLookupSingleValuedTests.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.lookup; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.EvalOperator; +import org.elasticsearch.compute.operator.FilterOperator; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.compute.operator.SourceOperator; +import org.elasticsearch.compute.test.OperatorTestCase; +import org.elasticsearch.compute.test.operator.blocksource.ListRowsBlockSourceOperator; +import org.hamcrest.Matcher; + +import java.util.List; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; + +// based on FilterOperatorTests +public class BulkLookupSingleValuedTests extends OperatorTestCase { + + @Override + protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + + final Object[] possibilities = { "single", List.of("multiple", "values") }; + + // returns pages with two blocks + // in first block even rows have single values, odd rows have multi values + // in second block even rows have value == true, odd rows have value == false + // + return new ListRowsBlockSourceOperator( + blockFactory, + List.of(ElementType.BYTES_REF, ElementType.BOOLEAN), + IntStream.range(0, size).mapToObj(l -> List.of(possibilities[l % 2], (l % 2) == 0)).toList() + ); + } + + @Override + protected void assertSimpleOutput(List input, List results) { + final BytesRef expected = new BytesRef("single"); + final BytesRef scratch = new BytesRef(); + for (var page : results) { + final BytesRefBlock b0 = page.getBlock(0); + final BooleanBlock b1 = page.getBlock(1); + for (int p = 0; p < page.getPositionCount(); p++) { + final BytesRef bytesValue = b0.getBytesRef(p, scratch); + final Boolean boolValue = b1.getBoolean(p); + + // only the single values should pass the filter + assertThat(bytesValue, equalTo(expected)); + assertThat(boolValue, equalTo(true)); + } + } + } + + @Override + protected Operator.OperatorFactory simple(SimpleOptions options) { + return new FilterOperator.FilterOperatorFactory(new EvalOperator.ExpressionEvaluator.Factory() { + + @Override + public EvalOperator.ExpressionEvaluator get(DriverContext context) { + return new BulkLookupSingleValued(context, 0, null); + } + + @Override + public String toString() { + return "BulkLookupSingleValued[channelOffset=0]"; + } + }); + } + + @Override + protected Matcher expectedDescriptionOfSimple() { + return equalTo("FilterOperator[evaluator=BulkLookupSingleValued[channelOffset=0]]"); + } + + @Override + protected Matcher expectedToStringOfSimple() { + return expectedDescriptionOfSimple(); + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/languages_non_unique_key.csv b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/languages_non_unique_key.csv index d6381b174d739..6d4d34c4d84ea 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/languages_non_unique_key.csv +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/languages_non_unique_key.csv @@ -4,6 +4,7 @@ language_code:integer,language_name:keyword,country:keyword 1,,United Kingdom 1,English,United States of America 2,German,[Germany,Austria] +2,German,[Germany,Belgium] 2,German,Switzerland 2,German, 4,Quenya, diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec index 336d198fd58bc..6a8f065b05127 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec @@ -4180,12 +4180,13 @@ FROM languages_lookup_non_unique_key | EVAL language_code = null::integer | INLINE STATS MAX(language_code) BY language_code | SORT country -| LIMIT 5 +| LIMIT 6 ; country:keyword |language_name:keyword |MAX(language_code):integer |language_code:integer Atlantis |null |null |null [Austria, Germany]|German |null |null +[Belgium, Germany]|German |null |null Canada |English |null |null Mv-Land |Mv-Lang |null |null Mv-Land2 |Mv-Lang2 |null |null diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 62d17789ce0f0..89dae2fc85694 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -592,6 +592,41 @@ language_code:integer | language_name:keyword | country:text 8 | null | null ; +mvKeywordJoinWarningsFromLeftSide +required_capability: join_lookup_v12 +required_capability: async_operator_warnings_fix + +ROW name = ["English", "Spanish"] +| LOOKUP JOIN languages_lookup_non_unique_key ON name == language_name +| KEEP name, country.keyword, language_name +| LIMIT 1 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN languages_lookup_non_unique_key ON name == language_name] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +name:keyword | country.keyword:keyword | language_name:keyword +[English, Spanish] | null | null +; + +mvKeywordJoinWarningsFromRightSide +required_capability: join_lookup_v12 +required_capability: async_operator_warnings_fix + +ROW name = "Germany" +| LOOKUP JOIN languages_lookup_non_unique_key ON name == country.keyword +| KEEP name, country.keyword, language_name +| LIMIT 1 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN languages_lookup_non_unique_key ON name == language_name] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +name:keyword | country.keyword:keyword | language_name:keyword +Germany | null | null +; + + ############################################### # Filtering tests with languages_lookup index ############################################### diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/views.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/views.csv-spec index 22a160b641663..e38f6210d7d7e 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/views.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/views.csv-spec @@ -159,8 +159,9 @@ ignoreOrder:true count:long | country:keyword 1 | Atlantis 1 | Austria +1 | Belgium 1 | Canada -1 | Germany +2 | Germany 1 | Switzerland 1 | United Kingdom 1 | United States @@ -176,8 +177,9 @@ ignoreOrder:true count:long | country:keyword 1 | Atlantis 1 | Austria +1 | Belgium 1 | Canada -1 | Germany +2 | Germany 1 | Switzerland 1 | United Kingdom 1 | United States diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index 93dd6480c79c5..e63b39968cc02 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -37,11 +37,14 @@ import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.FilterOperator; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.OutputOperator; import org.elasticsearch.compute.operator.ProjectOperator; import org.elasticsearch.compute.operator.Warnings; import org.elasticsearch.compute.operator.lookup.BlockOptimization; +import org.elasticsearch.compute.operator.lookup.BulkKeywordLookup; +import org.elasticsearch.compute.operator.lookup.BulkLookupSingleValued; import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator; import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.MergePositionsOperator; @@ -101,7 +104,7 @@ *

*

* The join process spawns a {@link Driver} per incoming page which runs in - * two or three stages: + * two, three or four stages: *

*

* Stage 1: Finding matching document IDs for the input page. This stage is done @@ -114,7 +117,11 @@ * {@code [DocVector, IntBlock: positions, Block: field1, Block: field2,...]}. *

*

- * Stage 3: Optionally this combines the extracted values based on positions and filling + * Stage 3: Optionally the BulkLookupMvFilterOperator removes false-positive + * multivalue matches when the {@link BulkKeywordLookup} optimization. + *

+ *

+ * Stage 4: Optionally this combines the extracted values based on positions and filling * nulls for positions without matches. This is done by {@link MergePositionsOperator}. * The output page is represented as {@code [Block: field1, Block: field2,...]}. *

@@ -345,6 +352,8 @@ protected void doLookup(T request, CancellableTask task, ActionListener operators = new ArrayList<>(); if (request.extractFields.isEmpty() == false) { var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields); releasables.add(extractFieldsOperator); operators.add(extractFieldsOperator); } + + // Stage 3 - 137269 + Operator bulkLookupMvFilterOperator = bulkLookupMvFilterOperator(queryList, driverContext, warnings); + if (bulkLookupMvFilterOperator != null) { + operators.add(bulkLookupMvFilterOperator); + } + + // Stage 4 operators.add(finishPages); /* @@ -493,6 +511,27 @@ private Operator dropDocBlockOperator(List extractFields) { return new ProjectOperator(projection); } + /** + * Returns an operator to remove false-positive multivalue matches from + * BulkKeywordLookup or null when that optimization is not used. + */ + private static Operator bulkLookupMvFilterOperator( + LookupEnrichQueryGenerator queryList, + DriverContext driverContext, + Warnings warnings + ) { + final BulkKeywordLookup bulkLookup = queryList.getBulkKeywordLookup(); + if (bulkLookup != null) { + + // at this point the output page [DocVector, IntBlock: positions, Block: field1, Block: field2,...] + // get the channel ignoreing the DocVector and IntBlock + // + final int channelOffset = 2 + bulkLookup.getExtractChannelOffset(); + return new FilterOperator(new BulkLookupSingleValued(driverContext, channelOffset, warnings)); + } + return null; + } + protected Page createNullResponse(int positionCount, List extractFields) { final Block[] blocks = new Block[extractFields.size()]; try { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java index 890172942697f..49521f5a1f318 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java @@ -14,6 +14,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Warnings; +import org.elasticsearch.compute.operator.lookup.BulkKeywordLookup; import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.QueryList; import org.elasticsearch.index.mapper.MappedFieldType; @@ -24,6 +25,7 @@ import org.elasticsearch.xpack.esql.capabilities.TranslationAware; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.predicate.Predicates; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals; @@ -62,6 +64,7 @@ public class ExpressionQueryList implements LookupEnrichQueryGenerator { private final List lucenePushableFilterBuilders = new ArrayList<>(); private final AliasFilter aliasFilter; private final ClusterService clusterService; + private BulkKeywordLookup bulkKeywordLookup = null; private ExpressionQueryList( List queryLists, @@ -137,6 +140,7 @@ public static ExpressionQueryList expressionBasedJoin( expressionQueryList.buildJoinOnForExpressionJoin( request.getJoinOnConditions(), request.getMatchFields(), + request.getExtractFields(), context, lucenePushdownPredicates, warnings @@ -144,14 +148,24 @@ public static ExpressionQueryList expressionBasedJoin( return expressionQueryList; } + @Override + public BulkKeywordLookup getBulkKeywordLookup() { + return bulkKeywordLookup; + }; + private void buildJoinOnForExpressionJoin( Expression joinOnConditions, List matchFields, + List extractFields, SearchExecutionContext context, LucenePushdownPredicates lucenePushdownPredicates, Warnings warnings ) { List expressions = Predicates.splitAnd(joinOnConditions); + if (applyAsFastKeywordFilter(expressions, matchFields, extractFields, context, clusterService, warnings)) { + // we managed to apply the whole condition as a fast keyword filter + return; + } for (Expression expr : expressions) { boolean applied = applyAsLeftRightBinaryComparison(expr, matchFields, context, clusterService, warnings); if (applied == false) { @@ -163,6 +177,71 @@ private void buildJoinOnForExpressionJoin( } } + private boolean applyAsFastKeywordFilter( + List expressions, + List matchFields, + List extractFields, + SearchExecutionContext context, + ClusterService clusterService, + Warnings warnings + ) { + if (expressions.size() == 1) { + Expression expr = expressions.get(0); + if (expr instanceof EsqlBinaryComparison binaryComparison + && binaryComparison.left() instanceof Attribute leftAttribute + && binaryComparison.right() instanceof Attribute rightAttribute) { + + // the left side comes from the page that was sent to the lookup node + // the right side is the field from the lookup index + // check if the left side is in the matchFields + int matchChannelOffset = -1; + DataType dataType = null; + for (int i = 0; i < matchFields.size(); i++) { + if (matchFields.get(i).fieldName().equals(leftAttribute.name())) { + matchChannelOffset = i; + dataType = matchFields.get(i).type(); + break; + } + } + + if (matchChannelOffset != -1 && dataType == DataType.KEYWORD) { + + // BulkLookupMvFilterOperator needs the extractChannelOffset later + // when filtering out false-positive multivalue matches + // + int extractChannelOffset = -1; + for (int i = 0; i < extractFields.size(); i++) { + if (extractFields.get(i).name().equals(rightAttribute.name())) { + extractChannelOffset = i; + break; + } + } + MappedFieldType rightFieldType = context.getFieldType(rightAttribute.name()); + + if (extractChannelOffset != -1 && rightFieldType != null) { + // special handle Equals operator on keyword fields + // we can apply as a BulkKeywordLookup for better performance + if (binaryComparison instanceof Equals) { + ElementType leftElementType = PlannerUtils.toElementType(dataType); + bulkKeywordLookup = new BulkKeywordLookup( + rightFieldType, + leftElementType, + context, + matchChannelOffset, + extractChannelOffset, + clusterService, + aliasFilter, + warnings + ); + return true; + } + } + } + } + } + return false; + } + private boolean applyAsRightSidePushableFilter( Expression filter, SearchExecutionContext context, @@ -316,6 +395,9 @@ public Query getQuery(int position, Page inputPage, SearchExecutionContext searc */ @Override public int getPositionCount(Page inputPage) { + if (bulkKeywordLookup != null) { + return bulkKeywordLookup.getPositionCount(inputPage); + } int positionCount = queryLists.get(0).getPositionCount(inputPage); for (QueryList queryList : queryLists) { if (queryList.getPositionCount(inputPage) != positionCount) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index 139a2f8a8fabb..c8d39fbec926d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -308,6 +308,10 @@ public List getMatchFields() { return matchFields; } + public List getExtractFields() { + return extractFields; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out);