Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
Expand Down Expand Up @@ -44,7 +43,7 @@ public static class Factory extends LuceneOperator.Factory {

public Factory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
int taskConcurrency,
int limit
Expand Down Expand Up @@ -121,6 +120,9 @@ protected Page getCheckedOutput() throws IOException {
if (scorer == null) {
remainingDocs = 0;
} else {
if (scorer.tags().isEmpty() == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leverage this and min/max later too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

throw new UnsupportedOperationException("tags not supported by " + getClass());
}
Weight weight = scorer.weight();
var leafReaderContext = scorer.leafReaderContext();
// see org.apache.lucene.search.TotalHitCountCollector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.compute.data.Block;
Expand Down Expand Up @@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) {

public LuceneMaxFactory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
int taskConcurrency,
String fieldName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.compute.data.Block;
Expand Down Expand Up @@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) {

public LuceneMinFactory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
int taskConcurrency,
String fieldName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public Page getCheckedOutput() throws IOException {
if (scorer == null) {
remainingDocs = 0;
} else {
if (scorer.tags().isEmpty() == false) {
throw new UnsupportedOperationException("tags not supported by " + getClass());
}
final LeafReader reader = scorer.leafReaderContext().reader();
final Query query = scorer.weight().getQuery();
if (query == null || query instanceof MatchAllDocsQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public abstract static class Factory implements SourceOperator.SourceOperatorFac
*/
protected Factory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
Function<Query, LuceneSliceQueue.PartitioningStrategy> autoStrategy,
int taskConcurrency,
Expand Down Expand Up @@ -155,10 +155,13 @@ LuceneScorer getCurrentOrLoadNextScorer() {
final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
logger.trace("Starting {}", partialLeaf);
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
if (currentScorer == null || currentScorer.leafReaderContext() != leaf) {
if (currentScorer == null // First time
|| currentScorer.leafReaderContext() != leaf // Moved to a new leaf
|| currentScorer.weight != currentSlice.weight() // Moved to a new query
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This last bit of the if statement took most of a day to figure out I needed. But tests caught it.

) {
final Weight weight = currentSlice.weight();
processedQueries.add(weight.getQuery());
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, leaf);
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf);
}
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
currentScorer.maxPosition = partialLeaf.maxDoc();
Expand All @@ -177,15 +180,17 @@ static final class LuceneScorer {
private final ShardContext shardContext;
private final Weight weight;
private final LeafReaderContext leafReaderContext;
private final List<Object> tags;

private BulkScorer bulkScorer;
private int position;
private int maxPosition;
private Thread executingThread;

LuceneScorer(ShardContext shardContext, Weight weight, LeafReaderContext leafReaderContext) {
LuceneScorer(ShardContext shardContext, Weight weight, List<Object> tags, LeafReaderContext leafReaderContext) {
this.shardContext = shardContext;
this.weight = weight;
this.tags = tags;
this.leafReaderContext = leafReaderContext;
reinitialize();
}
Expand Down Expand Up @@ -230,6 +235,13 @@ Weight weight() {
int position() {
return position;
}

/**
* Tags to add to the data returned by this query.
*/
List<Object> tags() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure Object is the right thing. It works, but we might want Supplier<Block> or something more specific. But for now this is good enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the object list can provide a better debugging message, but the block supplier might be better; otherwise, we would need to provide the exact boxed type for numeric values.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the getting the boxed type perfect could be tricky. Suppliers are quite explicit. Let's keep it as is for now and rework when we find a rough edge.

return tags;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/**
* Holds a list of multiple partial Lucene segments
*/
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight) {
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight, List<Object> tags) {
int numLeaves() {
return leaves.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,47 @@

/**
* Shared Lucene slices between Lucene operators.
* <p>
* Each shard is {@link #create built} with a list of queries to run and
* tags to add to the queries ({@code List<QueryAndTags>}). Some examples:
* </p>
* <ul>
* <li>
* For queries like {@code FROM foo} we'll use a one element list
* containing {@code match_all, []}. It loads all documents in the
* index and append no extra fields to the loaded documents.
* </li>
* <li>
* For queries like {@code FROM foo | WHERE a > 10} we'll use a one
* element list containing {@code +single_value(a) +(a > 10), []}.
* It loads all documents where {@code a} is single valued and
* greater than 10.
* </li>
* <li>
* For queries like {@code FROM foo | STATS MAX(b) BY ROUND_TO(a, 0, 100)}
* we'll use a two element list containing
* <ul>
* <li>{@code +single_value(a) +(a < 100), [0]}</li>
* <li>{@code +single_value(a) +(a >= 100), [100]}</li>
* </ul>
* It loads all documents in the index where {@code a} is single
* valued and adds a constant {@code 0} to the documents where
* {@code a < 100} and the constant {@code 100} to the documents
* where {@code a >= 100}.
* </li>
* </ul>
* <p>
* IMPORTANT: Runners make no effort to deduplicate the results from multiple
* queries. If you need to only see each document one time then make sure the
* queries are mutually exclusive.
* </p>
*/
public final class LuceneSliceQueue {
/**
* Query to run and tags to add to the results.
*/
public record QueryAndTags(Query query, List<Object> tags) {}

public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher
public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher

Expand Down Expand Up @@ -69,7 +108,7 @@ public Collection<String> remainingShardsIdentifiers() {

public static LuceneSliceQueue create(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
Function<Query, PartitioningStrategy> autoStrategy,
int taskConcurrency,
Expand All @@ -78,27 +117,29 @@ public static LuceneSliceQueue create(
List<LuceneSlice> slices = new ArrayList<>();
Map<String, PartitioningStrategy> partitioningStrategies = new HashMap<>(contexts.size());
for (ShardContext ctx : contexts) {
Query query = queryFunction.apply(ctx);
query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
/*
* Rewrite the query on the local index so things like fully
* overlapping range queries become match all. It's important
* to do this before picking the partitioning strategy so we
* can pick more aggressive strategies when the query rewrites
* into MatchAll.
*/
try {
query = ctx.searcher().rewrite(query);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
Weight weight = weight(ctx, query, scoreMode);
for (List<PartialLeafReaderContext> group : groups) {
if (group.isEmpty() == false) {
slices.add(new LuceneSlice(ctx, group, weight));
for (QueryAndTags queryAndExtra : queryFunction.apply(ctx)) {
Query query = queryAndExtra.query;
query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
/*
* Rewrite the query on the local index so things like fully
* overlapping range queries become match all. It's important
* to do this before picking the partitioning strategy so we
* can pick more aggressive strategies when the query rewrites
* into MatchAll.
*/
try {
query = ctx.searcher().rewrite(query);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
Weight weight = weight(ctx, query, scoreMode);
for (List<PartialLeafReaderContext> group : groups) {
if (group.isEmpty() == false) {
slices.add(new LuceneSlice(ctx, group, weight, queryAndExtra.tags));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.DocBlock;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.DocVector;
import org.elasticsearch.compute.data.DoubleVector;
import org.elasticsearch.compute.data.IntVector;
Expand Down Expand Up @@ -64,7 +65,7 @@ public static class Factory extends LuceneOperator.Factory {

public Factory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
int taskConcurrency,
int maxPageSize,
Expand Down Expand Up @@ -320,28 +321,29 @@ public Page getCheckedOutput() throws IOException {
IntVector shard = null;
IntVector leaf = null;
IntVector docs = null;
DoubleVector scores = null;
DocBlock docBlock = null;
Block[] blocks = new Block[1 + (scoreBuilder == null ? 0 : 1) + scorer.tags().size()];
currentPagePos -= discardedDocs;
try {
shard = blockFactory.newConstantIntVector(scorer.shardContext().index(), currentPagePos);
leaf = blockFactory.newConstantIntVector(scorer.leafReaderContext().ord, currentPagePos);
docs = buildDocsVector(currentPagePos);
docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
docBlock = new DocVector(shard, leaf, docs, true).asBlock();
int b = 0;
blocks[b++] = new DocVector(shard, leaf, docs, true).asBlock();
shard = null;
leaf = null;
docs = null;
if (scoreBuilder == null) {
page = new Page(currentPagePos, docBlock);
} else {
scores = buildScoresVector(currentPagePos);
if (scoreBuilder != null) {
blocks[b++] = buildScoresVector(currentPagePos).asBlock();
scoreBuilder = blockFactory.newDoubleVectorBuilder(Math.min(remainingDocs, maxPageSize));
page = new Page(currentPagePos, docBlock, scores.asBlock());
}
for (Object e : scorer.tags()) {
blocks[b++] = BlockUtils.constantBlock(blockFactory, e, currentPagePos);
}
page = new Page(currentPagePos, blocks);
} finally {
if (page == null) {
Releasables.closeExpectNoException(shard, leaf, docs, docBlock, scores);
Releasables.closeExpectNoException(shard, leaf, docs, Releasables.wrap(blocks));
}
}
currentPagePos = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static class Factory extends LuceneOperator.Factory {

public Factory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
int taskConcurrency,
int maxPageSize,
Expand Down Expand Up @@ -171,6 +171,9 @@ private Page collect() throws IOException {
return emit(true);
}
try {
if (scorer.tags().isEmpty() == false) {
throw new UnsupportedOperationException("tags not supported by " + getClass());
}
if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) {
// TODO: share the bottom between shardCollectors
perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public Page getCheckedOutput() throws IOException {
doneCollecting = true;
return null;
}
if (slice.tags().isEmpty() == false) {
throw new UnsupportedOperationException("tags not supported by " + getClass());
}
Releasables.close(fieldsReader);
fieldsReader = new ShardLevelFieldsReader(blockFactory, slice.shardContext(), fieldsToExtracts);
iterator = new SegmentsIterator(slice);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.compute.lucene;

import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.SourceOperator;
Expand Down Expand Up @@ -37,7 +36,7 @@ private TimeSeriesSourceOperatorFactory(
List<? extends ShardContext> contexts,
boolean emitDocIds,
List<ValuesSourceReaderOperator.FieldInfo> fieldsToExact,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
int taskConcurrency,
int maxPageSize,
int limit
Expand Down Expand Up @@ -74,7 +73,7 @@ public static TimeSeriesSourceOperatorFactory create(
boolean emitDocIds,
List<? extends ShardContext> contexts,
List<ValuesSourceReaderOperator.FieldInfo> fieldsToExact,
Function<ShardContext, Query> queryFunction
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction
) {
return new TimeSeriesSourceOperatorFactory(contexts, emitDocIds, fieldsToExact, queryFunction, taskConcurrency, maxPageSize, limit);
}
Expand Down
Loading
Loading