Skip to content

Commit affa352

Browse files
committed
wip
1 parent c58ac45 commit affa352

File tree

3 files changed

+395
-7
lines changed

3 files changed

+395
-7
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesConsumer.java

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
import org.apache.lucene.codecs.DocValuesProducer;
1515
import org.apache.lucene.codecs.lucene90.IndexedDISI;
1616
import org.apache.lucene.index.BinaryDocValues;
17+
import org.apache.lucene.index.DocIDMerger;
1718
import org.apache.lucene.index.DocValues;
1819
import org.apache.lucene.index.DocValuesSkipIndexType;
20+
import org.apache.lucene.index.DocValuesType;
1921
import org.apache.lucene.index.EmptyDocValuesProducer;
2022
import org.apache.lucene.index.FieldInfo;
2123
import org.apache.lucene.index.IndexFileNames;
24+
import org.apache.lucene.index.MergeState;
2225
import org.apache.lucene.index.NumericDocValues;
2326
import org.apache.lucene.index.SegmentWriteState;
2427
import org.apache.lucene.index.SortedDocValues;
@@ -46,7 +49,9 @@
4649
import java.util.Arrays;
4750
import java.util.List;
4851

52+
import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.BULK_MERGE_ENABLED;
4953
import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
54+
import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.OPTIMIZED_MERGE_ENABLED;
5055
import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.SKIP_INDEX_LEVEL_SHIFT;
5156
import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.SKIP_INDEX_MAX_LEVEL;
5257
import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.SORTED_SET;
@@ -519,6 +524,247 @@ private void writeSortedNumericField(FieldInfo field, DocValuesProducer valuesPr
519524
}
520525
}
521526

