Skip to content

Commit b51da1c

Browse files
committed
Implement docIDRunEnd() on ES819TSDBDocValuesProducer
This method allows consumers to quickly check if a DocIdSetIterator matches a large run of documents; it was missing from our custom Codec DocValues implementations.
1 parent 80ec7ab commit b51da1c

File tree

2 files changed

+182
-0
lines changed

2 files changed

+182
-0
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,11 @@ public boolean advanceExact(int target) throws IOException {
293293
doc = target;
294294
return true;
295295
}
296+
297+
@Override
298+
public int docIDRunEnd() throws IOException {
299+
return maxDoc;
300+
}
296301
}
297302

298303
private abstract static class SparseBinaryDocValues extends BinaryDocValues {
@@ -327,6 +332,11 @@ public int advance(int target) throws IOException {
327332
public boolean advanceExact(int target) throws IOException {
328333
return disi.advanceExact(target);
329334
}
335+
336+
@Override
337+
public int docIDRunEnd() throws IOException {
338+
return disi.docIDRunEnd();
339+
}
330340
}
331341

332342
@Override
@@ -368,6 +378,11 @@ public int advance(int target) throws IOException {
368378
public long cost() {
369379
return ords.cost();
370380
}
381+
382+
@Override
383+
public int docIDRunEnd() throws IOException {
384+
return ords.docIDRunEnd();
385+
}
371386
};
372387
}
373388

@@ -749,6 +764,11 @@ public int advance(int target) throws IOException {
749764
public long cost() {
750765
return ords.cost();
751766
}
767+
768+
@Override
769+
public int docIDRunEnd() throws IOException {
770+
return ords.docIDRunEnd();
771+
}
752772
};
753773
}
754774

@@ -1085,6 +1105,11 @@ public boolean advanceExact(int target) {
10851105
public long cost() {
10861106
return maxDoc;
10871107
}
1108+
1109+
@Override
1110+
public int docIDRunEnd() {
1111+
return maxDoc;
1112+
}
10881113
};
10891114
} else {
10901115
final IndexedDISI disi = new IndexedDISI(
@@ -1126,6 +1151,11 @@ public long cost() {
11261151
public long longValue() {
11271152
return 0L;
11281153
}
1154+
1155+
@Override
1156+
public int docIDRunEnd() throws IOException {
1157+
return disi.docIDRunEnd();
1158+
}
11291159
};
11301160
}
11311161
}
@@ -1177,6 +1207,11 @@ public long cost() {
11771207
return maxDoc;
11781208
}
11791209

