Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
*/
public interface PartitionedDocValues {
/**
* @param numPartitions the actual number of prefixes available in the partition
* @param prefixes the prefix keys
* @param startDocs the startDocs of corresponding prefix keys
* @param numPartitions the actual number of prefixes available in the partition
*/
record PrefixPartitions(int[] prefixes, int[] startDocs, int numPartitions) {
record PrefixPartitions(int numPartitions, int[] prefixes, int[] startDocs) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ static PartitionedDocValues.PrefixPartitions prefixPartitions(IndexInput dataIn,
startDocs[i] = doc;
last = doc;
}
return new PartitionedDocValues.PrefixPartitions(prefixes, startDocs, numPartitions);
return new PartitionedDocValues.PrefixPartitions(numPartitions, prefixes, startDocs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9327000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.4.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
azure_openai_oauth_settings,9326000
time_series_partitioning,9327000
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -21,18 +23,23 @@
import org.elasticsearch.compute.lucene.PartialLeafReaderContext;
import org.elasticsearch.compute.lucene.ShardContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.codec.tsdb.PartitionedDocValues;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.internal.ContextIndexSearcher;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Function;
import java.util.function.IntFunction;
Expand Down Expand Up @@ -75,6 +82,8 @@
* </p>
*/
public final class LuceneSliceQueue {
public static final TransportVersion TIME_SERIES_PARTITIONING = TransportVersion.fromName("time_series_partitioning");

/**
* Query to run and tags to add to the results.
*/
Expand Down Expand Up @@ -132,7 +141,7 @@ public record QueryAndTags(Query query, List<Object> tags) {}
for (LuceneSlice slice : sliceList) {
if (slice.queryHead()) {
queryHeads.add(slice.slicePosition());
} else if (slice.getLeaf(0).minDoc() == 0) {
} else if (slice.leaves().stream().allMatch(l -> l.minDoc() == 0)) {
segmentHeads.add(slice.slicePosition());
} else {
stealableSlices.add(slice.slicePosition());
Expand Down Expand Up @@ -303,6 +312,22 @@ List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int taskConc
int desiredSliceSize = Math.clamp(Math.ceilDiv(totalDocCount, taskConcurrency), 1, MAX_DOCS_PER_SLICE);
return new AdaptivePartitioner(Math.max(1, desiredSliceSize), MAX_SEGMENTS_PER_SLICE).partition(searcher.getLeafContexts());
}
},
/**
* Partition using the prefix of tsid
*/
TIME_SERIES(3) {
@Override
List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int taskConcurrency) {
final int totalDocCount = searcher.getIndexReader().maxDoc();
// Cap at 4 * MAX_DOCS_PER_SLICE since each slice spans multiple segments, reducing per-slice overhead.
final int docsPerSlice = Math.clamp(Math.ceilDiv(totalDocCount, taskConcurrency), 1, MAX_DOCS_PER_SLICE * 4);
try {
return new TimeSeriesPartitioner().partition(searcher.getLeafContexts(), docsPerSlice);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
};

private final byte id;
Expand All @@ -317,13 +342,18 @@ public static PartitioningStrategy readFrom(StreamInput in) throws IOException {
case 0 -> SHARD;
case 1 -> SEGMENT;
case 2 -> DOC;
case 3 -> TIME_SERIES;
default -> throw new IllegalArgumentException("invalid PartitioningStrategyId [" + id + "]");
};
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByte(id);
byte val = id;
if (this == TIME_SERIES && out.getTransportVersion().supports(TIME_SERIES_PARTITIONING) == false) {
val = DOC.id; // make time-series as DOC
Copy link
Contributor

@kkrik-es kkrik-es Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val = DOC.id; // make time-series as DOC
val = DOC.id; // fall back to DOC partitioning strategy for time-series

}
out.writeByte(val);
}

abstract List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int taskConcurrency);
Expand Down Expand Up @@ -359,8 +389,9 @@ private static PartitioningStrategy forAuto(
record WeightAndCache(Weight weight, LuceneSlice.BlockedOnCaching blockedOnCaching) {}

private static WeightAndCache weight(ShardContext ctx, Query query, ScoreMode scoreMode, PartitioningStrategy partitioning) {
final boolean intraSegment = partitioning == PartitioningStrategy.DOC || partitioning == PartitioningStrategy.TIME_SERIES;
try {
if (scoreMode == ScoreMode.COMPLETE_NO_SCORES && partitioning == PartitioningStrategy.DOC) {
if (scoreMode == ScoreMode.COMPLETE_NO_SCORES && intraSegment) {
DocPartitioningQueryCache queryCache = new DocPartitioningQueryCache(ctx.searcher().getQueryCache());
ContextIndexSearcher searcher = new ContextIndexSearcher(
ctx.searcher().getIndexReader(),
Expand Down Expand Up @@ -440,4 +471,84 @@ List<List<PartialLeafReaderContext>> partitionSmallSegments(List<LeafReaderConte
}
}

static final class TimeSeriesPartitioner {

private static class PrefixGroup {
final List<PartialLeafReaderContext> leaves;
int numDocs = 0;

PrefixGroup(int size) {
leaves = new ArrayList<>(size);
}

void add(LeafReaderContext context, int minDoc, int maxDoc) {
leaves.add(new PartialLeafReaderContext(context, minDoc, maxDoc));
numDocs += (maxDoc - minDoc);
}
}

List<List<PartialLeafReaderContext>> partition(List<LeafReaderContext> leaves, int docsPerSlice) throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change.

final Map<Integer, PrefixGroup> groups = new TreeMap<>(); // ordered by prefixes
PartitionedDocValues.PrefixPartitions prefixPartitions = null;
for (LeafReaderContext leaf : leaves) {
var tsid = leaf.reader().getSortedDocValues(TimeSeriesIdFieldMapper.NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit: rename to tsidValues?

if (tsid == null) {
continue; // empty
}
prefixPartitions = ((PartitionedDocValues) tsid).prefixPartitions(prefixPartitions);
final int maxDoc = leaf.reader().maxDoc();
assert prefixPartitions != null;
int pendingPrefix = -1;
int pendingStartDoc = -1;
int numPartitions = prefixPartitions.numPartitions();
for (int i = 0; i < numPartitions; i++) {
int startDoc = prefixPartitions.startDocs()[i];
int prefix = prefixPartitions.prefixes()[i];
if (pendingPrefix != -1 && pendingStartDoc < startDoc) {
groups.computeIfAbsent(pendingPrefix, k -> new PrefixGroup(leaves.size())).add(leaf, pendingStartDoc, startDoc);
}
pendingStartDoc = startDoc;
pendingPrefix = prefix;
}
if (pendingPrefix >= 0 && pendingStartDoc < maxDoc) {
groups.computeIfAbsent(pendingPrefix, k -> new PrefixGroup(leaves.size())).add(leaf, pendingStartDoc, maxDoc);
}
}
return combineGroups(groups.values().stream().toList(), docsPerSlice);
}

private List<List<PartialLeafReaderContext>> combineGroups(List<PrefixGroup> groups, int docsPerSlice) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: let's add a comment outlining what this does. Iiuc it combines groups to create chunkier slices so that they can be assigned to separate threads and be processed in parallel efficiently.

Map<LeafReaderContext, PartialLeafReaderContext> current = new IdentityHashMap<>();
List<List<PartialLeafReaderContext>> results = new ArrayList<>(groups.size());
final int minDocsPerSlice = Math.max(docsPerSlice * 2 / 3, 1);
final int maxDocsPerSlice = Math.max(docsPerSlice * 3 / 2, 1);
int pendingDocs = 0;
for (PrefixGroup slice : groups) {
if (pendingDocs >= docsPerSlice || (pendingDocs > minDocsPerSlice && (pendingDocs + slice.numDocs) > maxDocsPerSlice)) {
results.add(shuffle(current.values()));
current.clear();
pendingDocs = 0;
}
for (PartialLeafReaderContext leaf : slice.leaves) {
final LeafReaderContext ctx = leaf.leafReaderContext();
current.merge(ctx, leaf, (curr, next) -> {
assert curr.maxDoc() == next.minDoc() : "current=" + curr + "; next=" + next;
return new PartialLeafReaderContext(ctx, curr.minDoc(), next.maxDoc());
});
}
pendingDocs += slice.numDocs;
}
if (current.isEmpty() == false) {
results.add(shuffle(current.values()));
}
return results;
}

private List<PartialLeafReaderContext> shuffle(Collection<PartialLeafReaderContext> leaves) {
List<PartialLeafReaderContext> shuffled = new ArrayList<>(leaves);
// Shuffle so different drivers prefetch different segments concurrently, avoiding contention when a segment is being cached.
Randomness.shuffle(shuffled);
return shuffled;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
import org.elasticsearch.compute.operator.Limiter;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.index.codec.tsdb.PartitionedDocValues;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.function.Function;

Expand All @@ -31,6 +34,7 @@ public static final class Factory extends LuceneSourceOperator.Factory {
public Factory(
IndexedByShardId<? extends ShardContext> contexts,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
int docThresholdForAutoStrategy,
int taskConcurrency,
int maxPageSize,
Expand All @@ -39,8 +43,8 @@ public Factory(
super(
contexts,
queryFunction,
DataPartitioning.SHARD,
query -> { throw new UnsupportedOperationException("locked to SHARD partitioning"); },
dataPartitioning(dataPartitioning),
partitioningStrategy(contexts),
docThresholdForAutoStrategy,
taskConcurrency,
maxPageSize,
Expand Down Expand Up @@ -95,4 +99,26 @@ public static int pageSize(long estimateRowSizeInBytes, long maxPageSizeInBytes)
long pageSize = Math.clamp(numChunks * CHUNK_SIZE, CHUNK_SIZE, MAX_TARGET_PAGE_SIZE);
return Math.toIntExact(pageSize);
}

private static DataPartitioning dataPartitioning(DataPartitioning dataPartitioning) {
if (dataPartitioning == DataPartitioning.SHARD) {
return DataPartitioning.SHARD;
}
// Time-series can't run with segment or doc partitioning, so use auto
// to resolve to either shard or time-series partitioning.
return DataPartitioning.AUTO;
}

private static DataPartitioning.AutoStrategy partitioningStrategy(IndexedByShardId<? extends ShardContext> contexts) {
try {
for (ShardContext ctx : contexts.iterable()) {
if (PartitionedDocValues.canPartitionByTsidPrefix(ctx.searcher()) == false) {
return limit -> q -> LuceneSliceQueue.PartitioningStrategy.SHARD;
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return limit -> q -> LuceneSliceQueue.PartitioningStrategy.TIME_SERIES;
}
}
Loading
Loading