Skip to content

Commit 9414544

Browse files
committed
making sure it works with changes from elastic#135298
1 parent 9f7c36e commit 9414544

File tree

2 files changed

+204
-10
lines changed

2 files changed

+204
-10
lines changed

server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,23 @@ public static long getSharedRamSizeForShard(IndicesQueryCache queryCache, IndexS
140140
// extend beyond the closing of all shards.
141141
return 0L;
142142
}
143-
144-
long totalSize = cacheTotals.totalSize();
145-
long cacheSize = queryCache.getCacheSizeForShard(indexShard.shardId());
143+
/*
144+
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requestss in the cache for each
145+
* shard.
146+
*/
147+
long totalItemsInCache = cacheTotals.totalSize();
148+
long itemsInCacheForShard = queryCache.getCacheSizeForShard(indexShard.shardId());
146149
final long additionalRamBytesUsed;
147-
if (totalSize == 0) {
150+
if (totalItemsInCache == 0) {
148151
// all shards have zero cache footprint, so we apportion the size of the shared bytes equally across all shards
149152
additionalRamBytesUsed = Math.round((double) sharedRamBytesUsed / shardCount);
150153
} else {
151-
// some shards have nonzero cache footprint, so we apportion the size of the shared bytes proportionally to cache footprint
152-
additionalRamBytesUsed = Math.round((double) sharedRamBytesUsed * cacheSize / totalSize);
154+
/*
155+
* some shards have nonzero cache footprint, so we apportion the size of the shared bytes proportionally to the number of
156+
* segment-requests in the cache for this shard (the number and size of documents associated with those requests is irrelevant
157+
* for this calculation).
158+
*/
159+
additionalRamBytesUsed = Math.round((double) sharedRamBytesUsed * itemsInCacheForShard / totalItemsInCache);
153160
}
154161
assert additionalRamBytesUsed >= 0L : additionalRamBytesUsed;
155162
return additionalRamBytesUsed;

server/src/test/java/org/elasticsearch/indices/IndicesQueryCacheTests.java

Lines changed: 191 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.lucene.search.ScorerSupplier;
2727
import org.apache.lucene.search.Weight;
2828
import org.apache.lucene.store.Directory;
29+
import org.apache.lucene.util.Accountable;
2930
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
3031
import org.elasticsearch.common.settings.Settings;
3132
import org.elasticsearch.core.IOUtils;
@@ -36,31 +37,47 @@
3637
import org.elasticsearch.index.shard.ShardId;
3738
import org.elasticsearch.test.ESTestCase;
3839

40+
import java.io.Closeable;
3941
import java.io.IOException;
42+
import java.util.ArrayList;
4043
import java.util.List;
44+
import java.util.concurrent.atomic.AtomicReference;
4145

46+
import static org.hamcrest.Matchers.closeTo;
47+
import static org.hamcrest.Matchers.equalTo;
48+
import static org.hamcrest.Matchers.lessThan;
4249
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
4350
import static org.mockito.Mockito.mock;
4451
import static org.mockito.Mockito.when;
4552

4653
public class IndicesQueryCacheTests extends ESTestCase {
4754

48-
private static class DummyQuery extends Query {
55+
private static class DummyQuery extends Query implements Accountable {
4956

50-
private final int id;
57+
private final String id;
58+
private final long sizeInCache;
5159

5260
DummyQuery(int id) {
61+
this(Integer.toString(id), 10);
62+
}
63+
64+
DummyQuery(String id) {
65+
this(id, 10);
66+
}
67+
68+
DummyQuery(String id, long sizeInCache) {
5369
this.id = id;
70+
this.sizeInCache = sizeInCache;
5471
}
5572

5673
@Override
5774
public boolean equals(Object obj) {
58-
return sameClassAs(obj) && id == ((DummyQuery) obj).id;
75+
return sameClassAs(obj) && id.equals(((DummyQuery) obj).id);
5976
}
6077

6178
@Override
6279
public int hashCode() {
63-
return 31 * classHash() + id;
80+
return 31 * classHash() + id.hashCode();
6481
}
6582

6683
@Override
@@ -89,6 +106,10 @@ public boolean isCacheable(LeafReaderContext ctx) {
89106
};
90107
}
91108

109+
@Override
110+
public long ramBytesUsed() {
111+
return sizeInCache;
112+
}
92113
}
93114

94115
public void testBasics() throws IOException {
@@ -464,4 +485,170 @@ public void testGetSharedRamSizeForShard() {
464485
long sharedRamEq = IndicesQueryCache.getSharedRamSizeForShard(queryCache, shard1, zeroTotals);
465486
assertEquals(300L, sharedRamEq);
466487
}
488+
489+
public void testGetStatsMemory() throws Exception {
490+
/*
491+
* This test creates 2 shards, one with two segments and one with one. It makes unique queries against all 3 segments (so that each
492+
* query will be cached, up to the max cache size), and then asserts various things about the cache memory. Most importantly, it
493+
* asserts that the memory the cache attributes to each shard is proportional to the number of segment-queries for the shard in the
494+
* cache (and not to the number of documents in the query).
495+
*/
496+
String indexName = randomIdentifier();
497+
String uuid = randomUUID();
498+
ShardId shard1 = new ShardId(indexName, uuid, 0);
499+
ShardId shard2 = new ShardId(indexName, uuid, 1);
500+
List<Closeable> closeableList = new ArrayList<>();
501+
// We're going to create 2 segments for shard1, and 1 segment for shard2:
502+
int shard1Segment1Docs = randomIntBetween(11, 1000);
503+
int shard1Segment2Docs = randomIntBetween(1, 10);
504+
int shard2Segment1Docs = randomIntBetween(1, 10);
505+
IndexSearcher shard1Segment1Searcher = initializeSegment(shard1, shard1Segment1Docs, closeableList);
506+
IndexSearcher shard1Segment2Searcher = initializeSegment(shard1, shard1Segment2Docs, closeableList);
507+
IndexSearcher shard2Searcher = initializeSegment(shard2, shard2Segment1Docs, closeableList);
508+
509+
final int maxCacheSize = 200;
510+
Settings settings = Settings.builder()
511+
.put(IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING.getKey(), maxCacheSize)
512+
.put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), true)
513+
.build();
514+
IndicesQueryCache cache = new IndicesQueryCache(settings);
515+
shard1Segment1Searcher.setQueryCache(cache);
516+
shard1Segment2Searcher.setQueryCache(cache);
517+
shard2Searcher.setQueryCache(cache);
518+
519+
assertEquals(0L, cache.getStats(shard1, 0).getMemorySizeInBytes());
520+
521+
final long largeQuerySize = randomIntBetween(100, 1000);
522+
final long smallQuerySize = randomIntBetween(10, 50);
523+
524+
final int shard1Queries = randomIntBetween(20, 50);
525+
final int shard2Queries = randomIntBetween(5, 10);
526+
527+
for (int i = 0; i < shard1Queries; ++i) {
528+
shard1Segment1Searcher.count(new DummyQuery("ingest1-" + i, largeQuerySize));
529+
}
530+
IndicesQueryCache.CacheTotals cacheTotals = new IndicesQueryCache.CacheTotals(shard1Queries, 1);
531+
IndexShard indexShard1 = mock(IndexShard.class);
532+
when(indexShard1.shardId()).thenReturn(shard1);
533+
IndexShard indexShard2 = mock(IndexShard.class);
534+
when(indexShard2.shardId()).thenReturn(shard2);
535+
long sharedRamSizeShard1 = IndicesQueryCache.getSharedRamSizeForShard(cache, indexShard1, cacheTotals);
536+
long sharedRamSizeShard2 = IndicesQueryCache.getSharedRamSizeForShard(cache, indexShard2, cacheTotals);
537+
long shard1Segment1CacheMemory = calculateActualCacheMemoryForSegment(shard1Queries, largeQuerySize, shard1Segment1Docs);
538+
assertThat(cache.getStats(shard1, sharedRamSizeShard1).getMemorySizeInBytes(), equalTo(shard1Segment1CacheMemory));
539+
assertThat(cache.getStats(shard2, sharedRamSizeShard2).getMemorySizeInBytes(), equalTo(0L));
540+
for (int i = 0; i < shard2Queries; ++i) {
541+
shard2Searcher.count(new DummyQuery("ingest2-" + i, smallQuerySize));
542+
}
543+
/*
544+
* Now that we have cached some smaller things for shard2, the cache memory for shard1 has gone down. This is expected because we
545+
* report cache memory proportional to the number of segments for each shard, ignoring the number of documents or the actual
546+
* document sizes. Since the shard2 requests were smaller, the average cache memory size per segment has now gone down.
547+
*/
548+
cacheTotals = new IndicesQueryCache.CacheTotals(shard1Queries + shard2Queries, 2);
549+
sharedRamSizeShard1 = IndicesQueryCache.getSharedRamSizeForShard(cache, indexShard1, cacheTotals);
550+
sharedRamSizeShard2 = IndicesQueryCache.getSharedRamSizeForShard(cache, indexShard2, cacheTotals);
551+
assertThat(cache.getStats(shard1, sharedRamSizeShard1).getMemorySizeInBytes(), lessThan(shard1Segment1CacheMemory));
552+
long shard1CacheBytes = cache.getStats(shard1, sharedRamSizeShard1).getMemorySizeInBytes();
553+
long shard2CacheBytes = cache.getStats(shard2, sharedRamSizeShard2).getMemorySizeInBytes();
554+
long shard2Segment1CacheMemory = calculateActualCacheMemoryForSegment(shard2Queries, smallQuerySize, shard2Segment1Docs);
555+
556+
long totalMemory = shard1Segment1CacheMemory + shard2Segment1CacheMemory;
557+
// Each shard has some fixed overhead that we need to account for:
558+
long shard1Overhead = calculateOverheadForSegment(shard1Queries, shard1Segment1Docs);
559+
long shard2Overhead = calculateOverheadForSegment(shard2Queries, shard2Segment1Docs);
560+
long totalMemoryMinusOverhead = totalMemory - (shard1Overhead + shard2Overhead);
561+
/*
562+
* Note that the expected amount of memory we're calculating is based on the proportion of the number of queries to each shard
563+
* (since each shard currently only has queries to one segment)
564+
*/
565+
double shard1Segment1CacheMemoryShare = ((double) shard1Queries / (shard1Queries + shard2Queries)) * (totalMemoryMinusOverhead)
566+
+ shard1Overhead;
567+
double shard2Segment1CacheMemoryShare = ((double) shard2Queries / (shard1Queries + shard2Queries)) * (totalMemoryMinusOverhead)
568+
+ shard2Overhead;
569+
assertThat((double) shard1CacheBytes, closeTo(shard1Segment1CacheMemoryShare, 1)); // accounting for rounding
570+
assertThat((double) shard2CacheBytes, closeTo(shard2Segment1CacheMemoryShare, 1)); // accounting for rounding
571+
572+
// Now we cache just more "big" searches on shard1, but on a different segment:
573+
for (int i = 0; i < shard1Queries; ++i) {
574+
shard1Segment2Searcher.count(new DummyQuery("ingest3-" + i, largeQuerySize));
575+
}
576+
long shard1Segment2CacheMemory = calculateActualCacheMemoryForSegment(shard1Queries, largeQuerySize, shard1Segment2Docs);
577+
totalMemory = shard1Segment1CacheMemory + shard2Segment1CacheMemory + shard1Segment2CacheMemory;
578+
// Each shard has some fixed overhead that we need to account for:
579+
shard1Overhead = shard1Overhead + calculateOverheadForSegment(shard1Queries, shard1Segment2Docs);
580+
totalMemoryMinusOverhead = totalMemory - (shard1Overhead + shard2Overhead);
581+
/*
582+
* Note that the expected amount of memory we're calculating is based on the proportion of the number of queries to each segment.
583+
* The number of documents and the size of documents is irrelevant (aside from computing the fixed overhead).
584+
*/
585+
shard1Segment1CacheMemoryShare = ((double) (2 * shard1Queries) / ((2 * shard1Queries) + shard2Queries)) * (totalMemoryMinusOverhead)
586+
+ shard1Overhead;
587+
cacheTotals = new IndicesQueryCache.CacheTotals(2 * shard1Queries + shard2Queries, 2);
588+
sharedRamSizeShard1 = IndicesQueryCache.getSharedRamSizeForShard(cache, indexShard1, cacheTotals);
589+
shard1CacheBytes = cache.getStats(shard1, sharedRamSizeShard1).getMemorySizeInBytes();
590+
assertThat((double) shard1CacheBytes, closeTo(shard1Segment1CacheMemoryShare, 1)); // accounting for rounding
591+
592+
// Now make sure the cache only has items for shard2:
593+
for (int i = 0; i < (maxCacheSize * 2); ++i) {
594+
shard2Searcher.count(new DummyQuery("ingest4-" + i, smallQuerySize));
595+
}
596+
cacheTotals = new IndicesQueryCache.CacheTotals(maxCacheSize, 1);
597+
sharedRamSizeShard1 = IndicesQueryCache.getSharedRamSizeForShard(cache, indexShard1, cacheTotals);
598+
sharedRamSizeShard2 = IndicesQueryCache.getSharedRamSizeForShard(cache, indexShard2, cacheTotals);
599+
assertThat(cache.getStats(shard1, sharedRamSizeShard1).getMemorySizeInBytes(), equalTo(0L));
600+
assertThat(
601+
cache.getStats(shard2, sharedRamSizeShard2).getMemorySizeInBytes(),
602+
equalTo(calculateActualCacheMemoryForSegment(maxCacheSize, smallQuerySize, shard2Segment1Docs))
603+
);
604+
605+
IOUtils.close(closeableList);
606+
cache.onClose(shard1);
607+
cache.onClose(shard2);
608+
cache.close();
609+
}
610+
611+
/*
612+
* This calculates the memory that actually used by a segment in the IndicesQueryCache. It assumes queryCount queries are made to the
613+
* segment, and query is querySize bytes in size. It assumes that the shard contains numDocs documents.
614+
*/
615+
private long calculateActualCacheMemoryForSegment(long queryCount, long querySize, long numDocs) {
616+
return (queryCount * (querySize + 24)) + calculateOverheadForSegment(queryCount, numDocs);
617+
}
618+
619+
/*
620+
* This computes the part of the recorded IndicesQueryCache memory that is assigned to a segment and *not* divided up proportionally
621+
* when the cache reports the memory usage of each shard.
622+
*/
623+
private long calculateOverheadForSegment(long queryCount, long numDocs) {
624+
return queryCount * (112 + (8 * ((numDocs - 1) / 64)));
625+
}
626+
627+
/*
628+
* This returns an IndexSearcher for a single new segment in the given shard.
629+
*/
630+
private IndexSearcher initializeSegment(ShardId shard, int numDocs, List<Closeable> closeableList) throws Exception {
631+
AtomicReference<IndexSearcher> indexSearcherReference = new AtomicReference<>();
632+
/*
633+
* Usually creating an IndexWriter like this results in a single segment getting created, but sometimes it results in more. For the
634+
* sake of keeping the calculations in this test simple we want just a single segment. So we do this in an assertBusy.
635+
*/
636+
assertBusy(() -> {
637+
Directory dir = newDirectory();
638+
IndexWriter indexWriter = new IndexWriter(dir, newIndexWriterConfig());
639+
for (int i = 0; i < numDocs; i++) {
640+
indexWriter.addDocument(new Document());
641+
}
642+
DirectoryReader directoryReader = DirectoryReader.open(indexWriter);
643+
indexWriter.close();
644+
directoryReader = ElasticsearchDirectoryReader.wrap(directoryReader, shard);
645+
IndexSearcher indexSearcher = new IndexSearcher(directoryReader);
646+
indexSearcherReference.set(indexSearcher);
647+
indexSearcher.setQueryCachingPolicy(TrivialQueryCachingPolicy.ALWAYS);
648+
closeableList.add(directoryReader);
649+
closeableList.add(dir);
650+
assertThat(indexSearcher.getLeafContexts().size(), equalTo(1));
651+
});
652+
return indexSearcherReference.get();
653+
}
467654
}

0 commit comments

Comments
 (0)