Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.lookup;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.internal.AliasFilter;

import java.io.IOException;
import java.util.function.BiFunction;

public class BulkKeywordLookup {
private final MappedFieldType rightFieldType;
private final int matchChannelOffset;
private final int extractChannelOffset;
private final SearchExecutionContext context;
private final ClusterService clusterService;
private final AliasFilter aliasFilter;
private final Warnings warnings;
private final String fieldName;
private final BiFunction<Block, Integer, Object> blockValueReader;

private TermsEnum[] termsEnumCache = null;
private PostingsEnum[] postingsCache = null;
private final BytesRef scratch = new BytesRef();

public BulkKeywordLookup(
MappedFieldType rightFieldType,
ElementType leftElementType,
SearchExecutionContext context,
int matchChannelOffset,
int extractChannelOffset,
ClusterService clusterService,
AliasFilter aliasFilter,
Warnings warnings
) {
this.rightFieldType = rightFieldType;
this.context = context;
this.matchChannelOffset = matchChannelOffset; // offset of field in left (input) page
this.extractChannelOffset = extractChannelOffset; // offset of field in right (output) page
this.clusterService = clusterService;
this.aliasFilter = aliasFilter;
this.warnings = warnings;
this.fieldName = rightFieldType.name();
this.blockValueReader = QueryList.createBlockValueReaderForType(leftElementType);
}

/**
* Process a single query at the given position using direct Lucene index access.
* This method bypasses Lucene's query framework entirely and directly accesses
* the inverted index using TermsEnum and PostingsEnum for maximum performance.
*/
public int processQuery(
Page inputPage,
int position,
IndexReader indexReader,
IntVector.Builder docsBuilder,
IntVector.Builder segmentsBuilder,
IntVector.Builder positionsBuilder
) {
try {
final BytesRefBlock block = inputPage.getBlock(matchChannelOffset);
final int valueCount = block.getValueCount(position);
if (valueCount > 1) {
warnings.registerException(new IllegalArgumentException("LOOKUP JOIN encountered multi-value"));
return 0; // Skip multi-value positions
}
if (valueCount < 1) {
return 0; // Skip null positions
}
final int firstValueIndex = block.getFirstValueIndex(position);
final BytesRef termBytes = block.getBytesRef(firstValueIndex, scratch);
int totalMatches = 0;
for (LeafReaderContext leafContext : indexReader.leaves()) {
int leafOrd = leafContext.ord;
TermsEnum termsEnum = termsEnumCache[leafOrd];
if (termsEnum.seekExact(termBytes) == false) {
continue; // Term doesn't exist in this segment
}
PostingsEnum postings = postingsCache[leafOrd];
if (postings == null) {
postings = termsEnum.postings(null, 0);
postingsCache[leafOrd] = postings;
}

// Reset the postings to the current term (reuse the cached PostingsEnum)
postings = termsEnum.postings(postings, 0);

Bits liveDocs = leafContext.reader().getLiveDocs();
int docId;
while ((docId = postings.nextDoc()) != PostingsEnum.NO_MORE_DOCS) {
// Check if document is not deleted
if (liveDocs != null && liveDocs.get(docId) == false) {
continue; // Skip deleted documents
}
docsBuilder.appendInt(docId);
if (segmentsBuilder != null) {
segmentsBuilder.appendInt(leafContext.ord);
}
positionsBuilder.appendInt(position);
totalMatches++;
}
}
return totalMatches;
} catch (Exception e) {
warnings.registerException(e);
return 0;
}
}

public int getPositionCount(Page inputPage) {
final Block block = inputPage.getBlock(matchChannelOffset);
return block.getPositionCount();
}

public int getExtractChannelOffset() {
return extractChannelOffset;
}

/**
* Initialize caches for the given index reader. This should be called once
* before the first processQuery call for a given index reader.
*/
public void initializeCaches(IndexReader indexReader) throws IOException {
if (termsEnumCache == null) {
final int numLeaves = indexReader.leaves().size();
termsEnumCache = new TermsEnum[numLeaves];
postingsCache = new PostingsEnum[numLeaves];

// Pre-populate caches with TermsEnum for each leaf
for (int i = 0; i < numLeaves; i++) {
LeafReaderContext leafContext = indexReader.leaves().get(i);
Terms terms = leafContext.reader().terms(fieldName);
termsEnumCache[i] = terms.iterator();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.lookup;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.Warnings;

/**
* Emit page of boolean values where corresponding position in specified channel contains a single value.
* Used in AbstractLookupService to filter out false-positive matches when using BulkKeywordLookup optimization.
*/
public record BulkLookupSingleValued(DriverContext context, int channelOffset, Warnings warnings)
implements
EvalOperator.ExpressionEvaluator {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(BulkLookupSingleValued.class);

@Override
public Block eval(Page page) {
final Block block = page.getBlock(channelOffset);
final int positionCount = block.getPositionCount();
final BooleanVector.FixedBuilder singles = context.blockFactory().newBooleanVectorFixedBuilder(positionCount);

boolean encounteredMultiValue = false;
for (int p = 0; p < positionCount; p++) {
final int valueCount = block.getValueCount(p);
if (valueCount > 1) {
encounteredMultiValue = true;
}
singles.appendBoolean(valueCount == 1);
}
if (encounteredMultiValue) {
warnings.registerException(new IllegalArgumentException("LOOKUP JOIN encountered multi-value"));
}

final Block result = singles.build().asBlock();
return result;
}

@Override
public long baseRamBytesUsed() {
return BASE_RAM_BYTES_USED;
}

@Override
public String toString() {
return "BulkLookupSingleValued[channelOffset=" + channelOffset + ']';
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ public Page getOutput() {
if (indexReader.leaves().size() > 1) {
segmentsBuilder = blockFactory.newIntVectorBuilder(estimatedSize);
}
if (queryList.getBulkKeywordLookup() != null) {
return processBulkQueries(inputPage, positionsBuilder, segmentsBuilder, docsBuilder);
}
int totalMatches = 0;
do {
Query query;
Expand Down Expand Up @@ -193,6 +196,33 @@ public Page getOutput() {
}
}

private Page processBulkQueries(
Page inputPage,
IntVector.Builder positionsBuilder,
IntVector.Builder segmentsBuilder,
IntVector.Builder docsBuilder
) throws IOException {
queryPosition++;
BulkKeywordLookup bulkKeywordLookup = queryList.getBulkKeywordLookup();
int totalMatches = 0;
bulkKeywordLookup.initializeCaches(indexReader);
while (queryPosition < queryList.getPositionCount(inputPage)) {
int matches = bulkKeywordLookup.processQuery(
inputPage,
queryPosition,
indexReader,
docsBuilder,
segmentsBuilder,
positionsBuilder
);
totalMatches += matches;
queryPosition++;
}
final Page result = buildPage(totalMatches, positionsBuilder, segmentsBuilder, docsBuilder);

return result;
}

Page buildPage(int positions, IntVector.Builder positionsBuilder, IntVector.Builder segmentsBuilder, IntVector.Builder docsBuilder) {
IntVector positionsVector = null;
IntVector shardsVector = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ public interface LookupEnrichQueryGenerator {
*/
int getPositionCount(Page inputPage);

/**
* Returns a BulkKeywordLookup if applicable, null otherwise.
*/
default BulkKeywordLookup getBulkKeywordLookup() {
return null;
};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.lookup;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.FilterOperator;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.test.OperatorTestCase;
import org.elasticsearch.compute.test.operator.blocksource.ListRowsBlockSourceOperator;
import org.hamcrest.Matcher;

import java.util.List;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.equalTo;

// based on FilterOperatorTests
public class BulkLookupSingleValuedTests extends OperatorTestCase {

@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {

final Object[] possibilities = { "single", List.of("multiple", "values") };

// returns pages with two blocks
// in first block even rows have single values, odd rows have multi values
// in second block even rows have value == true, odd rows have value == false
//
return new ListRowsBlockSourceOperator(
blockFactory,
List.of(ElementType.BYTES_REF, ElementType.BOOLEAN),
IntStream.range(0, size).mapToObj(l -> List.of(possibilities[l % 2], (l % 2) == 0)).toList()
);
}

@Override
protected void assertSimpleOutput(List<Page> input, List<Page> results) {
final BytesRef expected = new BytesRef("single");
final BytesRef scratch = new BytesRef();
for (var page : results) {
final BytesRefBlock b0 = page.<BytesRefBlock>getBlock(0);
final BooleanBlock b1 = page.<BooleanBlock>getBlock(1);
for (int p = 0; p < page.getPositionCount(); p++) {
final BytesRef bytesValue = b0.getBytesRef(p, scratch);
final Boolean boolValue = b1.getBoolean(p);

// only the single values should pass the filter
assertThat(bytesValue, equalTo(expected));
assertThat(boolValue, equalTo(true));
}
}
}

@Override
protected Operator.OperatorFactory simple(SimpleOptions options) {
return new FilterOperator.FilterOperatorFactory(new EvalOperator.ExpressionEvaluator.Factory() {

@Override
public EvalOperator.ExpressionEvaluator get(DriverContext context) {
return new BulkLookupSingleValued(context, 0, null);
}

@Override
public String toString() {
return "BulkLookupSingleValued[channelOffset=0]";
}
});
}

@Override
protected Matcher<String> expectedDescriptionOfSimple() {
return equalTo("FilterOperator[evaluator=BulkLookupSingleValued[channelOffset=0]]");
}

@Override
protected Matcher<String> expectedToStringOfSimple() {
return expectedDescriptionOfSimple();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ language_code:integer,language_name:keyword,country:keyword
1,,United Kingdom
1,English,United States of America
2,German,[Germany,Austria]
2,German,[Germany,Belgium]
2,German,Switzerland
2,German,
4,Quenya,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4180,12 +4180,13 @@ FROM languages_lookup_non_unique_key
| EVAL language_code = null::integer
| INLINE STATS MAX(language_code) BY language_code
| SORT country
| LIMIT 5
| LIMIT 6
;

country:keyword |language_name:keyword |MAX(language_code):integer |language_code:integer
Atlantis |null |null |null
[Austria, Germany]|German |null |null
[Belgium, Germany]|German |null |null
Canada |English |null |null
Mv-Land |Mv-Lang |null |null
Mv-Land2 |Mv-Lang2 |null |null
Expand Down
Loading