527+
@Override
528+
public void mergeSortedNumericField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException {
529+
// TODO: support skip index
530+
// TODO: add escape hatch
531+
boolean compatibleWithBulkMerge = OPTIMIZED_MERGE_ENABLED
532+
&& mergeState.needsIndexSort
533+
&& mergeFieldInfo.docValuesSkipIndexType() == DocValuesSkipIndexType.NONE;
534+
long sumNumValues = 0;
535+
int sumNumDocsWithField = 0;
536+
537+
if (compatibleWithBulkMerge) {
538+
for (DocValuesProducer docValuesProducer : mergeState.docValuesProducers) {
539+
if (docValuesProducer instanceof ES87TSDBDocValuesProducer tsdbProducer) {
540+
if (tsdbProducer.version != ES87TSDBDocValuesFormat.VERSION_CURRENT) {
541+
compatibleWithBulkMerge = false;
542+
break;
543+
}
544+
545+
ES87TSDBDocValuesProducer.SortedNumericEntry entry = tsdbProducer.sortedNumerics.get(mergeFieldInfo.name);
546+
assert entry != null;
547+
// TODO: support also fields with offsets
548+
if (entry.docsWithFieldOffset != -1) {
549+
compatibleWithBulkMerge = false;
550+
break;
551+
}
552+
sumNumValues += entry.numValues;
553+
sumNumDocsWithField += entry.numDocsWithField;
554+
} else {
555+
compatibleWithBulkMerge = false;
556+
break;
557+
}
558+
}
559+
}
560+
if (compatibleWithBulkMerge) {
561+
if (Math.toIntExact(sumNumValues) != sumNumDocsWithField) {
562+
compatibleWithBulkMerge = false;
563+
}
564+
}
565+
if (compatibleWithBulkMerge) {
566+
// Documents marked as deleted should be rare. Maybe in the case of noop operation?
567+
for (int i = 0; i < mergeState.liveDocs.length; i++) {
568+
if (mergeState.liveDocs[i] != null) {
569+
compatibleWithBulkMerge = false;
570+
break;
571+
}
572+
}
573+
}
574+
575+
if (compatibleWithBulkMerge == false) {
576+
super.mergeSortedNumericField(mergeFieldInfo, mergeState);
577+
return;
578+
}
579+
580+
meta.writeInt(mergeFieldInfo.number);
581+
meta.writeByte(ES87TSDBDocValuesFormat.SORTED_NUMERIC);
582+
583+
// meta[-1, 0]: All documents have values
584+
meta.writeLong(-1); // docsWithFieldOffset
585+
meta.writeLong(0L); // docsWithFieldLength
586+
meta.writeShort((short) -1); // jumpTableEntryCount
587+
meta.writeByte((byte) -1); // denseRankPower
588+
meta.writeLong(sumNumValues);
589+
// no maxOrd, just write
590+
meta.writeInt(-1);
591+
592+
List<NumericDocValuesSub> subs = new ArrayList<>();
593+
assert mergeState.docMaps.length == mergeState.docValuesProducers.length;
594+
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
595+
ES87TSDBDocValuesProducer.BulkSortedNumericDocValues values = null;
596+
DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i];
597+
if (docValuesProducer != null) {
598+
FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name);
599+
if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.NUMERIC) {
600+
values = (ES87TSDBDocValuesProducer.BulkSortedNumericDocValues) docValuesProducer.getSortedNumeric(readerFieldInfo);
601+
}
602+
}
603+
if (values != null) {
604+
subs.add(new NumericDocValuesSub(mergeState.docMaps[i], values));
605+
}
606+
}
607+
608+
final SortedNumericDocValues values = mergeNumericValues(subs, mergeState.needsIndexSort);
609+
final ByteBuffersDataOutput indexOut = new ByteBuffersDataOutput();
610+
final DirectMonotonicWriter indexWriter = DirectMonotonicWriter.getInstance(
611+
meta,
612+
new ByteBuffersIndexOutput(indexOut, "temp-dv-index", "temp-dv-index"),
613+
1L + ((sumNumValues - 1) >>> ES87TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT),
614+
ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
615+
);
616+
617+
final long[] buffer = new long[ES87TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE];
618+
int bufferSize = 0;
619+
final TSDBDocValuesEncoder encoder = new TSDBDocValuesEncoder(ES87TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE);
620+
final long valuesDataOffset = data.getFilePointer();
621+
622+
if (ES87TSDBDocValuesFormat.BULK_MERGE_ENABLED) {
623+
final var docIDMerger = DocIDMerger.of(subs, mergeState.needsIndexSort);
624+
var sub = docIDMerger.next();
625+
while (sub != null) {
626+
int fromDocID = sub.docID;
627+
int toDocID = fromDocID;
628+
final var current = sub;
629+
while ((sub = docIDMerger.next()) == current) {
630+
++toDocID;
631+
assert sub.docID == toDocID;
632+
}
633+
++toDocID; // exclusive bound
634+
635+
// Is there the opportunity to bulk merge?
636+
if (bufferSize == 0) {
637+
int bulkToDocId = current.values.canBulkEncode(fromDocID, toDocID);
638+
if (bulkToDocId != -1) {
639+
current.values.bulkEncode(data, indexWriter, fromDocID, bulkToDocId);
640+
}
641+
if (bulkToDocId == toDocID) {
642+
continue;
643+
} else {
644+
fromDocID = bulkToDocId;
645+
}
646+
}
647+
648+
// Fallback to doc by doc decoding and encoding:
649+
for (int i = fromDocID; i < toDocID; i++) {
650+
buffer[bufferSize++] = values.nextValue();
651+
if (bufferSize == ES87TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) {
652+
indexWriter.add(data.getFilePointer() - valuesDataOffset);
653+
encoder.encode(buffer, data);
654+
bufferSize = 0;
655+
}
656+
}
657+
}
658+
} else {
659+
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
660+
assert values.docValueCount() == 1 : "for now only single valued dense sorted numeric docvalues are supported";
661+
buffer[bufferSize++] = values.nextValue();
662+
if (bufferSize == ES87TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) {
663+
indexWriter.add(data.getFilePointer() - valuesDataOffset);
664+
encoder.encode(buffer, data);
665+
bufferSize = 0;
666+
}
667+
}
668+
if (bufferSize > 0) {
669+
indexWriter.add(data.getFilePointer() - valuesDataOffset);
670+
// Fill unused slots in the block with zeroes rather than junk
671+
Arrays.fill(buffer, bufferSize, ES87TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE, 0L);
672+
encoder.encode(buffer, data);
673+
}
674+
}
675+
676+
final long valuesDataLength = data.getFilePointer() - valuesDataOffset;
677+
indexWriter.finish();
678+
final long indexDataOffset = data.getFilePointer();
679+
data.copyBytes(indexOut.toDataInput(), indexOut.size());
680+
meta.writeLong(indexDataOffset);
681+
meta.writeLong(data.getFilePointer() - indexDataOffset);
682+
683+
meta.writeLong(valuesDataOffset);
684+
meta.writeLong(valuesDataLength);
685+
686+
// only for sorted set numeric:
687+
meta.writeInt(sumNumDocsWithField);
688+
}
689+
690+
private static SortedNumericDocValues mergeNumericValues(
691+
List<NumericDocValuesSub> subs,
692+
boolean indexIsSorted
693+
) throws IOException {
694+
long cost = 0;
695+
for (NumericDocValuesSub sub : subs) {
696+
cost += sub.values.cost();
697+
}
698+
final long finalCost = cost;
699+
700+
final DocIDMerger<NumericDocValuesSub> docIDMerger = DocIDMerger.of(subs, indexIsSorted);
701+
702+
// DocValues.singleton(...)
703+
return new SortedNumericDocValues() {
704+
private int docID = -1;
705+
private NumericDocValuesSub current;
706+
707+
@Override
708+
public int docID() {
709+
return docID;
710+
}
711+
712+
@Override
713+
public int nextDoc() throws IOException {
714+
current = docIDMerger.next();
715+
if (current == null) {
716+
docID = NO_MORE_DOCS;
717+
} else {
718+
docID = current.mappedDocID;
719+
}
720+
return docID;
721+
}
722+
723+
@Override
724+
public int advance(int target) throws IOException {
725+
throw new UnsupportedOperationException();
726+
}
727+
728+
@Override
729+
public boolean advanceExact(int target) throws IOException {
730+
throw new UnsupportedOperationException();
731+
}
732+
733+
@Override
734+
public long cost() {
735+
return finalCost;
736+
}
737+
738+
@Override
739+
public long nextValue() throws IOException {
740+
return current.values.nextValue();
741+
}
742+
743+
@Override
744+
public int docValueCount() {
745+
return current.values.docValueCount();
746+
}
747+
748+
};
749+
}
750+
751+
static class NumericDocValuesSub extends DocIDMerger.Sub {
752+
753+
final ES87TSDBDocValuesProducer.BulkSortedNumericDocValues values;
754+
int docID = -1;
755+
756+
NumericDocValuesSub(MergeState.DocMap docMap, ES87TSDBDocValuesProducer.BulkSortedNumericDocValues values) {
757+
super(docMap);
758+
this.values = values;
759+
assert values.docID() == -1;
760+
}
761+
762+
@Override
763+
public int nextDoc() throws IOException {
764+
return docID = values.nextDoc();
765+
}
766+
}
767+
522768
private static boolean isSingleValued(SortedSetDocValues values) throws IOException {
523769
if (DocValues.unwrapSingleton(values) != null) {
524770
return true;

server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormat.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.codecs.DocValuesProducer;
1414
import org.apache.lucene.index.SegmentReadState;
1515
import org.apache.lucene.index.SegmentWriteState;
16+
import org.elasticsearch.common.util.FeatureFlag;
1617

1718
import java.io.IOException;
1819

@@ -75,6 +76,25 @@ public class ES87TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValuesF
7576
}
7677
}
7778

