Skip to content

Commit 0c1653d

Browse files
committed
Use local segment fieldInfos to lookup tsdb merge stats
1 parent 5df6262 commit 0c1653d

File tree

4 files changed

+161
-3
lines changed

4 files changed

+161
-3
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class DocValuesConsumerUtil {
2424

2525
record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField, int minLength, int maxLength) {}
2626

27-
static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo fieldInfo) {
27+
static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo mergedFieldInfo) {
2828
if (optimizedMergeEnabled == false || mergeState.needsIndexSort == false) {
2929
return UNSUPPORTED;
3030
}
@@ -42,6 +42,10 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me
4242
int maxLength = 0;
4343

4444
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
45+
final FieldInfo fieldInfo = mergeState.fieldInfos[i].fieldInfo(mergedFieldInfo.name);
46+
if (fieldInfo == null) {
47+
continue;
48+
}
4549
DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i];
4650
if (docValuesProducer instanceof FilterDocValuesProducer filterDocValuesProducer) {
4751
docValuesProducer = filterDocValuesProducer.getIn();

server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,13 @@ public class ES87TSDBDocValuesFormatTests extends BaseDocValuesFormatTestCase {
5858
LogConfigurator.configureESLogging();
5959
}
6060

61-
static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat {
61+
public static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat {
6262

6363
TestES87TSDBDocValuesFormat() {
6464
super();
6565
}
6666

67-
TestES87TSDBDocValuesFormat(int skipIndexIntervalSize) {
67+
public TestES87TSDBDocValuesFormat(int skipIndexIntervalSize) {
6868
super(skipIndexIntervalSize);
6969
}
7070

server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.lucene.codecs.Codec;
1313
import org.apache.lucene.codecs.DocValuesFormat;
14+
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
1415
import org.apache.lucene.document.BinaryDocValuesField;
1516
import org.apache.lucene.document.Document;
1617
import org.apache.lucene.document.NumericDocValuesField;
@@ -21,17 +22,26 @@
2122
import org.apache.lucene.index.DocValues;
2223
import org.apache.lucene.index.IndexWriter;
2324
import org.apache.lucene.index.IndexWriterConfig;
25+
import org.apache.lucene.index.IndexableField;
26+
import org.apache.lucene.index.LeafReader;
2427
import org.apache.lucene.index.LogByteSizeMergePolicy;
28+
import org.apache.lucene.index.NumericDocValues;
29+
import org.apache.lucene.index.SortedNumericDocValues;
2530
import org.apache.lucene.search.Sort;
2631
import org.apache.lucene.search.SortField;
2732
import org.apache.lucene.search.SortedNumericSortField;
2833
import org.apache.lucene.util.BytesRef;
2934
import org.elasticsearch.cluster.metadata.DataStream;
35+
import org.elasticsearch.common.Randomness;
3036
import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec;
3137
import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests;
3238

39+
import java.io.IOException;
40+
import java.util.ArrayList;
3341
import java.util.Arrays;
42+
import java.util.List;
3443
import java.util.Locale;
44+
import java.util.function.Supplier;
3545

3646
public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests {
3747

@@ -514,6 +524,114 @@ public void testWithNoValueMultiValue() throws Exception {
514524
}
515525
}
516526

527+
public void testAddIndices() throws IOException {
528+
String timestampField = "@timestamp";
529+
String hostnameField = "host.name";
530+
Supplier<IndexWriterConfig> indexConfigWithRandomDVFormat = () -> {
531+
IndexWriterConfig config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField);
532+
DocValuesFormat dvFormat = switch (random().nextInt(3)) {
533+
case 0 -> new ES87TSDBDocValuesFormatTests.TestES87TSDBDocValuesFormat(random().nextInt(4, 16));
534+
case 1 -> new ES819TSDBDocValuesFormat();
535+
case 2 -> new Lucene90DocValuesFormat();
536+
default -> throw new AssertionError("unknown option");
537+
};
538+
config.setCodec(new Elasticsearch900Lucene101Codec() {
539+
@Override
540+
public DocValuesFormat getDocValuesFormatForField(String field) {
541+
return dvFormat;
542+
}
543+
});
544+
return config;
545+
};
546+
try (var source1 = newDirectory(); var source2 = newDirectory(); var singleDir = newDirectory(); var mergeDir = newDirectory()) {
547+
try (
548+
var writer1 = new IndexWriter(source1, indexConfigWithRandomDVFormat.get());
549+
var writer2 = new IndexWriter(source2, indexConfigWithRandomDVFormat.get());
550+
var singleWriter = new IndexWriter(singleDir, indexConfigWithRandomDVFormat.get())
551+
) {
552+
int numDocs = 1 + random().nextInt(1_000);
553+
long timestamp = random().nextLong(1000_000L);
554+
for (int i = 0; i < numDocs; i++) {
555+
List<IndexableField> fields = new ArrayList<>();
556+
String hostName = String.format(Locale.ROOT, "host-%d", random().nextInt(5));
557+
timestamp += 1 + random().nextInt(1_000);
558+
fields.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName)));
559+
fields.add(new SortedNumericDocValuesField(timestampField, timestamp));
560+
final IndexWriter splitWriter = random().nextBoolean() ? writer1 : writer2;
561+
if (random().nextBoolean()) {
562+
fields.add(new SortedNumericDocValuesField("gets", random().nextLong(1000_000L)));
563+
} else {
564+
fields.add(new SortedNumericDocValuesField("posts", random().nextLong(1000_000L)));
565+
}
566+
fields.add(new NumericDocValuesField("memory", random().nextLong(1000_000L)));
567+
Randomness.shuffle(fields);
568+
splitWriter.addDocument(fields);
569+
if (random().nextInt(100) <= 5) {
570+
splitWriter.commit();
571+
}
572+
// add to the single writer
573+
singleWriter.addDocument(fields);
574+
if (random().nextInt(100) <= 5) {
575+
singleWriter.commit();
576+
}
577+
}
578+
if (random().nextBoolean()) {
579+
writer1.forceMerge(1);
580+
}
581+
if (random().nextBoolean()) {
582+
writer2.forceMerge(1);
583+
}
584+
singleWriter.commit();
585+
singleWriter.forceMerge(1);
586+
}
587+
try (var mergeWriter = new IndexWriter(mergeDir, getTimeSeriesIndexWriterConfig(hostnameField, timestampField))) {
588+
mergeWriter.addIndexes(source1, source2);
589+
mergeWriter.forceMerge(1);
590+
}
591+
try (var reader1 = DirectoryReader.open(singleDir); var reader2 = DirectoryReader.open(mergeDir)) {
592+
assertEquals(reader1.maxDoc(), reader2.maxDoc());
593+
assertEquals(1, reader1.leaves().size());
594+
assertEquals(1, reader2.leaves().size());
595+
for (int i = 0; i < reader1.leaves().size(); i++) {
596+
LeafReader leaf1 = reader1.leaves().get(i).reader();
597+
LeafReader leaf2 = reader2.leaves().get(i).reader();
598+
duelAssertNumericField(leaf1, leaf2, "gets");
599+
duelAssertNumericField(leaf1, leaf2, "posts");
600+
duelAssertNumericField(leaf1, leaf2, "memory");
601+
duelAssertNumericField(leaf1, leaf2, "@timestamp");
602+
}
603+
}
604+
}
605+
}
606+
607+
static void duelAssertNumericField(LeafReader reader1, LeafReader reader2, String fieldName) throws IOException {
608+
SortedNumericDocValues sdv1 = reader1.getSortedNumericDocValues(fieldName);
609+
SortedNumericDocValues sdv2 = reader2.getSortedNumericDocValues(fieldName);
610+
NumericDocValues dv1;
611+
NumericDocValues dv2;
612+
if (sdv1 != null) {
613+
dv1 = DocValues.unwrapSingleton(sdv1);
614+
assertNotNull(sdv2);
615+
dv2 = DocValues.unwrapSingleton(sdv2);
616+
assertNotNull(dv1);
617+
assertNotNull(dv2);
618+
} else {
619+
assertNull(sdv2);
620+
dv1 = reader1.getNumericDocValues(fieldName);
621+
dv2 = reader2.getNumericDocValues(fieldName);
622+
if (dv1 == null) {
623+
assertNull(dv2);
624+
return;
625+
}
626+
}
627+
while (dv1.nextDoc() != NumericDocValues.NO_MORE_DOCS) {
628+
assertNotEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
629+
assertEquals(dv1.docID(), dv2.docID());
630+
assertEquals(dv1.longValue(), dv2.longValue());
631+
}
632+
assertEquals(NumericDocValues.NO_MORE_DOCS, dv2.nextDoc());
633+
}
634+
517635
private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) {
518636
var config = new IndexWriterConfig();
519637
config.setIndexSort(

x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1212
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
1313
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
14+
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
1415
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
1516
import org.elasticsearch.action.bulk.BulkRequest;
1617
import org.elasticsearch.action.get.GetRequest;
1718
import org.elasticsearch.action.index.IndexRequest;
1819
import org.elasticsearch.action.search.SearchRequest;
20+
import org.elasticsearch.action.support.WriteRequest;
1921
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2022
import org.elasticsearch.cluster.metadata.Template;
2123
import org.elasticsearch.common.compress.CompressedXContent;
@@ -35,6 +37,8 @@
3537
import java.util.List;
3638
import java.util.UUID;
3739

40+
import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
41+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
3842
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
3943
import static org.hamcrest.Matchers.equalTo;
4044
import static org.hamcrest.Matchers.is;
@@ -206,6 +210,38 @@ private void checkIndexSearchAndRetrieval(String dataStreamName, boolean routeOn
206210
});
207211
}
208212

