Skip to content

Commit 2b8a310

Browse files
authored
Revert "Improving statsByShard performance when the number of shards is very large (#130857)" (#137973) (#137986)
This reverts commit 22c15bc.
1 parent c2a0be3 commit 2b8a310

File tree

11 files changed

+95
-233
lines changed

11 files changed

+95
-233
lines changed

docs/changelog/130857.yaml

Lines changed: 0 additions & 6 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@
4848
import org.elasticsearch.index.seqno.RetentionLeaseStats;
4949
import org.elasticsearch.index.seqno.SeqNoStats;
5050
import org.elasticsearch.index.shard.IndexShard;
51-
import org.elasticsearch.index.shard.ShardId;
52-
import org.elasticsearch.indices.IndicesQueryCache;
5351
import org.elasticsearch.indices.IndicesService;
5452
import org.elasticsearch.injection.guice.Inject;
5553
import org.elasticsearch.node.NodeService;
@@ -259,12 +257,9 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
259257
false,
260258
false
261259
);
262-
Map<ShardId, Long> shardIdToSharedRam = IndicesQueryCache.getSharedRamSizeForAllShards(indicesService);
263260
List<ShardStats> shardsStats = new ArrayList<>();
264261
for (IndexService indexService : indicesService) {
265262
for (IndexShard indexShard : indexService) {
266-
// get the shared ram for this shard id (or zero if there's nothing in the map)
267-
long sharedRam = shardIdToSharedRam.getOrDefault(indexShard.shardId(), 0L);
268263
cancellableTask.ensureNotCancelled();
269264
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
270265
// only report on fully started shards
@@ -285,7 +280,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
285280
new ShardStats(
286281
indexShard.routingEntry(),
287282
indexShard.shardPath(),
288-
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS, sharedRam),
283+
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
289284
commitStats,
290285
seqNoStats,
291286
retentionLeaseStats,
@@ -316,7 +311,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
316311
clusterStatus,
317312
nodeInfo,
318313
nodeStats,
319-
shardsStats.toArray(new ShardStats[0]),
314+
shardsStats.toArray(new ShardStats[shardsStats.size()]),
320315
searchUsageStats,
321316
repositoryUsageStats,
322317
ccsTelemetry,
@@ -478,7 +473,7 @@ protected void sendItemRequest(String clusterAlias, ActionListener<RemoteCluster
478473
@Override
479474
protected void onItemResponse(String clusterAlias, RemoteClusterStatsResponse response) {
480475
if (response != null) {
481-
remoteClustersStats.computeIfPresent(clusterAlias, (ignored, v) -> v.acceptResponse(response));
476+
remoteClustersStats.computeIfPresent(clusterAlias, (k, v) -> v.acceptResponse(response));
482477
}
483478
}
484479

server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,7 @@ public CommonStats(CommonStatsFlags flags) {
154154
/**
155155
* Filters the given flags for {@link CommonStatsFlags#SHARD_LEVEL} flags and calculates the corresponding statistics.
156156
*/
157-
public static CommonStats getShardLevelStats(
158-
IndicesQueryCache indicesQueryCache,
159-
IndexShard indexShard,
160-
CommonStatsFlags flags,
161-
long precomputedSharedRam
162-
) {
157+
public static CommonStats getShardLevelStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) {
163158
// Filter shard level flags
164159
CommonStatsFlags filteredFlags = flags.clone();
165160
for (CommonStatsFlags.Flag flag : filteredFlags.getFlags()) {
@@ -179,7 +174,7 @@ public static CommonStats getShardLevelStats(
179174
case Refresh -> stats.refresh = indexShard.refreshStats();
180175
case Flush -> stats.flush = indexShard.flushStats();
181176
case Warmer -> stats.warmer = indexShard.warmerStats();
182-
case QueryCache -> stats.queryCache = indicesQueryCache.getStats(indexShard.shardId(), precomputedSharedRam);
177+
case QueryCache -> stats.queryCache = indicesQueryCache.getStats(indexShard.shardId());
183178
case FieldData -> stats.fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
184179
case Completion -> stats.completion = indexShard.completionStats(flags.completionDataFields());
185180
case Segments -> stats.segments = indexShard.segmentStats(

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.index.seqno.RetentionLeaseStats;
2828
import org.elasticsearch.index.seqno.SeqNoStats;
2929
import org.elasticsearch.index.shard.IndexShard;
30-
import org.elasticsearch.indices.IndicesQueryCache;
3130
import org.elasticsearch.indices.IndicesService;
3231
import org.elasticsearch.injection.guice.Inject;
3332
import org.elasticsearch.tasks.CancellableTask;
@@ -115,13 +114,7 @@ protected void shardOperation(IndicesStatsRequest request, ShardRouting shardRou
115114
assert task instanceof CancellableTask;
116115
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
117116
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
118-
long sharedRam = IndicesQueryCache.getSharedRamSizeForShard(indicesService, indexShard.shardId());
119-
CommonStats commonStats = CommonStats.getShardLevelStats(
120-
indicesService.getIndicesQueryCache(),
121-
indexShard,
122-
request.flags(),
123-
sharedRam
124-
);
117+
CommonStats commonStats = CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, request.flags());
125118
CommitStats commitStats;
126119
SeqNoStats seqNoStats;
127120
RetentionLeaseStats retentionLeaseStats;

server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java

Lines changed: 33 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,12 @@
2626
import org.elasticsearch.common.unit.ByteSizeValue;
2727
import org.elasticsearch.core.Nullable;
2828
import org.elasticsearch.core.Predicates;
29-
import org.elasticsearch.index.IndexService;
3029
import org.elasticsearch.index.cache.query.QueryCacheStats;
31-
import org.elasticsearch.index.shard.IndexShard;
3230
import org.elasticsearch.index.shard.ShardId;
3331

3432
import java.io.Closeable;
3533
import java.io.IOException;
3634
import java.util.Collections;
37-
import java.util.HashMap;
3835
import java.util.IdentityHashMap;
3936
import java.util.Map;
4037
import java.util.Set;
@@ -70,38 +67,6 @@ public class IndicesQueryCache implements QueryCache, Closeable {
7067
private final Map<ShardId, Stats> shardStats = new ConcurrentHashMap<>();
7168
private volatile long sharedRamBytesUsed;
7269

73-
/**
74-
* Calculates a map of {@link ShardId} to {@link Long} which contains the calculated share of the {@link IndicesQueryCache} shared ram
75-
* size for a given shard (that is, the sum of all the longs is the size of the indices query cache). Since many shards will not
76-
* participate in the cache, shards whose calculated share is zero will not be contained in the map at all. As a consequence, the
77-
* correct pattern for using the returned map will be via {@link Map#getOrDefault(Object, Object)} with a {@code defaultValue} of
78-
* {@code 0L}.
79-
*/
80-
public static Map<ShardId, Long> getSharedRamSizeForAllShards(IndicesService indicesService) {
81-
Map<ShardId, Long> shardIdToSharedRam = new HashMap<>();
82-
IndicesQueryCache.CacheTotals cacheTotals = IndicesQueryCache.getCacheTotalsForAllShards(indicesService);
83-
for (IndexService indexService : indicesService) {
84-
for (IndexShard indexShard : indexService) {
85-
final var queryCache = indicesService.getIndicesQueryCache();
86-
long sharedRam = (queryCache == null) ? 0L : queryCache.getSharedRamSizeForShard(indexShard.shardId(), cacheTotals);
87-
// as a size optimization, only store non-zero values in the map
88-
if (sharedRam > 0L) {
89-
shardIdToSharedRam.put(indexShard.shardId(), sharedRam);
90-
}
91-
}
92-
}
93-
return Collections.unmodifiableMap(shardIdToSharedRam);
94-
}
95-
96-
public long getCacheSizeForShard(ShardId shardId) {
97-
Stats stats = shardStats.get(shardId);
98-
return stats != null ? stats.cacheSize : 0L;
99-
}
100-
101-
public long getSharedRamBytesUsed() {
102-
return sharedRamBytesUsed;
103-
}
104-
10570
// This is a hack for the fact that the close listener for the
10671
// ShardCoreKeyMap will be called before onDocIdSetEviction
10772
// See onDocIdSetEviction for more info
@@ -124,58 +89,40 @@ private static QueryCacheStats toQueryCacheStatsSafe(@Nullable Stats stats) {
12489
return stats == null ? new QueryCacheStats() : stats.toQueryCacheStats();
12590
}
12691

127-
/**
128-
* This computes the total cache size in bytes, and the total shard count in the cache for all shards.
129-
* @param indicesService
130-
* @return A CacheTotals object containing the computed total number of items in the cache and the number of shards seen in the cache
131-
*/
132-
private static CacheTotals getCacheTotalsForAllShards(IndicesService indicesService) {
133-
IndicesQueryCache queryCache = indicesService.getIndicesQueryCache();
134-
boolean hasQueryCache = queryCache != null;
92+
private long getShareOfAdditionalRamBytesUsed(long itemsInCacheForShard) {
93+
if (sharedRamBytesUsed == 0L) {
94+
return 0L;
95+
}
96+
97+
/*
98+
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each
99+
* shard.
100+
*/
101+
// TODO avoid looping over all local shards here - see https://github.com/elastic/elasticsearch/issues/97222
135102
long totalItemsInCache = 0L;
136103
int shardCount = 0;
137-
for (final IndexService indexService : indicesService) {
138-
for (final IndexShard indexShard : indexService) {
139-
final var shardId = indexShard.shardId();
140-
long cacheSize = hasQueryCache ? queryCache.getCacheSizeForShard(shardId) : 0L;
141-
shardCount++;
142-
assert cacheSize >= 0 : "Unexpected cache size of " + cacheSize + " for shard " + shardId;
143-
totalItemsInCache += cacheSize;
104+
if (itemsInCacheForShard == 0L) {
105+
for (final var stats : shardStats.values()) {
106+
shardCount += 1;
107+
if (stats.cacheSize > 0L) {
108+
// some shard has nonzero cache footprint, so we apportion the shared size by cache footprint, and this shard has none
109+
return 0L;
110+
}
111+
}
112+
} else {
113+
// branchless loop for the common case
114+
for (final var stats : shardStats.values()) {
115+
shardCount += 1;
116+
totalItemsInCache += stats.cacheSize;
144117
}
145-
}
146-
return new CacheTotals(totalItemsInCache, shardCount);
147-
}
148-
149-
public static long getSharedRamSizeForShard(IndicesService indicesService, ShardId shardId) {
150-
IndicesQueryCache.CacheTotals cacheTotals = IndicesQueryCache.getCacheTotalsForAllShards(indicesService);
151-
final var queryCache = indicesService.getIndicesQueryCache();
152-
return (queryCache == null) ? 0L : queryCache.getSharedRamSizeForShard(shardId, cacheTotals);
153-
}
154-
155-
/**
156-
* This method computes the shared RAM size in bytes for the given indexShard.
157-
* @param shardId The shard to compute the shared RAM size for
158-
* @param cacheTotals Shard totals computed in getCacheTotalsForAllShards()
159-
* @return the shared RAM size in bytes allocated to the given shard, or 0 if unavailable
160-
*/
161-
private long getSharedRamSizeForShard(ShardId shardId, CacheTotals cacheTotals) {
162-
long sharedRamBytesUsed = getSharedRamBytesUsed();
163-
if (sharedRamBytesUsed == 0L) {
164-
return 0L;
165118
}
166119

167-
int shardCount = cacheTotals.shardCount();
168120
if (shardCount == 0) {
169121
// Sometimes it's not possible to do this when there are no shard entries at all, which can happen as the shared ram usage can
170122
// extend beyond the closing of all shards.
171123
return 0L;
172124
}
173-
/*
174-
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each
175-
* shard.
176-
*/
177-
long totalItemsInCache = cacheTotals.totalItemsInCache();
178-
long itemsInCacheForShard = getCacheSizeForShard(shardId);
125+
179126
final long additionalRamBytesUsed;
180127
if (totalItemsInCache == 0) {
181128
// all shards have zero cache footprint, so we apportion the size of the shared bytes equally across all shards
@@ -196,12 +143,10 @@ private long getSharedRamSizeForShard(ShardId shardId, CacheTotals cacheTotals)
196143
return additionalRamBytesUsed;
197144
}
198145

199-
private record CacheTotals(long totalItemsInCache, int shardCount) {}
200-
201146
/** Get usage statistics for the given shard. */
202-
public QueryCacheStats getStats(ShardId shard, long precomputedSharedRamBytesUsed) {
147+
public QueryCacheStats getStats(ShardId shard) {
203148
final QueryCacheStats queryCacheStats = toQueryCacheStatsSafe(shardStats.get(shard));
204-
queryCacheStats.addRamBytesUsed(precomputedSharedRamBytesUsed);
149+
queryCacheStats.addRamBytesUsed(getShareOfAdditionalRamBytesUsed(queryCacheStats.getCacheSize()));
205150
return queryCacheStats;
206151
}
207152

@@ -298,7 +243,7 @@ QueryCacheStats toQueryCacheStats() {
298243
public String toString() {
299244
return "{shardId="
300245
+ shardId
301-
+ ", ramBytesUsed="
246+
+ ", ramBytedUsed="
302247
+ ramBytesUsed
303248
+ ", hitCount="
304249
+ hitCount
@@ -395,7 +340,11 @@ protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) {
395340
shardStats.cacheCount += 1;
396341
shardStats.ramBytesUsed += ramBytesUsed;
397342

398-
StatsAndCount statsAndCount = stats2.computeIfAbsent(readerCoreKey, ignored -> new StatsAndCount(shardStats));
343+
StatsAndCount statsAndCount = stats2.get(readerCoreKey);
344+
if (statsAndCount == null) {
345+
statsAndCount = new StatsAndCount(shardStats);
346+
stats2.put(readerCoreKey, statsAndCount);
347+
}
399348
statsAndCount.count += 1;
400349
}
401350

@@ -408,7 +357,7 @@ protected void onDocIdSetEviction(Object readerCoreKey, int numEntries, long sum
408357
if (numEntries > 0) {
409358
// We can't use ShardCoreKeyMap here because its core closed
410359
// listener is called before the listener of the cache which
411-
// triggers this eviction. So instead we use stats2 that
360+
// triggers this eviction. So instead we use use stats2 that
412361
// we only evict when nothing is cached anymore on the segment
413362
// instead of relying on close listeners
414363
final StatsAndCount statsAndCount = stats2.get(readerCoreKey);

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -519,36 +519,33 @@ static Map<Index, CommonStats> statsByIndex(final IndicesService indicesService,
519519
}
520520

521521
static Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) {
522-
Map<ShardId, Long> shardIdToSharedRam = IndicesQueryCache.getSharedRamSizeForAllShards(indicesService);
523522
final Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
523+
524524
for (final IndexService indexService : indicesService) {
525525
for (final IndexShard indexShard : indexService) {
526-
// get the shared ram for this shard id (or zero if there's nothing in the map)
527-
long sharedRam = shardIdToSharedRam.getOrDefault(indexShard.shardId(), 0L);
528526
try {
529-
final IndexShardStats indexShardStats = indicesService.indexShardStats(indicesService, indexShard, flags, sharedRam);
527+
final IndexShardStats indexShardStats = indicesService.indexShardStats(indicesService, indexShard, flags);
528+
530529
if (indexShardStats == null) {
531530
continue;
532531
}
532+
533533
if (statsByShard.containsKey(indexService.index()) == false) {
534534
statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats));
535535
} else {
536536
statsByShard.get(indexService.index()).add(indexShardStats);
537537
}
538538
} catch (IllegalIndexShardStateException | AlreadyClosedException e) {
539+
// we can safely ignore illegal state on ones that are closing for example
539540
logger.trace(() -> format("%s ignoring shard stats", indexShard.shardId()), e);
540541
}
541542
}
542543
}
544+
543545
return statsByShard;
544546
}
545547

546-
IndexShardStats indexShardStats(
547-
final IndicesService indicesService,
548-
final IndexShard indexShard,
549-
final CommonStatsFlags flags,
550-
final long precomputedSharedRam
551-
) {
548+
IndexShardStats indexShardStats(final IndicesService indicesService, final IndexShard indexShard, final CommonStatsFlags flags) {
552549
if (indexShard.routingEntry() == null) {
553550
return null;
554551
}
@@ -573,7 +570,7 @@ IndexShardStats indexShardStats(
573570
new ShardStats(
574571
indexShard.routingEntry(),
575572
indexShard.shardPath(),
576-
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, flags, precomputedSharedRam),
573+
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, flags),
577574
commitStats,
578575
seqNoStats,
579576
retentionLeaseStats,

server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public void testCreation() {
115115
ShardStats shardStats = new ShardStats(
116116
shardRouting,
117117
new ShardPath(false, path, path, shardRouting.shardId()),
118-
CommonStats.getShardLevelStats(null, indexShard, new CommonStatsFlags(CommonStatsFlags.Flag.Store), 0L),
118+
CommonStats.getShardLevelStats(null, indexShard, new CommonStatsFlags(CommonStatsFlags.Flag.Store)),
119119
null,
120120
null,
121121
null,

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1626,7 +1626,7 @@ public void testShardStats() throws IOException {
16261626
ShardStats stats = new ShardStats(
16271627
shard.routingEntry(),
16281628
shard.shardPath(),
1629-
CommonStats.getShardLevelStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags(), 0L),
1629+
CommonStats.getShardLevelStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()),
16301630
shard.commitStats(),
16311631
shard.seqNoStats(),
16321632
shard.getRetentionLeaseStats(),

0 commit comments

Comments
 (0)