diff --git a/docs/changelog/132597.yaml b/docs/changelog/132597.yaml new file mode 100644 index 0000000000000..46c52b7b40bba --- /dev/null +++ b/docs/changelog/132597.yaml @@ -0,0 +1,5 @@ +pr: 132597 +summary: Use local segment `fieldInfos` to lookup tsdb merge stats +area: Codec +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java index c29585f173316..9f9dbd11f52a3 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java @@ -24,7 +24,7 @@ class DocValuesConsumerUtil { record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField, int minLength, int maxLength) {} - static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo fieldInfo) { + static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo mergedFieldInfo) { if (optimizedMergeEnabled == false || mergeState.needsIndexSort == false) { return UNSUPPORTED; } @@ -42,6 +42,10 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me int maxLength = 0; for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + final FieldInfo fieldInfo = mergeState.fieldInfos[i].fieldInfo(mergedFieldInfo.name); + if (fieldInfo == null) { + continue; + } DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; if (docValuesProducer instanceof FilterDocValuesProducer filterDocValuesProducer) { docValuesProducer = filterDocValuesProducer.getIn(); diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java index a219ebb3740cc..daedfce7b71e1 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java @@ -58,13 +58,13 @@ public class ES87TSDBDocValuesFormatTests extends BaseDocValuesFormatTestCase { LogConfigurator.configureESLogging(); } - static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat { + public static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat { TestES87TSDBDocValuesFormat() { super(); } - TestES87TSDBDocValuesFormat(int skipIndexIntervalSize) { + public TestES87TSDBDocValuesFormat(int skipIndexIntervalSize) { super(skipIndexIntervalSize); } diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java index 368d6f23d0fa1..46b46fda11d56 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat; import org.apache.lucene.document.BinaryDocValuesField; import org.apache.lucene.document.Document; import org.apache.lucene.document.NumericDocValuesField; @@ -21,17 +22,29 @@ import org.apache.lucene.index.DocValues; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LogByteSizeMergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.util.BytesRef; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec; import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests; +import org.elasticsearch.test.ESTestCase; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Locale; +import java.util.function.Supplier; +import java.util.stream.IntStream; public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests { @@ -514,6 +527,184 @@ public void testWithNoValueMultiValue() throws Exception { } } + public void testAddIndices() throws IOException { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + Supplier indexConfigWithRandomDVFormat = () -> { + IndexWriterConfig config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + DocValuesFormat dvFormat = switch (random().nextInt(3)) { + case 0 -> new ES87TSDBDocValuesFormatTests.TestES87TSDBDocValuesFormat(random().nextInt(4, 16)); + case 1 -> new ES819TSDBDocValuesFormat(); + case 2 -> new Lucene90DocValuesFormat(); + default -> throw new AssertionError("unknown option"); + }; + config.setCodec(new Elasticsearch900Lucene101Codec() { + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return dvFormat; + } + }); + return config; + }; + var allNumericFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "numeric_" + n).toList(); + var allSortedNumericFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_numeric_" + n).toList(); + var allSortedFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_" + n).toList(); + var allSortedSetFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_set" + n).toList(); + var allBinaryFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "binary_" + n).toList(); + try (var source1 = newDirectory(); var source2 = newDirectory(); var singleDir = newDirectory(); var mergeDir = newDirectory()) { + try ( + var writer1 = new IndexWriter(source1, indexConfigWithRandomDVFormat.get()); + var writer2 = new IndexWriter(source2, indexConfigWithRandomDVFormat.get()); + var singleWriter = new IndexWriter(singleDir, indexConfigWithRandomDVFormat.get()) + ) { + int numDocs = 1 + random().nextInt(1_000); + long timestamp = random().nextLong(1000_000L); + for (int i = 0; i < numDocs; i++) { + List fields = new ArrayList<>(); + String hostName = String.format(Locale.ROOT, "host-%d", random().nextInt(5)); + timestamp += 1 + random().nextInt(1_000); + fields.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); + fields.add(new SortedNumericDocValuesField(timestampField, timestamp)); + var numericFields = ESTestCase.randomSubsetOf(allNumericFields); + for (String f : numericFields) { + fields.add(new NumericDocValuesField(f, random().nextLong(1000L))); + } + var sortedNumericFields = ESTestCase.randomSubsetOf(allSortedNumericFields); + for (String field : sortedNumericFields) { + int valueCount = 1 + random().nextInt(3); + for (int v = 0; v < valueCount; v++) { + fields.add(new SortedNumericDocValuesField(field, random().nextLong(1000L))); + } + } + var sortedFields = ESTestCase.randomSubsetOf(allSortedFields); + for (String field : sortedFields) { + fields.add(new SortedDocValuesField(field, new BytesRef("s" + random().nextInt(100)))); + } + var sortedSetFields = ESTestCase.randomSubsetOf(allSortedSetFields); + for (String field : sortedSetFields) { + int valueCount = 1 + random().nextInt(3); + for (int v = 0; v < valueCount; v++) { + fields.add(new SortedSetDocValuesField(field, new BytesRef("ss" + random().nextInt(100)))); + } + } + List binaryFields = ESTestCase.randomSubsetOf(allBinaryFields); + for (String field : binaryFields) { + fields.add(new BinaryDocValuesField(field, new BytesRef("b" + random().nextInt(100)))); + } + for (IndexWriter writer : List.of(ESTestCase.randomFrom(writer1, writer2), singleWriter)) { + Randomness.shuffle(fields); + writer.addDocument(fields); + if (random().nextInt(100) <= 5) { + writer.commit(); + } + } + } + if (random().nextBoolean()) { + writer1.forceMerge(1); + } + if (random().nextBoolean()) { + writer2.forceMerge(1); + } + singleWriter.commit(); + singleWriter.forceMerge(1); + } + try (var mergeWriter = new IndexWriter(mergeDir, getTimeSeriesIndexWriterConfig(hostnameField, timestampField))) { + mergeWriter.addIndexes(source1, source2); + mergeWriter.forceMerge(1); + } + try (var reader1 = DirectoryReader.open(singleDir); var reader2 = DirectoryReader.open(mergeDir)) { + assertEquals(reader1.maxDoc(), reader2.maxDoc()); + assertEquals(1, reader1.leaves().size()); + assertEquals(1, reader2.leaves().size()); + for (int i = 0; i < reader1.leaves().size(); i++) { + LeafReader leaf1 = reader1.leaves().get(i).reader(); + LeafReader leaf2 = reader2.leaves().get(i).reader(); + for (String f : CollectionUtils.appendToCopy(allSortedNumericFields, timestampField)) { + var dv1 = leaf1.getNumericDocValues(f); + var dv2 = leaf2.getNumericDocValues(f); + if (dv1 == null) { + assertNull(dv2); + continue; + } + assertNotNull(dv2); + while (dv1.nextDoc() != NumericDocValues.NO_MORE_DOCS) { + assertNotEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + assertEquals(dv1.docID(), dv2.docID()); + assertEquals(dv1.longValue(), dv2.longValue()); + } + assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + } + for (String f : CollectionUtils.appendToCopy(allSortedNumericFields, timestampField)) { + var dv1 = leaf1.getSortedNumericDocValues(f); + var dv2 = leaf2.getSortedNumericDocValues(f); + if (dv1 == null) { + assertNull(dv2); + continue; + } + assertNotNull(dv2); + while (dv1.nextDoc() != NumericDocValues.NO_MORE_DOCS) { + assertNotEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + assertEquals(dv1.docID(), dv2.docID()); + assertEquals(dv1.docValueCount(), dv2.docValueCount()); + for (int v = 0; v < dv1.docValueCount(); v++) { + assertEquals(dv1.nextValue(), dv2.nextValue()); + } + } + assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + } + for (String f : CollectionUtils.appendToCopy(allSortedFields, hostnameField)) { + var dv1 = leaf1.getSortedDocValues(f); + var dv2 = leaf2.getSortedDocValues(f); + if (dv1 == null) { + assertNull(dv2); + continue; + } + assertNotNull(dv2); + while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) { + assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc()); + assertEquals(dv1.docID(), dv2.docID()); + assertEquals(dv1.lookupOrd(dv1.ordValue()), dv2.lookupOrd(dv2.ordValue())); + } + assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + } + for (String f : allSortedSetFields) { + var dv1 = leaf1.getSortedSetDocValues(f); + var dv2 = leaf2.getSortedSetDocValues(f); + if (dv1 == null) { + assertNull(dv2); + continue; + } + assertNotNull(dv2); + while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) { + assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc()); + assertEquals(dv1.docID(), dv2.docID()); + assertEquals(dv1.docValueCount(), dv2.docValueCount()); + for (int v = 0; v < dv1.docValueCount(); v++) { + assertEquals(dv1.lookupOrd(dv1.nextOrd()), dv2.lookupOrd(dv2.nextOrd())); + } + } + assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + } + for (String f : allBinaryFields) { + var dv1 = leaf1.getBinaryDocValues(f); + var dv2 = leaf2.getBinaryDocValues(f); + if (dv1 == null) { + assertNull(dv2); + continue; + } + assertNotNull(dv2); + while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) { + assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc()); + assertEquals(dv1.docID(), dv2.docID()); + assertEquals(dv1.binaryValue(), dv2.binaryValue()); + } + assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + } + } + } + } + } + private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) { var config = new IndexWriterConfig(); config.setIndexSort( diff --git a/x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java b/x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java index 82c8d9f5243d5..bea3b7343de3b 100644 --- a/x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java +++ b/x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java @@ -11,11 +11,13 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.admin.indices.shrink.ResizeType; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.compress.CompressedXContent; @@ -35,6 +37,8 @@ import java.util.List; import java.util.UUID; +import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -206,6 +210,38 @@ private void checkIndexSearchAndRetrieval(String dataStreamName, boolean routeOn }); } + public void testShrink() throws Exception { + client().admin() + .indices() + .prepareCreate("my-logs") + .setMapping("@timestamp", "type=date", "host.name", "type=keyword") + .setSettings(indexSettings(between(3, 5), 0).put("index.mode", "logsdb").put("index.sort.field", "host.name")) + .get(); + + long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-08-08T00:00:00Z"); + BulkRequest bulkRequest = new BulkRequest("my-logs"); + int numDocs = randomIntBetween(100, 10_000); + for (int i = 0; i < numDocs; i++) { + timestamp += randomIntBetween(0, 1000); + String field = "field-" + randomIntBetween(1, 20); + bulkRequest.add( + new IndexRequest("my-logs").id(Integer.toString(i)) + .source("host.name", "host-" + between(1, 5), "@timestamp", timestamp, field, randomNonNegativeLong()) + ); + } + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().bulk(bulkRequest).actionGet(); + client().admin().indices().prepareFlush("my-logs").get(); + client().admin().indices().prepareUpdateSettings("my-logs").setSettings(Settings.builder().put("index.blocks.write", true)).get(); + client().admin() + .indices() + .prepareResizeIndex("my-logs", "shrink-my-logs") + .setResizeType(ResizeType.SHRINK) + .setSettings(indexSettings(1, 0).build()) + .get(); + assertNoFailures(client().admin().indices().prepareForceMerge("shrink-my-logs").setMaxNumSegments(1).setFlush(true).get()); + } + static String formatInstant(Instant instant) { return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); }