Skip to content

Commit 017afc6

Browse files
authored
Improving statsByShard performance when the number of shards is very large (elastic#130857) (elastic#137768)
Co-authored-by: Joe Gallo <[email protected]> (cherry picked from commit 22c15bc) # Conflicts: # server/src/test/java/org/elasticsearch/indices/IndicesQueryCacheTests.java
1 parent 7e81740 commit 017afc6

File tree

11 files changed

+231
-93
lines changed

11 files changed

+231
-93
lines changed

docs/changelog/130857.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 130857
2+
summary: Improving statsByShard performance when the number of shards is very large
3+
area: Stats
4+
type: bug
5+
issues:
6+
- 97222

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import org.elasticsearch.index.seqno.RetentionLeaseStats;
4747
import org.elasticsearch.index.seqno.SeqNoStats;
4848
import org.elasticsearch.index.shard.IndexShard;
49+
import org.elasticsearch.index.shard.ShardId;
50+
import org.elasticsearch.indices.IndicesQueryCache;
4951
import org.elasticsearch.indices.IndicesService;
5052
import org.elasticsearch.injection.guice.Inject;
5153
import org.elasticsearch.node.NodeService;
@@ -252,9 +254,12 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
252254
false,
253255
false
254256
);
257+
Map<ShardId, Long> shardIdToSharedRam = IndicesQueryCache.getSharedRamSizeForAllShards(indicesService);
255258
List<ShardStats> shardsStats = new ArrayList<>();
256259
for (IndexService indexService : indicesService) {
257260
for (IndexShard indexShard : indexService) {
261+
// get the shared ram for this shard id (or zero if there's nothing in the map)
262+
long sharedRam = shardIdToSharedRam.getOrDefault(indexShard.shardId(), 0L);
258263
cancellableTask.ensureNotCancelled();
259264
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
260265
// only report on fully started shards
@@ -275,7 +280,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
275280
new ShardStats(
276281
indexShard.routingEntry(),
277282
indexShard.shardPath(),
278-
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
283+
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS, sharedRam),
279284
commitStats,
280285
seqNoStats,
281286
retentionLeaseStats,
@@ -303,7 +308,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
303308
clusterStatus,
304309
nodeInfo,
305310
nodeStats,
306-
shardsStats.toArray(new ShardStats[shardsStats.size()]),
311+
shardsStats.toArray(new ShardStats[0]),
307312
searchUsageStats,
308313
repositoryUsageStats,
309314
ccsTelemetry,
@@ -472,7 +477,7 @@ protected void sendItemRequest(String clusterAlias, ActionListener<RemoteCluster
472477
@Override
473478
protected void onItemResponse(String clusterAlias, RemoteClusterStatsResponse response) {
474479
if (response != null) {
475-
remoteClustersStats.computeIfPresent(clusterAlias, (k, v) -> v.acceptResponse(response));
480+
remoteClustersStats.computeIfPresent(clusterAlias, (ignored, v) -> v.acceptResponse(response));
476481
}
477482
}
478483

server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,12 @@ public CommonStats(CommonStatsFlags flags) {
154154
/**
155155
* Filters the given flags for {@link CommonStatsFlags#SHARD_LEVEL} flags and calculates the corresponding statistics.
156156
*/
157-
public static CommonStats getShardLevelStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) {
157+
public static CommonStats getShardLevelStats(
158+
IndicesQueryCache indicesQueryCache,
159+
IndexShard indexShard,
160+
CommonStatsFlags flags,
161+
long precomputedSharedRam
162+
) {
158163
// Filter shard level flags
159164
CommonStatsFlags filteredFlags = flags.clone();
160165
for (CommonStatsFlags.Flag flag : filteredFlags.getFlags()) {
@@ -174,7 +179,7 @@ public static CommonStats getShardLevelStats(IndicesQueryCache indicesQueryCache
174179
case Refresh -> stats.refresh = indexShard.refreshStats();
175180
case Flush -> stats.flush = indexShard.flushStats();
176181
case Warmer -> stats.warmer = indexShard.warmerStats();
177-
case QueryCache -> stats.queryCache = indicesQueryCache.getStats(indexShard.shardId());
182+
case QueryCache -> stats.queryCache = indicesQueryCache.getStats(indexShard.shardId(), precomputedSharedRam);
178183
case FieldData -> stats.fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
179184
case Completion -> stats.completion = indexShard.completionStats(flags.completionDataFields());
180185
case Segments -> stats.segments = indexShard.segmentStats(

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.index.seqno.RetentionLeaseStats;
2727
import org.elasticsearch.index.seqno.SeqNoStats;
2828
import org.elasticsearch.index.shard.IndexShard;
29+
import org.elasticsearch.indices.IndicesQueryCache;
2930
import org.elasticsearch.indices.IndicesService;
3031
import org.elasticsearch.injection.guice.Inject;
3132
import org.elasticsearch.tasks.CancellableTask;
@@ -110,7 +111,13 @@ protected void shardOperation(IndicesStatsRequest request, ShardRouting shardRou
110111
assert task instanceof CancellableTask;
111112
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
112113
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
113-
CommonStats commonStats = CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, request.flags());
114+
long sharedRam = IndicesQueryCache.getSharedRamSizeForShard(indicesService, indexShard.shardId());
115+
CommonStats commonStats = CommonStats.getShardLevelStats(
116+
indicesService.getIndicesQueryCache(),
117+
indexShard,
118+
request.flags(),
119+
sharedRam
120+
);
114121
CommitStats commitStats;
115122
SeqNoStats seqNoStats;
116123
RetentionLeaseStats retentionLeaseStats;

server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java

Lines changed: 84 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@
2828
import org.elasticsearch.common.unit.ByteSizeValue;
2929
import org.elasticsearch.core.Nullable;
3030
import org.elasticsearch.core.Predicates;
31+
import org.elasticsearch.index.IndexService;
3132
import org.elasticsearch.index.cache.query.QueryCacheStats;
33+
import org.elasticsearch.index.shard.IndexShard;
3234
import org.elasticsearch.index.shard.ShardId;
3335

3436
import java.io.Closeable;
3537
import java.io.IOException;
3638
import java.util.Collections;
39+
import java.util.HashMap;
3740
import java.util.IdentityHashMap;
3841
import java.util.Map;
3942
import java.util.Set;
@@ -69,6 +72,38 @@ public class IndicesQueryCache implements QueryCache, Closeable {
6972
private final Map<ShardId, Stats> shardStats = new ConcurrentHashMap<>();
7073
private volatile long sharedRamBytesUsed;
7174

75+
/**
76+
* Calculates a map of {@link ShardId} to {@link Long} which contains the calculated share of the {@link IndicesQueryCache} shared ram
77+
* size for a given shard (that is, the sum of all the longs is the size of the indices query cache). Since many shards will not
78+
* participate in the cache, shards whose calculated share is zero will not be contained in the map at all. As a consequence, the
79+
* correct pattern for using the returned map will be via {@link Map#getOrDefault(Object, Object)} with a {@code defaultValue} of
80+
* {@code 0L}.
81+
*/
82+
public static Map<ShardId, Long> getSharedRamSizeForAllShards(IndicesService indicesService) {
83+
Map<ShardId, Long> shardIdToSharedRam = new HashMap<>();
84+
IndicesQueryCache.CacheTotals cacheTotals = IndicesQueryCache.getCacheTotalsForAllShards(indicesService);
85+
for (IndexService indexService : indicesService) {
86+
for (IndexShard indexShard : indexService) {
87+
final var queryCache = indicesService.getIndicesQueryCache();
88+
long sharedRam = (queryCache == null) ? 0L : queryCache.getSharedRamSizeForShard(indexShard.shardId(), cacheTotals);
89+
// as a size optimization, only store non-zero values in the map
90+
if (sharedRam > 0L) {
91+
shardIdToSharedRam.put(indexShard.shardId(), sharedRam);
92+
}
93+
}
94+
}
95+
return Collections.unmodifiableMap(shardIdToSharedRam);
96+
}
97+
98+
public long getCacheSizeForShard(ShardId shardId) {
99+
Stats stats = shardStats.get(shardId);
100+
return stats != null ? stats.cacheSize : 0L;
101+
}
102+
103+
public long getSharedRamBytesUsed() {
104+
return sharedRamBytesUsed;
105+
}
106+
72107
// This is a hack for the fact that the close listener for the
73108
// ShardCoreKeyMap will be called before onDocIdSetEviction
74109
// See onDocIdSetEviction for more info
@@ -91,40 +126,58 @@ private static QueryCacheStats toQueryCacheStatsSafe(@Nullable Stats stats) {
91126
return stats == null ? new QueryCacheStats() : stats.toQueryCacheStats();
92127
}
93128

94-
private long getShareOfAdditionalRamBytesUsed(long itemsInCacheForShard) {
95-
if (sharedRamBytesUsed == 0L) {
96-
return 0L;
97-
}
98-
99-
/*
100-
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each
101-
* shard.
102-
*/
103-
// TODO avoid looping over all local shards here - see https://github.com/elastic/elasticsearch/issues/97222
129+
/**
130+
* This computes the total cache size in bytes, and the total shard count in the cache for all shards.
131+
* @param indicesService
132+
* @return A CacheTotals object containing the computed total number of items in the cache and the number of shards seen in the cache
133+
*/
134+
private static CacheTotals getCacheTotalsForAllShards(IndicesService indicesService) {
135+
IndicesQueryCache queryCache = indicesService.getIndicesQueryCache();
136+
boolean hasQueryCache = queryCache != null;
104137
long totalItemsInCache = 0L;
105138
int shardCount = 0;
106-
if (itemsInCacheForShard == 0L) {
107-
for (final var stats : shardStats.values()) {
108-
shardCount += 1;
109-
if (stats.cacheSize > 0L) {
110-
// some shard has nonzero cache footprint, so we apportion the shared size by cache footprint, and this shard has none
111-
return 0L;
112-
}
113-
}
114-
} else {
115-
// branchless loop for the common case
116-
for (final var stats : shardStats.values()) {
117-
shardCount += 1;
118-
totalItemsInCache += stats.cacheSize;
139+
for (final IndexService indexService : indicesService) {
140+
for (final IndexShard indexShard : indexService) {
141+
final var shardId = indexShard.shardId();
142+
long cacheSize = hasQueryCache ? queryCache.getCacheSizeForShard(shardId) : 0L;
143+
shardCount++;
144+
assert cacheSize >= 0 : "Unexpected cache size of " + cacheSize + " for shard " + shardId;
145+
totalItemsInCache += cacheSize;
119146
}
120147
}
148+
return new CacheTotals(totalItemsInCache, shardCount);
149+
}
121150

151+
public static long getSharedRamSizeForShard(IndicesService indicesService, ShardId shardId) {
152+
IndicesQueryCache.CacheTotals cacheTotals = IndicesQueryCache.getCacheTotalsForAllShards(indicesService);
153+
final var queryCache = indicesService.getIndicesQueryCache();
154+
return (queryCache == null) ? 0L : queryCache.getSharedRamSizeForShard(shardId, cacheTotals);
155+
}
156+
157+
/**
158+
* This method computes the shared RAM size in bytes for the given indexShard.
159+
* @param shardId The shard to compute the shared RAM size for
160+
* @param cacheTotals Shard totals computed in getCacheTotalsForAllShards()
161+
* @return the shared RAM size in bytes allocated to the given shard, or 0 if unavailable
162+
*/
163+
private long getSharedRamSizeForShard(ShardId shardId, CacheTotals cacheTotals) {
164+
long sharedRamBytesUsed = getSharedRamBytesUsed();
165+
if (sharedRamBytesUsed == 0L) {
166+
return 0L;
167+
}
168+
169+
int shardCount = cacheTotals.shardCount();
122170
if (shardCount == 0) {
123171
// Sometimes it's not possible to do this when there are no shard entries at all, which can happen as the shared ram usage can
124172
// extend beyond the closing of all shards.
125173
return 0L;
126174
}
127-
175+
/*
176+
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each
177+
* shard.
178+
*/
179+
long totalItemsInCache = cacheTotals.totalItemsInCache();
180+
long itemsInCacheForShard = getCacheSizeForShard(shardId);
128181
final long additionalRamBytesUsed;
129182
if (totalItemsInCache == 0) {
130183
// all shards have zero cache footprint, so we apportion the size of the shared bytes equally across all shards
@@ -145,10 +198,12 @@ private long getShareOfAdditionalRamBytesUsed(long itemsInCacheForShard) {
145198
return additionalRamBytesUsed;
146199
}
147200

201+
private record CacheTotals(long totalItemsInCache, int shardCount) {}
202+
148203
/** Get usage statistics for the given shard. */
149-
public QueryCacheStats getStats(ShardId shard) {
204+
public QueryCacheStats getStats(ShardId shard, long precomputedSharedRamBytesUsed) {
150205
final QueryCacheStats queryCacheStats = toQueryCacheStatsSafe(shardStats.get(shard));
151-
queryCacheStats.addRamBytesUsed(getShareOfAdditionalRamBytesUsed(queryCacheStats.getCacheSize()));
206+
queryCacheStats.addRamBytesUsed(precomputedSharedRamBytesUsed);
152207
return queryCacheStats;
153208
}
154209

@@ -257,7 +312,7 @@ QueryCacheStats toQueryCacheStats() {
257312
public String toString() {
258313
return "{shardId="
259314
+ shardId
260-
+ ", ramBytedUsed="
315+
+ ", ramBytesUsed="
261316
+ ramBytesUsed
262317
+ ", hitCount="
263318
+ hitCount
@@ -354,11 +409,7 @@ protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) {
354409
shardStats.cacheCount += 1;
355410
shardStats.ramBytesUsed += ramBytesUsed;
356411

357-
StatsAndCount statsAndCount = stats2.get(readerCoreKey);
358-
if (statsAndCount == null) {
359-
statsAndCount = new StatsAndCount(shardStats);
360-
stats2.put(readerCoreKey, statsAndCount);
361-
}
412+
StatsAndCount statsAndCount = stats2.computeIfAbsent(readerCoreKey, ignored -> new StatsAndCount(shardStats));
362413
statsAndCount.count += 1;
363414
}
364415

@@ -371,7 +422,7 @@ protected void onDocIdSetEviction(Object readerCoreKey, int numEntries, long sum
371422
if (numEntries > 0) {
372423
// We can't use ShardCoreKeyMap here because its core closed
373424
// listener is called before the listener of the cache which
374-
// triggers this eviction. So instead we use use stats2 that
425+
// triggers this eviction. So instead we use stats2 that
375426
// we only evict when nothing is cached anymore on the segment
376427
// instead of relying on close listeners
377428
final StatsAndCount statsAndCount = stats2.get(readerCoreKey);

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -497,33 +497,36 @@ static Map<Index, CommonStats> statsByIndex(final IndicesService indicesService,
497497
}
498498

499499
static Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) {
500+
Map<ShardId, Long> shardIdToSharedRam = IndicesQueryCache.getSharedRamSizeForAllShards(indicesService);
500501
final Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
501-
502502
for (final IndexService indexService : indicesService) {
503503
for (final IndexShard indexShard : indexService) {
504+
// get the shared ram for this shard id (or zero if there's nothing in the map)
505+
long sharedRam = shardIdToSharedRam.getOrDefault(indexShard.shardId(), 0L);
504506
try {
505-
final IndexShardStats indexShardStats = indicesService.indexShardStats(indicesService, indexShard, flags);
506-
507+
final IndexShardStats indexShardStats = indicesService.indexShardStats(indicesService, indexShard, flags, sharedRam);
507508
if (indexShardStats == null) {
508509
continue;
509510
}
510-
511511
if (statsByShard.containsKey(indexService.index()) == false) {
512512
statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats));
513513
} else {
514514
statsByShard.get(indexService.index()).add(indexShardStats);
515515
}
516516
} catch (IllegalIndexShardStateException | AlreadyClosedException e) {
517-
// we can safely ignore illegal state on ones that are closing for example
518517
logger.trace(() -> format("%s ignoring shard stats", indexShard.shardId()), e);
519518
}
520519
}
521520
}
522-
523521
return statsByShard;
524522
}
525523

526-
IndexShardStats indexShardStats(final IndicesService indicesService, final IndexShard indexShard, final CommonStatsFlags flags) {
524+
IndexShardStats indexShardStats(
525+
final IndicesService indicesService,
526+
final IndexShard indexShard,
527+
final CommonStatsFlags flags,
528+
final long precomputedSharedRam
529+
) {
527530
if (indexShard.routingEntry() == null) {
528531
return null;
529532
}
@@ -548,7 +551,7 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index
548551
new ShardStats(
549552
indexShard.routingEntry(),
550553
indexShard.shardPath(),
551-
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, flags),
554+
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, flags, precomputedSharedRam),
552555
commitStats,
553556
seqNoStats,
554557
retentionLeaseStats,

server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public void testCreation() {
115115
ShardStats shardStats = new ShardStats(
116116
shardRouting,
117117
new ShardPath(false, path, path, shardRouting.shardId()),
118-
CommonStats.getShardLevelStats(null, indexShard, new CommonStatsFlags(CommonStatsFlags.Flag.Store)),
118+
CommonStats.getShardLevelStats(null, indexShard, new CommonStatsFlags(CommonStatsFlags.Flag.Store), 0L),
119119
null,
120120
null,
121121
null,

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1620,7 +1620,7 @@ public void testShardStats() throws IOException {
16201620
ShardStats stats = new ShardStats(
16211621
shard.routingEntry(),
16221622
shard.shardPath(),
1623-
CommonStats.getShardLevelStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()),
1623+
CommonStats.getShardLevelStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags(), 0L),
16241624
shard.commitStats(),
16251625
shard.seqNoStats(),
16261626
shard.getRetentionLeaseStats(),

0 commit comments

Comments
 (0)