diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 073000979918e..b9293b7666263 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -653,15 +653,26 @@ public static Version parseVersionLenient(String toParse, Version defaultValue) * If no SegmentReader can be extracted an {@link IllegalStateException} is thrown. */ public static SegmentReader segmentReader(LeafReader reader) { + SegmentReader segmentReader = tryUnwrapSegmentReader(reader); + if (segmentReader == null) { + throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]"); + } + return segmentReader; + } + + /** + * Tries to extract a segment reader from the given index reader. Unlike {@link #segmentReader(LeafReader)} this method returns + * null if no SegmentReader can be unwrapped instead of throwing an exception. + */ + public static SegmentReader tryUnwrapSegmentReader(LeafReader reader) { if (reader instanceof SegmentReader) { return (SegmentReader) reader; } else if (reader instanceof final FilterLeafReader fReader) { - return segmentReader(FilterLeafReader.unwrap(fReader)); + return tryUnwrapSegmentReader(FilterLeafReader.unwrap(fReader)); } else if (reader instanceof final FilterCodecReader fReader) { - return segmentReader(FilterCodecReader.unwrap(fReader)); + return tryUnwrapSegmentReader(FilterCodecReader.unwrap(fReader)); } - // hard fail - we can't get a SegmentReader - throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]"); + return null; } @SuppressForbidden(reason = "Version#parseLeniently() used in a central place") diff --git a/server/src/main/java/org/elasticsearch/index/codec/TrackingInMemoryPostingsBytesCodec.java b/server/src/main/java/org/elasticsearch/index/codec/TrackingInMemoryPostingsBytesCodec.java new file mode 100644 index 0000000000000..a3f493cb566de --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/TrackingInMemoryPostingsBytesCodec.java @@ -0,0 +1,154 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec; + +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.FieldsConsumer; +import org.apache.lucene.codecs.FieldsProducer; +import org.apache.lucene.codecs.FilterCodec; +import org.apache.lucene.codecs.NormsProducer; +import org.apache.lucene.codecs.PostingsFormat; +import org.apache.lucene.codecs.SegmentInfoFormat; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.BytesRef; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.IntConsumer; + +public final class TrackingInMemoryPostingsBytesCodec extends FilterCodec { + public static final String IN_MEMORY_POSTINGS_BYTES_KEY = "es.postings.in_memory_bytes"; + + public TrackingInMemoryPostingsBytesCodec(Codec delegate) { + super(delegate.getName(), delegate); + } + + @Override + public SegmentInfoFormat segmentInfoFormat() { + SegmentInfoFormat format = super.segmentInfoFormat(); + return new SegmentInfoFormat() { + @Override + public SegmentInfo read(Directory directory, String segmentName, byte[] segmentID, IOContext context) throws IOException { + return format.read(directory, segmentName, segmentID, context); + } + + @Override + public void write(Directory dir, SegmentInfo info, IOContext ioContext) throws IOException { + format.write(dir, info, ioContext); + } + }; + } + + @Override + public PostingsFormat postingsFormat() { + PostingsFormat format = super.postingsFormat(); + return new PostingsFormat(format.getName()) { + @Override + public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException { + FieldsConsumer consumer = format.fieldsConsumer(state); + return new TrackingLengthFieldsConsumer(state, consumer); + } + + @Override + public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException { + return format.fieldsProducer(state); + } + }; + } + + static final class TrackingLengthFieldsConsumer extends FieldsConsumer { + final SegmentWriteState state; + final FieldsConsumer in; + final Map maxLengths = new HashMap<>(); + + TrackingLengthFieldsConsumer(SegmentWriteState state, FieldsConsumer in) { + this.state = state; + this.in = in; + } + + @Override + public void write(Fields fields, NormsProducer norms) throws IOException { + in.write(new TrackingLengthFields(fields, maxLengths), norms); + long totalLength = 0; + for (int len : maxLengths.values()) { + totalLength += len; // minTerm + totalLength += len; // maxTerm + } + state.segmentInfo.putAttribute(IN_MEMORY_POSTINGS_BYTES_KEY, Long.toString(totalLength)); + } + + @Override + public void close() throws IOException { + in.close(); + } + } + + static final class TrackingLengthFields extends FilterLeafReader.FilterFields { + final Map maxLengths; + + TrackingLengthFields(Fields in, Map maxLengths) { + super(in); + this.maxLengths = maxLengths; + } + + @Override + public Terms terms(String field) throws IOException { + Terms terms = super.terms(field); + if (terms == null) { + return null; + } + return new TrackingLengthTerms(terms, len -> maxLengths.compute(field, (k, v) -> v == null ? len : Math.max(v, len))); + } + } + + static final class TrackingLengthTerms extends FilterLeafReader.FilterTerms { + final IntConsumer onFinish; + + TrackingLengthTerms(Terms in, IntConsumer onFinish) { + super(in); + this.onFinish = onFinish; + } + + @Override + public TermsEnum iterator() throws IOException { + return new TrackingLengthTermsEnum(super.iterator(), onFinish); + } + } + + static final class TrackingLengthTermsEnum extends FilterLeafReader.FilterTermsEnum { + int maxTermLength = 0; + final IntConsumer onFinish; + + TrackingLengthTermsEnum(TermsEnum in, IntConsumer onFinish) { + super(in); + this.onFinish = onFinish; + } + + @Override + public BytesRef next() throws IOException { + final BytesRef term = super.next(); + if (term != null) { + maxTermLength = Math.max(maxTermLength, term.length); + } else { + onFinish.accept(maxTermLength); + } + return term; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7f6fe40dbaaf0..bbb2cefe9705d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -79,6 +79,7 @@ import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy; +import org.elasticsearch.index.codec.TrackingInMemoryPostingsBytesCodec; import org.elasticsearch.index.mapper.DocumentParser; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.LuceneDocument; @@ -2734,7 +2735,7 @@ private IndexWriterConfig getIndexWriterConfig() { iwc.setMaxFullFlushMergeWaitMillis(-1); iwc.setSimilarity(engineConfig.getSimilarity()); iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); - iwc.setCodec(engineConfig.getCodec()); + iwc.setCodec(new TrackingInMemoryPostingsBytesCodec(engineConfig.getCodec())); boolean useCompoundFile = engineConfig.getUseCompoundFile(); iwc.setUseCompoundFile(useCompoundFile); if (useCompoundFile == false) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f199e8f202959..8ff827872fa99 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.DelegatingAnalyzerWrapper; +import org.apache.lucene.codecs.FieldsProducer; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FieldInfos; @@ -20,6 +21,7 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SegmentReader; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; @@ -85,6 +87,7 @@ import org.elasticsearch.index.cache.request.ShardRequestCache; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.FieldInfosWithUsages; +import org.elasticsearch.index.codec.TrackingInMemoryPostingsBytesCodec; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine.GetResult; @@ -4145,6 +4148,7 @@ public void afterRefresh(boolean didRefresh) { int numSegments = 0; int totalFields = 0; long usages = 0; + long totalPostingBytes = 0; for (LeafReaderContext leaf : searcher.getLeafContexts()) { numSegments++; var fieldInfos = leaf.reader().getFieldInfos(); @@ -4156,8 +4160,18 @@ public void afterRefresh(boolean didRefresh) { } else { usages = -1; } + SegmentReader segmentReader = Lucene.tryUnwrapSegmentReader(leaf.reader()); + if (segmentReader != null) { + FieldsProducer postingsReader = segmentReader.getPostingsReader(); + String postingBytes = segmentReader.getSegmentInfo().info.getAttribute( + TrackingInMemoryPostingsBytesCodec.IN_MEMORY_POSTINGS_BYTES_KEY + ); + if (postingBytes != null) { + totalPostingBytes += Long.parseLong(postingBytes); + } + } } - shardFieldStats = new ShardFieldStats(numSegments, totalFields, usages); + shardFieldStats = new ShardFieldStats(numSegments, totalFields, usages, totalPostingBytes); } catch (AlreadyClosedException ignored) { } diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardFieldStats.java b/server/src/main/java/org/elasticsearch/index/shard/ShardFieldStats.java index 531df89116453..5b32fd2e747ab 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ShardFieldStats.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardFieldStats.java @@ -13,11 +13,12 @@ * A per shard stats including the number of segments and total fields across those segments. * These stats should be recomputed whenever the shard is refreshed. * - * @param numSegments the number of segments - * @param totalFields the total number of fields across the segments - * @param fieldUsages the number of usages for segment-level fields (e.g., doc_values, postings, norms, points) - * -1 if unavailable + * @param numSegments the number of segments + * @param totalFields the total number of fields across the segments + * @param fieldUsages the number of usages for segment-level fields (e.g., doc_values, postings, norms, points) + * -1 if unavailable + * @param postingsInMemoryBytes the total bytes in memory used for postings across all fields */ -public record ShardFieldStats(int numSegments, int totalFields, long fieldUsages) { +public record ShardFieldStats(int numSegments, int totalFields, long fieldUsages, long postingsInMemoryBytes) { } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 4549a329d499a..19b95c9fa6c25 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1877,7 +1877,7 @@ public void testShardFieldStats() throws IOException { assertThat(stats.totalFields(), equalTo(0)); assertThat(stats.fieldUsages(), equalTo(0L)); // index some documents - int numDocs = between(1, 10); + int numDocs = between(2, 10); for (int i = 0; i < numDocs; i++) { indexDoc(shard, "_doc", "first_" + i, """ { @@ -1895,6 +1895,7 @@ public void testShardFieldStats() throws IOException { // _id(term), _source(0), _version(dv), _primary_term(dv), _seq_no(point,dv), f1(postings,norms), // f1.keyword(term,dv), f2(postings,norms), f2.keyword(term,dv), assertThat(stats.fieldUsages(), equalTo(13L)); + assertThat(stats.postingsInMemoryBytes(), equalTo(40L)); // don't re-compute on refresh without change if (randomBoolean()) { shard.refresh("test"); @@ -1912,7 +1913,7 @@ public void testShardFieldStats() throws IOException { } assertThat(shard.getShardFieldStats(), sameInstance(stats)); // index more docs - numDocs = between(1, 10); + numDocs = between(2, 10); for (int i = 0; i < numDocs; i++) { indexDoc(shard, "_doc", "first_" + i, """ { @@ -1949,6 +1950,7 @@ public void testShardFieldStats() throws IOException { // _id(term), _source(0), _version(dv), _primary_term(dv), _seq_no(point,dv), f1(postings,norms), // f1.keyword(term,dv), f2(postings,norms), f2.keyword(term,dv), f3(postings,norms), f3.keyword(term,dv), __soft_deletes assertThat(stats.fieldUsages(), equalTo(18L)); + assertThat(stats.postingsInMemoryBytes(), equalTo(64L)); closeShards(shard); }