Skip to content

Commit 168c6c5

Browse files
committed
First attempt at improving statsByShard performance
1 parent 8f0e1f7 commit 168c6c5

File tree

3 files changed

+153
-20
lines changed

3 files changed

+153
-20
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,18 @@ public CommonStats(CommonStatsFlags flags) {
155155
* Filters the given flags for {@link CommonStatsFlags#SHARD_LEVEL} flags and calculates the corresponding statistics.
156156
*/
157157
public static CommonStats getShardLevelStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) {
158+
return getShardLevelStats(indicesQueryCache, indexShard, flags, null);
159+
}
160+
161+
/**
162+
* Overload that takes a precomputed shared RAM map for O(N) stats.
163+
*/
164+
public static CommonStats getShardLevelStats(
165+
IndicesQueryCache indicesQueryCache,
166+
IndexShard indexShard,
167+
CommonStatsFlags flags,
168+
java.util.Map<org.elasticsearch.index.shard.ShardId, Long> precomputedSharedRam
169+
) {
158170
// Filter shard level flags
159171
CommonStatsFlags filteredFlags = flags.clone();
160172
for (CommonStatsFlags.Flag flag : filteredFlags.getFlags()) {
@@ -174,7 +186,16 @@ public static CommonStats getShardLevelStats(IndicesQueryCache indicesQueryCache
174186
case Refresh -> stats.refresh = indexShard.refreshStats();
175187
case Flush -> stats.flush = indexShard.flushStats();
176188
case Warmer -> stats.warmer = indexShard.warmerStats();
177-
case QueryCache -> stats.queryCache = indicesQueryCache.getStats(indexShard.shardId());
189+
case QueryCache -> {
190+
if (precomputedSharedRam != null && precomputedSharedRam.containsKey(indexShard.shardId())) {
191+
stats.queryCache = indicesQueryCache.getStats(
192+
indexShard.shardId(),
193+
precomputedSharedRam.get(indexShard.shardId())
194+
);
195+
} else {
196+
stats.queryCache = indicesQueryCache.getStats(indexShard.shardId());
197+
}
198+
}
178199
case FieldData -> stats.fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
179200
case Completion -> stats.completion = indexShard.completionStats(flags.completionDataFields());
180201
case Segments -> stats.segments = indexShard.segmentStats(

server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.Closeable;
3333
import java.io.IOException;
3434
import java.util.Collections;
35+
import java.util.HashMap;
3536
import java.util.IdentityHashMap;
3637
import java.util.Map;
3738
import java.util.Set;
@@ -67,6 +68,17 @@ public class IndicesQueryCache implements QueryCache, Closeable {
6768
private final Map<ShardId, Stats> shardStats = new ConcurrentHashMap<>();
6869
private volatile long sharedRamBytesUsed;
6970

71+
// Package-private for IndicesService efficient stats collection
72+
long getCacheSizeForShard(ShardId shardId) {
73+
Stats stats = shardStats.get(shardId);
74+
return stats != null ? stats.cacheSize : 0L;
75+
}
76+
77+
// Package-private for IndicesService efficient stats collection
78+
long getSharedRamBytesUsed() {
79+
return sharedRamBytesUsed;
80+
}
81+
7082
// This is a hack for the fact that the close listener for the
7183
// ShardCoreKeyMap will be called before onDocIdSetEviction
7284
// See onDocIdSetEviction for more info
@@ -139,6 +151,57 @@ public QueryCacheStats getStats(ShardId shard) {
139151
return queryCacheStats;
140152
}
141153

154+
/**
155+
* Overload to allow passing in a precomputed shared RAM split for this shard.
156+
*/
157+
public QueryCacheStats getStats(ShardId shard, long precomputedSharedRamBytesUsed) {
158+
final QueryCacheStats queryCacheStats = toQueryCacheStatsSafe(shardStats.get(shard));
159+
queryCacheStats.addRamBytesUsed(precomputedSharedRamBytesUsed);
160+
return queryCacheStats;
161+
}
162+
163+
/**
164+
* Precompute the shared RAM split for all shards, returning a map of ShardId to the additional shared RAM bytes used.
165+
* This avoids O(N^2) when collecting stats for all shards.
166+
*/
167+
public Map<ShardId, Long> computeAllShardSharedRamBytesUsed() {
168+
Map<ShardId, Long> result = new HashMap<>();
169+
if (sharedRamBytesUsed == 0L) {
170+
for (ShardId shardId : shardStats.keySet()) {
171+
result.put(shardId, 0L);
172+
}
173+
return result;
174+
}
175+
long totalSize = 0L;
176+
int shardCount = 0;
177+
boolean anyNonZero = false;
178+
for (Stats stats : shardStats.values()) {
179+
shardCount += 1;
180+
if (stats.cacheSize > 0L) {
181+
anyNonZero = true;
182+
totalSize += stats.cacheSize;
183+
}
184+
}
185+
if (shardCount == 0) {
186+
return result;
187+
}
188+
if (anyNonZero == false) {
189+
// All shards have zero cache footprint, apportion equally
190+
long perShard = Math.round((double) sharedRamBytesUsed / shardCount);
191+
for (ShardId shardId : shardStats.keySet()) {
192+
result.put(shardId, perShard);
193+
}
194+
} else {
195+
// Apportion proportionally to cache footprint
196+
for (Map.Entry<ShardId, Stats> entry : shardStats.entrySet()) {
197+
long cacheSize = entry.getValue().cacheSize;
198+
long ram = (totalSize == 0) ? 0L : Math.round((double) sharedRamBytesUsed * cacheSize / totalSize);
199+
result.put(entry.getKey(), ram);
200+
}
201+
}
202+
return result;
203+
}
204+
142205
@Override
143206
public Weight doCache(Weight weight, QueryCachingPolicy policy) {
144207
while (weight instanceof CachingWeightWrapper) {

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 68 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@
167167
import java.util.ArrayList;
168168
import java.util.Arrays;
169169
import java.util.Collection;
170+
import java.util.Collections;
170171
import java.util.EnumMap;
171172
import java.util.HashMap;
172173
import java.util.Iterator;
@@ -519,33 +520,81 @@ static Map<Index, CommonStats> statsByIndex(final IndicesService indicesService,
519520
}
520521

521522
static Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) {
522-
final Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
523-
523+
IndicesQueryCache queryCache = indicesService.getIndicesQueryCache();
524+
// First pass: gather all shards, cache sizes, and compute totals
525+
class ShardCacheInfo {
526+
final IndexService indexService;
527+
final IndexShard indexShard;
528+
final org.elasticsearch.index.shard.ShardId shardId;
529+
final long cacheSize;
530+
531+
ShardCacheInfo(IndexService is, IndexShard shard, org.elasticsearch.index.shard.ShardId id, long size) {
532+
this.indexService = is;
533+
this.indexShard = shard;
534+
this.shardId = id;
535+
this.cacheSize = size;
536+
}
537+
}
538+
List<ShardCacheInfo> shardInfos = new ArrayList<>();
539+
long totalSize = 0L;
540+
int shardCount = 0;
541+
boolean anyNonZero = false;
524542
for (final IndexService indexService : indicesService) {
525543
for (final IndexShard indexShard : indexService) {
526-
try {
527-
final IndexShardStats indexShardStats = indicesService.indexShardStats(indicesService, indexShard, flags);
528-
529-
if (indexShardStats == null) {
530-
continue;
531-
}
532-
533-
if (statsByShard.containsKey(indexService.index()) == false) {
534-
statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats));
535-
} else {
536-
statsByShard.get(indexService.index()).add(indexShardStats);
537-
}
538-
} catch (IllegalIndexShardStateException | AlreadyClosedException e) {
539-
// we can safely ignore illegal state on ones that are closing for example
540-
logger.trace(() -> format("%s ignoring shard stats", indexShard.shardId()), e);
544+
org.elasticsearch.index.shard.ShardId shardId = indexShard.shardId();
545+
long cacheSize = queryCache.getCacheSizeForShard(shardId);
546+
shardInfos.add(new ShardCacheInfo(indexService, indexShard, shardId, cacheSize));
547+
shardCount++;
548+
if (cacheSize > 0L) {
549+
anyNonZero = true;
550+
totalSize += cacheSize;
541551
}
542552
}
543553
}
544-
554+
long sharedRamBytesUsed = queryCache.getSharedRamBytesUsed();
555+
final Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
556+
// Second pass: build stats, compute shared RAM on the fly
557+
for (ShardCacheInfo info : shardInfos) {
558+
long sharedRam = 0L;
559+
if (sharedRamBytesUsed != 0L) {
560+
if (anyNonZero == false) {
561+
sharedRam = Math.round((double) sharedRamBytesUsed / shardCount);
562+
} else if (totalSize != 0) {
563+
sharedRam = Math.round((double) sharedRamBytesUsed * info.cacheSize / totalSize);
564+
}
565+
}
566+
try {
567+
final IndexShardStats indexShardStats = indicesService.indexShardStats(
568+
indicesService,
569+
info.indexShard,
570+
flags,
571+
Collections.singletonMap(info.shardId, sharedRam)
572+
);
573+
if (indexShardStats == null) {
574+
continue;
575+
}
576+
if (statsByShard.containsKey(info.indexService.index()) == false) {
577+
statsByShard.put(info.indexService.index(), arrayAsArrayList(indexShardStats));
578+
} else {
579+
statsByShard.get(info.indexService.index()).add(indexShardStats);
580+
}
581+
} catch (IllegalIndexShardStateException | AlreadyClosedException e) {
582+
logger.trace(() -> format("%s ignoring shard stats", info.shardId), e);
583+
}
584+
}
545585
return statsByShard;
546586
}
547587

548588
IndexShardStats indexShardStats(final IndicesService indicesService, final IndexShard indexShard, final CommonStatsFlags flags) {
589+
return indexShardStats(indicesService, indexShard, flags, null);
590+
}
591+
592+
IndexShardStats indexShardStats(
593+
final IndicesService indicesService,
594+
final IndexShard indexShard,
595+
final CommonStatsFlags flags,
596+
Map<org.elasticsearch.index.shard.ShardId, Long> precomputedSharedRam
597+
) {
549598
if (indexShard.routingEntry() == null) {
550599
return null;
551600
}
@@ -570,7 +619,7 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index
570619
new ShardStats(
571620
indexShard.routingEntry(),
572621
indexShard.shardPath(),
573-
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, flags),
622+
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, flags, precomputedSharedRam),
574623
commitStats,
575624
seqNoStats,
576625
retentionLeaseStats,

0 commit comments

Comments
 (0)