Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -280,6 +282,7 @@ protected static ShardFieldStats shardFieldStats(List<LeafReaderContext> leaves)
int totalFields = 0;
long usages = 0;
long totalPostingBytes = 0;
long liveDocsBytes = 0;
for (LeafReaderContext leaf : leaves) {
numSegments++;
var fieldInfos = leaf.reader().getFieldInfos();
Expand All @@ -291,19 +294,44 @@ protected static ShardFieldStats shardFieldStats(List<LeafReaderContext> leaves)
} else {
usages = -1;
}
if (TrackingPostingsInMemoryBytesCodec.TRACK_POSTINGS_IN_MEMORY_BYTES.isEnabled()) {
boolean trackPostingsMemoryEnabled = TrackingPostingsInMemoryBytesCodec.TRACK_POSTINGS_IN_MEMORY_BYTES.isEnabled();
boolean trackLiveDocsMemoryEnabled = ShardFieldStats.TRACK_LIVE_DOCS_IN_MEMORY_BYTES.isEnabled();
if (trackLiveDocsMemoryEnabled || trackPostingsMemoryEnabled) {
SegmentReader segmentReader = Lucene.tryUnwrapSegmentReader(leaf.reader());
if (segmentReader != null) {
String postingBytes = segmentReader.getSegmentInfo().info.getAttribute(
TrackingPostingsInMemoryBytesCodec.IN_MEMORY_POSTINGS_BYTES_KEY
);
if (postingBytes != null) {
totalPostingBytes += Long.parseLong(postingBytes);
if (trackPostingsMemoryEnabled) {
String postingBytes = segmentReader.getSegmentInfo().info.getAttribute(
TrackingPostingsInMemoryBytesCodec.IN_MEMORY_POSTINGS_BYTES_KEY
);
if (postingBytes != null) {
totalPostingBytes += Long.parseLong(postingBytes);
}
}
if (trackLiveDocsMemoryEnabled) {
var liveDocs = segmentReader.getLiveDocs();
if (liveDocs != null) {
assert validateLiveDocsClass(liveDocs);
// Would prefer to use FixedBitSet#ramBytesUsed() however FixedBits / Bits interface don't expose that.
// This almost does what FixedBitSet#ramBytesUsed() does, liveDocs.length() returns the length of the bits long
// array
liveDocsBytes += RamUsageEstimator.alignObjectSize(
(long) RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (liveDocs.length() / 8L)
);
}
}
}
}
}
return new ShardFieldStats(numSegments, totalFields, usages, totalPostingBytes);
return new ShardFieldStats(numSegments, totalFields, usages, totalPostingBytes, liveDocsBytes);
}

private static boolean validateLiveDocsClass(Bits liveDocs) {
// These classes are package protected in Lucene and therefor we compare fully qualified classnames as strings here:
String fullClassName = liveDocs.getClass().getName();
assert fullClassName.equals("org.apache.lucene.util.FixedBits")
|| fullClassName.equals("org.apache.lucene.tests.codecs.asserting.AssertingLiveDocsFormat$AssertingBits")
: "unexpected class [" + fullClassName + "]";
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@

package org.elasticsearch.index.shard;

import org.elasticsearch.common.util.FeatureFlag;

/**
* A per shard stats including the number of segments and total fields across those segments.
* These stats should be recomputed whenever the shard is refreshed.
*
* @param numSegments the number of segments
* @param totalFields the total number of fields across the segments
* @param fieldUsages the number of usages for segment-level fields (e.g., doc_values, postings, norms, points)
* -1 if unavailable
* @param numSegments the number of segments
* @param totalFields the total number of fields across the segments
* @param fieldUsages the number of usages for segment-level fields (e.g., doc_values, postings, norms, points)
* -1 if unavailable
* @param postingsInMemoryBytes the total bytes in memory used for postings across all fields
* @param liveDocsBytes the total bytes in memory used for live docs
*/
public record ShardFieldStats(int numSegments, int totalFields, long fieldUsages, long postingsInMemoryBytes) {
public record ShardFieldStats(int numSegments, int totalFields, long fieldUsages, long postingsInMemoryBytes, long liveDocsBytes) {

public static final FeatureFlag TRACK_LIVE_DOCS_IN_MEMORY_BYTES = new FeatureFlag("track_live_docs_in_memory_bytes");

}
Original file line number Diff line number Diff line change
Expand Up @@ -1980,6 +1980,54 @@ public void testShardFieldStats() throws IOException {
closeShards(shard);
}

public void testShardFieldStatsWithDeletes() throws IOException {
Settings settings = Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE).build();
IndexShard shard = newShard(true, settings);
assertNull(shard.getShardFieldStats());
recoverShardFromStore(shard);
boolean liveDocsTrackingEnabled = ShardFieldStats.TRACK_LIVE_DOCS_IN_MEMORY_BYTES.isEnabled();

// index some documents
int numDocs = 10;
for (int i = 0; i < numDocs; i++) {
indexDoc(shard, "_doc", "first_" + i, """
{
"f1": "foo",
"f2": "bar"
}
""");
}
shard.refresh("test");
var stats = shard.getShardFieldStats();
assertThat(stats.numSegments(), equalTo(1));
assertThat(stats.liveDocsBytes(), equalTo(0L));

// delete a doc
deleteDoc(shard, "first_0");

// Refresh and fetch new stats:
shard.refresh("test");
stats = shard.getShardFieldStats();
// More segments because delete operation is stored in the new segment for replication purposes.
assertThat(stats.numSegments(), equalTo(2));
// Delete op is stored in new segment, but marked as deleted. All segements have live docs:
assertThat(stats.liveDocsBytes(), equalTo(liveDocsTrackingEnabled ? 40L : 0L));

// delete another doc:
deleteDoc(shard, "first_1");
shard.getMinRetainedSeqNo();

// Refresh and fetch new stats:
shard.refresh("test");
stats = shard.getShardFieldStats();
// More segments because delete operation is stored in the new segment for replication purposes.
assertThat(stats.numSegments(), equalTo(3));
// Delete op is stored in new segment, but marked as deleted. All segements have live docs:
assertThat(stats.liveDocsBytes(), equalTo(liveDocsTrackingEnabled ? 56L : 0L));

closeShards(shard);
}

public void testIndexingOperationsListeners() throws IOException {
IndexShard shard = newStartedShard(true);
indexDoc(shard, "_doc", "0", "{\"foo\" : \"bar\"}");
Expand Down