213+
public void testShrink() throws Exception {
214+
client().admin()
215+
.indices()
216+
.prepareCreate("my-logs")
217+
.setMapping("@timestamp", "type=date", "host.name", "type=keyword")
218+
.setSettings(indexSettings(between(3, 5), 0).put("index.mode", "logsdb").put("index.sort.field", "host.name"))
219+
.get();
220+
221+
long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-08-08T00:00:00Z");
222+
BulkRequest bulkRequest = new BulkRequest("my-logs");
223+
int numDocs = randomIntBetween(100, 10_000);
224+
for (int i = 0; i < numDocs; i++) {
225+
timestamp += randomIntBetween(0, 1000);
226+
String field = "field-" + randomIntBetween(1, 20);
227+
bulkRequest.add(
228+
new IndexRequest("my-logs").id(Integer.toString(i))
229+
.source("host.name", "host-" + between(1, 5), "@timestamp", timestamp, field, randomNonNegativeLong())
230+
);
231+
}
232+
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
233+
client().bulk(bulkRequest).actionGet();
234+
client().admin().indices().prepareFlush("my-logs").get();
235+
client().admin().indices().prepareUpdateSettings("my-logs").setSettings(Settings.builder().put("index.blocks.write", true)).get();
236+
client().admin()
237+
.indices()
238+
.prepareResizeIndex("my-logs", "shrink-my-logs")
239+
.setResizeType(ResizeType.SHRINK)
240+
.setSettings(indexSettings(1, 0).build())
241+
.get();
242+
assertNoFailures(client().admin().indices().prepareForceMerge("shrink-my-logs").setMaxNumSegments(1).setFlush(true).get());
243+
}
244+
209245
static String formatInstant(Instant instant) {
210246
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
211247
}

0 commit comments

Comments
 (0)