79+
// Escape hatches:
80+
static final boolean OPTIMIZED_MERGE_ENABLED;
81+
static final FeatureFlag TSDB_DOC_VALUES_OPTIMIZED_MERGE = new FeatureFlag("tsdb_doc_values_optimized_merge");
82+
static final String OPTIMIZED_MERGE_ENABLED_NAME = ES87TSDBDocValuesConsumer.class.getName() + ".enableOptimizedMerge";
83+
84+
static final boolean BULK_MERGE_ENABLED;
85+
static final FeatureFlag TSDB_DOC_VALUES_BULK_MERGE = new FeatureFlag("tsdb_doc_values_bulk_merge");
86+
static final String BULK_MERGE_ENABLED_NAME = ES87TSDBDocValuesConsumer.class.getName() + ".enableBulkMerge";
87+
88+
static {
89+
boolean optimizedMergeDefault = TSDB_DOC_VALUES_OPTIMIZED_MERGE.isEnabled();
90+
OPTIMIZED_MERGE_ENABLED = Boolean.parseBoolean(
91+
System.getProperty(OPTIMIZED_MERGE_ENABLED_NAME, Boolean.toString(optimizedMergeDefault))
92+
);
93+
boolean bulkMergeDefault = TSDB_DOC_VALUES_BULK_MERGE.isEnabled();
94+
// BULK_MERGE_ENABLED = Boolean.parseBoolean(System.getProperty(BULK_MERGE_ENABLED_NAME, Boolean.toString(bulkMergeDefault)));
95+
BULK_MERGE_ENABLED = false;
96+
}
97+
7898
private final int skipIndexIntervalSize;
7999

80100
/** Default constructor. */

0 commit comments

Comments
 (0)