From 46f53961b4cbcea785124c4242a9d9b8a14d52bb Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Tue, 12 Aug 2025 10:12:52 +0100 Subject: [PATCH 1/2] Implement docIDRunEnd() on ES819TSDBDocValuesProducer This method allows consumers to quickly check if a DocIdSetIterator matches a large run of documents; it was missing from our custom Codec DocValues implementations. --- .../es819/ES819TSDBDocValuesProducer.java | 50 ++++++ .../es819/ES819TSDBDocValuesFormatTests.java | 152 ++++++++++++++++-- 2 files changed, 192 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java index 163e4c729bb95..b5cab3973bea5 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java @@ -294,6 +294,11 @@ public boolean advanceExact(int target) throws IOException { doc = target; return true; } + + @Override + public int docIDRunEnd() throws IOException { + return maxDoc; + } } private abstract static class SparseBinaryDocValues extends BinaryDocValues { @@ -328,6 +333,11 @@ public int advance(int target) throws IOException { public boolean advanceExact(int target) throws IOException { return disi.advanceExact(target); } + + @Override + public int docIDRunEnd() throws IOException { + return disi.docIDRunEnd(); + } } @Override @@ -369,6 +379,11 @@ public int advance(int target) throws IOException { public long cost() { return ords.cost(); } + + @Override + public int docIDRunEnd() throws IOException { + return ords.docIDRunEnd(); + } }; } @@ -750,6 +765,11 @@ public int advance(int target) throws IOException { public long cost() { return ords.cost(); } + + @Override + public int docIDRunEnd() throws IOException { + return ords.docIDRunEnd(); + } }; } @@ -1086,6 +1106,11 @@ public boolean advanceExact(int target) { public long cost() { return maxDoc; } + + @Override + public int docIDRunEnd() { + return maxDoc; + } }; } else { final IndexedDISI disi = new IndexedDISI( @@ -1127,6 +1152,11 @@ public long cost() { public long longValue() { return 0L; } + + @Override + public int docIDRunEnd() throws IOException { + return disi.docIDRunEnd(); + } }; } } @@ -1178,6 +1208,11 @@ public long cost() { return maxDoc; } + @Override + public int docIDRunEnd() { + return maxDoc; + } + @Override public long longValue() throws IOException { final int index = doc; @@ -1286,6 +1321,11 @@ public long cost() { return disi.cost(); } + @Override + public int docIDRunEnd() throws IOException { + return disi.docIDRunEnd(); + } + @Override public long longValue() throws IOException { final int index = disi.index(); @@ -1406,6 +1446,11 @@ public long nextValue() throws IOException { public int docValueCount() { return count; } + + @Override + public int docIDRunEnd() { + return maxDoc; + } }; } else { // sparse @@ -1463,6 +1508,11 @@ public int docValueCount() { return count; } + @Override + public int docIDRunEnd() throws IOException { + return disi.docIDRunEnd(); + } + private void set() { if (set == false) { final int index = disi.index(); diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java index 19118c7ac3270..9d7525a0567e4 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java @@ -27,6 +27,7 @@ import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; @@ -49,6 +50,8 @@ import java.util.function.Supplier; import java.util.stream.IntStream; +import static org.elasticsearch.test.ESTestCase.randomFrom; + public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests { final Codec codec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()); @@ -67,9 +70,9 @@ public void testForceMergeDenseCase() throws Exception { try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { long counter1 = 0; long counter2 = 10_000_000; - long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; - long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; - String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16}; + long[] gauge2Values = new long[]{-2, -4, -6, -8, -10, -12, -14, -16}; + String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"}; int numDocs = 256 + random().nextInt(1024); int numHosts = numDocs / 20; @@ -290,9 +293,9 @@ public void testForceMergeSparseCase() throws Exception { try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { long counter1 = 0; long counter2 = 10_000_000; - long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; - long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; - String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16}; + long[] gauge2Values = new long[]{-2, -4, -6, -8, -10, -12, -14, -16}; + String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"}; int numDocs = 256 + random().nextInt(1024); int numHosts = numDocs / 20; @@ -442,8 +445,8 @@ public void testWithNoValueMultiValue() throws Exception { var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { - long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; - String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16}; + String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"}; { long timestamp = baseTimestamp; for (int i = 0; i < numRounds; i++) { @@ -709,7 +712,7 @@ public void testBulkLoading() throws Exception { var config = getTimeSeriesIndexWriterConfig(null, timestampField); try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { - long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16}; int numDocs = 256 + random().nextInt(8096); for (int i = 0; i < numDocs; i++) { @@ -740,7 +743,7 @@ public void testBulkLoading() throws Exception { var counterDV = getBulkNumericDocValues(leaf.reader(), counterField); var gaugeDV = getBulkNumericDocValues(leaf.reader(), gaugeField); int maxDoc = leaf.reader().maxDoc(); - for (int i = 0; i < maxDoc;) { + for (int i = 0; i < maxDoc; ) { int size = Math.max(1, random().nextInt(0, maxDoc - i)); var docs = TestBlock.docs(IntStream.range(i, i + size).toArray()); @@ -959,6 +962,135 @@ private static BulkNumericDocValues getBulkNumericDocValues(LeafReader leafReade return (BulkNumericDocValues) DocValues.unwrapSingleton(leafReader.getSortedNumericDocValues(counterField)); } + public void testDocIDEndRun() throws IOException { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + long baseTimestamp = 1704067200000L; + + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { + long counter1 = 0; + + + long[] gauge2Values = new long[]{-2, -4, -6, -8, -10, -12, -14, -16}; + String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"}; + + // IndexedDISI stores ids in blocks of 4096. To test sparse end runs, we want a mixture of + // dense and sparse blocks, so we need the gap frequency to be larger than + // this value, but smaller than two blocks, and to index at least three blocks + int gap_frequency = 4500 + random().nextInt(2048); + int numDocs = 10000 + random().nextInt(10000); + int numHosts = numDocs / 20; + + for (int i = 0; i < numDocs; i++) { + var d = new Document(); + + int batchIndex = i / numHosts; + String hostName = String.format(Locale.ROOT, "host-%03d", batchIndex); + long timestamp = baseTimestamp + (1000L * i); + + d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); + // Index sorting doesn't work with NumericDocValuesField: + d.add(new SortedNumericDocValuesField(timestampField, timestamp)); + d.add(new NumericDocValuesField("counter", counter1++)); + if (i % gap_frequency != 0) { + d.add(new NumericDocValuesField("sparse_counter", counter1)); + } + + int numGauge2 = 1 + random().nextInt(8); + for (int j = 0; j < numGauge2; j++) { + d.add(new SortedNumericDocValuesField("gauge", gauge2Values[(i + j) % gauge2Values.length])); + if (i % gap_frequency != 0) { + d.add(new SortedNumericDocValuesField("sparse_gauge", gauge2Values[(i + j) % gauge2Values.length])); + } + } + + d.add(new SortedDocValuesField("tag", new BytesRef(randomFrom(tags)))); + if (i % gap_frequency != 0) { + d.add(new SortedDocValuesField("sparse_tag", new BytesRef(randomFrom(tags)))); + } + + int numTags = 1 + random().nextInt(8); + for (int j = 0; j < numTags; j++) { + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[(i + j) % tags.length]))); + if (i % gap_frequency != 0) { + d.add(new SortedSetDocValuesField("sparse_tags", new BytesRef(tags[(i + j) % tags.length]))); + } + } + + d.add(new BinaryDocValuesField("tags_as_bytes", new BytesRef(tags[i % tags.length]))); + if (i % gap_frequency != 0) { + d.add(new BinaryDocValuesField("sparse_tags_as_bytes", new BytesRef(tags[i % tags.length]))); + } + + iw.addDocument(d); + if (i % 100 == 0) { + iw.commit(); + } + } + iw.commit(); + + iw.forceMerge(1); + + try (var reader = DirectoryReader.open(iw)) { + assertEquals(1, reader.leaves().size()); + assertEquals(numDocs, reader.maxDoc()); + var leaf = reader.leaves().get(0).reader(); + var hostNameDV = leaf.getSortedDocValues(hostnameField); + assertNotNull(hostNameDV); + validateRunEnd(hostNameDV); + var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); + assertNotNull(timestampDV); + validateRunEnd(timestampDV); + var counterOneDV = leaf.getNumericDocValues("counter"); + assertNotNull(counterOneDV); + validateRunEnd(counterOneDV); + var sparseCounter = leaf.getNumericDocValues("sparse_counter"); + assertNotNull(sparseCounter); + validateRunEnd(sparseCounter); + var gaugeOneDV = leaf.getSortedNumericDocValues("gauge"); + assertNotNull(gaugeOneDV); + validateRunEnd(gaugeOneDV); + var sparseGaugeDV = leaf.getSortedNumericDocValues("sparse_gauge"); + assertNotNull(sparseGaugeDV); + validateRunEnd(sparseGaugeDV); + var tagDV = leaf.getSortedDocValues("tag"); + assertNotNull(tagDV); + validateRunEnd(tagDV); + var sparseTagDV = leaf.getSortedDocValues("sparse_tag"); + assertNotNull(sparseTagDV); + validateRunEnd(sparseTagDV); + var tagsDV = leaf.getSortedSetDocValues("tags"); + assertNotNull(tagsDV); + validateRunEnd(tagsDV); + var sparseTagsDV = leaf.getSortedSetDocValues("sparse_tags"); + assertNotNull(sparseTagsDV); + validateRunEnd(sparseTagsDV); + var tagBytesDV = leaf.getBinaryDocValues("tags_as_bytes"); + assertNotNull(tagBytesDV); + validateRunEnd(tagBytesDV); + var sparseTagBytesDV = leaf.getBinaryDocValues("sparse_tags_as_bytes"); + assertNotNull(sparseTagBytesDV); + validateRunEnd(sparseTagBytesDV); + } + } + } + + private void validateRunEnd(DocIdSetIterator iterator) throws IOException { + int runCount = 0; + while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + int runLength = iterator.docIDRunEnd() - iterator.docID() - 1; + if (runLength > 1) { + runCount++; + for (int i = 0; i < runLength; i++) { + int expected = iterator.docID() + 1; + assertEquals(expected, iterator.advance(expected)); + } + } + } + assertTrue("Expected docid runs of greater than 1", runCount > 0); + } + private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) { var config = new IndexWriterConfig(); if (hostnameField != null) { From 1264fd0829b6dbba891d04cc98ca51444e2e4364 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 14 Aug 2025 15:40:48 +0000 Subject: [PATCH 2/2] [CI] Auto commit changes from spotless --- .../es819/ES819TSDBDocValuesFormatTests.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java index 9d7525a0567e4..2ea3791164cfd 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java @@ -70,9 +70,9 @@ public void testForceMergeDenseCase() throws Exception { try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { long counter1 = 0; long counter2 = 10_000_000; - long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16}; - long[] gauge2Values = new long[]{-2, -4, -6, -8, -10, -12, -14, -16}; - String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"}; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; int numDocs = 256 + random().nextInt(1024); int numHosts = numDocs / 20; @@ -293,9 +293,9 @@ public void testForceMergeSparseCase() throws Exception { try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { long counter1 = 0; long counter2 = 10_000_000; - long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16}; - long[] gauge2Values = new long[]{-2, -4, -6, -8, -10, -12, -14, -16}; - String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"}; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; int numDocs = 256 + random().nextInt(1024); int numHosts = numDocs / 20; @@ -445,8 +445,8 @@ public void testWithNoValueMultiValue() throws Exception { var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { - long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16}; - String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"}; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; { long timestamp = baseTimestamp; for (int i = 0; i < numRounds; i++) { @@ -712,7 +712,7 @@ public void testBulkLoading() throws Exception { var config = getTimeSeriesIndexWriterConfig(null, timestampField); try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { - long[] gauge1Values = new long[]{2, 4, 6, 8, 10, 12, 14, 16}; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; int numDocs = 256 + random().nextInt(8096); for (int i = 0; i < numDocs; i++) { @@ -743,7 +743,7 @@ public void testBulkLoading() throws Exception { var counterDV = getBulkNumericDocValues(leaf.reader(), counterField); var gaugeDV = getBulkNumericDocValues(leaf.reader(), gaugeField); int maxDoc = leaf.reader().maxDoc(); - for (int i = 0; i < maxDoc; ) { + for (int i = 0; i < maxDoc;) { int size = Math.max(1, random().nextInt(0, maxDoc - i)); var docs = TestBlock.docs(IntStream.range(i, i + size).toArray()); @@ -971,9 +971,8 @@ public void testDocIDEndRun() throws IOException { try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { long counter1 = 0; - - long[] gauge2Values = new long[]{-2, -4, -6, -8, -10, -12, -14, -16}; - String[] tags = new String[]{"tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"}; + long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; // IndexedDISI stores ids in blocks of 4096. To test sparse end runs, we want a mixture of // dense and sparse blocks, so we need the gap frequency to be larger than