Skip to content

Commit 9b08639

Browse files
pxsalehiomricohenn
authored andcommitted
Move MergeMemoryEstimator (elastic#125686)
Relates ES-10961
1 parent 49ea9a1 commit 9b08639

File tree

2 files changed

+127
-0
lines changed

2 files changed

+127
-0
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3550,4 +3550,13 @@ <T> T performActionWithDirectoryReader(SearcherScope scope, CheckedFunction<Dire
35503550
store.decRef();
35513551
}
35523552
}
3553+
3554+
protected long estimateMergeBytes(MergePolicy.OneMerge merge) {
3555+
try (Searcher searcher = acquireSearcher("merge_memory_estimation", SearcherScope.INTERNAL)) {
3556+
return MergeMemoryEstimator.estimateMergeMemory(merge, searcher.getIndexReader());
3557+
} catch (AlreadyClosedException e) {
3558+
// Can't estimate if the searcher is closed
3559+
return 0L;
3560+
}
3561+
}
35533562
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.codecs.KnnVectorsReader;
13+
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader;
14+
import org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat;
15+
import org.apache.lucene.index.FieldInfo;
16+
import org.apache.lucene.index.IndexReader;
17+
import org.apache.lucene.index.LeafReaderContext;
18+
import org.apache.lucene.index.MergePolicy;
19+
import org.apache.lucene.index.SegmentCommitInfo;
20+
import org.apache.lucene.index.SegmentReader;
21+
import org.apache.lucene.index.VectorEncoding;
22+
import org.elasticsearch.common.lucene.Lucene;
23+
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.stream.Collectors;
27+
28+
/**
29+
* Provides an estimation of the memory needed to merge segments.
30+
*
31+
* This class is a temporary solution until we have a better way to estimate the memory needed for merges in Lucene
32+
* (see the corresponding <a href="https://github.com/apache/lucene/issues/14225">Lucene issue</a>)
33+
* We can work iteratively in providing estimations for different types of fields and vector encodings.
34+
*/
35+
public class MergeMemoryEstimator {
36+
37+
// Determined empirically by using Accountable.ramBytesUsed() during merges on Lucene using an instrumented build of Lucene.
38+
// Didn't adapted the ramBytesUsed() code for this as it depends on graph levels and size for non-zero levels, which are difficult
39+
// to estimate without actually building the graph.
40+
public static final long HNSW_PER_DOC_ESTIMATION = 348L;
41+
42+
/**
43+
* Estimates the memory, in bytes, needed to merge the segments of the given merge.
44+
*/
45+
public static long estimateMergeMemory(MergePolicy.OneMerge merge, IndexReader indexReader) {
46+
assert merge.segments.isEmpty() == false;
47+
48+
long memoryNeeded = 0;
49+
Map<String, SegmentCommitInfo> segments = merge.segments.stream().collect(Collectors.toMap(s -> s.info.name, s -> s));
50+
List<LeafReaderContext> leaves = indexReader.leaves();
51+
SegmentReader segmentReader = null;
52+
for (LeafReaderContext leafReaderContext : leaves) {
53+
segmentReader = Lucene.segmentReader(leafReaderContext.reader());
54+
String segmentName = segmentReader.getSegmentName();
55+
SegmentCommitInfo segmentCommitInfo = segments.get(segmentName);
56+
if (segmentCommitInfo != null) {
57+
memoryNeeded += estimateMergeMemory(segmentCommitInfo, segmentReader);
58+
segments.remove(segmentName);
59+
if (segments.isEmpty()) {
60+
break;
61+
}
62+
}
63+
}
64+
65+
// Estimate segments without readers - the searcher may not have been refreshed yet, so estimate them with the field info from
66+
// the last segment reader
67+
if (segmentReader != null) {
68+
for (SegmentCommitInfo segmentCommitInfo : segments.values()) {
69+
memoryNeeded += estimateMergeMemory(segmentCommitInfo, segmentReader);
70+
}
71+
}
72+
73+
return memoryNeeded;
74+
}
75+
76+
private static long estimateMergeMemory(SegmentCommitInfo segmentCommitInfo, SegmentReader reader) {
77+
long maxMem = 0;
78+
for (FieldInfo fieldInfo : reader.getFieldInfos()) {
79+
maxMem = Math.max(maxMem, estimateFieldMemory(fieldInfo, segmentCommitInfo, reader));
80+
}
81+
return maxMem;
82+
}
83+
84+
private static long estimateFieldMemory(FieldInfo fieldInfo, SegmentCommitInfo segmentCommitInfo, SegmentReader segmentReader) {
85+
86+
long maxMem = 0;
87+
if (fieldInfo.hasVectorValues()) {
88+
maxMem = Math.max(maxMem, estimateVectorFieldMemory(fieldInfo, segmentCommitInfo, segmentReader));
89+
}
90+
// TODO Work on estimations on other field infos when / if needed
91+
92+
return maxMem;
93+
}
94+
95+
private static long estimateVectorFieldMemory(FieldInfo fieldInfo, SegmentCommitInfo segmentCommitInfo, SegmentReader segmentReader) {
96+
KnnVectorsReader vectorsReader = segmentReader.getVectorReader();
97+
if (vectorsReader instanceof PerFieldKnnVectorsFormat.FieldsReader perFieldKnnVectorsFormat) {
98+
vectorsReader = perFieldKnnVectorsFormat.getFieldReader(fieldInfo.getName());
99+
}
100+
101+
return getVectorFieldEstimation(fieldInfo, segmentCommitInfo, vectorsReader);
102+
}
103+
104+
private static long getVectorFieldEstimation(FieldInfo fieldInfo, SegmentCommitInfo segmentCommitInfo, KnnVectorsReader vectorsReader) {
105+
int numDocs = segmentCommitInfo.info.maxDoc() - segmentCommitInfo.getDelCount();
106+
if (vectorsReader instanceof Lucene99HnswVectorsReader) {
107+
return numDocs * HNSW_PER_DOC_ESTIMATION;
108+
109+
} else {
110+
// Dominated by the heap byte buffer size used to write each vector
111+
if (fieldInfo.getVectorEncoding() == VectorEncoding.FLOAT32) {
112+
return fieldInfo.getVectorDimension() * VectorEncoding.FLOAT32.byteSize;
113+
}
114+
// Byte does not use buffering for writing but the IndexOutput directly
115+
return 0;
116+
}
117+
}
118+
}

0 commit comments

Comments
 (0)