From aa347a284813756975cdd6acda178b8bb926dc35 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 8 Aug 2025 18:00:51 -0700 Subject: [PATCH] Use local segment fieldInfos to lookup tsdb merge stats (#132597) Merging shrink TSDB or LogsDB indices can fail in versions 8.19 or 9.1+. When shrinking an index to a single shard, we use addIndexes, which can add Lucene segments directly. In this case, FieldInfos can differ between shards and the new segment. We should use the FieldInfos from each segment to retrieve the merge stats, instead of the FieldInfos of the merged segment. Relates #125403 --- docs/changelog/132597.yaml | 5 + .../tsdb/es819/DocValuesConsumerUtil.java | 6 +- .../tsdb/ES87TSDBDocValuesFormatTests.java | 4 +- .../es819/ES819TSDBDocValuesFormatTests.java | 191 ++++++++++++++++++ .../xpack/logsdb/LogsIndexingIT.java | 36 ++++ 5 files changed, 239 insertions(+), 3 deletions(-) create mode 100644 docs/changelog/132597.yaml diff --git a/docs/changelog/132597.yaml b/docs/changelog/132597.yaml new file mode 100644 index 0000000000000..46c52b7b40bba --- /dev/null +++ b/docs/changelog/132597.yaml @@ -0,0 +1,5 @@ +pr: 132597 +summary: Use local segment `fieldInfos` to lookup tsdb merge stats +area: Codec +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java index c29585f173316..9f9dbd11f52a3 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java @@ -24,7 +24,7 @@ class DocValuesConsumerUtil { record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField, int minLength, int maxLength) {} - static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo fieldInfo) { + static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo mergedFieldInfo) { if (optimizedMergeEnabled == false || mergeState.needsIndexSort == false) { return UNSUPPORTED; } @@ -42,6 +42,10 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me int maxLength = 0; for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + final FieldInfo fieldInfo = mergeState.fieldInfos[i].fieldInfo(mergedFieldInfo.name); + if (fieldInfo == null) { + continue; + } DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; if (docValuesProducer instanceof FilterDocValuesProducer filterDocValuesProducer) { docValuesProducer = filterDocValuesProducer.getIn(); diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java index a219ebb3740cc..daedfce7b71e1 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java @@ -58,13 +58,13 @@ public class ES87TSDBDocValuesFormatTests extends BaseDocValuesFormatTestCase { LogConfigurator.configureESLogging(); } - static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat { + public static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat { TestES87TSDBDocValuesFormat() { super(); } - TestES87TSDBDocValuesFormat(int skipIndexIntervalSize) { + public TestES87TSDBDocValuesFormat(int skipIndexIntervalSize) { super(skipIndexIntervalSize); } diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java index 368d6f23d0fa1..46b46fda11d56 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat; import org.apache.lucene.document.BinaryDocValuesField; import org.apache.lucene.document.Document; import org.apache.lucene.document.NumericDocValuesField; @@ -21,17 +22,29 @@ import org.apache.lucene.index.DocValues; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LogByteSizeMergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.util.BytesRef; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec; import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests; +import org.elasticsearch.test.ESTestCase; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Locale; +import java.util.function.Supplier; +import java.util.stream.IntStream; public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests { @@ -514,6 +527,184 @@ public void testWithNoValueMultiValue() throws Exception { } } + public void testAddIndices() throws IOException { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + Supplier indexConfigWithRandomDVFormat = () -> { + IndexWriterConfig config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + DocValuesFormat dvFormat = switch (random().nextInt(3)) { + case 0 -> new ES87TSDBDocValuesFormatTests.TestES87TSDBDocValuesFormat(random().nextInt(4, 16)); + case 1 -> new ES819TSDBDocValuesFormat(); + case 2 -> new Lucene90DocValuesFormat(); + default -> throw new AssertionError("unknown option"); + }; + config.setCodec(new Elasticsearch900Lucene101Codec() { + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return dvFormat; + } + }); + return config; + }; + var allNumericFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "numeric_" + n).toList(); + var allSortedNumericFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_numeric_" + n).toList(); + var allSortedFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_" + n).toList(); + var allSortedSetFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_set" + n).toList(); + var allBinaryFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "binary_" + n).toList(); + try (var source1 = newDirectory(); var source2 = newDirectory(); var singleDir = newDirectory(); var mergeDir = newDirectory()) { + try ( + var writer1 = new IndexWriter(source1, indexConfigWithRandomDVFormat.get()); + var writer2 = new IndexWriter(source2, indexConfigWithRandomDVFormat.get()); + var singleWriter = new IndexWriter(singleDir, indexConfigWithRandomDVFormat.get()) + ) { + int numDocs = 1 + random().nextInt(1_000); + long timestamp = random().nextLong(1000_000L); + for (int i = 0; i < numDocs; i++) { + List fields = new ArrayList<>(); + String hostName = String.format(Locale.ROOT, "host-%d", random().nextInt(5)); + timestamp += 1 + random().nextInt(1_000); + fields.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); + fields.add(new SortedNumericDocValuesField(timestampField, timestamp)); + var numericFields = ESTestCase.randomSubsetOf(allNumericFields); + for (String f : numericFields) { + fields.add(new NumericDocValuesField(f, random().nextLong(1000L))); + } + var sortedNumericFields = ESTestCase.randomSubsetOf(allSortedNumericFields); + for (String field : sortedNumericFields) { + int valueCount = 1 + random().nextInt(3); + for (int v = 0; v < valueCount; v++) { + fields.add(new SortedNumericDocValuesField(field, random().nextLong(1000L))); + } + } + var sortedFields = ESTestCase.randomSubsetOf(allSortedFields); + for (String field : sortedFields) { + fields.add(new SortedDocValuesField(field, new BytesRef("s" + random().nextInt(100)))); + } + var sortedSetFields = ESTestCase.randomSubsetOf(allSortedSetFields); + for (String field : sortedSetFields) { + int valueCount = 1 + random().nextInt(3); + for (int v = 0; v < valueCount; v++) { + fields.add(new SortedSetDocValuesField(field, new BytesRef("ss" + random().nextInt(100)))); + } + } + List binaryFields = ESTestCase.randomSubsetOf(allBinaryFields); + for (String field : binaryFields) { + fields.add(new BinaryDocValuesField(field, new BytesRef("b" + random().nextInt(100)))); + } + for (IndexWriter writer : List.of(ESTestCase.randomFrom(writer1, writer2), singleWriter)) { + Randomness.shuffle(fields); + writer.addDocument(fields); + if (random().nextInt(100) <= 5) { + writer.commit(); + } + } + } + if (random().nextBoolean()) { + writer1.forceMerge(1); + } + if (random().nextBoolean()) { + writer2.forceMerge(1); + } + singleWriter.commit(); + singleWriter.forceMerge(1); + } + try (var mergeWriter = new IndexWriter(mergeDir, getTimeSeriesIndexWriterConfig(hostnameField, timestampField))) { + mergeWriter.addIndexes(source1, source2); + mergeWriter.forceMerge(1); + } + try (var reader1 = DirectoryReader.open(singleDir); var reader2 = DirectoryReader.open(mergeDir)) { + assertEquals(reader1.maxDoc(), reader2.maxDoc()); + assertEquals(1, reader1.leaves().size()); + assertEquals(1, reader2.leaves().size()); + for (int i = 0; i < reader1.leaves().size(); i++) { + LeafReader leaf1 = reader1.leaves().get(i).reader(); + LeafReader leaf2 = reader2.leaves().get(i).reader(); + for (String f : CollectionUtils.appendToCopy(allSortedNumericFields, timestampField)) { + var dv1 = leaf1.getNumericDocValues(f); + var dv2 = leaf2.getNumericDocValues(f); + if (dv1 == null) { + assertNull(dv2); + continue; + } + assertNotNull(dv2); + while (dv1.nextDoc() != NumericDocValues.NO_MORE_DOCS) { + assertNotEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + assertEquals(dv1.docID(), dv2.docID()); + assertEquals(dv1.longValue(), dv2.longValue()); + } + assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + } + for (String f : CollectionUtils.appendToCopy(allSortedNumericFields, timestampField)) { + var dv1 = leaf1.getSortedNumericDocValues(f); + var dv2 = leaf2.getSortedNumericDocValues(f); + if (dv1 == null) { + assertNull(dv2); + continue; + } + assertNotNull(dv2); + while (dv1.nextDoc() != NumericDocValues.NO_MORE_DOCS) { + assertNotEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + assertEquals(dv1.docID(), dv2.docID()); + assertEquals(dv1.docValueCount(), dv2.docValueCount()); + for (int v = 0; v < dv1.docValueCount(); v++) { + assertEquals(dv1.nextValue(), dv2.nextValue()); + } + } + assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + } + for (String f : CollectionUtils.appendToCopy(allSortedFields, hostnameField)) { + var dv1 = leaf1.getSortedDocValues(f); + var dv2 = leaf2.getSortedDocValues(f); + if (dv1 == null) { + assertNull(dv2); + continue; + } + assertNotNull(dv2); + while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) { + assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc()); + assertEquals(dv1.docID(), dv2.docID()); + assertEquals(dv1.lookupOrd(dv1.ordValue()), dv2.lookupOrd(dv2.ordValue())); + } + assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + } + for (String f : allSortedSetFields) { + var dv1 = leaf1.getSortedSetDocValues(f); + var dv2 = leaf2.getSortedSetDocValues(f); + if (dv1 == null) { + assertNull(dv2); + continue; + } + assertNotNull(dv2); + while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) { + assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc()); + assertEquals(dv1.docID(), dv2.docID()); + assertEquals(dv1.docValueCount(), dv2.docValueCount()); + for (int v = 0; v < dv1.docValueCount(); v++) { + assertEquals(dv1.lookupOrd(dv1.nextOrd()), dv2.lookupOrd(dv2.nextOrd())); + } + } + assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + } + for (String f : allBinaryFields) { + var dv1 = leaf1.getBinaryDocValues(f); + var dv2 = leaf2.getBinaryDocValues(f); + if (dv1 == null) { + assertNull(dv2); + continue; + } + assertNotNull(dv2); + while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) { + assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc()); + assertEquals(dv1.docID(), dv2.docID()); + assertEquals(dv1.binaryValue(), dv2.binaryValue()); + } + assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc()); + } + } + } + } + } + private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) { var config = new IndexWriterConfig(); config.setIndexSort( diff --git a/x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java b/x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java index 82c8d9f5243d5..bea3b7343de3b 100644 --- a/x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java +++ b/x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java @@ -11,11 +11,13 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.admin.indices.shrink.ResizeType; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.compress.CompressedXContent; @@ -35,6 +37,8 @@ import java.util.List; import java.util.UUID; +import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -206,6 +210,38 @@ private void checkIndexSearchAndRetrieval(String dataStreamName, boolean routeOn }); } + public void testShrink() throws Exception { + client().admin() + .indices() + .prepareCreate("my-logs") + .setMapping("@timestamp", "type=date", "host.name", "type=keyword") + .setSettings(indexSettings(between(3, 5), 0).put("index.mode", "logsdb").put("index.sort.field", "host.name")) + .get(); + + long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-08-08T00:00:00Z"); + BulkRequest bulkRequest = new BulkRequest("my-logs"); + int numDocs = randomIntBetween(100, 10_000); + for (int i = 0; i < numDocs; i++) { + timestamp += randomIntBetween(0, 1000); + String field = "field-" + randomIntBetween(1, 20); + bulkRequest.add( + new IndexRequest("my-logs").id(Integer.toString(i)) + .source("host.name", "host-" + between(1, 5), "@timestamp", timestamp, field, randomNonNegativeLong()) + ); + } + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().bulk(bulkRequest).actionGet(); + client().admin().indices().prepareFlush("my-logs").get(); + client().admin().indices().prepareUpdateSettings("my-logs").setSettings(Settings.builder().put("index.blocks.write", true)).get(); + client().admin() + .indices() + .prepareResizeIndex("my-logs", "shrink-my-logs") + .setResizeType(ResizeType.SHRINK) + .setSettings(indexSettings(1, 0).build()) + .get(); + assertNoFailures(client().admin().indices().prepareForceMerge("shrink-my-logs").setMaxNumSegments(1).setFlush(true).get()); + } + static String formatInstant(Instant instant) { return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); }