1210+
@Override
1211+
public int docIDRunEnd() {
1212+
return maxDoc;
1213+
}
1214+
11801215
@Override
11811216
public long longValue() throws IOException {
11821217
final int index = doc;
@@ -1238,6 +1273,11 @@ public long cost() {
12381273
return disi.cost();
12391274
}
12401275

1276+
@Override
1277+
public int docIDRunEnd() throws IOException {
1278+
return disi.docIDRunEnd();
1279+
}
1280+
12411281
@Override
12421282
public long longValue() throws IOException {
12431283
final int index = disi.index();
@@ -1358,6 +1398,11 @@ public long nextValue() throws IOException {
13581398
public int docValueCount() {
13591399
return count;
13601400
}
1401+
1402+
@Override
1403+
public int docIDRunEnd() {
1404+
return maxDoc;
1405+
}
13611406
};
13621407
} else {
13631408
// sparse
@@ -1415,6 +1460,11 @@ public int docValueCount() {
14151460
return count;
14161461
}
14171462

1463+
@Override
1464+
public int docIDRunEnd() throws IOException {
1465+
return disi.docIDRunEnd();
1466+
}
1467+
14181468
private void set() {
14191469
if (set == false) {
14201470
final int index = disi.index();

server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.lucene.index.LogByteSizeMergePolicy;
2828
import org.apache.lucene.index.NumericDocValues;
2929
import org.apache.lucene.index.SortedDocValues;
30+
import org.apache.lucene.search.DocIdSetIterator;
3031
import org.apache.lucene.search.Sort;
3132
import org.apache.lucene.search.SortField;
3233
import org.apache.lucene.search.SortedNumericSortField;
@@ -47,6 +48,8 @@
4748
import java.util.function.Supplier;
4849
import java.util.stream.IntStream;
4950

51+
import static org.elasticsearch.test.ESTestCase.randomFrom;
52+
5053
public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests {
5154

5255
final Codec codec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat());
@@ -698,6 +701,135 @@ public DocValuesFormat getDocValuesFormatForField(String field) {
698701
}
699702
}
700703

704+
public void testDocIDEndRun() throws IOException {
705+
String timestampField = "@timestamp";
706+
String hostnameField = "host.name";
707+
long baseTimestamp = 1704067200000L;
708+
709+
var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField);
710+
try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
711+
long counter1 = 0;
712+
713+
714+
long[] gauge2Values = new long[]{-2, -4, -6, -8, -10, -12, -14, -16};
715+
String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"};
716+
717+
// IndexedDISI stores ids in blocks of 4096. To test sparse end runs, we want a mixture of
718+
// dense and sparse blocks, so we need the gap frequency to be larger than
719+
// this value, but smaller than two blocks, and to index at least three blocks
720+
int gap_frequency = 4500 + random().nextInt(2048);
721+
int numDocs = 10000 + random().nextInt(10000);
722+
int numHosts = numDocs / 20;
723+
724+
for (int i = 0; i < numDocs; i++) {
725+
var d = new Document();
726+
727+
int batchIndex = i / numHosts;
728+
String hostName = String.format(Locale.ROOT, "host-%03d", batchIndex);
729+
long timestamp = baseTimestamp + (1000L * i);
730+
731+
d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName)));
732+
// Index sorting doesn't work with NumericDocValuesField:
733+
d.add(new SortedNumericDocValuesField(timestampField, timestamp));
734+
d.add(new NumericDocValuesField("counter", counter1++));
735+
if (i % gap_frequency != 0) {
736+
d.add(new NumericDocValuesField("sparse_counter", counter1));
737+
}
738+
739+
int numGauge2 = 1 + random().nextInt(8);
740+
for (int j = 0; j < numGauge2; j++) {
741+
d.add(new SortedNumericDocValuesField("gauge", gauge2Values[(i + j) % gauge2Values.length]));
742+
if (i % gap_frequency != 0) {
743+
d.add(new SortedNumericDocValuesField("sparse_gauge", gauge2Values[(i + j) % gauge2Values.length]));
744+
}
745+
}
746+
747+
d.add(new SortedDocValuesField("tag", new BytesRef(randomFrom(tags))));
748+
if (i % gap_frequency != 0) {
749+
d.add(new SortedDocValuesField("sparse_tag", new BytesRef(randomFrom(tags))));
750+
}
751+
752+
int numTags = 1 + random().nextInt(8);
753+
for (int j = 0; j < numTags; j++) {
754+
d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[(i + j) % tags.length])));
755+
if (i % gap_frequency != 0) {
756+
d.add(new SortedSetDocValuesField("sparse_tags", new BytesRef(tags[(i + j) % tags.length])));
757+
}
758+
}
759+
760+
d.add(new BinaryDocValuesField("tags_as_bytes", new BytesRef(tags[i % tags.length])));
761+
if (i % gap_frequency != 0) {
762+
d.add(new BinaryDocValuesField("sparse_tags_as_bytes", new BytesRef(tags[i % tags.length])));
763+
}
764+
765+
iw.addDocument(d);
766+
if (i % 100 == 0) {
767+
iw.commit();
768+
}
769+
}
770+
iw.commit();
771+
772+
iw.forceMerge(1);
773+
774+
try (var reader = DirectoryReader.open(iw)) {
775+
assertEquals(1, reader.leaves().size());
776+
assertEquals(numDocs, reader.maxDoc());
777+
var leaf = reader.leaves().get(0).reader();
778+
var hostNameDV = leaf.getSortedDocValues(hostnameField);
779+
assertNotNull(hostNameDV);
780+
validateRunEnd(hostNameDV);
781+
var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField));
782+
assertNotNull(timestampDV);
783+
validateRunEnd(timestampDV);
784+
var counterOneDV = leaf.getNumericDocValues("counter");
785+
assertNotNull(counterOneDV);
786+
validateRunEnd(counterOneDV);
787+
var sparseCounter = leaf.getNumericDocValues("sparse_counter");
788+
assertNotNull(sparseCounter);
789+
validateRunEnd(sparseCounter);
790+
var gaugeOneDV = leaf.getSortedNumericDocValues("gauge");
791+
assertNotNull(gaugeOneDV);
792+
validateRunEnd(gaugeOneDV);
793+
var sparseGaugeDV = leaf.getSortedNumericDocValues("sparse_gauge");
794+
assertNotNull(sparseGaugeDV);
795+
validateRunEnd(sparseGaugeDV);
796+
var tagDV = leaf.getSortedDocValues("tag");
797+
assertNotNull(tagDV);
798+
validateRunEnd(tagDV);
799+
var sparseTagDV = leaf.getSortedDocValues("sparse_tag");
800+
assertNotNull(sparseTagDV);
801+
validateRunEnd(sparseTagDV);
802+
var tagsDV = leaf.getSortedSetDocValues("tags");
803+
assertNotNull(tagsDV);
804+
validateRunEnd(tagsDV);
805+
var sparseTagsDV = leaf.getSortedSetDocValues("sparse_tags");
806+
assertNotNull(sparseTagsDV);
807+
validateRunEnd(sparseTagsDV);
808+
var tagBytesDV = leaf.getBinaryDocValues("tags_as_bytes");
809+
assertNotNull(tagBytesDV);
810+
validateRunEnd(tagBytesDV);
811+
var sparseTagBytesDV = leaf.getBinaryDocValues("sparse_tags_as_bytes");
812+
assertNotNull(sparseTagBytesDV);
813+
validateRunEnd(sparseTagBytesDV);
814+
}
815+
}
816+
}
817+
818+
private void validateRunEnd(DocIdSetIterator iterator) throws IOException {
819+
int runCount = 0;
820+
while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
821+
int runLength = iterator.docIDRunEnd() - iterator.docID() - 1;
822+
if (runLength > 1) {
823+
runCount++;
824+
for (int i = 0; i < runLength; i++) {
825+
int expected = iterator.docID() + 1;
826+
assertEquals(expected, iterator.advance(expected));
827+
}
828+
}
829+
}
830+
assertTrue("Expected docid runs of greater than 1", runCount > 0);
831+
}
832+
701833
private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) {
702834
var config = new IndexWriterConfig();
703835
config.setIndexSort(

0 commit comments

Comments
 (0)