Skip to content

Commit 899947c

Browse files
committed
Load constant blocks for ordinal values with index sorts
1 parent 2864dd8 commit 899947c

File tree

9 files changed

+142
-67
lines changed

9 files changed

+142
-67
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BulkNumericDocValues.java

Lines changed: 0 additions & 27 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 85 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
final class ES819TSDBDocValuesProducer extends DocValuesProducer {
5555
final IntObjectHashMap<NumericEntry> numerics;
56+
private int primarySortFieldNumber = -1;
5657
final IntObjectHashMap<BinaryEntry> binaries;
5758
final IntObjectHashMap<SortedEntry> sorted;
5859
final IntObjectHashMap<SortedSetEntry> sortedSets;
@@ -91,7 +92,11 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer {
9192
);
9293

9394
readFields(in, state.fieldInfos);
94-
95+
final var indexSort = state.segmentInfo.getIndexSort();
96+
if (indexSort != null) {
97+
var primaryField = indexSort.getSort()[0];
98+
primarySortFieldNumber = state.fieldInfos.fieldInfo(primaryField.getField()).number;
99+
}
95100
} catch (Throwable exception) {
96101
priorE = exception;
97102
} finally {
@@ -333,10 +338,10 @@ public boolean advanceExact(int target) throws IOException {
333338
@Override
334339
public SortedDocValues getSorted(FieldInfo field) throws IOException {
335340
SortedEntry entry = sorted.get(field.number);
336-
return getSorted(entry);
341+
return getSorted(entry, field.number == primarySortFieldNumber);
337342
}
338343

339-
private SortedDocValues getSorted(SortedEntry entry) throws IOException {
344+
private SortedDocValues getSorted(SortedEntry entry, boolean valuesSorted) throws IOException {
340345
final NumericDocValues ords = getNumeric(entry.ordsEntry, entry.termsDictEntry.termsDictSize);
341346
return new BaseSortedDocValues(entry) {
342347

@@ -369,10 +374,27 @@ public int advance(int target) throws IOException {
369374
public long cost() {
370375
return ords.cost();
371376
}
377+
378+
@Override
379+
public BlockLoader.Block tryReadColumnAtATime(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
380+
final int docCount = docs.count();
381+
if (valuesSorted && ords instanceof BaseDenseNumericValues denseOrds) {
382+
int firstDoc = docs.get(offset);
383+
denseOrds.advanceExact(firstDoc);
384+
long startValue = denseOrds.longValue();
385+
int lastDoc = docs.get(docCount - 1);
386+
long lastValue = denseOrds.lookAheadValueAt(lastDoc);
387+
if (lastValue == startValue) {
388+
BytesRef b = lookupOrd(Math.toIntExact(startValue));
389+
return factory.constantBytes(BytesRef.deepCopyOf(b), docCount);
390+
}
391+
}
392+
return null;
393+
}
372394
};
373395
}
374396

375-
abstract class BaseSortedDocValues extends SortedDocValues {
397+
abstract class BaseSortedDocValues extends SortedDocValues implements BlockLoader.OptionalColumnAtATimeReader {
376398

377399
final SortedEntry entry;
378400
final TermsEnum termsEnum;
@@ -406,6 +428,15 @@ public int lookupTerm(BytesRef key) throws IOException {
406428
public TermsEnum termsEnum() throws IOException {
407429
return new TermsDict(entry.termsDictEntry, data, merging);
408430
}
431+
432+
@Override
433+
public BlockLoader.Block tryReadColumnAtATime(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
434+
return null;
435+
}
436+
}
437+
438+
abstract static class BaseDenseNumericValues extends NumericDocValues implements BlockLoader.OptionalColumnAtATimeReader {
439+
abstract long lookAheadValueAt(int targetDoc) throws IOException;
409440
}
410441

411442
abstract static class BaseSortedSetDocValues extends SortedSetDocValues {
@@ -695,7 +726,7 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOExcepti
695726
public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
696727
SortedSetEntry entry = sortedSets.get(field.number);
697728
if (entry.singleValueEntry != null) {
698-
return DocValues.singleton(getSorted(entry.singleValueEntry));
729+
return DocValues.singleton(getSorted(entry.singleValueEntry, field.number == primarySortFieldNumber));
699730
}
700731

701732
SortedNumericEntry ordsEntry = entry.ordsEntry;
@@ -1141,13 +1172,18 @@ public long longValue() {
11411172
final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1;
11421173
if (entry.docsWithFieldOffset == -1) {
11431174
// dense
1144-
return new BulkNumericDocValues() {
1175+
return new BaseDenseNumericValues() {
11451176

11461177
private final int maxDoc = ES819TSDBDocValuesProducer.this.maxDoc;
11471178
private int doc = -1;
11481179
private final TSDBDocValuesEncoder decoder = new TSDBDocValuesEncoder(ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE);
11491180
private long currentBlockIndex = -1;
11501181
private final long[] currentBlock = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE];
1182+
// lookahead block
1183+
private long lookaheadBlockIndex = -1;
1184+
private long[] lookaheadBlock;
1185+
private IndexInput lookaheadData = null;
1186+
11511187

11521188
@Override
11531189
public int docID() {
@@ -1183,24 +1219,28 @@ public long longValue() throws IOException {
11831219
final int index = doc;
11841220
final int blockIndex = index >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT;
11851221
final int blockInIndex = index & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
1186-
if (blockIndex != currentBlockIndex) {
1187-
assert blockIndex > currentBlockIndex : blockIndex + " < " + currentBlockIndex;
1188-
// no need to seek if the loading block is the next block
1189-
if (currentBlockIndex + 1 != blockIndex) {
1190-
valuesData.seek(indexReader.get(blockIndex));
1191-
}
1192-
currentBlockIndex = blockIndex;
1193-
if (maxOrd >= 0) {
1194-
decoder.decodeOrdinals(valuesData, currentBlock, bitsPerOrd);
1195-
} else {
1196-
decoder.decode(valuesData, currentBlock);
1197-
}
1222+
if(blockIndex == currentBlockIndex) {
1223+
return currentBlock[blockInIndex];
1224+
}
1225+
if (blockIndex == lookaheadBlockIndex) {
1226+
return lookaheadBlock[blockInIndex];
1227+
}
1228+
assert blockIndex > currentBlockIndex : blockIndex + " < " + currentBlockIndex;
1229+
// no need to seek if the loading block is the next block
1230+
if (currentBlockIndex + 1 != blockIndex) {
1231+
valuesData.seek(indexReader.get(blockIndex));
1232+
}
1233+
currentBlockIndex = blockIndex;
1234+
if (maxOrd >= 0) {
1235+
decoder.decodeOrdinals(valuesData, currentBlock, bitsPerOrd);
1236+
} else {
1237+
decoder.decode(valuesData, currentBlock);
11981238
}
11991239
return currentBlock[blockInIndex];
12001240
}
12011241

12021242
@Override
1203-
public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
1243+
public BlockLoader.Block tryReadColumnAtATime(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
12041244
assert maxOrd == -1 : "unexpected maxOrd[" + maxOrd + "]";
12051245
final int docsCount = docs.count();
12061246
doc = docs.get(docsCount - 1);
@@ -1238,6 +1278,32 @@ public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs
12381278
}
12391279
}
12401280

1281+
@Override
1282+
long lookAheadValueAt(int targetDoc) throws IOException {
1283+
final int blockIndex = targetDoc >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT;
1284+
final int valueIndex = targetDoc & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
1285+
if(blockIndex == currentBlockIndex) {
1286+
return currentBlock[valueIndex];
1287+
}
1288+
// load data to the lookahead block
1289+
if (lookaheadBlockIndex != blockIndex) {
1290+
if (lookaheadBlock == null) {
1291+
lookaheadBlock = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE];
1292+
lookaheadData = data.slice("look_ahead_values", entry.valuesOffset, entry.valuesLength);
1293+
}
1294+
if (lookaheadBlockIndex + 1 != blockIndex) {
1295+
lookaheadData.seek(indexReader.get(blockIndex));
1296+
}
1297+
if (maxOrd >= 0) {
1298+
decoder.decodeOrdinals(lookaheadData, lookaheadBlock, bitsPerOrd);
1299+
} else {
1300+
decoder.decode(lookaheadData, lookaheadBlock);
1301+
}
1302+
lookaheadBlockIndex = blockIndex;
1303+
}
1304+
return lookaheadBlock[valueIndex];
1305+
}
1306+
12411307
static boolean isDense(int firstDocId, int lastDocId, int length) {
12421308
// This does not detect duplicate docids (e.g [1, 1, 2, 4] would be detected as dense),
12431309
// this can happen with enrich or lookup. However this codec isn't used for enrich / lookup.

server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.lucene.util.BytesRef;
2222
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
2323
import org.elasticsearch.index.IndexVersion;
24-
import org.elasticsearch.index.codec.tsdb.es819.BulkNumericDocValues;
2524
import org.elasticsearch.index.mapper.BlockLoader.BlockFactory;
2625
import org.elasticsearch.index.mapper.BlockLoader.BooleanBuilder;
2726
import org.elasticsearch.index.mapper.BlockLoader.Builder;
@@ -127,8 +126,11 @@ static class SingletonLongs extends BlockDocValuesReader {
127126

128127
@Override
129128
public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
130-
if (numericDocValues instanceof BulkNumericDocValues bulkDv) {
131-
return bulkDv.read(factory, docs, offset);
129+
if (numericDocValues instanceof BlockLoader.OptionalColumnAtATimeReader direct) {
130+
BlockLoader.Block result = direct.tryReadColumnAtATime(factory, docs, offset);
131+
if (result != null) {
132+
return result;
133+
}
132134
}
133135
try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count() - offset)) {
134136
int lastDoc = -1;
@@ -659,6 +661,12 @@ public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throw
659661
if (docs.count() - offset == 1) {
660662
return readSingleDoc(factory, docs.get(offset));
661663
}
664+
if (ordinals instanceof BlockLoader.OptionalColumnAtATimeReader direct) {
665+
BlockLoader.Block block = direct.tryReadColumnAtATime(factory, docs, offset);
666+
if (block != null) {
667+
return block;
668+
}
669+
}
662670
try (var builder = factory.singletonOrdinalsBuilder(ordinals, docs.count() - offset)) {
663671
for (int i = offset; i < docs.count(); i++) {
664672
int doc = docs.get(i);

server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.index.SortedDocValues;
1414
import org.apache.lucene.index.SortedSetDocValues;
1515
import org.apache.lucene.util.BytesRef;
16+
import org.elasticsearch.core.Nullable;
1617
import org.elasticsearch.core.Releasable;
1718
import org.elasticsearch.search.fetch.StoredFieldsSpec;
1819
import org.elasticsearch.search.lookup.Source;
@@ -46,6 +47,22 @@ interface ColumnAtATimeReader extends Reader {
4647
BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException;
4748
}
4849

50+
/**
51+
* An interface for readers that attempt to load all document values in a column-at-a-time fashion.
52+
* <p>
53+
* Unlike {@link ColumnAtATimeReader}, implementations may return {@code null} if they are unable
54+
* to load the requested values, for example due to unsupported underlying data.
55+
* This allows callers to optimistically try optimized loading strategies first, and fall back if necessary.
56+
*/
57+
interface OptionalColumnAtATimeReader {
58+
/**
59+
* Attempts to read the values of all documents in {@code docs}
60+
* Returns {@code null} if unable to load the values.
61+
*/
62+
@Nullable
63+
BlockLoader.Block tryReadColumnAtATime(BlockFactory factory, Docs docs, int offset) throws IOException;
64+
}
65+
4966
interface RowStrideReader extends Reader {
5067
/**
5168
* Reads the values of the given document into the builder.
@@ -549,6 +566,5 @@ interface AggregateMetricDoubleBuilder extends Builder {
549566
DoubleBuilder sum();
550567

551568
IntBuilder count();
552-
553569
}
554570
}

server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.index.IndexSettings;
3232
import org.elasticsearch.index.IndexVersion;
3333
import org.elasticsearch.index.IndexVersions;
34-
import org.elasticsearch.index.codec.tsdb.es819.BulkNumericDocValues;
3534
import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
3635
import org.elasticsearch.index.mapper.DateFieldMapper.DateFieldType;
3736
import org.elasticsearch.script.DateFieldScript;

test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.elasticsearch.index.IndexSettings;
4444
import org.elasticsearch.index.IndexVersion;
4545
import org.elasticsearch.index.IndexVersions;
46-
import org.elasticsearch.index.codec.tsdb.es819.BulkNumericDocValues;
4746
import org.elasticsearch.index.engine.Engine;
4847
import org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshot;
4948
import org.elasticsearch.index.fielddata.FieldDataContext;
@@ -1541,7 +1540,7 @@ public void testSingletonLongBulkBlockReading() throws IOException {
15411540
LeafReaderContext context = reader.leaves().get(0);
15421541
var blockLoader = mapperService.fieldType("field").blockLoader(mockBlockContext);
15431542
var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
1544-
assertThat(columnReader.numericDocValues, instanceOf(BulkNumericDocValues.class));
1543+
assertThat(columnReader.numericDocValues, instanceOf(BlockLoader.OptionalColumnAtATimeReader.class));
15451544
var docBlock = TestBlock.docs(IntStream.range(0, 3).toArray());
15461545
var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
15471546
for (int i = 0; i < block.size(); i++) {
@@ -1566,7 +1565,7 @@ public void testSingletonLongBulkBlockReading() throws IOException {
15661565
LeafReaderContext context = reader.leaves().get(0);
15671566
var blockLoader = mapperService.fieldType("field").blockLoader(mockBlockContext);
15681567
var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
1569-
assertThat(columnReader.numericDocValues, not(instanceOf(BulkNumericDocValues.class)));
1568+
assertThat(columnReader.numericDocValues, not(instanceOf(BlockLoader.OptionalColumnAtATimeReader.class)));
15701569
var docBlock = TestBlock.docs(IntStream.range(0, 3).toArray());
15711570
var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
15721571
assertThat(block.get(0), equalTo(expectedSampleValues[0]));

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@
77

88
package org.elasticsearch.compute.lucene.read;
99

10-
import org.apache.lucene.util.BytesRef;
1110
import org.elasticsearch.compute.data.Block;
1211
import org.elasticsearch.compute.data.BlockFactory;
13-
import org.elasticsearch.compute.data.BytesRefBlock;
1412
import org.elasticsearch.core.Releasable;
1513

1614
class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable {
@@ -35,9 +33,4 @@ public void close() {
3533
nullBlock.close();
3634
}
3735
}
38-
39-
@Override
40-
public BytesRefBlock constantBytes(BytesRef value, int count) {
41-
return factory.newConstantBytesRefBlockWith(value, count);
42-
}
4336
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,15 @@
99

1010
import org.apache.lucene.index.SortedDocValues;
1111
import org.apache.lucene.index.SortedSetDocValues;
12+
import org.apache.lucene.util.BytesRef;
1213
import org.elasticsearch.compute.data.Block;
1314
import org.elasticsearch.compute.data.BlockFactory;
15+
import org.elasticsearch.compute.data.BytesRefBlock;
16+
import org.elasticsearch.compute.data.BytesRefVector;
1417
import org.elasticsearch.compute.data.ElementType;
18+
import org.elasticsearch.compute.data.IntVector;
19+
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
20+
import org.elasticsearch.core.Releasables;
1521
import org.elasticsearch.index.mapper.BlockLoader;
1622

1723
public abstract class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory {
@@ -41,6 +47,27 @@ public BlockLoader.BytesRefBuilder bytesRefs(int expectedCount) {
4147
return factory.newBytesRefBlockBuilder(expectedCount);
4248
}
4349

50+
@Override
51+
public BytesRefBlock constantBytes(BytesRef value, int count) {
52+
if (count == 1) {
53+
return factory.newConstantBytesRefBlockWith(value, count);
54+
}
55+
BytesRefVector dict = null;
56+
IntVector ordinals = null;
57+
boolean success = false;
58+
try {
59+
dict = factory.newConstantBytesRefVector(value, 1);
60+
ordinals = factory.newConstantIntVector(0, count);
61+
var result = new OrdinalBytesRefVector(ordinals, dict).asBlock();
62+
success = true;
63+
return result;
64+
} finally {
65+
if (success == false) {
66+
Releasables.closeExpectNoException(dict, ordinals);
67+
}
68+
}
69+
}
70+
4471
@Override
4572
public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) {
4673
return factory.newDoubleBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);

0 commit comments

Comments
 (0)