Skip to content

Commit 46f5396

Browse files
committed
Implement docIDRunEnd() on ES819TSDBDocValuesProducer
This method allows consumers to quickly check if a DocIdSetIterator matches a large run of documents; it was missing from our custom Codec DocValues implementations.
1 parent 8fecee3 commit 46f5396

File tree

2 files changed

+192
-10
lines changed

2 files changed

+192
-10
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,11 @@ public boolean advanceExact(int target) throws IOException {
294294
doc = target;
295295
return true;
296296
}
297+
298+
@Override
299+
public int docIDRunEnd() throws IOException {
300+
return maxDoc;
301+
}
297302
}
298303

299304
private abstract static class SparseBinaryDocValues extends BinaryDocValues {
@@ -328,6 +333,11 @@ public int advance(int target) throws IOException {
328333
public boolean advanceExact(int target) throws IOException {
329334
return disi.advanceExact(target);
330335
}
336+
337+
@Override
338+
public int docIDRunEnd() throws IOException {
339+
return disi.docIDRunEnd();
340+
}
331341
}
332342

333343
@Override
@@ -369,6 +379,11 @@ public int advance(int target) throws IOException {
369379
public long cost() {
370380
return ords.cost();
371381
}
382+
383+
@Override
384+
public int docIDRunEnd() throws IOException {
385+
return ords.docIDRunEnd();
386+
}
372387
};
373388
}
374389

@@ -750,6 +765,11 @@ public int advance(int target) throws IOException {
750765
public long cost() {
751766
return ords.cost();
752767
}
768+
769+
@Override
770+
public int docIDRunEnd() throws IOException {
771+
return ords.docIDRunEnd();
772+
}
753773
};
754774
}
755775

@@ -1086,6 +1106,11 @@ public boolean advanceExact(int target) {
10861106
public long cost() {
10871107
return maxDoc;
10881108
}
1109+
1110+
@Override
1111+
public int docIDRunEnd() {
1112+
return maxDoc;
1113+
}
10891114
};
10901115
} else {
10911116
final IndexedDISI disi = new IndexedDISI(
@@ -1127,6 +1152,11 @@ public long cost() {
11271152
public long longValue() {
11281153
return 0L;
11291154
}
1155+
1156+
@Override
1157+
public int docIDRunEnd() throws IOException {
1158+
return disi.docIDRunEnd();
1159+
}
11301160
};
11311161
}
11321162
}
@@ -1178,6 +1208,11 @@ public long cost() {
11781208
return maxDoc;
11791209
}
11801210

