-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Push compute engine value loading for longs down to tsdb codec. #132622
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
dd4c80d
d8ea150
0d73a27
b32cf93
173f113
5d84f19
e6014ec
150e06a
ffd2de6
be0c77c
a5c877a
1c15d39
054b12e
f097f4a
c26e575
cc3614b
8636124
212ec5d
6ca5c66
2ef68f4
115e4e6
dcbcaf2
5e207e1
206703b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 132622 | ||
summary: Add bulk loading of dense singleton number doc values to tsdb codec and push compute engine value loading for longs down to tsdb codec | ||
area: "Codec" | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.index.codec.tsdb.es819; | ||
|
||
import org.apache.lucene.index.NumericDocValues; | ||
|
||
/** | ||
* An es819 doc values Specialization that allows retrieving a {@link BulkReader}. | ||
*/ | ||
public abstract class BulkNumericDocValues extends NumericDocValues { | ||
|
||
/** | ||
* @return a bulk reader instance or <code>null</code> if field or implementation doesn't support bulk loading. | ||
*/ | ||
public abstract BulkReader getBulkReader(); | ||
dnhatn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.index.codec.tsdb.es819; | ||
|
||
import org.elasticsearch.index.mapper.BlockLoader; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* Low level abstraction for es819 doc values to allow for bulk reading doc values into compute engine's block builders. | ||
*/ | ||
public interface BulkReader { | ||
|
||
/** | ||
* Appends values into the provided builder for the specified docs from the specified offset. | ||
*/ | ||
void bulkRead(BlockLoader.SingletonBulkLongBuilder builder, BlockLoader.Docs docs, int offset) throws IOException; | ||
|
||
/** | ||
* @return the current docid of this bulk reader. | ||
*/ | ||
int docID(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ | |
import org.apache.lucene.util.packed.PackedInts; | ||
import org.elasticsearch.core.IOUtils; | ||
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder; | ||
import org.elasticsearch.index.mapper.BlockLoader; | ||
|
||
import java.io.IOException; | ||
|
||
|
@@ -1140,13 +1141,14 @@ public long longValue() { | |
final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1; | ||
if (entry.docsWithFieldOffset == -1) { | ||
// dense | ||
return new NumericDocValues() { | ||
return new BulkNumericDocValues() { | ||
|
||
private final int maxDoc = ES819TSDBDocValuesProducer.this.maxDoc; | ||
private int doc = -1; | ||
private final TSDBDocValuesEncoder decoder = new TSDBDocValuesEncoder(ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE); | ||
private long currentBlockIndex = -1; | ||
private final long[] currentBlock = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE]; | ||
private BulkReader bulkReader; | ||
|
||
@Override | ||
public int docID() { | ||
|
@@ -1155,11 +1157,13 @@ public int docID() { | |
|
||
@Override | ||
public int nextDoc() throws IOException { | ||
assert bulkReader == null : "can't use this method if bulk loader has been initialized"; | ||
return advance(doc + 1); | ||
} | ||
|
||
@Override | ||
public int advance(int target) throws IOException { | ||
assert bulkReader == null : "can't use this method if bulk loader has been initialized"; | ||
if (target >= maxDoc) { | ||
return doc = NO_MORE_DOCS; | ||
} | ||
|
@@ -1168,6 +1172,7 @@ public int advance(int target) throws IOException { | |
|
||
@Override | ||
public boolean advanceExact(int target) { | ||
assert bulkReader == null : "can't use this method if bulk loader has been initialized"; | ||
doc = target; | ||
return true; | ||
} | ||
|
@@ -1179,6 +1184,7 @@ public long cost() { | |
|
||
@Override | ||
public long longValue() throws IOException { | ||
assert bulkReader == null : "can't use this method if bulk loader has been initialized"; | ||
final int index = doc; | ||
final int blockIndex = index >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT; | ||
final int blockInIndex = index & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; | ||
|
@@ -1197,6 +1203,61 @@ public long longValue() throws IOException { | |
} | ||
return currentBlock[blockInIndex]; | ||
} | ||
|
||
@Override | ||
public BulkReader getBulkReader() { | ||
if (bulkReader == null) { | ||
bulkReader = new BulkReader() { | ||
|
||
@Override | ||
public void bulkRead(BlockLoader.SingletonBulkLongBuilder builder, BlockLoader.Docs docs, int offset) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be more consistent to implement |
||
throws IOException { | ||
assert maxOrd == -1 : "unexpected maxOrd[" + maxOrd + "]"; | ||
final int docsCount = docs.count(); | ||
doc = docs.get(docsCount - 1); | ||
for (int i = offset; i < docsCount;) { | ||
int index = docs.get(i); | ||
final int blockIndex = index >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT; | ||
final int blockInIndex = index & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK; | ||
if (blockIndex != currentBlockIndex) { | ||
assert blockIndex > currentBlockIndex : blockIndex + " < " + currentBlockIndex; | ||
// no need to seek if the loading block is the next block | ||
if (currentBlockIndex + 1 != blockIndex) { | ||
valuesData.seek(indexReader.get(blockIndex)); | ||
} | ||
currentBlockIndex = blockIndex; | ||
decoder.decode(valuesData, currentBlock); | ||
} | ||
|
||
// Try to append more than just one value: | ||
// Instead of iterating over docs and find the max length, take an optimistic approach to avoid as | ||
// many comparisons as there are remaining docs and instead do at most 7 comparisons: | ||
int length = 1; | ||
int remainingBlockLength = ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE - blockInIndex; | ||
for (int newLength = remainingBlockLength; newLength > 1; newLength = newLength >> 1) { | ||
int lastIndex = i + newLength - 1; | ||
if (lastIndex < docsCount && isDense(index, docs.get(lastIndex), newLength)) { | ||
length = newLength; | ||
break; | ||
} | ||
} | ||
builder.appendLongs(currentBlock, blockInIndex, length); | ||
i += length; | ||
} | ||
} | ||
|
||
static boolean isDense(int firstDocId, int lastDocId, int length) { | ||
return lastDocId - firstDocId == length - 1; | ||
} | ||
|
||
@Override | ||
public int docID() { | ||
return doc; | ||
} | ||
}; | ||
} | ||
return bulkReader; | ||
} | ||
}; | ||
} else { | ||
final IndexedDISI disi = new IndexedDISI( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,10 @@ | |
import org.apache.lucene.index.SortedSetDocValues; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.elasticsearch.common.io.stream.ByteArrayStreamInput; | ||
import org.elasticsearch.index.IndexMode; | ||
import org.elasticsearch.index.IndexVersion; | ||
import org.elasticsearch.index.codec.tsdb.es819.BulkNumericDocValues; | ||
import org.elasticsearch.index.codec.tsdb.es819.BulkReader; | ||
import org.elasticsearch.index.mapper.BlockLoader.BlockFactory; | ||
import org.elasticsearch.index.mapper.BlockLoader.BooleanBuilder; | ||
import org.elasticsearch.index.mapper.BlockLoader.Builder; | ||
|
@@ -62,6 +65,12 @@ public abstract static class DocValuesBlockLoader implements BlockLoader { | |
|
||
@Override | ||
public final ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException { | ||
if (supportsOptimizedColumnAtTimeReader()) { | ||
var optimizedColumnAtTimeReader = optimizedColumnAtTimeReader(context); | ||
if (optimizedColumnAtTimeReader != null) { | ||
return optimizedColumnAtTimeReader; | ||
} | ||
} | ||
return reader(context); | ||
} | ||
|
||
|
@@ -84,13 +93,23 @@ public boolean supportsOrdinals() { | |
public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
protected boolean supportsOptimizedColumnAtTimeReader() { | ||
return false; | ||
} | ||
|
||
protected ColumnAtATimeReader optimizedColumnAtTimeReader(LeafReaderContext context) throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
} | ||
|
||
public static class LongsBlockLoader extends DocValuesBlockLoader { | ||
private final String fieldName; | ||
private final IndexMode indexMode; | ||
|
||
public LongsBlockLoader(String fieldName) { | ||
public LongsBlockLoader(String fieldName, IndexMode indexMode) { | ||
this.fieldName = fieldName; | ||
this.indexMode = indexMode; | ||
} | ||
|
||
@Override | ||
|
@@ -114,9 +133,57 @@ public AllReader reader(LeafReaderContext context) throws IOException { | |
} | ||
return new ConstantNullsReader(); | ||
} | ||
|
||
@Override | ||
protected boolean supportsOptimizedColumnAtTimeReader() { | ||
return indexMode.supportOptimizedDocValueLoading(); | ||
} | ||
|
||
protected ColumnAtATimeReader optimizedColumnAtTimeReader(LeafReaderContext context) throws IOException { | ||
NumericDocValues singleton = context.reader().getNumericDocValues(fieldName); | ||
if (singleton == null) { | ||
SortedNumericDocValues docValues = context.reader().getSortedNumericDocValues(fieldName); | ||
singleton = DocValues.unwrapSingleton(docValues); | ||
} | ||
|
||
if (singleton instanceof BulkNumericDocValues bulkDv) { | ||
var bulkLoader = bulkDv.getBulkReader(); | ||
return new BulkSingletonLong(bulkLoader); | ||
} | ||
|
||
return null; | ||
} | ||
} | ||
|
||
static final class BulkSingletonLong implements BlockLoader.ColumnAtATimeReader { | ||
private final Thread creationThread; | ||
private final BulkReader bulkReader; | ||
|
||
BulkSingletonLong(BulkReader bulkReader) { | ||
this.creationThread = Thread.currentThread(); | ||
this.bulkReader = bulkReader; | ||
} | ||
|
||
@Override | ||
public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { | ||
try (BlockLoader.SingletonBulkLongBuilder builder = factory.singletonLongs(docs.count() - offset)) { | ||
bulkReader.bulkRead(builder, docs, offset); | ||
return builder.build(); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean canReuse(int startingDocID) { | ||
return creationThread == Thread.currentThread() && bulkReader.docID() <= startingDocID; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "BlockDocValuesReader.BulkSingletonLong"; | ||
} | ||
} | ||
|
||
private static class SingletonLongs extends BlockDocValuesReader { | ||
static class SingletonLongs extends BlockDocValuesReader { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we enable the optimization in
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that should work as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I took me a while, but this works out and it is now even simpler! 054b12e |
||
private final NumericDocValues numericDocValues; | ||
|
||
SingletonLongs(NumericDocValues numericDocValues) { | ||
|
@@ -164,7 +231,7 @@ public String toString() { | |
} | ||
} | ||
|
||
private static class Longs extends BlockDocValuesReader { | ||
static class Longs extends BlockDocValuesReader { | ||
private final SortedNumericDocValues numericDocValues; | ||
private int docID = -1; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the summary doesn't match with the PR title?