Skip to content

Commit dd4c80d

Browse files
committed
Push compute engine value loading for longs down to tsdb codec.
This is the first of many changes that pushes loading of field values to the es819 doc values codec in case of logsdb/tsdb and when the field supports it. This change first targets reading field values in bulk mode at codec level when doc values type is numeric doc values or sorted doc values, there is only one value per document, and the field is dense (all documents have a value). Multivalued and sparse fields are more complex to support bulk reading for, but it is possible. With this change, the following field types will support bulk read mode at codec level under the described conditions: long, date, geo_point, point and unsigned_long. Other number types like integer, short, double, float, scaled_float will be supported in a followup, but would be similar to long based fields, but required an additional conversion step to either an int or float vector. This change originates from elastic#132460 (which adds bulk reading to `@timestamp`, `_tsid` and dimension fields) and is basically the timestamp support part of it. In another followup, support for single valued, dense sorted (set) doc values will be added for field like _tsid. Relates to elastic#128445
1 parent 7bd95dc commit dd4c80d

File tree

25 files changed

+1030
-29
lines changed

25 files changed

+1030
-29
lines changed

server/src/main/java/org/elasticsearch/index/IndexMode.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,11 @@ public void validateSourceFieldMapper(SourceFieldMapper sourceFieldMapper) {
234234
public SourceFieldMapper.Mode defaultSourceMode() {
235235
return SourceFieldMapper.Mode.SYNTHETIC;
236236
}
237+
238+
@Override
239+
public boolean supportOptimizedDocValueLoading() {
240+
return true;
241+
}
237242
},
238243
LOGSDB("logsdb") {
239244
@Override
@@ -320,6 +325,11 @@ public SourceFieldMapper.Mode defaultSourceMode() {
320325
public String getDefaultCodec() {
321326
return CodecService.BEST_COMPRESSION_CODEC;
322327
}
328+
329+
@Override
330+
public boolean supportOptimizedDocValueLoading() {
331+
return true;
332+
}
323333
},
324334
LOOKUP("lookup") {
325335
@Override
@@ -564,6 +574,13 @@ public boolean useDefaultPostingsFormat() {
564574
return false;
565575
}
566576

577+
/**
578+
* @return Whether the index mode uses a doc value codec that may support optimized doc value loading for one or more fields.
579+
*/
580+
public boolean supportOptimizedDocValueLoading() {
581+
return false;
582+
}
583+
567584
/**
568585
* Parse a string into an {@link IndexMode}.
569586
*/

server/src/main/java/org/elasticsearch/index/codec/tsdb/DocValuesForUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import java.io.IOException;
1818

19-
public class DocValuesForUtil {
19+
public final class DocValuesForUtil {
2020
private static final int BITS_IN_FOUR_BYTES = 4 * Byte.SIZE;
2121
private static final int BITS_IN_FIVE_BYTES = 5 * Byte.SIZE;
2222
private static final int BITS_IN_SIX_BYTES = 6 * Byte.SIZE;

server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBDocValuesEncoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
*
5555
* Of course, decoding follows the opposite order with respect to encoding.
5656
*/
57-
public class TSDBDocValuesEncoder {
57+
public final class TSDBDocValuesEncoder {
5858
private final DocValuesForUtil forUtil;
5959
private final int numericBlockSize;
6060

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb.es819;
11+
12+
import org.apache.lucene.index.NumericDocValues;
13+
14+
/**
15+
* An es819 doc values Specialization that allows retrieving a {@link BulkReader}.
16+
*/
17+
public abstract class BulkNumericDocValues extends NumericDocValues {
18+
19+
/**
20+
* @return a bulk reader instance or <code>null</code> if field or implementation doesn't support bulk loading.
21+
*/
22+
public abstract BulkReader getBulkReader();
23+
24+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb.es819;
11+
12+
import org.elasticsearch.index.mapper.BlockLoader;
13+
14+
import java.io.IOException;
15+
16+
/**
17+
* Low level abstraction for es819 doc values to allow for bulk reading doc values into compute engine's block builders.
18+
*/
19+
public interface BulkReader {
20+
21+
/**
22+
* Appends values into the provided builder for the specified docs from the specified offset.
23+
*/
24+
void bulkRead(BlockLoader.SingletonBulkLongBuilder builder, BlockLoader.Docs docs, int offset) throws IOException;
25+
26+
/**
27+
* @return the current docid of this bulk reader.
28+
*/
29+
int docID();
30+
31+
}

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.lucene.util.packed.PackedInts;
4444
import org.elasticsearch.core.IOUtils;
4545
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder;
46+
import org.elasticsearch.index.mapper.BlockLoader;
4647

4748
import java.io.IOException;
4849

@@ -1140,13 +1141,14 @@ public long longValue() {
11401141
final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1;
11411142
if (entry.docsWithFieldOffset == -1) {
11421143
// dense
1143-
return new NumericDocValues() {
1144+
return new BulkNumericDocValues() {
11441145

11451146
private final int maxDoc = ES819TSDBDocValuesProducer.this.maxDoc;
11461147
private int doc = -1;
11471148
private final TSDBDocValuesEncoder decoder = new TSDBDocValuesEncoder(ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE);
11481149
private long currentBlockIndex = -1;
11491150
private final long[] currentBlock = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE];
1151+
private BulkReader bulkReader;
11501152

11511153
@Override
11521154
public int docID() {
@@ -1155,11 +1157,13 @@ public int docID() {
11551157

11561158
@Override
11571159
public int nextDoc() throws IOException {
1160+
assert bulkReader == null : "can't use this method if bulk loader has been initialized";
11581161
return advance(doc + 1);
11591162
}
11601163

11611164
@Override
11621165
public int advance(int target) throws IOException {
1166+
assert bulkReader == null : "can't use this method if bulk loader has been initialized";
11631167
if (target >= maxDoc) {
11641168
return doc = NO_MORE_DOCS;
11651169
}
@@ -1168,6 +1172,7 @@ public int advance(int target) throws IOException {
11681172

11691173
@Override
11701174
public boolean advanceExact(int target) {
1175+
assert bulkReader == null : "can't use this method if bulk loader has been initialized";
11711176
doc = target;
11721177
return true;
11731178
}
@@ -1179,6 +1184,7 @@ public long cost() {
11791184

11801185
@Override
11811186
public long longValue() throws IOException {
1187+
assert bulkReader == null : "can't use this method if bulk loader has been initialized";
11821188
final int index = doc;
11831189
final int blockIndex = index >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT;
11841190
final int blockInIndex = index & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
@@ -1197,6 +1203,126 @@ public long longValue() throws IOException {
11971203
}
11981204
return currentBlock[blockInIndex];
11991205
}
1206+
1207+
@Override
1208+
public BulkReader getBulkReader() {
1209+
if (bulkReader == null) {
1210+
bulkReader = new BulkReader() {
1211+
1212+
@Override
1213+
public void bulkRead(BlockLoader.SingletonBulkLongBuilder builder, BlockLoader.Docs docs, int offset)
1214+
throws IOException {
1215+
assert maxOrd == -1;
1216+
doc = docs.get(docs.count() - 1);
1217+
boolean isDense = doc - docs.get(0) == docs.count() - 1;
1218+
if (isDense) {
1219+
// Figure out where we start and whether the previous block needs to be read:
1220+
int firstDocId = docs.get(offset);
1221+
int firstBlockIndex = firstDocId >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT;
1222+
int firstBlockInIndex = firstDocId & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
1223+
1224+
int start;
1225+
if (currentBlockIndex != firstBlockIndex) {
1226+
// different block, so seek:
1227+
valuesData.seek(indexReader.get(firstBlockIndex));
1228+
if (firstBlockInIndex == 0) {
1229+
// start is a full block, defer consuming later with complete blocks.
1230+
start = offset;
1231+
} else {
1232+
// partial block, consume it here
1233+
currentBlockIndex = firstBlockInIndex;
1234+
decoder.decode(valuesData, currentBlock);
1235+
int length = ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE - firstBlockInIndex;
1236+
if (docs.count() < length) {
1237+
builder.appendLongs(currentBlock, firstBlockInIndex, docs.count());
1238+
return;
1239+
} else {
1240+
builder.appendLongs(currentBlock, firstBlockInIndex, length);
1241+
start = offset + length;
1242+
}
1243+
}
1244+
} else {
1245+
// consume remaining
1246+
int docsLength = docs.count() - offset;
1247+
int blockLength = ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE - firstBlockInIndex;
1248+
if (docsLength < blockLength) {
1249+
builder.appendLongs(currentBlock, firstBlockInIndex, docsLength);
1250+
return;
1251+
} else {
1252+
builder.appendLongs(currentBlock, firstBlockInIndex, blockLength);
1253+
start = offset + blockLength;
1254+
}
1255+
}
1256+
1257+
// Figure out how many complete blocks we can read:
1258+
int completeBlockSize = 0;
1259+
int[] completeBlocks = new int[(docs.count() / ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) + 1];
1260+
int docsIndex = start;
1261+
while (docsIndex < docs.count()) {
1262+
int docId = docs.get(docsIndex);
1263+
if (docsIndex + ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE >= docs.count()) {
1264+
break;
1265+
}
1266+
1267+
int nextIndex = docs.get(docsIndex + ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE);
1268+
if (nextIndex - docId == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) {
1269+
completeBlocks[completeBlockSize++] = docId;
1270+
docsIndex += ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE;
1271+
} else {
1272+
break;
1273+
}
1274+
}
1275+
1276+
// Read those complete blocks:
1277+
for (int i = 0; i < completeBlockSize; i++) {
1278+
int docId = completeBlocks[i];
1279+
currentBlockIndex = docId >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT;
1280+
decoder.decode(valuesData, currentBlock);
1281+
int blockInIndex = docId & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
1282+
int length = ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE - blockInIndex;
1283+
assert length == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE : "unexpected length [" + length + "]";
1284+
builder.appendLongs(currentBlock, blockInIndex, length);
1285+
}
1286+
1287+
// Check for a remainder and if so read it:
1288+
if (docsIndex < docs.count()) {
1289+
int docId = docs.get(docsIndex);
1290+
currentBlockIndex = docId >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT;
1291+
decoder.decode(valuesData, currentBlock);
1292+
1293+
int blockInIndex = docId & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
1294+
int lastBlockInIndex = doc & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
1295+
int length = lastBlockInIndex - blockInIndex + 1;
1296+
1297+
builder.appendLongs(currentBlock, blockInIndex, length);
1298+
}
1299+
} else {
1300+
for (int i = offset; i < docs.count(); i++) {
1301+
int index = docs.get(i);
1302+
final int blockIndex = index >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT;
1303+
final int blockInIndex = index & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
1304+
if (blockIndex != currentBlockIndex) {
1305+
assert blockIndex > currentBlockIndex : blockIndex + " < " + currentBlockIndex;
1306+
// no need to seek if the loading block is the next block
1307+
if (currentBlockIndex + 1 != blockIndex) {
1308+
valuesData.seek(indexReader.get(blockIndex));
1309+
}
1310+
currentBlockIndex = blockIndex;
1311+
decoder.decode(valuesData, currentBlock);
1312+
}
1313+
builder.appendLong(currentBlock[blockInIndex]);
1314+
}
1315+
}
1316+
}
1317+
1318+
@Override
1319+
public int docID() {
1320+
return doc;
1321+
}
1322+
};
1323+
}
1324+
return bulkReader;
1325+
}
12001326
};
12011327
} else {
12021328
final IndexedDISI disi = new IndexedDISI(

0 commit comments

Comments
 (0)