Skip to content

Commit ce66ca2

Browse files
committed
Make things testable
1 parent 1c8dbfa commit ce66ca2

File tree

4 files changed

+90
-6
lines changed

4 files changed

+90
-6
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2850,7 +2850,7 @@ public synchronized void beforeMerge(OnGoingMerge merge) {
28502850
@Override
28512851
protected long estimateMergeMemory(MergePolicy.OneMerge merge) {
28522852
try (Searcher searcher = acquireSearcher("merge_memory_estimation", SearcherScope.INTERNAL)) {
2853-
return SegmentMergeMemoryEstimator.estimateSegmentMemory(merge, searcher.getIndexReader());
2853+
return SegmentMergeMemoryEstimator.estimateMergeMemory(merge, searcher.getIndexReader());
28542854
} catch (AlreadyClosedException e) {
28552855
failOnTragicEvent(e);
28562856
return 0L;

server/src/main/java/org/elasticsearch/index/engine/SegmentMergeMemoryEstimator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,28 @@
2525

2626
public class SegmentMergeMemoryEstimator {
2727

28-
public static long estimateSegmentMemory(MergePolicy.OneMerge merge, IndexReader indexReader) {
28+
public static long estimateMergeMemory(MergePolicy.OneMerge merge, IndexReader indexReader) {
2929
long memoryNeeded = 0;
3030
for (SegmentCommitInfo mergedSegment : merge.segments) {
31-
memoryNeeded += estimateSegmentMemory(mergedSegment.info.name, indexReader);
31+
memoryNeeded += estimateMergeMemory(mergedSegment.info.name, indexReader);
3232
}
3333
return memoryNeeded;
3434
}
3535

36-
private static long estimateSegmentMemory(String segmentName, IndexReader indexReader) {
36+
private static long estimateMergeMemory(String segmentName, IndexReader indexReader) {
3737
List<LeafReaderContext> leaves = indexReader.leaves();
3838
for (LeafReaderContext leafReaderContext : leaves) {
3939
SegmentReader segmentReader = Lucene.segmentReader(leafReaderContext.reader());
4040
if (segmentReader.getSegmentName().equals(segmentName)) {
41-
return estimateSegmentMemory(segmentReader);
41+
return estimateMergeMemory(segmentReader);
4242
}
4343
}
4444

4545
// Segment not found, we can't estimate it
4646
return 0;
4747
}
4848

49-
private static long estimateSegmentMemory(SegmentReader reader) {
49+
private static long estimateMergeMemory(SegmentReader reader) {
5050
long maxMem = 0;
5151
for (FieldInfo fieldInfo : reader.getFieldInfos()) {
5252
maxMem = Math.max(maxMem, estimateFieldMemory(fieldInfo, reader));

server/src/main/java/org/elasticsearch/index/merge/OnGoingMerge.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,8 @@ public List<SegmentCommitInfo> getMergedSegments() {
5656
public MergePolicy.OneMerge getMerge() {
5757
return oneMerge;
5858
}
59+
60+
public long getMemoryBytesNeeded() {
61+
return memoryBytesNeeded;
62+
}
5963
}

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
import org.elasticsearch.index.mapper.SourceFieldMapper;
115115
import org.elasticsearch.index.mapper.Uid;
116116
import org.elasticsearch.index.mapper.VersionFieldMapper;
117+
import org.elasticsearch.index.merge.OnGoingMerge;
117118
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
118119
import org.elasticsearch.index.seqno.ReplicationTracker;
119120
import org.elasticsearch.index.seqno.RetentionLease;
@@ -2639,6 +2640,85 @@ public void testMergeThreadLogging() throws Exception {
26392640
}
26402641
}
26412642

2643+
public void testMergeEstimatesMemorySize() throws Exception {
2644+
2645+
try (Store store = createStore()) {
2646+
LogMergePolicy lmp = newLogMergePolicy();
2647+
lmp.setMergeFactor(2);
2648+
2649+
EngineConfig config = config(defaultSettings, store, createTempDir(), lmp, null);
2650+
2651+
store.createEmpty();
2652+
final String translogUuid = Translog.createEmptyTranslog(
2653+
config.getTranslogConfig().getTranslogPath(),
2654+
SequenceNumbers.NO_OPS_PERFORMED,
2655+
shardId,
2656+
primaryTerm.get()
2657+
);
2658+
store.associateIndexWithNewTranslog(translogUuid);
2659+
2660+
AtomicInteger estimatedMerges = new AtomicInteger();
2661+
AtomicInteger actualMerges = new AtomicInteger();
2662+
Set<Long> estimatedMemorySizes = Collections.synchronizedSet(new HashSet<>());
2663+
2664+
InternalEngine engine = new InternalTestEngine(config) {
2665+
@Override
2666+
protected IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
2667+
return new IndexWriter(directory, iwc) {
2668+
@Override
2669+
public void merge(MergePolicy.OneMerge merge) throws IOException {
2670+
actualMerges.addAndGet(1);
2671+
super.merge(merge);
2672+
}
2673+
};
2674+
}
2675+
2676+
@Override
2677+
protected ElasticsearchMergeScheduler createMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
2678+
return new ElasticsearchConcurrentMergeScheduler(shardId, indexSettings) {
2679+
@Override
2680+
protected long estimateMergeMemory(MergePolicy.OneMerge merge) {
2681+
// Estimate merge memory randomly, and record the estimated merges and memory sizes for later comparison
2682+
long estimation = randomLongBetween(1, 100000000L);
2683+
estimatedMerges.addAndGet(1);
2684+
estimatedMemorySizes.add(estimation);
2685+
return estimation;
2686+
}
2687+
2688+
@Override
2689+
protected void beforeMerge(OnGoingMerge merge) {
2690+
// Checks the estimation is available before merging, and that it has been done previously
2691+
assertThat(merge.getMemoryBytesNeeded(), Matchers.greaterThan(0L));
2692+
assertTrue(estimatedMemorySizes.remove(merge.getMemoryBytesNeeded()));
2693+
}
2694+
2695+
@Override
2696+
protected void afterMerge(OnGoingMerge merge) {}
2697+
};
2698+
}
2699+
};
2700+
recoverFromTranslog(engine, translogHandler, Long.MAX_VALUE);
2701+
2702+
int numDocs = randomIntBetween(10, 100);
2703+
for (int i = 0; i < numDocs; i++) {
2704+
engine.index(indexForDoc(testParsedDocument(randomIdentifier(), null, testDocument(), B_1, null)));
2705+
if (randomBoolean()) {
2706+
engine.flush();
2707+
engine.forceMerge(true, 1, false, UUIDs.randomBase64UUID());
2708+
}
2709+
}
2710+
2711+
assertBusy(() -> {
2712+
// All scheduled merges have been done
2713+
assertThat(estimatedMerges.get(), is(actualMerges.get()));
2714+
// All estimations have been accounted for
2715+
assertTrue(estimatedMemorySizes.isEmpty());
2716+
});
2717+
2718+
engine.close();
2719+
}
2720+
}
2721+
26422722
public void testSeqNoAndCheckpoints() throws IOException, InterruptedException {
26432723
final int opCount = randomIntBetween(1, 256);
26442724
long primarySeqNo = SequenceNumbers.NO_OPS_PERFORMED;

0 commit comments

Comments
 (0)