Skip to content

Commit 038148a

Browse files
committed
Add merge estimation to OneMerge on ElasticsearchConcurrentMergeScheduler
1 parent 144ff0c commit 038148a

File tree

5 files changed

+181
-6
lines changed

5 files changed

+181
-6
lines changed

server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total
3333
* and current merges.
3434
*/
35-
public class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler implements ElasticsearchMergeScheduler {
35+
public abstract class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler implements ElasticsearchMergeScheduler {
3636

3737
protected final Logger logger;
3838
private final Settings indexSettings;
@@ -96,7 +96,7 @@ protected void message(String message) {
9696
@Override
9797
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
9898
long timeNS = System.nanoTime();
99-
OnGoingMerge onGoingMerge = new OnGoingMerge(merge);
99+
OnGoingMerge onGoingMerge = new OnGoingMerge(merge, estimateMergeMemory(merge));
100100
mergeTracking.mergeStarted(onGoingMerge);
101101
try {
102102
beforeMerge(onGoingMerge);
@@ -107,18 +107,22 @@ protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) thro
107107

108108
afterMerge(onGoingMerge);
109109
}
110-
111110
}
112111

113112
/**
114113
* A callback allowing for custom logic before an actual merge starts.
115114
*/
116-
protected void beforeMerge(OnGoingMerge merge) {}
115+
protected abstract void beforeMerge(OnGoingMerge merge);
117116

118117
/**
119118
* A callback allowing for custom logic before an actual merge starts.
120119
*/
121-
protected void afterMerge(OnGoingMerge merge) {}
120+
protected abstract void afterMerge(OnGoingMerge merge);
121+
122+
/**
123+
* Retrieves an estimation on how much memory is needed for the merge.
124+
*/
125+
protected abstract long estimateMergeMemory(MergePolicy.OneMerge merge);
122126

123127
@Override
124128
public MergeScheduler clone() {

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2847,6 +2847,13 @@ public synchronized void beforeMerge(OnGoingMerge merge) {
28472847
}
28482848
}
28492849

2850+
@Override
2851+
protected long estimateMergeMemory(MergePolicy.OneMerge merge) {
2852+
try (Searcher searcher = acquireSearcher("merge_memory_estimation")) {
2853+
return SegmentMergeMemoryEstimator.estimateSegmentMemory(merge, searcher.getIndexReader());
2854+
}
2855+
}
2856+
28502857
@Override
28512858
public synchronized void afterMerge(OnGoingMerge merge) {
28522859
int maxNumMerges = getMaxMergeCount();
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.codecs.KnnVectorsReader;
13+
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader;
14+
import org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat;
15+
import org.apache.lucene.index.CodecReader;
16+
import org.apache.lucene.index.FieldInfo;
17+
import org.apache.lucene.index.FilterLeafReader;
18+
import org.apache.lucene.index.IndexReader;
19+
import org.apache.lucene.index.LeafReaderContext;
20+
import org.apache.lucene.index.MergePolicy;
21+
import org.apache.lucene.index.SegmentCommitInfo;
22+
import org.apache.lucene.index.SegmentReader;
23+
import org.apache.lucene.index.VectorEncoding;
24+
import org.elasticsearch.common.lucene.Lucene;
25+
import org.elasticsearch.common.unit.ByteSizeValue;
26+
27+
import java.util.List;
28+
29+
public class SegmentMergeMemoryEstimator {
30+
31+
public static long estimateSegmentMemory(MergePolicy.OneMerge merge, IndexReader indexReader) {
32+
long memoryNeeded = 0;
33+
for (SegmentCommitInfo mergedSegment : merge.segments) {
34+
memoryNeeded += estimateSegmentMemory(mergedSegment.info.name, indexReader);
35+
}
36+
return memoryNeeded;
37+
}
38+
39+
private static long estimateSegmentMemory(String segmentName, IndexReader indexReader) {
40+
List<LeafReaderContext> leaves = indexReader.leaves();
41+
for (LeafReaderContext leafReaderContext : leaves) {
42+
SegmentReader segmentReader = Lucene.segmentReader(leafReaderContext.reader());
43+
if (segmentReader.getSegmentName().equals(segmentName)) {
44+
return estimateSegmentMemory(segmentReader);
45+
}
46+
}
47+
48+
throw new IllegalArgumentException("Segment not found: " + segmentName);
49+
}
50+
51+
private static long estimateSegmentMemory(SegmentReader reader) {
52+
long maxMem = 0;
53+
for (FieldInfo fieldInfo : reader.getFieldInfos()) {
54+
maxMem = Math.max(maxMem, estimateFieldMemory(fieldInfo, reader).getBytes());
55+
}
56+
return maxMem;
57+
}
58+
59+
private static ByteSizeValue estimateFieldMemory(FieldInfo fieldInfo, SegmentReader segmentReader) {
60+
61+
long maxMem = 0;
62+
if (fieldInfo.hasVectorValues()) {
63+
maxMem = Math.max(maxMem, estimateVectorFieldMemory(fieldInfo, segmentReader));
64+
}
65+
// TODO Work on estimations on other field infos when / if needed
66+
67+
return ByteSizeValue.ofBytes(maxMem);
68+
}
69+
70+
private static long estimateVectorFieldMemory(FieldInfo fieldInfo, SegmentReader segmentReader) {
71+
long maxMem = 0;
72+
for (LeafReaderContext ctx : segmentReader.leaves()) {
73+
CodecReader codecReader = (CodecReader) FilterLeafReader.unwrap(ctx.reader());
74+
KnnVectorsReader vectorsReader = codecReader.getVectorReader();
75+
if (vectorsReader instanceof PerFieldKnnVectorsFormat.FieldsReader perFieldKnnVectorsFormat) {
76+
vectorsReader = perFieldKnnVectorsFormat.getFieldReader(fieldInfo.getName());
77+
}
78+
79+
final long estimation = getVectorFieldEstimation(fieldInfo, segmentReader, vectorsReader);
80+
maxMem = Math.max(maxMem, estimation);
81+
}
82+
return maxMem;
83+
}
84+
85+
private static long getVectorFieldEstimation(FieldInfo fieldInfo, SegmentReader segmentReader, KnnVectorsReader vectorsReader) {
86+
int numDocs = segmentReader.numDocs();
87+
if (vectorsReader instanceof Lucene99HnswVectorsReader) {
88+
// Determined empirically from graph usage on merges, as it's complicated to estimate graph levels and size for non-zero levels
89+
return numDocs * 348L;
90+
91+
} else {
92+
// Dominated by the heap byte buffer size used to write each vector
93+
if (fieldInfo.getVectorEncoding() == VectorEncoding.FLOAT32) {
94+
return fieldInfo.getVectorDimension() * VectorEncoding.FLOAT32.byteSize;
95+
}
96+
// Byte does not use buffering for writing but the IndexOutput directly
97+
return 0;
98+
}
99+
}
100+
101+
}

server/src/main/java/org/elasticsearch/index/merge/OnGoingMerge.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ public class OnGoingMerge {
2121

2222
private final String id;
2323
private final MergePolicy.OneMerge oneMerge;
24+
private final long memoryBytesNeeded;
2425

25-
public OnGoingMerge(MergePolicy.OneMerge merge) {
26+
public OnGoingMerge(MergePolicy.OneMerge merge, long memoryBytesNeeded) {
2627
this.id = Integer.toString(System.identityHashCode(merge));
2728
this.oneMerge = merge;
2829

30+
this.memoryBytesNeeded = memoryBytesNeeded;
2931
}
3032

3133
/**
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.document.Document;
13+
import org.apache.lucene.document.Field;
14+
import org.apache.lucene.document.KnnFloatVectorField;
15+
import org.apache.lucene.document.NumericDocValuesField;
16+
import org.apache.lucene.document.StringField;
17+
import org.apache.lucene.index.IndexWriter;
18+
import org.apache.lucene.index.IndexWriterConfig;
19+
import org.apache.lucene.store.Directory;
20+
import org.elasticsearch.test.ESTestCase;
21+
22+
import java.io.IOException;
23+
24+
public class MemoryEstimationMergeTests extends ESTestCase {
25+
26+
public void testMerge() throws IOException {
27+
try (Directory dir = newDirectory()) {
28+
IndexWriterConfig iwc = newIndexWriterConfig();
29+
int numDocs = 1000;
30+
int vectorDims = 100;
31+
32+
try (IndexWriter writer = new IndexWriter(dir, iwc)) {
33+
for (int i = 0; i < numDocs; i++) {
34+
if (rarely()) {
35+
writer.flush();
36+
}
37+
if (rarely()) {
38+
writer.forceMerge(1, false);
39+
}
40+
Document doc = new Document();
41+
doc.add(new StringField("id", "" + i, Field.Store.NO));
42+
doc.add(newTextField("text", "the quick brown fox", Field.Store.YES));
43+
doc.add(new NumericDocValuesField("sort", i));
44+
doc.add(new KnnFloatVectorField("floatVector", floatVector(vectorDims)));
45+
writer.addDocument(doc);
46+
if (i == numDocs / 2) {
47+
writer.flush();
48+
}
49+
}
50+
51+
writer.forceMerge(1);
52+
53+
}
54+
55+
}
56+
}
57+
58+
private float[] floatVector(int vectorDims) {
59+
return new float[vectorDims];
60+
}
61+
}

0 commit comments

Comments
 (0)