Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,14 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
final int maxDoc;
private byte[] termsDictBuffer;
private final int skipIndexIntervalSize;
private final int minDocsPerOrdinalForOrdinalRangeEncoding;
final boolean enableOptimizedMerge;
private int primarySortField = -1;

ES819TSDBDocValuesConsumer(
SegmentWriteState state,
int skipIndexIntervalSize,
int minDocsPerOrdinalForOrdinalRangeEncoding,
boolean enableOptimizedMerge,
String dataCodec,
String dataExtension,
Expand All @@ -75,6 +78,7 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
) throws IOException {
this.termsDictBuffer = new byte[1 << 14];
this.dir = state.directory;
this.minDocsPerOrdinalForOrdinalRangeEncoding = minDocsPerOrdinalForOrdinalRangeEncoding;
this.context = state.context;
boolean success = false;
try {
Expand All @@ -99,6 +103,13 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
maxDoc = state.segmentInfo.maxDoc();
this.skipIndexIntervalSize = skipIndexIntervalSize;
this.enableOptimizedMerge = enableOptimizedMerge;
final var indexSort = state.segmentInfo.getIndexSort();
if (indexSort != null && indexSort.getSort().length > 0 && indexSort.getSort()[0].getReverse() == false) {
var sortField = state.fieldInfos.fieldInfo(indexSort.getSort()[0].getField());
if (sortField != null) {
primarySortField = sortField.number;
}
}
success = true;
} finally {
if (success == false) {
Expand All @@ -124,6 +135,10 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOExcepti
writeField(field, producer, -1, null);
}

private boolean shouldEncodeOrdinalRange(FieldInfo field, long maxOrd, int numDocsWithValue) {
return maxDoc > 1 && field.number == primarySortField && (numDocsWithValue / maxOrd) >= minDocsPerOrdinalForOrdinalRangeEncoding;
}

private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd, OffsetsAccumulator offsetsAccumulator)
throws IOException {
int numDocsWithValue = 0;
Expand All @@ -149,19 +164,52 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
try {
if (numValues > 0) {
assert numDocsWithValue > 0;
// Special case for maxOrd of 1, signal -1 that no blocks will be written
meta.writeInt(maxOrd != 1 ? ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT : -1);
final ByteBuffersDataOutput indexOut = new ByteBuffersDataOutput();
final DirectMonotonicWriter indexWriter = DirectMonotonicWriter.getInstance(
meta,
new ByteBuffersIndexOutput(indexOut, "temp-dv-index", "temp-dv-index"),
1L + ((numValues - 1) >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT),
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
);
DirectMonotonicWriter indexWriter = null;

final long valuesDataOffset = data.getFilePointer();
// Special case for maxOrd of 1, skip writing the blocks
if (maxOrd != 1) {
if (maxOrd == 1) {
// Special case for maxOrd of 1, signal -1 that no blocks will be written
meta.writeInt(-1);
} else if (shouldEncodeOrdinalRange(field, maxOrd, numDocsWithValue)) {
// When a field is sorted, use ordinal range encode for long runs of the same ordinal.
meta.writeInt(-2);
meta.writeVInt(Math.toIntExact(maxOrd));
values = valuesProducer.getSortedNumeric(field);
if (enableOptimizedMerge && numDocsWithValue < maxDoc) {
disiAccumulator = new DISIAccumulator(dir, context, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}
DirectMonotonicWriter startDocs = DirectMonotonicWriter.getInstance(
meta,
data,
maxOrd + 1,
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
);
long lastOrd = 0;
startDocs.add(0);
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
if (disiAccumulator != null) {
disiAccumulator.addDocId(doc);
}
if (offsetsAccumulator != null) {
offsetsAccumulator.addDoc(1);
}
final long nextOrd = values.nextValue();
if (nextOrd != lastOrd) {
lastOrd = nextOrd;
startDocs.add(doc);
}
}
startDocs.add(maxDoc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we've already inserted maxDoc in the loop above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not, because startDocs should be at most maxDoc - 1.

startDocs.finish();
} else {
indexWriter = DirectMonotonicWriter.getInstance(
meta,
new ByteBuffersIndexOutput(indexOut, "temp-dv-index", "temp-dv-index"),
1L + ((numValues - 1) >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT),
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
);
meta.writeInt(DIRECT_MONOTONIC_BLOCK_SHIFT);
final long[] buffer = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE];
int bufferSize = 0;
final TSDBDocValuesEncoder encoder = new TSDBDocValuesEncoder(ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE);
Expand Down Expand Up @@ -204,8 +252,7 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
}

final long valuesDataLength = data.getFilePointer() - valuesDataOffset;
if (maxOrd != 1) {
// Special case for maxOrd of 1, indexWriter isn't really used, so no need to invoke finish() method.
if (indexWriter != null) {
indexWriter.finish();
}
final long indexDataOffset = data.getFilePointer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,22 @@ private static boolean getOptimizedMergeEnabledDefault() {
}

final int skipIndexIntervalSize;
final int minDocsPerOrdinalForOrdinalRangeEncoding;
private final boolean enableOptimizedMerge;

/** Default constructor. */
public ES819TSDBDocValuesFormat() {
this(DEFAULT_SKIP_INDEX_INTERVAL_SIZE, OPTIMIZED_MERGE_ENABLE_DEFAULT);
this(DEFAULT_SKIP_INDEX_INTERVAL_SIZE, NUMERIC_BLOCK_SIZE, OPTIMIZED_MERGE_ENABLE_DEFAULT);
}

/** Doc values fields format with specified skipIndexIntervalSize. */
public ES819TSDBDocValuesFormat(int skipIndexIntervalSize, boolean enableOptimizedMerge) {
public ES819TSDBDocValuesFormat(int skipIndexIntervalSize, int minDocsPerOrdinalForOrdinalRangeEncoding, boolean enableOptimizedMerge) {
super(CODEC_NAME);
if (skipIndexIntervalSize < 2) {
throw new IllegalArgumentException("skipIndexIntervalSize must be > 1, got [" + skipIndexIntervalSize + "]");
}
this.skipIndexIntervalSize = skipIndexIntervalSize;
this.minDocsPerOrdinalForOrdinalRangeEncoding = minDocsPerOrdinalForOrdinalRangeEncoding;
this.enableOptimizedMerge = enableOptimizedMerge;
}

Expand All @@ -127,6 +129,7 @@ public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOExcept
return new ES819TSDBDocValuesConsumer(
state,
skipIndexIntervalSize,
minDocsPerOrdinalForOrdinalRangeEncoding,
enableOptimizedMerge,
DATA_CODEC,
DATA_EXTENSION,
Expand Down
Loading