diff --git a/docs/changelog/132622.yaml b/docs/changelog/132622.yaml
new file mode 100644
index 0000000000000..497e0b3ba18a4
--- /dev/null
+++ b/docs/changelog/132622.yaml
@@ -0,0 +1,5 @@
+pr: 132622
+summary: Push down compute engine value loading of long based singleton numeric doc value to the es819 tsdb doc values codec.
+area: "Codec"
+type: enhancement
+issues: []
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/DocValuesForUtil.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/DocValuesForUtil.java
index db9c352ee30f8..04ac6fff604a9 100644
--- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/DocValuesForUtil.java
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/DocValuesForUtil.java
@@ -16,7 +16,7 @@
import java.io.IOException;
-public class DocValuesForUtil {
+public final class DocValuesForUtil {
private static final int BITS_IN_FOUR_BYTES = 4 * Byte.SIZE;
private static final int BITS_IN_FIVE_BYTES = 5 * Byte.SIZE;
private static final int BITS_IN_SIX_BYTES = 6 * Byte.SIZE;
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBDocValuesEncoder.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBDocValuesEncoder.java
index 18513c32003ab..5c369e8226f09 100644
--- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBDocValuesEncoder.java
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBDocValuesEncoder.java
@@ -54,7 +54,7 @@
*
* Of course, decoding follows the opposite order with respect to encoding.
*/
-public class TSDBDocValuesEncoder {
+public final class TSDBDocValuesEncoder {
private final DocValuesForUtil forUtil;
private final int numericBlockSize;
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BulkNumericDocValues.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BulkNumericDocValues.java
new file mode 100644
index 0000000000000..9ac2e636810b5
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BulkNumericDocValues.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.codec.tsdb.es819;
+
+import org.apache.lucene.index.NumericDocValues;
+import org.elasticsearch.index.mapper.BlockLoader;
+
+import java.io.IOException;
+
+/**
+ * An es819 doc values specialization that allows bulk loading of values that is optimized in the context of compute engine.
+ */
+public abstract class BulkNumericDocValues extends NumericDocValues {
+
+ /**
+ * Reads the values of all documents in {@code docs}.
+ */
+ public abstract BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException;
+
+}
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java
index 31d65bde1be0e..163e4c729bb95 100644
--- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java
@@ -43,6 +43,7 @@
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder;
+import org.elasticsearch.index.mapper.BlockLoader;
import java.io.IOException;
@@ -1140,7 +1141,7 @@ public long longValue() {
final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1;
if (entry.docsWithFieldOffset == -1) {
// dense
- return new NumericDocValues() {
+ return new BulkNumericDocValues() {
private final int maxDoc = ES819TSDBDocValuesProducer.this.maxDoc;
private int doc = -1;
@@ -1197,6 +1198,53 @@ public long longValue() throws IOException {
}
return currentBlock[blockInIndex];
}
+
+ @Override
+ public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
+ assert maxOrd == -1 : "unexpected maxOrd[" + maxOrd + "]";
+ final int docsCount = docs.count();
+ doc = docs.get(docsCount - 1);
+ try (BlockLoader.SingletonLongBuilder builder = factory.singletonLongs(docs.count() - offset)) {
+ for (int i = offset; i < docsCount;) {
+ int index = docs.get(i);
+ final int blockIndex = index >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT;
+ final int blockInIndex = index & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
+ if (blockIndex != currentBlockIndex) {
+ assert blockIndex > currentBlockIndex : blockIndex + " < " + currentBlockIndex;
+ // no need to seek if the loading block is the next block
+ if (currentBlockIndex + 1 != blockIndex) {
+ valuesData.seek(indexReader.get(blockIndex));
+ }
+ currentBlockIndex = blockIndex;
+ decoder.decode(valuesData, currentBlock);
+ }
+
+ // Try to append more than just one value:
+ // Instead of iterating over docs and find the max length, take an optimistic approach to avoid as
+ // many comparisons as there are remaining docs and instead do at most 7 comparisons:
+ int length = 1;
+ int remainingBlockLength = Math.min(ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE - blockInIndex, docsCount - i);
+ for (int newLength = remainingBlockLength; newLength > 1; newLength = newLength >> 1) {
+ int lastIndex = i + newLength - 1;
+ if (isDense(index, docs.get(lastIndex), newLength)) {
+ length = newLength;
+ break;
+ }
+ }
+ builder.appendLongs(currentBlock, blockInIndex, length);
+ i += length;
+ }
+ return builder.build();
+ }
+ }
+
+ static boolean isDense(int firstDocId, int lastDocId, int length) {
+ // This does not detect duplicate docids (e.g [1, 1, 2, 4] would be detected as dense),
+ // this can happen with enrich or lookup. However this codec isn't used for enrich / lookup.
+ // This codec is only used in the context of logsdb and tsdb, so this is fine here.
+ return lastDocId - firstDocId == length - 1;
+ }
+
};
} else {
final IndexedDISI disi = new IndexedDISI(
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java
index 809bad5145fe6..9e0574985f4db 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java
@@ -21,6 +21,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.index.IndexVersion;
+import org.elasticsearch.index.codec.tsdb.es819.BulkNumericDocValues;
import org.elasticsearch.index.mapper.BlockLoader.BlockFactory;
import org.elasticsearch.index.mapper.BlockLoader.BooleanBuilder;
import org.elasticsearch.index.mapper.BlockLoader.Builder;
@@ -84,6 +85,7 @@ public boolean supportsOrdinals() {
public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException {
throw new UnsupportedOperationException();
}
+
}
public static class LongsBlockLoader extends DocValuesBlockLoader {
@@ -116,8 +118,8 @@ public AllReader reader(LeafReaderContext context) throws IOException {
}
}
- private static class SingletonLongs extends BlockDocValuesReader {
- private final NumericDocValues numericDocValues;
+ static class SingletonLongs extends BlockDocValuesReader {
+ final NumericDocValues numericDocValues;
SingletonLongs(NumericDocValues numericDocValues) {
this.numericDocValues = numericDocValues;
@@ -125,6 +127,9 @@ private static class SingletonLongs extends BlockDocValuesReader {
@Override
public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+ if (numericDocValues instanceof BulkNumericDocValues bulkDv) {
+ return bulkDv.read(factory, docs, offset);
+ }
try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count() - offset)) {
int lastDoc = -1;
for (int i = offset; i < docs.count(); i++) {
@@ -164,7 +169,7 @@ public String toString() {
}
}
- private static class Longs extends BlockDocValuesReader {
+ static class Longs extends BlockDocValuesReader {
private final SortedNumericDocValues numericDocValues;
private int docID = -1;
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java
index a4a498e4048db..36c0b8bfb062f 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java
@@ -400,6 +400,17 @@ interface BlockFactory {
*/
LongBuilder longs(int expectedCount);
+ /**
+ * Build a specialized builder for singleton dense long based fields with the following constraints:
+ *
+ * - Only one value per document can be collected
+ * - No more than expectedCount values can be collected
+ *
+ *
+ * @param expectedCount The maximum number of values to be collected.
+ */
+ SingletonLongBuilder singletonLongs(int expectedCount);
+
/**
* Build a builder to load only {@code null}s.
*/
@@ -498,6 +509,16 @@ interface IntBuilder extends Builder {
IntBuilder appendInt(int value);
}
+ /**
+ * Specialized builder for collecting dense arrays of long values.
+ */
+ interface SingletonLongBuilder extends Builder {
+
+ SingletonLongBuilder appendLong(long value);
+
+ SingletonLongBuilder appendLongs(long[] values, int from, int length);
+ }
+
interface LongBuilder extends Builder {
/**
* Appends a long to the current entry.
diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java
index 46b46fda11d56..6eb291c386885 100644
--- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java
+++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java
@@ -27,6 +27,7 @@
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
@@ -36,6 +37,7 @@
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec;
import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests;
+import org.elasticsearch.index.mapper.TestBlock;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
@@ -705,14 +707,277 @@ public DocValuesFormat getDocValuesFormatForField(String field) {
}
}
+ public void testBulkLoading() throws Exception {
+ final String counterField = "counter";
+ final String timestampField = "@timestamp";
+ final String gaugeField = "gauge";
+ long currentTimestamp = 1704067200000L;
+ long currentCounter = 10_000_000;
+
+ var config = getTimeSeriesIndexWriterConfig(null, timestampField);
+ try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
+ long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
+ int numDocs = 256 + random().nextInt(8096);
+
+ for (int i = 0; i < numDocs; i++) {
+ var d = new Document();
+ long timestamp = currentTimestamp;
+ // Index sorting doesn't work with NumericDocValuesField:
+ d.add(SortedNumericDocValuesField.indexedField(timestampField, timestamp));
+ d.add(new SortedNumericDocValuesField(counterField, currentCounter));
+ d.add(new SortedNumericDocValuesField(gaugeField, gauge1Values[i % gauge1Values.length]));
+
+ iw.addDocument(d);
+ if (i % 100 == 0) {
+ iw.commit();
+ }
+ if (i < numDocs - 1) {
+ currentTimestamp += 1000L;
+ currentCounter++;
+ }
+ }
+ iw.commit();
+ var factory = TestBlock.factory();
+ final long lastIndexedTimestamp = currentTimestamp;
+ final long lastIndexedCounter = currentCounter;
+ try (var reader = DirectoryReader.open(iw)) {
+ int gaugeIndex = numDocs;
+ for (var leaf : reader.leaves()) {
+ var timestampDV = getBulkNumericDocValues(leaf.reader(), timestampField);
+ var counterDV = getBulkNumericDocValues(leaf.reader(), counterField);
+ var gaugeDV = getBulkNumericDocValues(leaf.reader(), gaugeField);
+ int maxDoc = leaf.reader().maxDoc();
+ for (int i = 0; i < maxDoc;) {
+ int size = Math.max(1, random().nextInt(0, maxDoc - i));
+ var docs = TestBlock.docs(IntStream.range(i, i + size).toArray());
+
+ {
+ // bulk loading timestamp:
+ var block = (TestBlock) timestampDV.read(factory, docs, 0);
+ assertEquals(size, block.size());
+ for (int j = 0; j < block.size(); j++) {
+ long actualTimestamp = (long) block.get(j);
+ long expectedTimestamp = currentTimestamp;
+ assertEquals(expectedTimestamp, actualTimestamp);
+ currentTimestamp -= 1000L;
+ }
+ }
+ {
+ // bulk loading counter field:
+ var block = (TestBlock) counterDV.read(factory, docs, 0);
+ assertEquals(size, block.size());
+ for (int j = 0; j < block.size(); j++) {
+ long actualCounter = (long) block.get(j);
+ long expectedCounter = currentCounter;
+ assertEquals(expectedCounter, actualCounter);
+ currentCounter--;
+ }
+ }
+ {
+ // bulk loading gauge field:
+ var block = (TestBlock) gaugeDV.read(factory, docs, 0);
+ assertEquals(size, block.size());
+ for (int j = 0; j < block.size(); j++) {
+ long actualGauge = (long) block.get(j);
+ long expectedGauge = gauge1Values[--gaugeIndex % gauge1Values.length];
+ assertEquals(expectedGauge, actualGauge);
+ }
+ }
+
+ i += size;
+ }
+ }
+ }
+
+ // Now bulk reader from one big segment and use random offset:
+ iw.forceMerge(1);
+ var blockFactory = TestBlock.factory();
+ try (var reader = DirectoryReader.open(iw)) {
+ int randomOffset = random().nextInt(numDocs / 4);
+ currentTimestamp = lastIndexedTimestamp - (randomOffset * 1000L);
+ currentCounter = lastIndexedCounter - randomOffset;
+ assertEquals(1, reader.leaves().size());
+ assertEquals(numDocs, reader.maxDoc());
+ var leafReader = reader.leaves().get(0).reader();
+ int maxDoc = leafReader.maxDoc();
+ int size = maxDoc - randomOffset;
+ int gaugeIndex = size;
+
+ var timestampDV = getBulkNumericDocValues(leafReader, timestampField);
+ var counterDV = getBulkNumericDocValues(leafReader, counterField);
+ var gaugeDV = getBulkNumericDocValues(leafReader, gaugeField);
+
+ var docs = TestBlock.docs(IntStream.range(0, maxDoc).toArray());
+
+ {
+ // bulk loading timestamp:
+ var block = (TestBlock) timestampDV.read(blockFactory, docs, randomOffset);
+ assertEquals(size, block.size());
+ for (int j = 0; j < block.size(); j++) {
+ long actualTimestamp = (long) block.get(j);
+ long expectedTimestamp = currentTimestamp;
+ assertEquals(expectedTimestamp, actualTimestamp);
+ currentTimestamp -= 1000L;
+ }
+ }
+ {
+ // bulk loading counter field:
+ var block = (TestBlock) counterDV.read(factory, docs, randomOffset);
+ assertEquals(size, block.size());
+ for (int j = 0; j < block.size(); j++) {
+ long actualCounter = (long) block.get(j);
+ long expectedCounter = currentCounter;
+ assertEquals(expectedCounter, actualCounter);
+ currentCounter--;
+ }
+ }
+ {
+ // bulk loading gauge field:
+ var block = (TestBlock) gaugeDV.read(factory, docs, randomOffset);
+ assertEquals(size, block.size());
+ for (int j = 0; j < block.size(); j++) {
+ long actualGauge = (long) block.get(j);
+ long expectedGauge = gauge1Values[--gaugeIndex % gauge1Values.length];
+ assertEquals(expectedGauge, actualGauge);
+ }
+ }
+
+ // And finally docs with gaps:
+ docs = TestBlock.docs(IntStream.range(0, maxDoc).filter(docId -> docId == 0 || docId % 64 != 0).toArray());
+ size = docs.count();
+ // Test against values loaded using normal doc value apis:
+ long[] expectedCounters = new long[size];
+ counterDV = getBulkNumericDocValues(leafReader, counterField);
+ for (int i = 0; i < docs.count(); i++) {
+ int docId = docs.get(i);
+ counterDV.advanceExact(docId);
+ expectedCounters[i] = counterDV.longValue();
+ }
+ counterDV = getBulkNumericDocValues(leafReader, counterField);
+ {
+ // bulk loading counter field:
+ var block = (TestBlock) counterDV.read(factory, docs, 0);
+ assertEquals(size, block.size());
+ for (int j = 0; j < block.size(); j++) {
+ long actualCounter = (long) block.get(j);
+ long expectedCounter = expectedCounters[j];
+ assertEquals(expectedCounter, actualCounter);
+ }
+ }
+ }
+ }
+ }
+
+ public void testBulkLoadingWithSparseDocs() throws Exception {
+ final String counterField = "counter";
+ final String timestampField = "@timestamp";
+ String queryField = "query_field";
+ long currentTimestamp = 1704067200000L;
+ long currentCounter = 10_000_000;
+
+ var config = getTimeSeriesIndexWriterConfig(null, timestampField);
+ try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
+ int numDocsPerQValue = 120;
+ int numDocs = numDocsPerQValue * (1 + random().nextInt(40));
+
+ long q = 1;
+ for (int i = 1; i <= numDocs; i++) {
+ var d = new Document();
+ long timestamp = currentTimestamp;
+ // Index sorting doesn't work with NumericDocValuesField:
+ d.add(SortedNumericDocValuesField.indexedField(timestampField, timestamp));
+ d.add(new SortedNumericDocValuesField(counterField, currentCounter));
+ d.add(new SortedNumericDocValuesField(queryField, q));
+ if (i % 120 == 0) {
+ q++;
+ }
+
+ iw.addDocument(d);
+ if (i % 100 == 0) {
+ iw.commit();
+ }
+ if (i < numDocs - 1) {
+ currentTimestamp += 1000L;
+ currentCounter++;
+ }
+ }
+ iw.commit();
+
+ // Now bulk reader from one big segment and use random offset:
+ iw.forceMerge(1);
+ var factory = TestBlock.factory();
+ try (var reader = DirectoryReader.open(iw)) {
+ assertEquals(1, reader.leaves().size());
+ assertEquals(numDocs, reader.maxDoc());
+ var leafReader = reader.leaves().get(0).reader();
+
+ for (int query = 1; query < q; query++) {
+ IndexSearcher searcher = new IndexSearcher(reader);
+ var topDocs = searcher.search(
+ SortedNumericDocValuesField.newSlowExactQuery(queryField, query),
+ numDocsPerQValue,
+ new Sort(SortField.FIELD_DOC),
+ false
+ );
+ assertEquals(numDocsPerQValue, topDocs.totalHits.value());
+ var timestampDV = getBulkNumericDocValues(leafReader, timestampField);
+ long[] expectedTimestamps = new long[numDocsPerQValue];
+ var counterDV = getBulkNumericDocValues(leafReader, counterField);
+ long[] expectedCounters = new long[numDocsPerQValue];
+ int[] docIds = new int[numDocsPerQValue];
+ for (int i = 0; i < topDocs.scoreDocs.length; i++) {
+ var scoreDoc = topDocs.scoreDocs[i];
+ docIds[i] = scoreDoc.doc;
+
+ assertTrue(timestampDV.advanceExact(scoreDoc.doc));
+ expectedTimestamps[i] = timestampDV.longValue();
+
+ assertTrue(counterDV.advanceExact(scoreDoc.doc));
+ expectedCounters[i] = counterDV.longValue();
+ }
+
+ var docs = TestBlock.docs(docIds);
+ {
+ timestampDV = getBulkNumericDocValues(leafReader, timestampField);
+ var block = (TestBlock) timestampDV.read(factory, docs, 0);
+ assertEquals(numDocsPerQValue, block.size());
+ for (int j = 0; j < block.size(); j++) {
+ long actualTimestamp = (long) block.get(j);
+ long expectedTimestamp = expectedTimestamps[j];
+ assertEquals(expectedTimestamp, actualTimestamp);
+ }
+ }
+ {
+ counterDV = getBulkNumericDocValues(leafReader, counterField);
+ var block = (TestBlock) counterDV.read(factory, docs, 0);
+ assertEquals(numDocsPerQValue, block.size());
+ for (int j = 0; j < block.size(); j++) {
+ long actualCounter = (long) block.get(j);
+ long expectedCounter = expectedCounters[j];
+ assertEquals(expectedCounter, actualCounter);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private static BulkNumericDocValues getBulkNumericDocValues(LeafReader leafReader, String counterField) throws IOException {
+ return (BulkNumericDocValues) DocValues.unwrapSingleton(leafReader.getSortedNumericDocValues(counterField));
+ }
+
private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) {
var config = new IndexWriterConfig();
- config.setIndexSort(
- new Sort(
- new SortField(hostnameField, SortField.Type.STRING, false),
- new SortedNumericSortField(timestampField, SortField.Type.LONG, true)
- )
- );
+ if (hostnameField != null) {
+ config.setIndexSort(
+ new Sort(
+ new SortField(hostnameField, SortField.Type.STRING, false),
+ new SortedNumericSortField(timestampField, SortField.Type.LONG, true)
+ )
+ );
+ } else {
+ config.setIndexSort(new Sort(new SortedNumericSortField(timestampField, SortField.Type.LONG, true)));
+ }
config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER);
config.setMergePolicy(new LogByteSizeMergePolicy());
config.setCodec(getCodec());
diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java
index fdf2503036d8b..f0e384d1cf272 100644
--- a/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java
+++ b/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java
@@ -9,15 +9,30 @@
package org.elasticsearch.index.mapper;
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocValuesType;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.tests.analysis.MockAnalyzer;
+import org.apache.lucene.tests.util.TestUtil;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Strings;
+import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
+import org.elasticsearch.index.codec.tsdb.es819.BulkNumericDocValues;
+import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
import org.elasticsearch.index.mapper.DateFieldMapper.DateFieldType;
import org.elasticsearch.script.DateFieldScript;
import org.elasticsearch.script.ScriptService;
@@ -33,6 +48,7 @@
import java.time.ZonedDateTime;
import java.util.Comparator;
import java.util.List;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
@@ -41,11 +57,13 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class DateFieldMapperTests extends MapperTestCase {
@@ -786,4 +804,107 @@ public void testLegacyDateFormatName() {
);
}
+
+ protected boolean supportsBulkBlockReading() {
+ return true;
+ }
+
+ public void testSingletonLongBulkBlockReadingManyValues() throws Exception {
+ final String mappings = """
+ {
+ "_doc" : {
+ "properties": {
+ "@timestamp": {
+ "type": "date",
+ "ignore_malformed": false
+ }
+ }
+ }
+ }
+ """;
+ Settings settings = indexSettings(IndexVersion.current(), 1, 1).put("index.mode", "logsdb").build();
+ var mapperService = createMapperService(settings, mappings);
+ try (Directory directory = newDirectory()) {
+ int from = 0;
+ int to = 10_000;
+ IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
+ iwc.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER);
+ iwc.setIndexSort(new Sort(new SortField("@timestamp", SortField.Type.LONG, true)));
+ iwc.setCodec(TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()));
+ try (IndexWriter iw = new IndexWriter(directory, iwc)) {
+ for (long i = from; i < to; i++) {
+ LuceneDocument doc = new LuceneDocument();
+ doc.add(new NumericDocValuesField("@timestamp", i));
+ iw.addDocument(doc);
+ }
+ iw.forceMerge(1);
+ }
+ var mockBlockContext = mock(MappedFieldType.BlockLoaderContext.class);
+ IndexMetadata indexMetadata = new IndexMetadata.Builder("index").settings(settings).build();
+ IndexSettings indexSettings = new IndexSettings(indexMetadata, settings);
+ when(mockBlockContext.indexSettings()).thenReturn(indexSettings);
+ var blockLoader = mapperService.fieldType("@timestamp").blockLoader(mockBlockContext);
+ try (DirectoryReader reader = DirectoryReader.open(directory)) {
+ LeafReaderContext context = reader.leaves().get(0);
+ {
+ // One big doc block
+ var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
+ assertThat(columnReader.numericDocValues, instanceOf(BulkNumericDocValues.class));
+ var docBlock = TestBlock.docs(IntStream.range(from, to).toArray());
+ var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
+ assertThat(block.size(), equalTo(to - from));
+ for (int i = 0; i < block.size(); i++) {
+ assertThat(block.get(i), equalTo(to - i - 1L));
+ }
+ }
+ {
+ // Smaller doc blocks
+ int docBlockSize = 1000;
+ var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
+ assertThat(columnReader.numericDocValues, instanceOf(BulkNumericDocValues.class));
+ for (int i = from; i < to; i += docBlockSize) {
+ var docBlock = TestBlock.docs(IntStream.range(i, i + docBlockSize).toArray());
+ var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
+ assertThat(block.size(), equalTo(docBlockSize));
+ for (int j = 0; j < block.size(); j++) {
+ long expected = to - ((long) docBlockSize * (i / docBlockSize)) - j - 1L;
+ assertThat(block.get(j), equalTo(expected));
+ }
+ }
+ }
+ {
+ // One smaller doc block:
+ var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
+ assertThat(columnReader.numericDocValues, instanceOf(BulkNumericDocValues.class));
+ var docBlock = TestBlock.docs(IntStream.range(1010, 2020).toArray());
+ var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
+ assertThat(block.size(), equalTo(1010));
+ for (int i = 0; i < block.size(); i++) {
+ long expected = 8990 - i - 1L;
+ assertThat(block.get(i), equalTo(expected));
+ }
+ }
+ {
+ // Read two tiny blocks:
+ var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
+ assertThat(columnReader.numericDocValues, instanceOf(BulkNumericDocValues.class));
+ var docBlock = TestBlock.docs(IntStream.range(32, 64).toArray());
+ var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
+ assertThat(block.size(), equalTo(32));
+ for (int i = 0; i < block.size(); i++) {
+ long expected = 9968 - i - 1L;
+ assertThat(block.get(i), equalTo(expected));
+ }
+
+ docBlock = TestBlock.docs(IntStream.range(64, 96).toArray());
+ block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
+ assertThat(block.size(), equalTo(32));
+ for (int i = 0; i < block.size(); i++) {
+ long expected = 9936 - i - 1L;
+ assertThat(block.get(i), equalTo(expected));
+ }
+ }
+ }
+ }
+ }
}
diff --git a/server/src/test/java/org/elasticsearch/index/mapper/LongFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/LongFieldMapperTests.java
index 9a0f4baa8f21a..0cdce5f1aa28a 100644
--- a/server/src/test/java/org/elasticsearch/index/mapper/LongFieldMapperTests.java
+++ b/server/src/test/java/org/elasticsearch/index/mapper/LongFieldMapperTests.java
@@ -162,4 +162,9 @@ public void execute() {
}
};
}
+
+ protected boolean supportsBulkBlockReading() {
+ return true;
+ }
+
}
diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java
index 7e127ba307942..04e158a703af2 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java
@@ -43,6 +43,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
+import org.elasticsearch.index.codec.tsdb.es819.BulkNumericDocValues;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshot;
import org.elasticsearch.index.fielddata.FieldDataContext;
@@ -91,7 +92,9 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -1498,6 +1501,112 @@ public void testSyntheticSourceKeepArrays() throws IOException {
assertThat(actual, equalTo(expected));
}
+ protected boolean supportsBulkBlockReading() {
+ return false;
+ }
+
+ protected Object[] getThreeSampleValues() {
+ return new Object[] { 1L, 2L, 3L };
+ }
+
+ protected Object[] getThreeEncodedSampleValues() {
+ return getThreeSampleValues();
+ }
+
+ public void testSingletonLongBulkBlockReading() throws IOException {
+ assumeTrue("field type supports bulk singleton long reading", supportsBulkBlockReading());
+ var settings = indexSettings(IndexVersion.current(), 1, 1).put("index.mode", "logsdb").build();
+ var mapperService = createMapperService(settings, fieldMapping(this::minimalMapping));
+ var mapper = mapperService.documentMapper();
+ var mockBlockContext = mock(MappedFieldType.BlockLoaderContext.class);
+ when(mockBlockContext.fieldExtractPreference()).thenReturn(MappedFieldType.FieldExtractPreference.DOC_VALUES);
+ IndexMetadata indexMetadata = new IndexMetadata.Builder("index").settings(settings).build();
+ IndexSettings indexSettings = new IndexSettings(indexMetadata, settings);
+ when(mockBlockContext.indexSettings()).thenReturn(indexSettings);
+
+ var sampleValuesForIndexing = getThreeSampleValues();
+ var expectedSampleValues = getThreeEncodedSampleValues();
+ {
+ // Dense
+ CheckedConsumer builder = iw -> {
+ for (int i = 0; i < 3; i++) {
+ var value = sampleValuesForIndexing[i];
+ var doc = mapper.parse(source(b -> b.field("@timestamp", 1L).field("field", "" + value))).rootDoc();
+ iw.addDocument(doc);
+ }
+ };
+ CheckedConsumer test = reader -> {
+ assertThat(reader.leaves(), hasSize(1));
+ assertThat(reader.numDocs(), equalTo(3));
+ LeafReaderContext context = reader.leaves().get(0);
+ var blockLoader = mapperService.fieldType("field").blockLoader(mockBlockContext);
+ var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
+ assertThat(columnReader.numericDocValues, instanceOf(BulkNumericDocValues.class));
+ var docBlock = TestBlock.docs(IntStream.range(0, 3).toArray());
+ var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
+ for (int i = 0; i < block.size(); i++) {
+ assertThat(block.get(i), equalTo(expectedSampleValues[i]));
+ }
+ };
+ withLuceneIndex(mapperService, builder, test);
+ }
+ {
+ // Sparse
+ CheckedConsumer builder = iw -> {
+ var doc = mapper.parse(source(b -> b.field("@timestamp", 1L).field("field", "" + sampleValuesForIndexing[0]))).rootDoc();
+ iw.addDocument(doc);
+ doc = mapper.parse(source(b -> b.field("@timestamp", 1L))).rootDoc();
+ iw.addDocument(doc);
+ doc = mapper.parse(source(b -> b.field("@timestamp", 1L).field("field", "" + sampleValuesForIndexing[2]))).rootDoc();
+ iw.addDocument(doc);
+ };
+ CheckedConsumer test = reader -> {
+ assertThat(reader.leaves(), hasSize(1));
+ assertThat(reader.numDocs(), equalTo(3));
+ LeafReaderContext context = reader.leaves().get(0);
+ var blockLoader = mapperService.fieldType("field").blockLoader(mockBlockContext);
+ var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
+ assertThat(columnReader.numericDocValues, not(instanceOf(BulkNumericDocValues.class)));
+ var docBlock = TestBlock.docs(IntStream.range(0, 3).toArray());
+ var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
+ assertThat(block.get(0), equalTo(expectedSampleValues[0]));
+ assertThat(block.get(1), nullValue());
+ assertThat(block.get(2), equalTo(expectedSampleValues[2]));
+ };
+ withLuceneIndex(mapperService, builder, test);
+ }
+ {
+ // Multi-value
+ CheckedConsumer builder = iw -> {
+ var doc = mapper.parse(source(b -> b.field("@timestamp", 1L).field("field", "" + sampleValuesForIndexing[0]))).rootDoc();
+ iw.addDocument(doc);
+ doc = mapper.parse(
+ source(
+ b -> b.field("@timestamp", 1L)
+ .field("field", List.of("" + sampleValuesForIndexing[0], "" + sampleValuesForIndexing[1]))
+ )
+ ).rootDoc();
+ iw.addDocument(doc);
+ doc = mapper.parse(source(b -> b.field("@timestamp", 1L).field("field", "" + sampleValuesForIndexing[2]))).rootDoc();
+ iw.addDocument(doc);
+ };
+ CheckedConsumer test = reader -> {
+ assertThat(reader.leaves(), hasSize(1));
+ assertThat(reader.numDocs(), equalTo(3));
+ LeafReaderContext context = reader.leaves().get(0);
+ var blockLoader = mapperService.fieldType("field").blockLoader(mockBlockContext);
+ var columnReader = blockLoader.columnAtATimeReader(context);
+ assertThat(columnReader, instanceOf(BlockDocValuesReader.Longs.class));
+ var docBlock = TestBlock.docs(IntStream.range(0, 3).toArray());
+ var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
+ assertThat(block.get(0), equalTo(expectedSampleValues[0]));
+ assertThat(block.get(1), equalTo(List.of(expectedSampleValues[0], expectedSampleValues[1])));
+ assertThat(block.get(2), equalTo(expectedSampleValues[2]));
+ };
+ withLuceneIndex(mapperService, builder, test);
+ }
+ }
+
protected String randomSyntheticSourceKeep() {
return randomFrom("all", "arrays");
}
diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java
index cb73dc96f69b2..f28aa7af3d228 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java
@@ -18,8 +18,10 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.stream.Collectors;
import static org.elasticsearch.test.ESTestCase.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -197,6 +199,51 @@ public LongsBuilder appendLong(long value) {
return new LongsBuilder();
}
+ @Override
+ public BlockLoader.SingletonLongBuilder singletonLongs(int expectedCount) {
+ final long[] values = new long[expectedCount];
+ return new BlockLoader.SingletonLongBuilder() {
+
+ private int count;
+
+ @Override
+ public BlockLoader.Block build() {
+ return new TestBlock(Arrays.stream(values).boxed().collect(Collectors.toUnmodifiableList()));
+ }
+
+ @Override
+ public BlockLoader.SingletonLongBuilder appendLongs(long[] newValues, int from, int length) {
+ System.arraycopy(newValues, from, values, count, length);
+ count += length;
+ return this;
+ }
+
+ @Override
+ public BlockLoader.SingletonLongBuilder appendLong(long value) {
+ values[count++] = value;
+ return this;
+ }
+
+ @Override
+ public BlockLoader.Builder appendNull() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BlockLoader.Builder beginPositionEntry() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BlockLoader.Builder endPositionEntry() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+
@Override
public BlockLoader.Builder nulls(int expectedCount) {
return longs(expectedCount);
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java
index c5e3628b268d4..5b23aceda7db1 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java
@@ -76,6 +76,11 @@ public BlockLoader.LongBuilder longs(int expectedCount) {
return factory.newLongBlockBuilder(expectedCount);
}
+ @Override
+ public BlockLoader.SingletonLongBuilder singletonLongs(int expectedCount) {
+ return new SingletonLongBuilder(expectedCount, factory);
+ }
+
@Override
public BlockLoader.Builder nulls(int expectedCount) {
return ElementType.NULL.newBlockBuilder(expectedCount, factory);
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/SingletonLongBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/SingletonLongBuilder.java
new file mode 100644
index 0000000000000..e82ed921d2c45
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/SingletonLongBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene.read;
+
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.index.mapper.BlockLoader;
+
+/**
+ * Like {@link org.elasticsearch.compute.data.LongBlockBuilder} but optimized for collecting dense single valued values.
+ * Additionally, this builder doesn't grow its array.
+ */
+public final class SingletonLongBuilder implements BlockLoader.SingletonLongBuilder, Releasable, Block.Builder {
+
+ private final long[] values;
+ private final BlockFactory blockFactory;
+
+ private int count;
+
+ public SingletonLongBuilder(int expectedCount, BlockFactory blockFactory) {
+ this.blockFactory = blockFactory;
+ blockFactory.adjustBreaker(valuesSize(expectedCount));
+ this.values = new long[expectedCount];
+ }
+
+ @Override
+ public Block.Builder appendNull() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Block.Builder beginPositionEntry() {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public Block.Builder endPositionEntry() {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public long estimatedBytes() {
+ return (long) values.length * Long.BYTES;
+ }
+
+ @Override
+ public Block build() {
+ return blockFactory.newLongArrayVector(values, count, 0L).asBlock();
+ }
+
+ @Override
+ public BlockLoader.SingletonLongBuilder appendLong(long value) {
+ values[count++] = value;
+ return this;
+ }
+
+ @Override
+ public BlockLoader.SingletonLongBuilder appendLongs(long[] values, int from, int length) {
+ System.arraycopy(values, from, this.values, count, length);
+ count += length;
+ return this;
+ }
+
+ @Override
+ public void close() {
+ blockFactory.adjustBreaker(-valuesSize(values.length));
+ }
+
+ static long valuesSize(int count) {
+ return (long) count * Long.BYTES;
+ }
+}
diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonLongBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonLongBuilderTests.java
new file mode 100644
index 0000000000000..e75a84e6e5202
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonLongBuilderTests.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene.read;
+
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.tests.analysis.MockAnalyzer;
+import org.apache.lucene.tests.util.TestUtil;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.LongVector;
+import org.elasticsearch.compute.test.ComputeTestCase;
+import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
+import org.elasticsearch.indices.CrankyCircuitBreakerService;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.test.MapMatcher.assertMap;
+import static org.elasticsearch.test.MapMatcher.matchesMap;
+import static org.hamcrest.Matchers.equalTo;
+
+public class SingletonLongBuilderTests extends ComputeTestCase {
+
+ public void testReader() throws IOException {
+ testRead(blockFactory());
+ }
+
+ public void testReadWithCranky() throws IOException {
+ var factory = crankyBlockFactory();
+ try {
+ testRead(factory);
+ // If we made it this far cranky didn't fail us!
+ } catch (CircuitBreakingException e) {
+ logger.info("cranky", e);
+ assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
+ }
+ assertThat(factory.breaker().getUsed(), equalTo(0L));
+ }
+
+ private void testRead(BlockFactory factory) throws IOException {
+ Long[] values = new Long[] { 1L, 2L, 3L, 4L };
+
+ int count = 1000;
+ try (Directory directory = newDirectory()) {
+ try (IndexWriter indexWriter = createIndexWriter(directory)) {
+ for (int i = 0; i < count; i++) {
+ Long v = values[i % values.length];
+ indexWriter.addDocument(List.of(new NumericDocValuesField("field", v)));
+ }
+ }
+ Map counts = new HashMap<>();
+ try (IndexReader reader = DirectoryReader.open(directory)) {
+ for (LeafReaderContext ctx : reader.leaves()) {
+ var docValues = ctx.reader().getNumericDocValues("field");
+ try (SingletonLongBuilder builder = new SingletonLongBuilder(ctx.reader().numDocs(), factory)) {
+ for (int i = 0; i < ctx.reader().maxDoc(); i++) {
+ assertThat(docValues.advanceExact(i), equalTo(true));
+ long value = docValues.longValue();
+ if (randomBoolean()) {
+ builder.appendLongs(new long[] { value }, 0, 1);
+ } else {
+ builder.appendLong(value);
+ }
+ }
+ try (LongVector build = (LongVector) builder.build().asVector()) {
+ for (int i = 0; i < build.getPositionCount(); i++) {
+ long key = build.getLong(i);
+ counts.merge(key, 1, Integer::sum);
+ }
+ }
+ }
+ }
+ }
+ int expectedCount = count / values.length;
+ assertMap(
+ counts,
+ matchesMap().entry(1L, expectedCount).entry(2L, expectedCount).entry(3L, expectedCount).entry(4L, expectedCount)
+ );
+ }
+ }
+
+ public void testMoreValues() throws IOException {
+ int count = 1_000;
+ try (Directory directory = newDirectory()) {
+ try (IndexWriter indexWriter = createIndexWriter(directory)) {
+ for (int i = 0; i < count; i++) {
+ indexWriter.addDocument(List.of(new NumericDocValuesField("field", i)));
+ }
+ indexWriter.forceMerge(1);
+ }
+ try (IndexReader reader = DirectoryReader.open(directory)) {
+ assertThat(reader.leaves().size(), equalTo(1));
+ LeafReader leafReader = reader.leaves().get(0).reader();
+ var docValues = leafReader.getNumericDocValues("field");
+ int offset = 850;
+ try (SingletonLongBuilder builder = new SingletonLongBuilder(count - offset, blockFactory())) {
+ for (int i = offset; i < leafReader.maxDoc(); i++) {
+ assertThat(docValues.advanceExact(i), equalTo(true));
+ long value = docValues.longValue();
+ if (randomBoolean()) {
+ builder.appendLongs(new long[] { value }, 0, 1);
+ } else {
+ builder.appendLong(value);
+ }
+ }
+ try (LongVector build = (LongVector) builder.build().asVector()) {
+ assertThat(build.getPositionCount(), equalTo(count - offset));
+ for (int i = 0; i < build.getPositionCount(); i++) {
+ Long key = build.getLong(i);
+ assertThat(key, equalTo((long) offset + i));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ static IndexWriter createIndexWriter(Directory directory) throws IOException {
+ IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
+ iwc.setCodec(TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()));
+ return new IndexWriter(directory, iwc);
+ }
+
+}
diff --git a/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongFieldMapperTests.java b/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongFieldMapperTests.java
index d2aabf8b82400..538d8efe97446 100644
--- a/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongFieldMapperTests.java
+++ b/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongFieldMapperTests.java
@@ -31,6 +31,7 @@
import java.io.IOException;
import java.math.BigInteger;
import java.time.Instant;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -509,4 +510,16 @@ public List invalidExample() {
return List.of();
}
}
+
+ @Override
+ protected Object[] getThreeEncodedSampleValues() {
+ return Arrays.stream(super.getThreeEncodedSampleValues())
+ .map(v -> UnsignedLongFieldMapper.sortableSignedLongToUnsigned((Long) v))
+ .toArray();
+ }
+
+ @Override
+ protected boolean supportsBulkBlockReading() {
+ return true;
+ }
}
diff --git a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/index/mapper/PointFieldMapperTests.java b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/index/mapper/PointFieldMapperTests.java
index ac9d6a0f5f87b..5faccab610530 100644
--- a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/index/mapper/PointFieldMapperTests.java
+++ b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/index/mapper/PointFieldMapperTests.java
@@ -536,4 +536,21 @@ public void testSyntheticSourceKeepArrays() {
protected IngestScriptSupport ingestScriptSupport() {
throw new AssumptionViolatedException("not supported");
}
+
+ protected Object[] getThreeSampleValues() {
+ return new Object[] { "1,1", "1,2", "1,3" };
+ }
+
+ @Override
+ protected Object[] getThreeEncodedSampleValues() {
+ return new Object[] {
+ new PointFieldMapper.XYFieldWithDocValues(FIELD_NAME, 1f, 1f).numericValue(),
+ new PointFieldMapper.XYFieldWithDocValues(FIELD_NAME, 1f, 2f).numericValue(),
+ new PointFieldMapper.XYFieldWithDocValues(FIELD_NAME, 1f, 3f).numericValue() };
+ }
+
+ @Override
+ protected boolean supportsBulkBlockReading() {
+ return true;
+ }
}