Skip to content

Commit d21840c

Browse files
authored
Use local segment fieldInfos to lookup tsdb merge stats (#132597) (#132612)
Merging shrink TSDB or LogsDB indices can fail in versions 8.19 or 9.1+. When shrinking an index to a single shard, we use addIndexes, which can add Lucene segments directly. In this case, FieldInfos can differ between shards and the new segment. We should use the FieldInfos from each segment to retrieve the merge stats, instead of the FieldInfos of the merged segment. Relates #125403
1 parent 0dc5a23 commit d21840c

File tree

5 files changed

+239
-3
lines changed

5 files changed

+239
-3
lines changed

docs/changelog/132597.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132597
2+
summary: Use local segment `fieldInfos` to lookup tsdb merge stats
3+
area: Codec
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class DocValuesConsumerUtil {
2424

2525
record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField, int minLength, int maxLength) {}
2626

27-
static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo fieldInfo) {
27+
static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo mergedFieldInfo) {
2828
if (optimizedMergeEnabled == false || mergeState.needsIndexSort == false) {
2929
return UNSUPPORTED;
3030
}
@@ -42,6 +42,10 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me
4242
int maxLength = 0;
4343

4444
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
45+
final FieldInfo fieldInfo = mergeState.fieldInfos[i].fieldInfo(mergedFieldInfo.name);
46+
if (fieldInfo == null) {
47+
continue;
48+
}
4549
DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i];
4650
if (docValuesProducer instanceof FilterDocValuesProducer filterDocValuesProducer) {
4751
docValuesProducer = filterDocValuesProducer.getIn();

server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,13 @@ public class ES87TSDBDocValuesFormatTests extends BaseDocValuesFormatTestCase {
5858
LogConfigurator.configureESLogging();
5959
}
6060

61-
static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat {
61+
public static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat {
6262

6363
TestES87TSDBDocValuesFormat() {
6464
super();
6565
}
6666

67-
TestES87TSDBDocValuesFormat(int skipIndexIntervalSize) {
67+
public TestES87TSDBDocValuesFormat(int skipIndexIntervalSize) {
6868
super(skipIndexIntervalSize);
6969
}
7070

server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.lucene.codecs.Codec;
1313
import org.apache.lucene.codecs.DocValuesFormat;
14+
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
1415
import org.apache.lucene.document.BinaryDocValuesField;
1516
import org.apache.lucene.document.Document;
1617
import org.apache.lucene.document.NumericDocValuesField;
@@ -21,17 +22,29 @@
2122
import org.apache.lucene.index.DocValues;
2223
import org.apache.lucene.index.IndexWriter;
2324
import org.apache.lucene.index.IndexWriterConfig;
25+
import org.apache.lucene.index.IndexableField;
26+
import org.apache.lucene.index.LeafReader;
2427
import org.apache.lucene.index.LogByteSizeMergePolicy;
28+
import org.apache.lucene.index.NumericDocValues;
29+
import org.apache.lucene.index.SortedDocValues;
2530
import org.apache.lucene.search.Sort;
2631
import org.apache.lucene.search.SortField;
2732
import org.apache.lucene.search.SortedNumericSortField;
2833
import org.apache.lucene.util.BytesRef;
2934
import org.elasticsearch.cluster.metadata.DataStream;
35+
import org.elasticsearch.common.Randomness;
36+
import org.elasticsearch.common.util.CollectionUtils;
3037
import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec;
3138
import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests;
39+
import org.elasticsearch.test.ESTestCase;
3240

41+
import java.io.IOException;
42+
import java.util.ArrayList;
3343
import java.util.Arrays;
44+
import java.util.List;
3445
import java.util.Locale;
46+
import java.util.function.Supplier;
47+
import java.util.stream.IntStream;
3548

3649
public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests {
3750

@@ -514,6 +527,184 @@ public void testWithNoValueMultiValue() throws Exception {
514527
}
515528
}
516529

530+
public void testAddIndices() throws IOException {
531+
String timestampField = "@timestamp";
532+
String hostnameField = "host.name";
533+
Supplier<IndexWriterConfig> indexConfigWithRandomDVFormat = () -> {
534+
IndexWriterConfig config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField);
535+
DocValuesFormat dvFormat = switch (random().nextInt(3)) {
536+
case 0 -> new ES87TSDBDocValuesFormatTests.TestES87TSDBDocValuesFormat(random().nextInt(4, 16));
537+
case 1 -> new ES819TSDBDocValuesFormat();
538+
case 2 -> new Lucene90DocValuesFormat();
539+
default -> throw new AssertionError("unknown option");
540+
};
541+
config.setCodec(new Elasticsearch900Lucene101Codec() {
542+
@Override
543+
public DocValuesFormat getDocValuesFormatForField(String field) {
544+
return dvFormat;
545+
}
546+
});
547+
return config;
548+
};
549+
var allNumericFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "numeric_" + n).toList();
550+
var allSortedNumericFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_numeric_" + n).toList();
551+
var allSortedFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_" + n).toList();
552+
var allSortedSetFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_set" + n).toList();
553+
var allBinaryFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "binary_" + n).toList();
554+
try (var source1 = newDirectory(); var source2 = newDirectory(); var singleDir = newDirectory(); var mergeDir = newDirectory()) {
555+
try (
556+
var writer1 = new IndexWriter(source1, indexConfigWithRandomDVFormat.get());
557+
var writer2 = new IndexWriter(source2, indexConfigWithRandomDVFormat.get());
558+
var singleWriter = new IndexWriter(singleDir, indexConfigWithRandomDVFormat.get())
559+
) {
560+
int numDocs = 1 + random().nextInt(1_000);
561+
long timestamp = random().nextLong(1000_000L);
562+
for (int i = 0; i < numDocs; i++) {
563+
List<IndexableField> fields = new ArrayList<>();
564+
String hostName = String.format(Locale.ROOT, "host-%d", random().nextInt(5));
565+
timestamp += 1 + random().nextInt(1_000);
566+
fields.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName)));
567+
fields.add(new SortedNumericDocValuesField(timestampField, timestamp));
568+
var numericFields = ESTestCase.randomSubsetOf(allNumericFields);
569+
for (String f : numericFields) {
570+
fields.add(new NumericDocValuesField(f, random().nextLong(1000L)));
571+
}
572+
var sortedNumericFields = ESTestCase.randomSubsetOf(allSortedNumericFields);
573+
for (String field : sortedNumericFields) {
574+
int valueCount = 1 + random().nextInt(3);
575+
for (int v = 0; v < valueCount; v++) {
576+
fields.add(new SortedNumericDocValuesField(field, random().nextLong(1000L)));
577+
}
578+
}
579+
var sortedFields = ESTestCase.randomSubsetOf(allSortedFields);
580+
for (String field : sortedFields) {
581+
fields.add(new SortedDocValuesField(field, new BytesRef("s" + random().nextInt(100))));
582+
}
583+
var sortedSetFields = ESTestCase.randomSubsetOf(allSortedSetFields);
584+
for (String field : sortedSetFields) {
585+
int valueCount = 1 + random().nextInt(3);
586+
for (int v = 0; v < valueCount; v++) {
587+
fields.add(new SortedSetDocValuesField(field, new BytesRef("ss" + random().nextInt(100))));
588+
}
589+
}
590+
List<String> binaryFields = ESTestCase.randomSubsetOf(allBinaryFields);
591+
for (String field : binaryFields) {
592+
fields.add(new BinaryDocValuesField(field, new BytesRef("b" + random().nextInt(100))));
593+
}
594+
for (IndexWriter writer : List.of(ESTestCase.randomFrom(writer1, writer2), singleWriter)) {
595+
Randomness.shuffle(fields);
596+
writer.addDocument(fields);
597+
if (random().nextInt(100) <= 5) {
598+
writer.commit();
599+
}
600+
}
601+
}
602+
if (random().nextBoolean()) {
603+
writer1.forceMerge(1);
604+
}
605+
if (random().nextBoolean()) {
606+
writer2.forceMerge(1);
607+
}
608+
singleWriter.commit();
609+
singleWriter.forceMerge(1);
610+
}
611+
try (var mergeWriter = new IndexWriter(mergeDir, getTimeSeriesIndexWriterConfig(hostnameField, timestampField))) {
612+
mergeWriter.addIndexes(source1, source2);
613+
mergeWriter.forceMerge(1);
614+
}
615+
try (var reader1 = DirectoryReader.open(singleDir); var reader2 = DirectoryReader.open(mergeDir)) {
616+
assertEquals(reader1.maxDoc(), reader2.maxDoc());
617+
assertEquals(1, reader1.leaves().size());
618+
assertEquals(1, reader2.leaves().size());
619+
for (int i = 0; i < reader1.leaves().size(); i++) {
620+
LeafReader leaf1 = reader1.leaves().get(i).reader();
621+
LeafReader leaf2 = reader2.leaves().get(i).reader();
622+
for (String f : CollectionUtils.appendToCopy(allSortedNumericFields, timestampField)) {
623+
var dv1 = leaf1.getNumericDocValues(f);
624+
var dv2 = leaf2.getNumericDocValues(f);
625+
if (dv1 == null) {
626+
assertNull(dv2);
627+
continue;
628+
}
629+
assertNotNull(dv2);
630+
while (dv1.nextDoc() != NumericDocValues.NO_MORE_DOCS) {
631+
assertNotEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
632+
assertEquals(dv1.docID(), dv2.docID());
633+
assertEquals(dv1.longValue(), dv2.longValue());
634+
}
635+
assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
636+
}
637+
for (String f : CollectionUtils.appendToCopy(allSortedNumericFields, timestampField)) {
638+
var dv1 = leaf1.getSortedNumericDocValues(f);
639+
var dv2 = leaf2.getSortedNumericDocValues(f);
640+
if (dv1 == null) {
641+
assertNull(dv2);
642+
continue;
643+
}
644+
assertNotNull(dv2);
645+
while (dv1.nextDoc() != NumericDocValues.NO_MORE_DOCS) {
646+
assertNotEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
647+
assertEquals(dv1.docID(), dv2.docID());
648+
assertEquals(dv1.docValueCount(), dv2.docValueCount());
649+
for (int v = 0; v < dv1.docValueCount(); v++) {
650+
assertEquals(dv1.nextValue(), dv2.nextValue());
651+
}
652+
}
653+
assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
654+
}
655+
for (String f : CollectionUtils.appendToCopy(allSortedFields, hostnameField)) {
656+
var dv1 = leaf1.getSortedDocValues(f);
657+
var dv2 = leaf2.getSortedDocValues(f);
658+
if (dv1 == null) {
659+
assertNull(dv2);
660+
continue;
661+
}
662+
assertNotNull(dv2);
663+
while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) {
664+
assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc());
665+
assertEquals(dv1.docID(), dv2.docID());
666+
assertEquals(dv1.lookupOrd(dv1.ordValue()), dv2.lookupOrd(dv2.ordValue()));
667+
}
668+
assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
669+
}
670+
for (String f : allSortedSetFields) {
671+
var dv1 = leaf1.getSortedSetDocValues(f);
672+
var dv2 = leaf2.getSortedSetDocValues(f);
673+
if (dv1 == null) {
674+
assertNull(dv2);
675+
continue;
676+
}
677+
assertNotNull(dv2);
678+
while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) {
679+
assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc());
680+
assertEquals(dv1.docID(), dv2.docID());
681+
assertEquals(dv1.docValueCount(), dv2.docValueCount());
682+
for (int v = 0; v < dv1.docValueCount(); v++) {
683+
assertEquals(dv1.lookupOrd(dv1.nextOrd()), dv2.lookupOrd(dv2.nextOrd()));
684+
}
685+
}
686+
assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
687+
}
688+
for (String f : allBinaryFields) {
689+
var dv1 = leaf1.getBinaryDocValues(f);
690+
var dv2 = leaf2.getBinaryDocValues(f);
691+
if (dv1 == null) {
692+
assertNull(dv2);
693+
continue;
694+
}
695+
assertNotNull(dv2);
696+
while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) {
697+
assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc());
698+
assertEquals(dv1.docID(), dv2.docID());
699+
assertEquals(dv1.binaryValue(), dv2.binaryValue());
700+
}
701+
assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
702+
}
703+
}
704+
}
705+
}
706+
}
707+
517708
private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) {
518709
var config = new IndexWriterConfig();
519710
config.setIndexSort(

x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1212
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
1313
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
14+
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
1415
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
1516
import org.elasticsearch.action.bulk.BulkRequest;
1617
import org.elasticsearch.action.get.GetRequest;
1718
import org.elasticsearch.action.index.IndexRequest;
1819
import org.elasticsearch.action.search.SearchRequest;
20+
import org.elasticsearch.action.support.WriteRequest;
1921
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2022
import org.elasticsearch.cluster.metadata.Template;
2123
import org.elasticsearch.common.compress.CompressedXContent;
@@ -35,6 +37,8 @@
3537
import java.util.List;
3638
import java.util.UUID;
3739

40+
import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
41+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
3842
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
3943
import static org.hamcrest.Matchers.equalTo;
4044
import static org.hamcrest.Matchers.is;
@@ -206,6 +210,38 @@ private void checkIndexSearchAndRetrieval(String dataStreamName, boolean routeOn
206210
});
207211
}
208212

