Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -653,15 +653,26 @@ public static Version parseVersionLenient(String toParse, Version defaultValue)
* If no SegmentReader can be extracted an {@link IllegalStateException} is thrown.
*/
public static SegmentReader segmentReader(LeafReader reader) {
SegmentReader segmentReader = tryUnwrapSegmentReader(reader);
if (segmentReader == null) {
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
}
return segmentReader;
}

/**
* Tries to extract a segment reader from the given index reader. Unlike {@link #segmentReader(LeafReader)} this method returns
* null if no SegmentReader can be unwrapped instead of throwing an exception.
*/
public static SegmentReader tryUnwrapSegmentReader(LeafReader reader) {
if (reader instanceof SegmentReader) {
return (SegmentReader) reader;
} else if (reader instanceof final FilterLeafReader fReader) {
return segmentReader(FilterLeafReader.unwrap(fReader));
return tryUnwrapSegmentReader(FilterLeafReader.unwrap(fReader));
} else if (reader instanceof final FilterCodecReader fReader) {
return segmentReader(FilterCodecReader.unwrap(fReader));
return tryUnwrapSegmentReader(FilterCodecReader.unwrap(fReader));
}
// hard fail - we can't get a SegmentReader
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
return null;
}

@SuppressForbidden(reason = "Version#parseLeniently() used in a central place")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.FieldsConsumer;
import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.NormsProducer;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.codecs.SegmentInfoFormat;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.BytesRef;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.IntConsumer;

public final class TrackingInMemoryPostingsBytesCodec extends FilterCodec {
public static final String IN_MEMORY_POSTINGS_BYTES_KEY = "es.postings.in_memory_bytes";

public TrackingInMemoryPostingsBytesCodec(Codec delegate) {
super(delegate.getName(), delegate);
}

@Override
public SegmentInfoFormat segmentInfoFormat() {
SegmentInfoFormat format = super.segmentInfoFormat();
return new SegmentInfoFormat() {
@Override
public SegmentInfo read(Directory directory, String segmentName, byte[] segmentID, IOContext context) throws IOException {
return format.read(directory, segmentName, segmentID, context);
}

@Override
public void write(Directory dir, SegmentInfo info, IOContext ioContext) throws IOException {
format.write(dir, info, ioContext);
}
};
}

@Override
public PostingsFormat postingsFormat() {
PostingsFormat format = super.postingsFormat();
return new PostingsFormat(format.getName()) {
@Override
public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
FieldsConsumer consumer = format.fieldsConsumer(state);
return new TrackingLengthFieldsConsumer(state, consumer);
}

@Override
public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException {
return format.fieldsProducer(state);
}
};
}

static final class TrackingLengthFieldsConsumer extends FieldsConsumer {
final SegmentWriteState state;
final FieldsConsumer in;
final Map<String, Integer> maxLengths = new HashMap<>();

TrackingLengthFieldsConsumer(SegmentWriteState state, FieldsConsumer in) {
this.state = state;
this.in = in;
}

@Override
public void write(Fields fields, NormsProducer norms) throws IOException {
in.write(new TrackingLengthFields(fields, maxLengths), norms);
long totalLength = 0;
for (int len : maxLengths.values()) {
totalLength += len; // minTerm
totalLength += len; // maxTerm
}
state.segmentInfo.putAttribute(IN_MEMORY_POSTINGS_BYTES_KEY, Long.toString(totalLength));
}

@Override
public void close() throws IOException {
in.close();
}
}

static final class TrackingLengthFields extends FilterLeafReader.FilterFields {
final Map<String, Integer> maxLengths;

TrackingLengthFields(Fields in, Map<String, Integer> maxLengths) {
super(in);
this.maxLengths = maxLengths;
}

@Override
public Terms terms(String field) throws IOException {
Terms terms = super.terms(field);
if (terms == null) {
return null;
}
return new TrackingLengthTerms(terms, len -> maxLengths.compute(field, (k, v) -> v == null ? len : Math.max(v, len)));
}
}

static final class TrackingLengthTerms extends FilterLeafReader.FilterTerms {
final IntConsumer onFinish;

TrackingLengthTerms(Terms in, IntConsumer onFinish) {
super(in);
this.onFinish = onFinish;
}

@Override
public TermsEnum iterator() throws IOException {
return new TrackingLengthTermsEnum(super.iterator(), onFinish);
}
}

