Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/133397.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 133397
summary: Push down loading of singleton dense double based field types to the …
area: "Compute Engine, Codec"
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder;
import org.elasticsearch.index.mapper.BlockDocValuesReader;
import org.elasticsearch.index.mapper.BlockLoader;

import java.io.IOException;
Expand Down Expand Up @@ -383,7 +384,12 @@ public long cost() {
}

@Override
public BlockLoader.Block tryRead(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
public BlockLoader.Block tryRead(
BlockLoader.BlockFactory factory,
BlockLoader.Docs docs,
int offset,
BlockDocValuesReader.ToDouble toDouble
) throws IOException {
if (ords instanceof BaseDenseNumericValues denseOrds) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert toDouble is null here?

var block = tryReadAHead(factory, docs, offset);
if (block != null) {
Expand Down Expand Up @@ -457,7 +463,12 @@ public TermsEnum termsEnum() throws IOException {
}

@Override
public BlockLoader.Block tryRead(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
public BlockLoader.Block tryRead(
BlockLoader.BlockFactory factory,
BlockLoader.Docs docs,
int offset,
BlockDocValuesReader.ToDouble toDouble
) throws IOException {
return null;
}

Expand Down Expand Up @@ -504,7 +515,12 @@ public final long cost() {
}

@Override
public BlockLoader.Block tryRead(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
public BlockLoader.Block tryRead(
BlockLoader.BlockFactory factory,
BlockLoader.Docs docs,
int offset,
BlockDocValuesReader.ToDouble toDouble
) throws IOException {
return null;
}

Expand Down Expand Up @@ -1365,7 +1381,18 @@ public long longValue() throws IOException {
}

@Override
public BlockLoader.Block tryRead(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
public BlockLoader.Block tryRead(
BlockLoader.BlockFactory factory,
BlockLoader.Docs docs,
int offset,
BlockDocValuesReader.ToDouble toDouble
) throws IOException {
if (toDouble != null) {
try (BlockLoader.SingletonDoubleBuilder builder = factory.singletonDoubles(docs.count() - offset)) {
SingletonLongToDoubleDelegate delegate = new SingletonLongToDoubleDelegate(builder, toDouble);
return tryRead(delegate, docs, offset);
}
}
try (BlockLoader.SingletonLongBuilder builder = factory.singletonLongs(docs.count() - offset)) {
return tryRead(builder, docs, offset);
}
Expand Down Expand Up @@ -1772,4 +1799,55 @@ public BlockLoader.Builder endPositionEntry() {
public void close() {}
}

// Block builder that consumes long values and converts them to double using the provided converter function.
static final class SingletonLongToDoubleDelegate implements BlockLoader.SingletonLongBuilder {
private final BlockLoader.SingletonDoubleBuilder doubleBuilder;
private final BlockDocValuesReader.ToDouble toDouble;
private final double[] buffer = new double[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE];

// The passed builder is used to store the converted double values and produce the final block containing them.
SingletonLongToDoubleDelegate(BlockLoader.SingletonDoubleBuilder doubleBuilder, BlockDocValuesReader.ToDouble toDouble) {
this.doubleBuilder = doubleBuilder;
this.toDouble = toDouble;
}

@Override
public BlockLoader.SingletonLongBuilder appendLong(long value) {
throw new UnsupportedOperationException();
}

@Override
public BlockLoader.SingletonLongBuilder appendLongs(long[] values, int from, int length) {
assert length <= buffer.length : "length " + length + " > " + buffer.length;
for (int i = 0; i < length; i++) {
buffer[i] = toDouble.convert(values[from + i]);
}
doubleBuilder.appendDoubles(buffer, 0, length);
return this;
}

@Override
public BlockLoader.Block build() {
return doubleBuilder.build();
}

@Override
public BlockLoader.Builder appendNull() {
throw new UnsupportedOperationException();
}

@Override
public BlockLoader.Builder beginPositionEntry() {
throw new UnsupportedOperationException();
}

@Override
public BlockLoader.Builder endPositionEntry() {
throw new UnsupportedOperationException();
}

@Override
public void close() {}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ static class SingletonLongs extends BlockDocValuesReader {
@Override
public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException {
if (numericDocValues instanceof BlockLoader.OptionalColumnAtATimeReader direct) {
BlockLoader.Block result = direct.tryRead(factory, docs, offset);
BlockLoader.Block result = direct.tryRead(factory, docs, offset, null);
if (result != null) {
return result;
}
Expand Down Expand Up @@ -398,6 +398,12 @@ private static class SingletonDoubles extends BlockDocValuesReader {

@Override
public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException {
if (docValues instanceof BlockLoader.OptionalColumnAtATimeReader direct) {
BlockLoader.Block result = direct.tryRead(factory, docs, offset, toDouble);
if (result != null) {
return result;
}
}
try (BlockLoader.DoubleBuilder builder = factory.doublesFromDocValues(docs.count() - offset)) {
for (int i = offset; i < docs.count(); i++) {
int doc = docs.get(i);
Expand Down Expand Up @@ -715,7 +721,7 @@ public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset, boole
return readSingleDoc(factory, docs.get(offset));
}
if (ordinals instanceof BlockLoader.OptionalColumnAtATimeReader direct) {
BlockLoader.Block block = direct.tryRead(factory, docs, offset);
BlockLoader.Block block = direct.tryRead(factory, docs, offset, null);
if (block != null) {
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ interface OptionalColumnAtATimeReader {
/**
* Attempts to read the values of all documents in {@code docs}
* Returns {@code null} if unable to load the values.
*
* @param toDouble a function to convert long values to double, or null if no conversion is needed/supported
*/
@Nullable
BlockLoader.Block tryRead(BlockFactory factory, Docs docs, int offset) throws IOException;
BlockLoader.Block tryRead(BlockFactory factory, Docs docs, int offset, BlockDocValuesReader.ToDouble toDouble) throws IOException;
}

interface RowStrideReader extends Reader {
Expand Down Expand Up @@ -435,6 +437,17 @@ interface BlockFactory {
*/
SingletonLongBuilder singletonLongs(int expectedCount);

/**
* Build a specialized builder for singleton dense double based fields with the following constraints:
* <ul>
* <li>Only one value per document can be collected</li>
* <li>No more than expectedCount values can be collected</li>
* </ul>
*
* @param expectedCount The maximum number of values to be collected.
*/
SingletonDoubleBuilder singletonDoubles(int expectedCount);

/**
* Build a builder to load only {@code null}s.
*/
Expand Down Expand Up @@ -537,12 +550,20 @@ interface IntBuilder extends Builder {
* Specialized builder for collecting dense arrays of long values.
*/
interface SingletonLongBuilder extends Builder {

SingletonLongBuilder appendLong(long value);

SingletonLongBuilder appendLongs(long[] values, int from, int length);
}

/**
* Specialized builder for collecting dense arrays of double values.
*/
interface SingletonDoubleBuilder extends Builder {
SingletonDoubleBuilder appendDouble(double value);

SingletonDoubleBuilder appendDoubles(double[] values, int from, int length);
}

interface LongBuilder extends Builder {
/**
* Appends a long to the current entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ public void testOptionalColumnAtATimeReader() throws Exception {

{
// bulk loading timestamp:
var block = (TestBlock) timestampDV.tryRead(factory, docs, 0);
var block = (TestBlock) timestampDV.tryRead(factory, docs, 0, null);
assertNotNull(block);
assertEquals(size, block.size());
for (int j = 0; j < block.size(); j++) {
Expand All @@ -785,10 +785,10 @@ public void testOptionalColumnAtATimeReader() throws Exception {
}
{
// bulk loading counter field:
var block = (TestBlock) counterDV.tryRead(factory, docs, 0);
var block = (TestBlock) counterDV.tryRead(factory, docs, 0, null);
assertNotNull(block);
assertEquals(size, block.size());
var stringBlock = (TestBlock) stringCounterDV.tryRead(factory, docs, 0);
var stringBlock = (TestBlock) stringCounterDV.tryRead(factory, docs, 0, null);
assertNotNull(stringBlock);
assertEquals(size, stringBlock.size());
for (int j = 0; j < block.size(); j++) {
Expand All @@ -805,7 +805,7 @@ public void testOptionalColumnAtATimeReader() throws Exception {
}
{
// bulk loading gauge field:
var block = (TestBlock) gaugeDV.tryRead(factory, docs, 0);
var block = (TestBlock) gaugeDV.tryRead(factory, docs, 0, null);
assertNotNull(block);
assertEquals(size, block.size());
for (int j = 0; j < block.size(); j++) {
Expand Down Expand Up @@ -843,7 +843,7 @@ public void testOptionalColumnAtATimeReader() throws Exception {

{
// bulk loading timestamp:
var block = (TestBlock) timestampDV.tryRead(blockFactory, docs, randomOffset);
var block = (TestBlock) timestampDV.tryRead(blockFactory, docs, randomOffset, null);
assertNotNull(block);
assertEquals(size, block.size());
for (int j = 0; j < block.size(); j++) {
Expand All @@ -855,11 +855,11 @@ public void testOptionalColumnAtATimeReader() throws Exception {
}
{
// bulk loading counter field:
var block = (TestBlock) counterDV.tryRead(factory, docs, randomOffset);
var block = (TestBlock) counterDV.tryRead(factory, docs, randomOffset, null);
assertNotNull(block);
assertEquals(size, block.size());

var stringBlock = (TestBlock) stringCounterDV.tryRead(factory, docs, randomOffset);
var stringBlock = (TestBlock) stringCounterDV.tryRead(factory, docs, randomOffset, null);
assertNotNull(stringBlock);
assertEquals(size, stringBlock.size());

Expand All @@ -877,7 +877,7 @@ public void testOptionalColumnAtATimeReader() throws Exception {
}
{
// bulk loading gauge field:
var block = (TestBlock) gaugeDV.tryRead(factory, docs, randomOffset);
var block = (TestBlock) gaugeDV.tryRead(factory, docs, randomOffset, null);
assertNotNull(block);
assertEquals(size, block.size());
for (int j = 0; j < block.size(); j++) {
Expand All @@ -902,11 +902,11 @@ public void testOptionalColumnAtATimeReader() throws Exception {
stringCounterDV = getBaseSortedDocValues(leafReader, counterFieldAsString);
{
// bulk loading counter field:
var block = (TestBlock) counterDV.tryRead(factory, docs, 0);
var block = (TestBlock) counterDV.tryRead(factory, docs, 0, null);
assertNotNull(block);
assertEquals(size, block.size());

var stringBlock = (TestBlock) stringCounterDV.tryRead(factory, docs, 0);
var stringBlock = (TestBlock) stringCounterDV.tryRead(factory, docs, 0, null);
assertNotNull(stringBlock);
assertEquals(size, stringBlock.size());

Expand Down Expand Up @@ -1001,7 +1001,7 @@ public void testOptionalColumnAtATimeReaderWithSparseDocs() throws Exception {
var docs = TestBlock.docs(docIds);
{
timestampDV = getBaseDenseNumericValues(leafReader, timestampField);
var block = (TestBlock) timestampDV.tryRead(factory, docs, 0);
var block = (TestBlock) timestampDV.tryRead(factory, docs, 0, null);
assertNotNull(block);
assertEquals(numDocsPerQValue, block.size());
for (int j = 0; j < block.size(); j++) {
Expand All @@ -1012,7 +1012,7 @@ public void testOptionalColumnAtATimeReaderWithSparseDocs() throws Exception {
}
{
counterDV = getBaseDenseNumericValues(leafReader, counterField);
var block = (TestBlock) counterDV.tryRead(factory, docs, 0);
var block = (TestBlock) counterDV.tryRead(factory, docs, 0, null);
assertNotNull(block);
assertEquals(numDocsPerQValue, block.size());
for (int j = 0; j < block.size(); j++) {
Expand All @@ -1023,7 +1023,7 @@ public void testOptionalColumnAtATimeReaderWithSparseDocs() throws Exception {
}
{
counterAsStringDV = getBaseSortedDocValues(leafReader, counterAsStringField);
var block = (TestBlock) counterAsStringDV.tryRead(factory, docs, 0);
var block = (TestBlock) counterAsStringDV.tryRead(factory, docs, 0, null);
assertNotNull(block);
assertEquals(numDocsPerQValue, block.size());
for (int j = 0; j < block.size(); j++) {
Expand Down Expand Up @@ -1086,7 +1086,7 @@ public int get(int i) {
}
};
var idReader = ESTestCase.asInstanceOf(OptionalColumnAtATimeReader.class, leaf.reader().getNumericDocValues("id"));
TestBlock idBlock = (TestBlock) idReader.tryRead(factory, docs, 0);
TestBlock idBlock = (TestBlock) idReader.tryRead(factory, docs, 0, null);
assertNotNull(idBlock);

{
Expand All @@ -1100,7 +1100,7 @@ public int get(int i) {
block = (TestBlock) reader2.tryReadAHead(factory, docs, randomOffset);
} else {
assertNull(reader2.tryReadAHead(factory, docs, randomOffset));
block = (TestBlock) reader2.tryRead(factory, docs, randomOffset);
block = (TestBlock) reader2.tryRead(factory, docs, randomOffset, null);
}
assertNotNull(block);
assertThat(block.size(), equalTo(docs.count() - randomOffset));
Expand All @@ -1122,7 +1122,7 @@ public int get(int i) {
block = (TestBlock) reader3.tryReadAHead(factory, docs, randomOffset);
} else {
assertNull(reader3.tryReadAHead(factory, docs, randomOffset));
block = (TestBlock) reader3.tryRead(factory, docs, randomOffset);
block = (TestBlock) reader3.tryRead(factory, docs, randomOffset, null);
}
assertNotNull(reader3);
assertNotNull(block);
Expand Down
Loading