Skip to content

Commit d74334e

Browse files
authored
Improve stats performance when the number of shards is very large (elastic#138126)
1 parent e15effe commit d74334e

File tree

11 files changed

+294
-144
lines changed

11 files changed

+294
-144
lines changed

docs/changelog/138126.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 138126
2+
summary: Improving performance of stats APIs when the number of shards is very large
3+
area: Stats
4+
type: bug
5+
issues:
6+
- 97222

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.cluster.service.ClusterService;
4141
import org.elasticsearch.common.io.stream.StreamInput;
4242
import org.elasticsearch.common.settings.Settings;
43+
import org.elasticsearch.common.util.CachedSupplier;
4344
import org.elasticsearch.common.util.CancellableSingleObjectCache;
4445
import org.elasticsearch.common.util.concurrent.ThreadContext;
4546
import org.elasticsearch.core.FixForMultiProject;
@@ -48,6 +49,8 @@
4849
import org.elasticsearch.index.seqno.RetentionLeaseStats;
4950
import org.elasticsearch.index.seqno.SeqNoStats;
5051
import org.elasticsearch.index.shard.IndexShard;
52+
import org.elasticsearch.index.shard.ShardId;
53+
import org.elasticsearch.indices.IndicesQueryCache;
5154
import org.elasticsearch.indices.IndicesService;
5255
import org.elasticsearch.injection.guice.Inject;
5356
import org.elasticsearch.node.NodeService;
@@ -75,6 +78,7 @@
7578
import java.util.concurrent.Executor;
7679
import java.util.function.BiFunction;
7780
import java.util.function.BooleanSupplier;
81+
import java.util.function.Supplier;
7882
import java.util.stream.Collectors;
7983

8084
/**
@@ -260,9 +264,13 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
260264
false,
261265
false
262266
);
267+
Supplier<Map<ShardId, Long>> shardIdToSharedRam = CachedSupplier.wrap(
268+
() -> IndicesQueryCache.getSharedRamSizeForAllShards(indicesService)
269+
);
263270
List<ShardStats> shardsStats = new ArrayList<>();
264271
for (IndexService indexService : indicesService) {
265272
for (IndexShard indexShard : indexService) {
273+
// get the shared ram for this shard id (or zero if there's nothing in the map)
266274
cancellableTask.ensureNotCancelled();
267275
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
268276
// only report on fully started shards
@@ -283,7 +291,12 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
283291
new ShardStats(
284292
indexShard.routingEntry(),
285293
indexShard.shardPath(),
286-
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
294+
CommonStats.getShardLevelStats(
295+
indicesService.getIndicesQueryCache(),
296+
indexShard,
297+
SHARD_STATS_FLAGS,
298+
() -> shardIdToSharedRam.get().getOrDefault(indexShard.shardId(), 0L)
299+
),
287300
commitStats,
288301
seqNoStats,
289302
retentionLeaseStats,
@@ -314,7 +327,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
314327
clusterStatus,
315328
nodeInfo,
316329
nodeStats,
317-
shardsStats.toArray(new ShardStats[shardsStats.size()]),
330+
shardsStats.toArray(new ShardStats[0]),
318331
searchUsageStats,
319332
repositoryUsageStats,
320333
ccsTelemetry,
@@ -476,7 +489,7 @@ protected void sendItemRequest(String clusterAlias, ActionListener<RemoteCluster
476489
@Override
477490
protected void onItemResponse(String clusterAlias, RemoteClusterStatsResponse response) {
478491
if (response != null) {
479-
remoteClustersStats.computeIfPresent(clusterAlias, (k, v) -> v.acceptResponse(response));
492+
remoteClustersStats.computeIfPresent(clusterAlias, (ignored, v) -> v.acceptResponse(response));
480493
}
481494
}
482495

server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646

4747
import java.io.IOException;
4848
import java.util.Objects;
49+
import java.util.function.Supplier;
4950

5051
public class CommonStats implements Writeable, ToXContentFragment {
5152

@@ -154,7 +155,12 @@ public CommonStats(CommonStatsFlags flags) {
154155
/**
155156
* Filters the given flags for {@link CommonStatsFlags#SHARD_LEVEL} flags and calculates the corresponding statistics.
156157
*/
157-
public static CommonStats getShardLevelStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) {
158+
public static CommonStats getShardLevelStats(
159+
IndicesQueryCache indicesQueryCache,
160+
IndexShard indexShard,
161+
CommonStatsFlags flags,
162+
Supplier<Long> precomputedSharedRam
163+
) {
158164
// Filter shard level flags
159165
CommonStatsFlags filteredFlags = flags.clone();
160166
for (CommonStatsFlags.Flag flag : filteredFlags.getFlags()) {
@@ -174,7 +180,7 @@ public static CommonStats getShardLevelStats(IndicesQueryCache indicesQueryCache
174180
case Refresh -> stats.refresh = indexShard.refreshStats();
175181
case Flush -> stats.flush = indexShard.flushStats();
176182
case Warmer -> stats.warmer = indexShard.warmerStats();
177-
case QueryCache -> stats.queryCache = indicesQueryCache.getStats(indexShard.shardId());
183+
case QueryCache -> stats.queryCache = indicesQueryCache.getStats(indexShard.shardId(), precomputedSharedRam);
178184
case FieldData -> stats.fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
179185
case Completion -> stats.completion = indexShard.completionStats(flags.completionDataFields());
180186
case Segments -> stats.segments = indexShard.segmentStats(

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import org.elasticsearch.cluster.routing.ShardsIterator;
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.io.stream.StreamInput;
25+
import org.elasticsearch.common.util.CachedSupplier;
2526
import org.elasticsearch.index.IndexService;
2627
import org.elasticsearch.index.engine.CommitStats;
2728
import org.elasticsearch.index.seqno.RetentionLeaseStats;
2829
import org.elasticsearch.index.seqno.SeqNoStats;
2930
import org.elasticsearch.index.shard.IndexShard;
31+
import org.elasticsearch.indices.IndicesQueryCache;
3032
import org.elasticsearch.indices.IndicesService;
3133
import org.elasticsearch.injection.guice.Inject;
3234
import org.elasticsearch.tasks.CancellableTask;
@@ -35,12 +37,13 @@
3537
import org.elasticsearch.transport.TransportService;
3638

3739
import java.io.IOException;
40+
import java.util.function.Supplier;
3841

3942
public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
4043
IndicesStatsRequest,
4144
IndicesStatsResponse,
4245
ShardStats,
43-
Void> {
46+
Supplier<IndicesQueryCache.CacheTotals>> {
4447

4548
private final IndicesService indicesService;
4649
private final ProjectResolver projectResolver;
@@ -112,19 +115,32 @@ protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException
112115
return new IndicesStatsRequest(in);
113116
}
114117

118+
@Override
119+
protected Supplier<IndicesQueryCache.CacheTotals> createNodeContext() {
120+
return CachedSupplier.wrap(() -> IndicesQueryCache.getCacheTotalsForAllShards(indicesService));
121+
}
122+
115123
@Override
116124
protected void shardOperation(
117125
IndicesStatsRequest request,
118126
ShardRouting shardRouting,
119127
Task task,
120-
Void nodeContext,
128+
Supplier<IndicesQueryCache.CacheTotals> context,
121129
ActionListener<ShardStats> listener
122130
) {
123131
ActionListener.completeWith(listener, () -> {
124132
assert task instanceof CancellableTask;
125133
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
126134
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
127-
CommonStats commonStats = CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, request.flags());
135+
CommonStats commonStats = CommonStats.getShardLevelStats(
136+
indicesService.getIndicesQueryCache(),
137+
indexShard,
138+
request.flags(),
139+
() -> {
140+
final IndicesQueryCache queryCache = indicesService.getIndicesQueryCache();
141+
return (queryCache == null) ? 0L : queryCache.getSharedRamSizeForShard(indexShard.shardId(), context.get());
142+
}
143+
);
128144
CommitStats commitStats;
129145
SeqNoStats seqNoStats;
130146
RetentionLeaseStats retentionLeaseStats;

server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java

Lines changed: 81 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,21 @@
2626
import org.elasticsearch.common.unit.ByteSizeValue;
2727
import org.elasticsearch.core.Nullable;
2828
import org.elasticsearch.core.Predicates;
29+
import org.elasticsearch.index.IndexService;
2930
import org.elasticsearch.index.cache.query.QueryCacheStats;
31+
import org.elasticsearch.index.shard.IndexShard;
3032
import org.elasticsearch.index.shard.ShardId;
3133

3234
import java.io.Closeable;
3335
import java.io.IOException;
3436
import java.util.Collections;
37+
import java.util.HashMap;
3538
import java.util.IdentityHashMap;
3639
import java.util.Map;
3740
import java.util.Set;
3841
import java.util.concurrent.ConcurrentHashMap;
3942
import java.util.function.Predicate;
43+
import java.util.function.Supplier;
4044

4145
public class IndicesQueryCache implements QueryCache, Closeable {
4246

@@ -67,6 +71,40 @@ public class IndicesQueryCache implements QueryCache, Closeable {
6771
private final Map<ShardId, Stats> shardStats = new ConcurrentHashMap<>();
6872
private volatile long sharedRamBytesUsed;
6973

74+
/**
75+
* Calculates a map of {@link ShardId} to {@link Long} which contains the calculated share of the {@link IndicesQueryCache} shared ram
76+
* size for a given shard (that is, the sum of all the longs is the size of the indices query cache). Since many shards will not
77+
* participate in the cache, shards whose calculated share is zero will not be contained in the map at all. As a consequence, the
78+
* correct pattern for using the returned map will be via {@link Map#getOrDefault(Object, Object)} with a {@code defaultValue} of
79+
* {@code 0L}.
80+
* @return an unmodifiable map from {@link ShardId} to the calculated share of the query cache's shared RAM size for each shard,
81+
* omitting shards with a zero share
82+
*/
83+
public static Map<ShardId, Long> getSharedRamSizeForAllShards(IndicesService indicesService) {
84+
Map<ShardId, Long> shardIdToSharedRam = new HashMap<>();
85+
IndicesQueryCache.CacheTotals cacheTotals = IndicesQueryCache.getCacheTotalsForAllShards(indicesService);
86+
for (IndexService indexService : indicesService) {
87+
for (IndexShard indexShard : indexService) {
88+
final var queryCache = indicesService.getIndicesQueryCache();
89+
long sharedRam = (queryCache == null) ? 0L : queryCache.getSharedRamSizeForShard(indexShard.shardId(), cacheTotals);
90+
// as a size optimization, only store non-zero values in the map
91+
if (sharedRam > 0L) {
92+
shardIdToSharedRam.put(indexShard.shardId(), sharedRam);
93+
}
94+
}
95+
}
96+
return Collections.unmodifiableMap(shardIdToSharedRam);
97+
}
98+
99+
public long getCacheSizeForShard(ShardId shardId) {
100+
Stats stats = shardStats.get(shardId);
101+
return stats != null ? stats.cacheSize : 0L;
102+
}
103+
104+
public long getSharedRamBytesUsed() {
105+
return sharedRamBytesUsed;
106+
}
107+
70108
// This is a hack for the fact that the close listener for the
71109
// ShardCoreKeyMap will be called before onDocIdSetEviction
72110
// See onDocIdSetEviction for more info
@@ -89,40 +127,52 @@ private static QueryCacheStats toQueryCacheStatsSafe(@Nullable Stats stats) {
89127
return stats == null ? new QueryCacheStats() : stats.toQueryCacheStats();
90128
}
91129

92-
private long getShareOfAdditionalRamBytesUsed(long itemsInCacheForShard) {
93-
if (sharedRamBytesUsed == 0L) {
94-
return 0L;
95-
}
96-
97-
/*
98-
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each
99-
* shard.
100-
*/
101-
// TODO avoid looping over all local shards here - see https://github.com/elastic/elasticsearch/issues/97222
130+
/**
131+
* Computes the total cache size in bytes, and the total shard count in the cache for all shards.
132+
* @param indicesService the IndicesService instance to retrieve cache information from
133+
* @return A CacheTotals object containing the computed total number of items in the cache and the number of shards seen in the cache
134+
*/
135+
public static CacheTotals getCacheTotalsForAllShards(IndicesService indicesService) {
136+
IndicesQueryCache queryCache = indicesService.getIndicesQueryCache();
137+
boolean hasQueryCache = queryCache != null;
102138
long totalItemsInCache = 0L;
103139
int shardCount = 0;
104-
if (itemsInCacheForShard == 0L) {
105-
for (final var stats : shardStats.values()) {
106-
shardCount += 1;
107-
if (stats.cacheSize > 0L) {
108-
// some shard has nonzero cache footprint, so we apportion the shared size by cache footprint, and this shard has none
109-
return 0L;
110-
}
111-
}
112-
} else {
113-
// branchless loop for the common case
114-
for (final var stats : shardStats.values()) {
115-
shardCount += 1;
116-
totalItemsInCache += stats.cacheSize;
140+
for (final IndexService indexService : indicesService) {
141+
for (final IndexShard indexShard : indexService) {
142+
final var shardId = indexShard.shardId();
143+
long cacheSize = hasQueryCache ? queryCache.getCacheSizeForShard(shardId) : 0L;
144+
shardCount++;
145+
assert cacheSize >= 0 : "Unexpected cache size of " + cacheSize + " for shard " + shardId;
146+
totalItemsInCache += cacheSize;
117147
}
118148
}
149+
return new CacheTotals(totalItemsInCache, shardCount);
150+
}
151+
152+
/**
153+
* This method computes the shared RAM size in bytes for the given indexShard.
154+
* @param shardId The shard to compute the shared RAM size for.
155+
* @param cacheTotals Shard totals computed in {@link #getCacheTotalsForAllShards(IndicesService)}.
156+
* @return the shared RAM size in bytes allocated to the given shard, or 0 if unavailable
157+
*/
158+
public long getSharedRamSizeForShard(ShardId shardId, CacheTotals cacheTotals) {
159+
long sharedRamBytesUsed = getSharedRamBytesUsed();
160+
if (sharedRamBytesUsed == 0L) {
161+
return 0L;
162+
}
119163

164+
int shardCount = cacheTotals.shardCount();
120165
if (shardCount == 0) {
121166
// Sometimes it's not possible to do this when there are no shard entries at all, which can happen as the shared ram usage can
122167
// extend beyond the closing of all shards.
123168
return 0L;
124169
}
125-
170+
/*
171+
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each
172+
* shard.
173+
*/
174+
long totalItemsInCache = cacheTotals.totalItemsInCache();
175+
long itemsInCacheForShard = getCacheSizeForShard(shardId);
126176
final long additionalRamBytesUsed;
127177
if (totalItemsInCache == 0) {
128178
// all shards have zero cache footprint, so we apportion the size of the shared bytes equally across all shards
@@ -143,10 +193,12 @@ private long getShareOfAdditionalRamBytesUsed(long itemsInCacheForShard) {
143193
return additionalRamBytesUsed;
144194
}
145195

196+
public record CacheTotals(long totalItemsInCache, int shardCount) {}
197+
146198
/** Get usage statistics for the given shard. */
147-
public QueryCacheStats getStats(ShardId shard) {
199+
public QueryCacheStats getStats(ShardId shard, Supplier<Long> precomputedSharedRamBytesUsed) {
148200
final QueryCacheStats queryCacheStats = toQueryCacheStatsSafe(shardStats.get(shard));
149-
queryCacheStats.addRamBytesUsed(getShareOfAdditionalRamBytesUsed(queryCacheStats.getCacheSize()));
201+
queryCacheStats.addRamBytesUsed(precomputedSharedRamBytesUsed.get());
150202
return queryCacheStats;
151203
}
152204

@@ -243,7 +295,7 @@ QueryCacheStats toQueryCacheStats() {
243295
public String toString() {
244296
return "{shardId="
245297
+ shardId
246-
+ ", ramBytedUsed="
298+
+ ", ramBytesUsed="
247299
+ ramBytesUsed
248300
+ ", hitCount="
249301
+ hitCount
@@ -340,11 +392,7 @@ protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) {
340392
shardStats.cacheCount += 1;
341393
shardStats.ramBytesUsed += ramBytesUsed;
342394

343-
StatsAndCount statsAndCount = stats2.get(readerCoreKey);
344-
if (statsAndCount == null) {
345-
statsAndCount = new StatsAndCount(shardStats);
346-
stats2.put(readerCoreKey, statsAndCount);
347-
}
395+
StatsAndCount statsAndCount = stats2.computeIfAbsent(readerCoreKey, ignored -> new StatsAndCount(shardStats));
348396
statsAndCount.count += 1;
349397
}
350398

@@ -357,7 +405,7 @@ protected void onDocIdSetEviction(Object readerCoreKey, int numEntries, long sum
357405
if (numEntries > 0) {
358406
// We can't use ShardCoreKeyMap here because its core closed
359407
// listener is called before the listener of the cache which
360-
// triggers this eviction. So instead we use use stats2 that
408+
// triggers this eviction. So instead we use stats2 that
361409
// we only evict when nothing is cached anymore on the segment
362410
// instead of relying on close listeners
363411
final StatsAndCount statsAndCount = stats2.get(readerCoreKey);

0 commit comments

Comments
 (0)