Skip to content

Commit 6f1d9ec

Browse files
committed
Implement docIDRunEnd() on ES819TSDBDocValuesProducer
This method allows consumers to quickly check if a DocIdSetIterator matches a large run of documents; it was missing from our custom Codec DocValues implementations.
1 parent 9978916 commit 6f1d9ec

File tree

2 files changed

+183
-0
lines changed

2 files changed

+183
-0
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,11 @@ public boolean advanceExact(int target) throws IOException {
293293
doc = target;
294294
return true;
295295
}
296+
297+
@Override
298+
public int docIDRunEnd() throws IOException {
299+
return maxDoc;
300+
}
296301
}
297302

298303
private abstract static class SparseBinaryDocValues extends BinaryDocValues {
@@ -327,6 +332,11 @@ public int advance(int target) throws IOException {
327332
public boolean advanceExact(int target) throws IOException {
328333
return disi.advanceExact(target);
329334
}
335+
336+
@Override
337+
public int docIDRunEnd() throws IOException {
338+
return disi.docIDRunEnd();
339+
}
330340
}
331341

332342
@Override
@@ -368,6 +378,11 @@ public int advance(int target) throws IOException {
368378
public long cost() {
369379
return ords.cost();
370380
}
381+
382+
@Override
383+
public int docIDRunEnd() throws IOException {
384+
return ords.docIDRunEnd();
385+
}
371386
};
372387
}
373388

@@ -749,6 +764,11 @@ public int advance(int target) throws IOException {
749764
public long cost() {
750765
return ords.cost();
751766
}
767+
768+
@Override
769+
public int docIDRunEnd() throws IOException {
770+
return ords.docIDRunEnd();
771+
}
752772
};
753773
}
754774

@@ -1085,6 +1105,11 @@ public boolean advanceExact(int target) {
10851105
public long cost() {
10861106
return maxDoc;
10871107
}
1108+
1109+
@Override
1110+
public int docIDRunEnd() {
1111+
return maxDoc;
1112+
}
10881113
};
10891114
} else {
10901115
final IndexedDISI disi = new IndexedDISI(
@@ -1126,6 +1151,11 @@ public long cost() {
11261151
public long longValue() {
11271152
return 0L;
11281153
}
1154+
1155+
@Override
1156+
public int docIDRunEnd() throws IOException {
1157+
return disi.docIDRunEnd();
1158+
}
11291159
};
11301160
}
11311161
}
@@ -1177,6 +1207,11 @@ public long cost() {
11771207
return maxDoc;
11781208
}
11791209

