Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132597.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132597
summary: Use local segment `fieldInfos` to lookup tsdb merge stats
area: Codec
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DocValuesConsumerUtil {

record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField, int minLength, int maxLength) {}

static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo fieldInfo) {
static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo mergedFieldInfo) {
if (optimizedMergeEnabled == false || mergeState.needsIndexSort == false) {
return UNSUPPORTED;
}
Expand All @@ -42,6 +42,10 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me
int maxLength = 0;

for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
final FieldInfo fieldInfo = mergeState.fieldInfos[i].fieldInfo(mergedFieldInfo.name);
if (fieldInfo == null) {
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to this if statement:

var wrapped = perFieldReader.getDocValuesProducer(fieldInfo);
if (wrapped == null) {
   continue;
}

and so it is safe to continue here? (and we don't have to fallback to UNSUPPORTED)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be safe to continue with the merge optimization. If the FieldInfo is null in some segments, it is safe to skip those and use the merge stats from segments that have values.

}
DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i];
if (docValuesProducer instanceof FilterDocValuesProducer filterDocValuesProducer) {
docValuesProducer = filterDocValuesProducer.getIn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ public class ES87TSDBDocValuesFormatTests extends BaseDocValuesFormatTestCase {
LogConfigurator.configureESLogging();
}

static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat {
public static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat {

TestES87TSDBDocValuesFormat() {
super();
}

TestES87TSDBDocValuesFormat(int skipIndexIntervalSize) {
public TestES87TSDBDocValuesFormat(int skipIndexIntervalSize) {
super(skipIndexIntervalSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.NumericDocValuesField;
Expand All @@ -21,17 +22,29 @@
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec;
import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.function.Supplier;
import java.util.stream.IntStream;

public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests {

Expand Down Expand Up @@ -514,6 +527,184 @@ public void testWithNoValueMultiValue() throws Exception {
}
}

public void testAddIndices() throws IOException {
String timestampField = "@timestamp";
String hostnameField = "host.name";
Supplier<IndexWriterConfig> indexConfigWithRandomDVFormat = () -> {
IndexWriterConfig config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField);
DocValuesFormat dvFormat = switch (random().nextInt(3)) {
case 0 -> new ES87TSDBDocValuesFormatTests.TestES87TSDBDocValuesFormat(random().nextInt(4, 16));
case 1 -> new ES819TSDBDocValuesFormat();
case 2 -> new Lucene90DocValuesFormat();
default -> throw new AssertionError("unknown option");
};
config.setCodec(new Elasticsearch900Lucene101Codec() {
@Override
public DocValuesFormat getDocValuesFormatForField(String field) {
return dvFormat;
}
});
return config;
};
var allNumericFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "numeric_" + n).toList();
var allSortedNumericFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_numeric_" + n).toList();
var allSortedFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_" + n).toList();
var allSortedSetFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "sorted_set" + n).toList();
var allBinaryFields = IntStream.range(0, ESTestCase.between(1, 10)).mapToObj(n -> "binary_" + n).toList();
try (var source1 = newDirectory(); var source2 = newDirectory(); var singleDir = newDirectory(); var mergeDir = newDirectory()) {
try (
var writer1 = new IndexWriter(source1, indexConfigWithRandomDVFormat.get());
var writer2 = new IndexWriter(source2, indexConfigWithRandomDVFormat.get());
var singleWriter = new IndexWriter(singleDir, indexConfigWithRandomDVFormat.get())
) {
int numDocs = 1 + random().nextInt(1_000);
long timestamp = random().nextLong(1000_000L);
for (int i = 0; i < numDocs; i++) {
List<IndexableField> fields = new ArrayList<>();
String hostName = String.format(Locale.ROOT, "host-%d", random().nextInt(5));
timestamp += 1 + random().nextInt(1_000);
fields.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName)));
fields.add(new SortedNumericDocValuesField(timestampField, timestamp));
var numericFields = ESTestCase.randomSubsetOf(allNumericFields);
for (String f : numericFields) {
fields.add(new NumericDocValuesField(f, random().nextLong(1000L)));
}
var sortedNumericFields = ESTestCase.randomSubsetOf(allSortedNumericFields);
for (String field : sortedNumericFields) {
int valueCount = 1 + random().nextInt(3);
for (int v = 0; v < valueCount; v++) {
fields.add(new SortedNumericDocValuesField(field, random().nextLong(1000L)));
}
}
var sortedFields = ESTestCase.randomSubsetOf(allSortedFields);
for (String field : sortedFields) {
fields.add(new SortedDocValuesField(field, new BytesRef("s" + random().nextInt(100))));
}
var sortedSetFields = ESTestCase.randomSubsetOf(allSortedSetFields);
for (String field : sortedSetFields) {
int valueCount = 1 + random().nextInt(3);
for (int v = 0; v < valueCount; v++) {
fields.add(new SortedSetDocValuesField(field, new BytesRef("ss" + random().nextInt(100))));
}
}
List<String> binaryFields = ESTestCase.randomSubsetOf(allBinaryFields);
for (String field : binaryFields) {
fields.add(new BinaryDocValuesField(field, new BytesRef("b" + random().nextInt(100))));
}
for (IndexWriter writer : List.of(ESTestCase.randomFrom(writer1, writer2), singleWriter)) {
Randomness.shuffle(fields);
writer.addDocument(fields);
if (random().nextInt(100) <= 5) {
writer.commit();
}
}
}
if (random().nextBoolean()) {
writer1.forceMerge(1);
}
if (random().nextBoolean()) {
writer2.forceMerge(1);
}
singleWriter.commit();
singleWriter.forceMerge(1);
}
try (var mergeWriter = new IndexWriter(mergeDir, getTimeSeriesIndexWriterConfig(hostnameField, timestampField))) {
mergeWriter.addIndexes(source1, source2);
mergeWriter.forceMerge(1);
}
try (var reader1 = DirectoryReader.open(singleDir); var reader2 = DirectoryReader.open(mergeDir)) {
assertEquals(reader1.maxDoc(), reader2.maxDoc());
assertEquals(1, reader1.leaves().size());
assertEquals(1, reader2.leaves().size());
for (int i = 0; i < reader1.leaves().size(); i++) {
LeafReader leaf1 = reader1.leaves().get(i).reader();
LeafReader leaf2 = reader2.leaves().get(i).reader();
for (String f : CollectionUtils.appendToCopy(allSortedNumericFields, timestampField)) {
var dv1 = leaf1.getNumericDocValues(f);
var dv2 = leaf2.getNumericDocValues(f);
if (dv1 == null) {
assertNull(dv2);
continue;
}
assertNotNull(dv2);
while (dv1.nextDoc() != NumericDocValues.NO_MORE_DOCS) {
assertNotEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
assertEquals(dv1.docID(), dv2.docID());
assertEquals(dv1.longValue(), dv2.longValue());
}
assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
}
for (String f : CollectionUtils.appendToCopy(allSortedNumericFields, timestampField)) {
var dv1 = leaf1.getSortedNumericDocValues(f);
var dv2 = leaf2.getSortedNumericDocValues(f);
if (dv1 == null) {
assertNull(dv2);
continue;
}
assertNotNull(dv2);
while (dv1.nextDoc() != NumericDocValues.NO_MORE_DOCS) {
assertNotEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
assertEquals(dv1.docID(), dv2.docID());
assertEquals(dv1.docValueCount(), dv2.docValueCount());
for (int v = 0; v < dv1.docValueCount(); v++) {
assertEquals(dv1.nextValue(), dv2.nextValue());
}
}
assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
}
for (String f : CollectionUtils.appendToCopy(allSortedFields, hostnameField)) {
var dv1 = leaf1.getSortedDocValues(f);
var dv2 = leaf2.getSortedDocValues(f);
if (dv1 == null) {
assertNull(dv2);
continue;
}
assertNotNull(dv2);
while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) {
assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc());
assertEquals(dv1.docID(), dv2.docID());
assertEquals(dv1.lookupOrd(dv1.ordValue()), dv2.lookupOrd(dv2.ordValue()));
}
assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
}
for (String f : allSortedSetFields) {
var dv1 = leaf1.getSortedSetDocValues(f);
var dv2 = leaf2.getSortedSetDocValues(f);
if (dv1 == null) {
assertNull(dv2);
continue;
}
assertNotNull(dv2);
while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) {
assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc());
assertEquals(dv1.docID(), dv2.docID());
assertEquals(dv1.docValueCount(), dv2.docValueCount());
for (int v = 0; v < dv1.docValueCount(); v++) {
assertEquals(dv1.lookupOrd(dv1.nextOrd()), dv2.lookupOrd(dv2.nextOrd()));
}
}
assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
}
for (String f : allBinaryFields) {
var dv1 = leaf1.getBinaryDocValues(f);
var dv2 = leaf2.getBinaryDocValues(f);
if (dv1 == null) {
assertNull(dv2);
continue;
}
assertNotNull(dv2);
while (dv1.nextDoc() != SortedDocValues.NO_MORE_DOCS) {
assertNotEquals(SortedDocValues.NO_MORE_DOCS, dv2.nextDoc());
assertEquals(dv1.docID(), dv2.docID());
assertEquals(dv1.binaryValue(), dv2.binaryValue());
}
assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
}
}
}
}
}