1211+
@Override
1212+
public int docIDRunEnd() {
1213+
return maxDoc;
1214+
}
1215+
11811216
@Override
11821217
public long longValue() throws IOException {
11831218
final int index = doc;
@@ -1286,6 +1321,11 @@ public long cost() {
12861321
return disi.cost();
12871322
}
12881323

1324+
@Override
1325+
public int docIDRunEnd() throws IOException {
1326+
return disi.docIDRunEnd();
1327+
}
1328+
12891329
@Override
12901330
public long longValue() throws IOException {
12911331
final int index = disi.index();
@@ -1406,6 +1446,11 @@ public long nextValue() throws IOException {
14061446
public int docValueCount() {
14071447
return count;
14081448
}
1449+
1450+
@Override
1451+
public int docIDRunEnd() {
1452+
return maxDoc;
1453+
}
14091454
};
14101455
} else {
14111456
// sparse
@@ -1463,6 +1508,11 @@ public int docValueCount() {
14631508
return count;
14641509
}
14651510

1511+
@Override
1512+
public int docIDRunEnd() throws IOException {
1513+
return disi.docIDRunEnd();
1514+
}
1515+
14661516
private void set() {
14671517
if (set == false) {
14681518
final int index = disi.index();

server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java

Lines changed: 142 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.lucene.index.LogByteSizeMergePolicy;
2828
import org.apache.lucene.index.NumericDocValues;
2929
import org.apache.lucene.index.SortedDocValues;
30+
import org.apache.lucene.search.DocIdSetIterator;
3031
import org.apache.lucene.search.IndexSearcher;
3132
import org.apache.lucene.search.Sort;
3233
import org.apache.lucene.search.SortField;
@@ -49,6 +50,8 @@
4950
import java.util.function.Supplier;
5051
import java.util.stream.IntStream;
5152

53+
import static org.elasticsearch.test.ESTestCase.randomFrom;
54+
5255
public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests {
5356

5457
final Codec codec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat());
@@ -67,9 +70,9 @@ public void testForceMergeDenseCase() throws Exception {
6770
try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
6871
long counter1 = 0;
6972
long counter2 = 10_000_000;
70-
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
71-
long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
72-
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };
73+
long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16};
74+
long[] gauge2Values = new long[]{-2, -4, -6, -8, -10, -12, -14, -16};
75+
String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"};
7376

7477
int numDocs = 256 + random().nextInt(1024);
7578
int numHosts = numDocs / 20;
@@ -290,9 +293,9 @@ public void testForceMergeSparseCase() throws Exception {
290293
try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
291294
long counter1 = 0;
292295
long counter2 = 10_000_000;
293-
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
294-
long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
295-
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };
296+
long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16};
297+
long[] gauge2Values = new long[]{-2, -4, -6, -8, -10, -12, -14, -16};
298+
String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"};
296299

297300
int numDocs = 256 + random().nextInt(1024);
298301
int numHosts = numDocs / 20;
@@ -442,8 +445,8 @@ public void testWithNoValueMultiValue() throws Exception {
442445

443446
var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField);
444447
try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
445-
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
446-
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };
448+
long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16};
449+
String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"};
447450
{
448451
long timestamp = baseTimestamp;
449452
for (int i = 0; i < numRounds; i++) {
@@ -709,7 +712,7 @@ public void testBulkLoading() throws Exception {
709712

710713
var config = getTimeSeriesIndexWriterConfig(null, timestampField);
711714
try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
712-
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
715+
long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16};
713716
int numDocs = 256 + random().nextInt(8096);
714717

715718
for (int i = 0; i < numDocs; i++) {
@@ -740,7 +743,7 @@ public void testBulkLoading() throws Exception {
740743
var counterDV = getBulkNumericDocValues(leaf.reader(), counterField);
741744
var gaugeDV = getBulkNumericDocValues(leaf.reader(), gaugeField);
742745
int maxDoc = leaf.reader().maxDoc();
743-
for (int i = 0; i < maxDoc;) {
746+
for (int i = 0; i < maxDoc; ) {
744747
int size = Math.max(1, random().nextInt(0, maxDoc - i));
745748
var docs = TestBlock.docs(IntStream.range(i, i + size).toArray());
746749

@@ -959,6 +962,135 @@ private static BulkNumericDocValues getBulkNumericDocValues(LeafReader leafReade
959962
return (BulkNumericDocValues) DocValues.unwrapSingleton(leafReader.getSortedNumericDocValues(counterField));
960963
}
961964

965+
public void testDocIDEndRun() throws IOException {
966+
String timestampField = "@timestamp";
967+
String hostnameField = "host.name";
968+
long baseTimestamp = 1704067200000L;
969+
970+
var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField);
971+
try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
972+
long counter1 = 0;
973+
974+
975+
long[] gauge2Values = new long[]{-2, -4, -6, -8, -10, -12, -14, -16};
976+
String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"};
977+
978+
// IndexedDISI stores ids in blocks of 4096. To test sparse end runs, we want a mixture of
979+
// dense and sparse blocks, so we need the gap frequency to be larger than
980+
// this value, but smaller than two blocks, and to index at least three blocks
981+
int gap_frequency = 4500 + random().nextInt(2048);
982+
int numDocs = 10000 + random().nextInt(10000);
983+
int numHosts = numDocs / 20;
984+
985+
for (int i = 0; i < numDocs; i++) {
986+
var d = new Document();
987+
988+
int batchIndex = i / numHosts;
989+
String hostName = String.format(Locale.ROOT, "host-%03d", batchIndex);
990+
long timestamp = baseTimestamp + (1000L * i);
991+
992+
d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName)));
993+
// Index sorting doesn't work with NumericDocValuesField:
994+
d.add(new SortedNumericDocValuesField(timestampField, timestamp));
995+
d.add(new NumericDocValuesField("counter", counter1++));
996+
if (i % gap_frequency != 0) {
997+
d.add(new NumericDocValuesField("sparse_counter", counter1));
998+
}
999+
1000+
int numGauge2 = 1 + random().nextInt(8);
1001+
for (int j = 0; j < numGauge2; j++) {
1002+
d.add(new SortedNumericDocValuesField("gauge", gauge2Values[(i + j) % gauge2Values.length]));
1003+
if (i % gap_frequency != 0) {
1004+
d.add(new SortedNumericDocValuesField("sparse_gauge", gauge2Values[(i + j) % gauge2Values.length]));
1005+
}
1006+
}
1007+
1008+
d.add(new SortedDocValuesField("tag", new BytesRef(randomFrom(tags))));
1009+
if (i % gap_frequency != 0) {
1010+
d.add(new SortedDocValuesField("sparse_tag", new BytesRef(randomFrom(tags))));
1011+
}
1012+
1013+
int numTags = 1 + random().nextInt(8);
1014+
for (int j = 0; j < numTags; j++) {
1015+
d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[(i + j) % tags.length])));
1016+
if (i % gap_frequency != 0) {
1017+
d.add(new SortedSetDocValuesField("sparse_tags", new BytesRef(tags[(i + j) % tags.length])));
1018+
}
1019+
}
1020+
1021+
d.add(new BinaryDocValuesField("tags_as_bytes", new BytesRef(tags[i % tags.length])));
1022+
if (i % gap_frequency != 0) {
1023+
d.add(new BinaryDocValuesField("sparse_tags_as_bytes", new BytesRef(tags[i % tags.length])));
1024+
}
1025+
1026+
iw.addDocument(d);
1027+
if (i % 100 == 0) {
1028+
iw.commit();
1029+
}
1030+
}
1031+
iw.commit();
1032+
1033+
iw.forceMerge(1);
1034+
1035+
try (var reader = DirectoryReader.open(iw)) {
1036+
assertEquals(1, reader.leaves().size());
1037+
assertEquals(numDocs, reader.maxDoc());
1038+
var leaf = reader.leaves().get(0).reader();
1039+
var hostNameDV = leaf.getSortedDocValues(hostnameField);
1040+
assertNotNull(hostNameDV);
1041+
validateRunEnd(hostNameDV);
1042+
var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField));
1043+
assertNotNull(timestampDV);
1044+
validateRunEnd(timestampDV);
1045+
var counterOneDV = leaf.getNumericDocValues("counter");
1046+
assertNotNull(counterOneDV);
1047+
validateRunEnd(counterOneDV);
1048+
var sparseCounter = leaf.getNumericDocValues("sparse_counter");
1049+
assertNotNull(sparseCounter);
1050+
validateRunEnd(sparseCounter);
1051+
var gaugeOneDV = leaf.getSortedNumericDocValues("gauge");
1052+
assertNotNull(gaugeOneDV);
1053+
validateRunEnd(gaugeOneDV);
1054+
var sparseGaugeDV = leaf.getSortedNumericDocValues("sparse_gauge");
1055+
assertNotNull(sparseGaugeDV);
1056+
validateRunEnd(sparseGaugeDV);
1057+
var tagDV = leaf.getSortedDocValues("tag");
1058+
assertNotNull(tagDV);
1059+
validateRunEnd(tagDV);
1060+
var sparseTagDV = leaf.getSortedDocValues("sparse_tag");
1061+
assertNotNull(sparseTagDV);
1062+
validateRunEnd(sparseTagDV);
1063+
var tagsDV = leaf.getSortedSetDocValues("tags");
1064+
assertNotNull(tagsDV);
1065+
validateRunEnd(tagsDV);
1066+
var sparseTagsDV = leaf.getSortedSetDocValues("sparse_tags");
1067+
assertNotNull(sparseTagsDV);
1068+
validateRunEnd(sparseTagsDV);
1069+
var tagBytesDV = leaf.getBinaryDocValues("tags_as_bytes");
1070+
assertNotNull(tagBytesDV);
1071+
validateRunEnd(tagBytesDV);
1072+
var sparseTagBytesDV = leaf.getBinaryDocValues("sparse_tags_as_bytes");
1073+
assertNotNull(sparseTagBytesDV);
1074+
validateRunEnd(sparseTagBytesDV);
1075+
}
1076+
}
1077+
}
1078+
1079+
private void validateRunEnd(DocIdSetIterator iterator) throws IOException {
1080+
int runCount = 0;
1081+
while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
1082+
int runLength = iterator.docIDRunEnd() - iterator.docID() - 1;
1083+
if (runLength > 1) {
1084+
runCount++;
1085+
for (int i = 0; i < runLength; i++) {
1086+
int expected = iterator.docID() + 1;
1087+
assertEquals(expected, iterator.advance(expected));
1088+
}
1089+
}
1090+
}
1091+
assertTrue("Expected docid runs of greater than 1", runCount > 0);
1092+
}
1093+
9621094
private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) {
9631095
var config = new IndexWriterConfig();
9641096
if (hostnameField != null) {

0 commit comments

Comments
 (0)