static final class TrackingLengthTermsEnum extends FilterLeafReader.FilterTermsEnum {
int maxTermLength = 0;
final IntConsumer onFinish;

TrackingLengthTermsEnum(TermsEnum in, IntConsumer onFinish) {
super(in);
this.onFinish = onFinish;
}

@Override
public BytesRef next() throws IOException {
final BytesRef term = super.next();
if (term != null) {
maxTermLength = Math.max(maxTermLength, term.length);
} else {
onFinish.accept(maxTermLength);
}
return term;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
import org.elasticsearch.index.codec.TrackingInMemoryPostingsBytesCodec;
import org.elasticsearch.index.mapper.DocumentParser;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.LuceneDocument;
Expand Down Expand Up @@ -2734,7 +2735,7 @@ private IndexWriterConfig getIndexWriterConfig() {
iwc.setMaxFullFlushMergeWaitMillis(-1);
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
iwc.setCodec(engineConfig.getCodec());
iwc.setCodec(new TrackingInMemoryPostingsBytesCodec(engineConfig.getCodec()));
boolean useCompoundFile = engineConfig.getUseCompoundFile();
iwc.setUseCompoundFile(useCompoundFile);
if (useCompoundFile == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.DelegatingAnalyzerWrapper;
import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInfos;
Expand All @@ -20,6 +21,7 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
Expand Down Expand Up @@ -85,6 +87,7 @@
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.codec.FieldInfosWithUsages;
import org.elasticsearch.index.codec.TrackingInMemoryPostingsBytesCodec;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.Engine.GetResult;
Expand Down Expand Up @@ -4145,6 +4148,7 @@ public void afterRefresh(boolean didRefresh) {
int numSegments = 0;
int totalFields = 0;
long usages = 0;
long totalPostingBytes = 0;
for (LeafReaderContext leaf : searcher.getLeafContexts()) {
numSegments++;
var fieldInfos = leaf.reader().getFieldInfos();
Expand All @@ -4156,8 +4160,18 @@ public void afterRefresh(boolean didRefresh) {
} else {
usages = -1;
}
SegmentReader segmentReader = Lucene.tryUnwrapSegmentReader(leaf.reader());
if (segmentReader != null) {
FieldsProducer postingsReader = segmentReader.getPostingsReader();
String postingBytes = segmentReader.getSegmentInfo().info.getAttribute(
TrackingInMemoryPostingsBytesCodec.IN_MEMORY_POSTINGS_BYTES_KEY
);
if (postingBytes != null) {
totalPostingBytes += Long.parseLong(postingBytes);
}
}
}
shardFieldStats = new ShardFieldStats(numSegments, totalFields, usages);
shardFieldStats = new ShardFieldStats(numSegments, totalFields, usages, totalPostingBytes);
} catch (AlreadyClosedException ignored) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
* A per shard stats including the number of segments and total fields across those segments.
* These stats should be recomputed whenever the shard is refreshed.
*
* @param numSegments the number of segments
* @param totalFields the total number of fields across the segments
* @param fieldUsages the number of usages for segment-level fields (e.g., doc_values, postings, norms, points)
* -1 if unavailable
* @param numSegments the number of segments
* @param totalFields the total number of fields across the segments
* @param fieldUsages the number of usages for segment-level fields (e.g., doc_values, postings, norms, points)
* -1 if unavailable
* @param postingsInMemoryBytes the total bytes in memory used for postings across all fields
*/
public record ShardFieldStats(int numSegments, int totalFields, long fieldUsages) {
public record ShardFieldStats(int numSegments, int totalFields, long fieldUsages, long postingsInMemoryBytes) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ public void testShardFieldStats() throws IOException {
assertThat(stats.totalFields(), equalTo(0));
assertThat(stats.fieldUsages(), equalTo(0L));
// index some documents
int numDocs = between(1, 10);
int numDocs = between(2, 10);
for (int i = 0; i < numDocs; i++) {
indexDoc(shard, "_doc", "first_" + i, """
{
Expand All @@ -1895,6 +1895,7 @@ public void testShardFieldStats() throws IOException {
// _id(term), _source(0), _version(dv), _primary_term(dv), _seq_no(point,dv), f1(postings,norms),
// f1.keyword(term,dv), f2(postings,norms), f2.keyword(term,dv),
assertThat(stats.fieldUsages(), equalTo(13L));
assertThat(stats.postingsInMemoryBytes(), equalTo(40L));
// don't re-compute on refresh without change
if (randomBoolean()) {
shard.refresh("test");
Expand All @@ -1912,7 +1913,7 @@ public void testShardFieldStats() throws IOException {
}
assertThat(shard.getShardFieldStats(), sameInstance(stats));
// index more docs
numDocs = between(1, 10);
numDocs = between(2, 10);
for (int i = 0; i < numDocs; i++) {
indexDoc(shard, "_doc", "first_" + i, """
{
Expand Down Expand Up @@ -1949,6 +1950,7 @@ public void testShardFieldStats() throws IOException {
// _id(term), _source(0), _version(dv), _primary_term(dv), _seq_no(point,dv), f1(postings,norms),
// f1.keyword(term,dv), f2(postings,norms), f2.keyword(term,dv), f3(postings,norms), f3.keyword(term,dv), __soft_deletes
assertThat(stats.fieldUsages(), equalTo(18L));
assertThat(stats.postingsInMemoryBytes(), equalTo(64L));
closeShards(shard);
}

Expand Down