1210+
@Override
1211+
public int docIDRunEnd() {
1212+
return maxDoc;
1213+
}
1214+
11801215
@Override
11811216
public long longValue() throws IOException {
11821217
final int index = doc;
@@ -1238,6 +1273,11 @@ public long cost() {
12381273
return disi.cost();
12391274
}
12401275

1276+
@Override
1277+
public int docIDRunEnd() throws IOException {
1278+
return disi.docIDRunEnd();
1279+
}
1280+
12411281
@Override
12421282
public long longValue() throws IOException {
12431283
final int index = disi.index();
@@ -1358,6 +1398,11 @@ public long nextValue() throws IOException {
13581398
public int docValueCount() {
13591399
return count;
13601400
}
1401+
1402+
@Override
1403+
public int docIDRunEnd() {
1404+
return maxDoc;
1405+
}
13611406
};
13621407
} else {
13631408
// sparse
@@ -1415,6 +1460,11 @@ public int docValueCount() {
14151460
return count;
14161461
}
14171462

1463+
@Override
1464+
public int docIDRunEnd() throws IOException {
1465+
return disi.docIDRunEnd();
1466+
}
1467+
14181468
private void set() {
14191469
if (set == false) {
14201470
final int index = disi.index();

server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.lucene.index.IndexWriter;
2222
import org.apache.lucene.index.IndexWriterConfig;
2323
import org.apache.lucene.index.LogByteSizeMergePolicy;
24+
import org.apache.lucene.search.DocIdSetIterator;
2425
import org.apache.lucene.search.Sort;
2526
import org.apache.lucene.search.SortField;
2627
import org.apache.lucene.search.SortedNumericSortField;
@@ -29,9 +30,12 @@
2930
import org.elasticsearch.cluster.metadata.DataStream;
3031
import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests;
3132

33+
import java.io.IOException;
3234
import java.util.Arrays;
3335
import java.util.Locale;
3436

37+
import static org.elasticsearch.test.ESTestCase.randomFrom;
38+
3539
public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests {
3640

3741
final Codec codec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat());
@@ -505,6 +509,135 @@ public void testWithNoValueMultiValue() throws Exception {
505509
}
506510
}
507511

512+
public void testDocIDEndRun() throws IOException {
513+
String timestampField = "@timestamp";
514+
String hostnameField = "host.name";
515+
long baseTimestamp = 1704067200000L;
516+
517+
var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField);
518+
try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
519+
long counter1 = 0;
520+
521+
522+
long[] gauge2Values = new long[]{-2, -4, -6, -8, -10, -12, -14, -16};
523+
String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"};
524+
525+
// IndexedDISI stores ids in blocks of 4096. To test sparse end runs, we want a mixture of
526+
// dense and sparse blocks, so we need the gap frequency to be larger than
527+
// this value, but smaller than two blocks, and to index at least three blocks
528+
int gap_frequency = 4500 + random().nextInt(2048);
529+
int numDocs = 10000 + random().nextInt(10000);
530+
int numHosts = numDocs / 20;
531+
532+
for (int i = 0; i < numDocs; i++) {
533+
var d = new Document();
534+
535+
int batchIndex = i / numHosts;
536+
String hostName = String.format(Locale.ROOT, "host-%03d", batchIndex);
537+
long timestamp = baseTimestamp + (1000L * i);
538+
539+
d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName)));
540+
// Index sorting doesn't work with NumericDocValuesField:
541+
d.add(new SortedNumericDocValuesField(timestampField, timestamp));
542+
d.add(new NumericDocValuesField("counter", counter1++));
543+
if (i % gap_frequency != 0) {
544+
d.add(new NumericDocValuesField("sparse_counter", counter1));
545+
}
546+
547+
int numGauge2 = 1 + random().nextInt(8);
548+
for (int j = 0; j < numGauge2; j++) {
549+
d.add(new SortedNumericDocValuesField("gauge", gauge2Values[(i + j) % gauge2Values.length]));
550+
if (i % gap_frequency != 0) {
551+
d.add(new SortedNumericDocValuesField("sparse_gauge", gauge2Values[(i + j) % gauge2Values.length]));
552+
}
553+
}
554+
555+
d.add(new SortedDocValuesField("tag", new BytesRef(randomFrom(tags))));
556+
if (i % gap_frequency != 0) {
557+
d.add(new SortedDocValuesField("sparse_tag", new BytesRef(randomFrom(tags))));
558+
}
559+
560+
int numTags = 1 + random().nextInt(8);
561+
for (int j = 0; j < numTags; j++) {
562+
d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[(i + j) % tags.length])));
563+
if (i % gap_frequency != 0) {
564+
d.add(new SortedSetDocValuesField("sparse_tags", new BytesRef(tags[(i + j) % tags.length])));
565+
}
566+
}
567+
568+
d.add(new BinaryDocValuesField("tags_as_bytes", new BytesRef(tags[i % tags.length])));
569+
if (i % gap_frequency != 0) {
570+
d.add(new BinaryDocValuesField("sparse_tags_as_bytes", new BytesRef(tags[i % tags.length])));
571+
}
572+
573+
iw.addDocument(d);
574+
if (i % 100 == 0) {
575+
iw.commit();
576+
}
577+
}
578+
iw.commit();
579+
580+
iw.forceMerge(1);
581+
582+
try (var reader = DirectoryReader.open(iw)) {
583+
assertEquals(1, reader.leaves().size());
584+
assertEquals(numDocs, reader.maxDoc());
585+
var leaf = reader.leaves().get(0).reader();
586+
var hostNameDV = leaf.getSortedDocValues(hostnameField);
587+
assertNotNull(hostNameDV);
588+
validateRunEnd(hostNameDV);
589+
var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField));
590+
assertNotNull(timestampDV);
591+
validateRunEnd(timestampDV);
592+
var counterOneDV = leaf.getNumericDocValues("counter");
593+
assertNotNull(counterOneDV);
594+
validateRunEnd(counterOneDV);
595+
var sparseCounter = leaf.getNumericDocValues("sparse_counter");
596+
assertNotNull(sparseCounter);
597+
validateRunEnd(sparseCounter);
598+
var gaugeOneDV = leaf.getSortedNumericDocValues("gauge");
599+
assertNotNull(gaugeOneDV);
600+
validateRunEnd(gaugeOneDV);
601+
var sparseGaugeDV = leaf.getSortedNumericDocValues("sparse_gauge");
602+
assertNotNull(sparseGaugeDV);
603+
validateRunEnd(sparseGaugeDV);
604+
var tagDV = leaf.getSortedDocValues("tag");
605+
assertNotNull(tagDV);
606+
validateRunEnd(tagDV);
607+
var sparseTagDV = leaf.getSortedDocValues("sparse_tag");
608+
assertNotNull(sparseTagDV);
609+
validateRunEnd(sparseTagDV);
610+
var tagsDV = leaf.getSortedSetDocValues("tags");
611+
assertNotNull(tagsDV);
612+
validateRunEnd(tagsDV);
613+
var sparseTagsDV = leaf.getSortedSetDocValues("sparse_tags");
614+
assertNotNull(sparseTagsDV);
615+
validateRunEnd(sparseTagsDV);
616+
var tagBytesDV = leaf.getBinaryDocValues("tags_as_bytes");
617+
assertNotNull(tagBytesDV);
618+
validateRunEnd(tagBytesDV);
619+
var sparseTagBytesDV = leaf.getBinaryDocValues("sparse_tags_as_bytes");
620+
assertNotNull(sparseTagBytesDV);
621+
validateRunEnd(sparseTagBytesDV);
622+
}
623+
}
624+
}
625+
626+
private void validateRunEnd(DocIdSetIterator iterator) throws IOException {
627+
int runCount = 0;
628+
while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
629+
int runLength = iterator.docIDRunEnd() - iterator.docID() - 1;
630+
if (runLength > 1) {
631+
runCount++;
632+
for (int i = 0; i < runLength; i++) {
633+
int expected = iterator.docID() + 1;
634+
assertEquals(expected, iterator.advance(expected));
635+
}
636+
}
637+
}
638+
assertTrue("Expected docid runs of greater than 1", runCount > 0);
639+
}
640+
508641
private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) {
509642
var config = new IndexWriterConfig();
510643
config.setIndexSort(

0 commit comments

Comments
 (0)