213+
public void testShrink() throws Exception {
214+
client().admin()
215+
.indices()
216+
.prepareCreate("my-logs")
217+
.setMapping("@timestamp", "type=date", "host.name", "type=keyword")
218+
.setSettings(indexSettings(between(3, 5), 0).put("index.mode", "logsdb").put("index.sort.field", "host.name"))
219+
.get();
220+
221+
long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-08-08T00:00:00Z");
222+
BulkRequest bulkRequest = new BulkRequest("my-logs");
223+
int numDocs = randomIntBetween(100, 10_000);
224+
for (int i = 0; i < numDocs; i++) {
225+
timestamp += randomIntBetween(0, 1000);
226+
String field = "field-" + randomIntBetween(1, 20);
227+
bulkRequest.add(
228+
new IndexRequest("my-logs").id(Integer.toString(i))
229+
.source("host.name", "host-" + between(1, 5), "@timestamp", timestamp, field, randomNonNegativeLong())
230+
);
231+
}
232+
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
233+
client().bulk(bulkRequest).actionGet();
234+
client().admin().indices().prepareFlush("my-logs").get();
235+
client().admin().indices().prepareUpdateSettings("my-logs").setSettings(Settings.builder().put("index.blocks.write", true)).get();
236+
client().admin()
237+
.indices()
238+
.prepareResizeIndex("my-logs", "shrink-my-logs")
239+
.setResizeType(ResizeType.SHRINK)
240+
.setSettings(indexSettings(1, 0).build())
241+
.get();
242+
assertNoFailures(client().admin().indices().prepareForceMerge("shrink-my-logs").setMaxNumSegments(1).setFlush(true).get());
243+
}
244+
209245
static String formatInstant(Instant instant) {
210246
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
211247
}

0 commit comments

Comments
 (0)