private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) {
var config = new IndexWriterConfig();
config.setIndexSort(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
Expand All @@ -35,6 +37,8 @@
import java.util.List;
import java.util.UUID;

import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -206,6 +210,38 @@ private void checkIndexSearchAndRetrieval(String dataStreamName, boolean routeOn
});
}

public void testShrink() throws Exception {
client().admin()
.indices()
.prepareCreate("my-logs")
.setMapping("@timestamp", "type=date", "host.name", "type=keyword")
.setSettings(indexSettings(between(3, 5), 0).put("index.mode", "logsdb").put("index.sort.field", "host.name"))
.get();

long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-08-08T00:00:00Z");
BulkRequest bulkRequest = new BulkRequest("my-logs");
int numDocs = randomIntBetween(100, 10_000);
for (int i = 0; i < numDocs; i++) {
timestamp += randomIntBetween(0, 1000);
String field = "field-" + randomIntBetween(1, 20);
bulkRequest.add(
new IndexRequest("my-logs").id(Integer.toString(i))
.source("host.name", "host-" + between(1, 5), "@timestamp", timestamp, field, randomNonNegativeLong())
);
}
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client().bulk(bulkRequest).actionGet();
client().admin().indices().prepareFlush("my-logs").get();
client().admin().indices().prepareUpdateSettings("my-logs").setSettings(Settings.builder().put("index.blocks.write", true)).get();
client().admin()
.indices()
.prepareResizeIndex("my-logs", "shrink-my-logs")
.setResizeType(ResizeType.SHRINK)
.setSettings(indexSettings(1, 0).build())
.get();
assertNoFailures(client().admin().indices().prepareForceMerge("shrink-my-logs").setMaxNumSegments(1).setFlush(true).get());
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}
Expand Down