Skip to content

Commit 0755992

Browse files
authored
Reapply "Track shardStarted events for simulation in DesiredBalanceComputer" (#135597)
This PR reapplies both #133630 and #135052 with a performance bug fix. The original PR #133630 had a severe impact on throughput for index creation. It was reverted with #135369. The flamegraph suggests the system spent a lot time to compute shard assignments on ClusterInfo instantiation time. But that is unnecessary since they do not change within a single polling interval. This PR fixes it by reuse the last value and avoid recomputation. Copying the original commit message here If a shard starts on the target node before the next ClusterInfo polling, today we don't include it for the simulation. With this PR, we track shards that can potentially start within one ClusterInfo polling cycle so that they are always included in simulation. The tracking is reset when a new ClusterInfo arrives. Resolves: ES-12723
1 parent c9f440e commit 0755992

File tree

8 files changed

+710
-67
lines changed

8 files changed

+710
-67
lines changed

docs/changelog/135597.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135597
2+
summary: Track `shardStarted` events for simulation in `DesiredBalanceComputer`
3+
area: Allocation
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import org.elasticsearch.common.io.stream.StreamOutput;
2222
import org.elasticsearch.common.io.stream.Writeable;
2323
import org.elasticsearch.common.unit.ByteSizeValue;
24+
import org.elasticsearch.common.util.Maps;
2425
import org.elasticsearch.common.xcontent.ChunkedToXContent;
2526
import org.elasticsearch.index.shard.ShardId;
2627
import org.elasticsearch.xcontent.ToXContent;
2728
import org.elasticsearch.xcontent.XContentBuilder;
2829

2930
import java.io.IOException;
31+
import java.util.Collections;
32+
import java.util.HashMap;
3033
import java.util.HashSet;
3134
import java.util.Iterator;
3235
import java.util.Map;
@@ -69,6 +72,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable, ExpectedShardS
6972
final Map<ShardId, Double> shardWriteLoads;
7073
// max heap size per node ID
7174
final Map<String, ByteSizeValue> maxHeapSizePerNode;
75+
private final Map<ShardId, Set<String>> shardToNodeIds;
7276

7377
protected ClusterInfo() {
7478
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
@@ -99,6 +103,34 @@ public ClusterInfo(
99103
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools,
100104
Map<ShardId, Double> shardWriteLoads,
101105
Map<String, ByteSizeValue> maxHeapSizePerNode
106+
) {
107+
this(
108+
leastAvailableSpaceUsage,
109+
mostAvailableSpaceUsage,
110+
shardSizes,
111+
shardDataSetSizes,
112+
dataPath,
113+
reservedSpace,
114+
estimatedHeapUsages,
115+
nodeUsageStatsForThreadPools,
116+
shardWriteLoads,
117+
maxHeapSizePerNode,
118+
computeShardToNodeIds(dataPath)
119+
);
120+
}
121+
122+
private ClusterInfo(
123+
Map<String, DiskUsage> leastAvailableSpaceUsage,
124+
Map<String, DiskUsage> mostAvailableSpaceUsage,
125+
Map<String, Long> shardSizes,
126+
Map<ShardId, Long> shardDataSetSizes,
127+
Map<NodeAndShard, String> dataPath,
128+
Map<NodeAndPath, ReservedSpace> reservedSpace,
129+
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
130+
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools,
131+
Map<ShardId, Double> shardWriteLoads,
132+
Map<String, ByteSizeValue> maxHeapSizePerNode,
133+
Map<ShardId, Set<String>> shardToNodeIds
102134
) {
103135
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
104136
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
@@ -110,6 +142,7 @@ public ClusterInfo(
110142
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools);
111143
this.shardWriteLoads = Map.copyOf(shardWriteLoads);
112144
this.maxHeapSizePerNode = Map.copyOf(maxHeapSizePerNode);
145+
this.shardToNodeIds = shardToNodeIds;
113146
}
114147

115148
public ClusterInfo(StreamInput in) throws IOException {
@@ -139,6 +172,46 @@ public ClusterInfo(StreamInput in) throws IOException {
139172
} else {
140173
this.maxHeapSizePerNode = Map.of();
141174
}
175+
this.shardToNodeIds = computeShardToNodeIds(dataPath);
176+
}
177+
178+
ClusterInfo updateWith(
179+
Map<String, DiskUsage> leastAvailableSpaceUsage,
180+
Map<String, DiskUsage> mostAvailableSpaceUsage,
181+
Map<String, Long> shardSizes,
182+
Map<NodeAndPath, ReservedSpace> reservedSpace,
183+
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
184+
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools
185+
) {
186+
return new ClusterInfo(
187+
leastAvailableSpaceUsage,
188+
mostAvailableSpaceUsage,
189+
shardSizes,
190+
shardDataSetSizes,
191+
dataPath,
192+
reservedSpace,
193+
estimatedHeapUsages,
194+
nodeUsageStatsForThreadPools,
195+
shardWriteLoads,
196+
maxHeapSizePerNode,
197+
shardToNodeIds
198+
);
199+
}
200+
201+
private static Map<ShardId, Set<String>> computeShardToNodeIds(Map<NodeAndShard, String> dataPath) {
202+
if (dataPath.isEmpty()) {
203+
return Map.of();
204+
}
205+
final var shardToNodeIds = new HashMap<ShardId, Set<String>>();
206+
for (NodeAndShard nodeAndShard : dataPath.keySet()) {
207+
shardToNodeIds.computeIfAbsent(nodeAndShard.shardId, ignore -> new HashSet<>()).add(nodeAndShard.nodeId);
208+
}
209+
return Collections.unmodifiableMap(Maps.transformValues(shardToNodeIds, Collections::unmodifiableSet));
210+
}
211+
212+
public Set<String> getNodeIdsForShard(ShardId shardId) {
213+
assert shardToNodeIds != null : "shardToNodeIds not computed for simulations, make sure this ClusterInfo is from polling";
214+
return shardToNodeIds.getOrDefault(shardId, Set.of());
142215
}
143216

144217
@Override
@@ -325,6 +398,16 @@ public Map<String, ByteSizeValue> getMaxHeapSizePerNode() {
325398
return this.maxHeapSizePerNode;
326399
}
327400

401+
/**
402+
* Return true if the shard has moved since the time ClusterInfo was created.
403+
*/
404+
public boolean hasShardMoved(ShardRouting shardRouting) {
405+
// We use dataPath to find out whether a shard is allocated on a node.
406+
// TODO: DataPath is sent with disk usages but thread pool usage is sent separately so that local shard allocation
407+
// may change between the two calls.
408+
return getDataPath(shardRouting) == null;
409+
}
410+
328411
/**
329412
* Method that incorporates the ShardId for the shard into a string that
330413
* includes a 'p' or 'r' depending on whether the shard is a primary.

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99

1010
package org.elasticsearch.cluster;
1111

12-
import org.elasticsearch.cluster.ClusterInfo.NodeAndShard;
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
1314
import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator;
1415
import org.elasticsearch.cluster.routing.ShardRouting;
16+
import org.elasticsearch.cluster.routing.UnassignedInfo;
1517
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
16-
import org.elasticsearch.common.unit.ByteSizeValue;
1718
import org.elasticsearch.common.util.CopyOnFirstWriteMap;
19+
import org.elasticsearch.core.Nullable;
1820
import org.elasticsearch.index.shard.ShardId;
1921

2022
import java.util.HashMap;
@@ -25,29 +27,26 @@
2527
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize;
2628
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.shouldReserveSpaceForInitializingShard;
2729
import static org.elasticsearch.cluster.routing.ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE;
30+
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.REINITIALIZED;
2831

2932
public class ClusterInfoSimulator {
3033

34+
private static final Logger logger = LogManager.getLogger(ClusterInfoSimulator.class);
35+
3136
private final RoutingAllocation allocation;
3237

3338
private final Map<String, DiskUsage> leastAvailableSpaceUsage;
3439
private final Map<String, DiskUsage> mostAvailableSpaceUsage;
3540
private final CopyOnFirstWriteMap<String, Long> shardSizes;
36-
private final Map<ShardId, Long> shardDataSetSizes;
37-
private final Map<NodeAndShard, String> dataPath;
3841
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
39-
private final Map<String, ByteSizeValue> maxHeapSizePerNode;
4042
private final ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator;
4143

4244
public ClusterInfoSimulator(RoutingAllocation allocation) {
4345
this.allocation = allocation;
4446
this.leastAvailableSpaceUsage = getAdjustedDiskSpace(allocation, allocation.clusterInfo().getNodeLeastAvailableDiskUsages());
4547
this.mostAvailableSpaceUsage = getAdjustedDiskSpace(allocation, allocation.clusterInfo().getNodeMostAvailableDiskUsages());
4648
this.shardSizes = new CopyOnFirstWriteMap<>(allocation.clusterInfo().shardSizes);
47-
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
48-
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
4949
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
50-
this.maxHeapSizePerNode = Map.copyOf(allocation.clusterInfo().maxHeapSizePerNode);
5150
this.shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
5251
}
5352

@@ -95,7 +94,7 @@ private static Map<String, DiskUsage> getAdjustedDiskSpace(RoutingAllocation all
9594
* Balance is later recalculated with a refreshed cluster info containing actual shards placement.
9695
*/
9796
public void simulateShardStarted(ShardRouting shard) {
98-
assert shard.initializing();
97+
assert shard.initializing() : "expected an initializing shard, but got: " + shard;
9998

10099
var project = allocation.metadata().projectFor(shard.index());
101100
var size = getExpectedShardSize(
@@ -122,6 +121,36 @@ public void simulateShardStarted(ShardRouting shard) {
122121
shardMovementWriteLoadSimulator.simulateShardStarted(shard);
123122
}
124123

124+
/**
125+
* This method simulates starting an already started shard with an optional {@code sourceNodeId} in case of a relocation.
126+
* @param startedShard The shard to simulate. Must be started already.
127+
* @param sourceNodeId The source node ID if the shard started as a result of relocation. {@code null} otherwise.
128+
*/
129+
public void simulateAlreadyStartedShard(ShardRouting startedShard, @Nullable String sourceNodeId) {
130+
assert startedShard.started() : "expected an already started shard, but got: " + startedShard;
131+
if (logger.isDebugEnabled()) {
132+
logger.debug(
133+
"simulated started shard {} on node [{}] as a {}",
134+
startedShard.shardId(),
135+
startedShard.currentNodeId(),
136+
sourceNodeId != null ? "relocating shard from node [" + sourceNodeId + "]" : "new shard"
137+
);
138+
}
139+
final long expectedShardSize = startedShard.getExpectedShardSize();
140+
if (sourceNodeId != null) {
141+
final var relocatingShard = startedShard.moveToUnassigned(new UnassignedInfo(REINITIALIZED, "simulation"))
142+
.initialize(sourceNodeId, null, expectedShardSize)
143+
.moveToStarted(expectedShardSize)
144+
.relocate(startedShard.currentNodeId(), expectedShardSize)
145+
.getTargetRelocatingShard();
146+
simulateShardStarted(relocatingShard);
147+
} else {
148+
final var initializingShard = startedShard.moveToUnassigned(new UnassignedInfo(REINITIALIZED, "simulation"))
149+
.initialize(startedShard.currentNodeId(), null, expectedShardSize);
150+
simulateShardStarted(initializingShard);
151+
}
152+
}
153+
125154
private void modifyDiskUsage(String nodeId, long freeDelta) {
126155
if (freeDelta == 0) {
127156
return;
@@ -156,17 +185,14 @@ private static long withinRange(long min, long max, long value) {
156185
}
157186

158187
public ClusterInfo getClusterInfo() {
159-
return new ClusterInfo(
160-
leastAvailableSpaceUsage,
161-
mostAvailableSpaceUsage,
162-
shardSizes.toImmutableMap(),
163-
shardDataSetSizes,
164-
dataPath,
165-
Map.of(),
166-
estimatedHeapUsages,
167-
shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(),
168-
allocation.clusterInfo().getShardWriteLoads(),
169-
maxHeapSizePerNode
170-
);
188+
return allocation.clusterInfo()
189+
.updateWith(
190+
leastAvailableSpaceUsage,
191+
mostAvailableSpaceUsage,
192+
shardSizes.toImmutableMap(),
193+
Map.of(),
194+
estimatedHeapUsages,
195+
shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools()
196+
);
171197
}
172198
}

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,6 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
100100
private volatile TimeValue updateFrequency;
101101
private volatile TimeValue fetchTimeout;
102102

103-
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
104-
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
105-
private volatile Map<String, ByteSizeValue> maxHeapPerNode;
106-
private volatile Map<String, Long> estimatedHeapUsagePerNode;
107-
private volatile Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStatsPerNode;
108-
private volatile IndicesStatsSummary indicesStatsSummary;
109-
110103
private final ThreadPool threadPool;
111104
private final Client client;
112105
private final Supplier<ClusterState> clusterStateSupplier;
@@ -120,6 +113,8 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
120113
private AsyncRefresh currentRefresh;
121114
private RefreshScheduler refreshScheduler;
122115

116+
private volatile ClusterInfo currentClusterInfo = ClusterInfo.EMPTY;
117+
123118
@SuppressWarnings("this-escape")
124119
public InternalClusterInfoService(
125120
Settings settings,
@@ -129,12 +124,6 @@ public InternalClusterInfoService(
129124
EstimatedHeapUsageCollector estimatedHeapUsageCollector,
130125
NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector
131126
) {
132-
this.leastAvailableSpaceUsages = Map.of();
133-
this.mostAvailableSpaceUsages = Map.of();
134-
this.maxHeapPerNode = Map.of();
135-
this.estimatedHeapUsagePerNode = Map.of();
136-
this.nodeThreadPoolUsageStatsPerNode = Map.of();
137-
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
138127
this.threadPool = threadPool;
139128
this.client = client;
140129
this.estimatedHeapUsageCollector = estimatedHeapUsageCollector;
@@ -208,6 +197,13 @@ public void clusterChanged(ClusterChangedEvent event) {
208197

209198
private class AsyncRefresh {
210199

200+
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
201+
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
202+
private volatile Map<String, ByteSizeValue> maxHeapPerNode;
203+
private volatile Map<String, Long> estimatedHeapUsagePerNode;
204+
private volatile Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStatsPerNode;
205+
private volatile IndicesStatsSummary indicesStatsSummary;
206+
211207
private final List<ActionListener<ClusterInfo>> thisRefreshListeners;
212208
private final RefCountingRunnable fetchRefs = new RefCountingRunnable(this::callListeners);
213209

@@ -453,7 +449,7 @@ public void onFailure(Exception e) {
453449
private void callListeners() {
454450
try {
455451
logger.trace("stats all received, computing cluster info and notifying listeners");
456-
final ClusterInfo clusterInfo = getClusterInfo();
452+
final ClusterInfo clusterInfo = updateAndGetCurrentClusterInfo();
457453
boolean anyListeners = false;
458454
for (final Consumer<ClusterInfo> listener : listeners) {
459455
anyListeners = true;
@@ -473,6 +469,30 @@ private void callListeners() {
473469
onRefreshComplete(this);
474470
}
475471
}
472+
473+
private ClusterInfo updateAndGetCurrentClusterInfo() {
474+
final Map<String, EstimatedHeapUsage> estimatedHeapUsages = new HashMap<>();
475+
maxHeapPerNode.forEach((nodeId, maxHeapSize) -> {
476+
final Long estimatedHeapUsage = estimatedHeapUsagePerNode.get(nodeId);
477+
if (estimatedHeapUsage != null) {
478+
estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
479+
}
480+
});
481+
final var newClusterInfo = new ClusterInfo(
482+
leastAvailableSpaceUsages,
483+
mostAvailableSpaceUsages,
484+
indicesStatsSummary.shardSizes,
485+
indicesStatsSummary.shardDataSetSizes,
486+
indicesStatsSummary.dataPath,
487+
indicesStatsSummary.reservedSpace,
488+
estimatedHeapUsages,
489+
nodeThreadPoolUsageStatsPerNode,
490+
indicesStatsSummary.shardWriteLoads(),
491+
maxHeapPerNode
492+
);
493+
currentClusterInfo = newClusterInfo;
494+
return newClusterInfo;
495+
}
476496
}
477497

478498
private void onRefreshComplete(AsyncRefresh completedRefresh) {
@@ -537,27 +557,7 @@ private boolean shouldRefresh() {
537557

538558
@Override
539559
public ClusterInfo getClusterInfo() {
540-
final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read
541-
final Map<String, EstimatedHeapUsage> estimatedHeapUsages = new HashMap<>();
542-
final var currentMaxHeapPerNode = this.maxHeapPerNode; // Make sure we use a consistent view
543-
currentMaxHeapPerNode.forEach((nodeId, maxHeapSize) -> {
544-
final Long estimatedHeapUsage = estimatedHeapUsagePerNode.get(nodeId);
545-
if (estimatedHeapUsage != null) {
546-
estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
547-
}
548-
});
549-
return new ClusterInfo(
550-
leastAvailableSpaceUsages,
551-
mostAvailableSpaceUsages,
552-
indicesStatsSummary.shardSizes,
553-
indicesStatsSummary.shardDataSetSizes,
554-
indicesStatsSummary.dataPath,
555-
indicesStatsSummary.reservedSpace,
556-
estimatedHeapUsages,
557-
nodeThreadPoolUsageStatsPerNode,
558-
indicesStatsSummary.shardWriteLoads(),
559-
currentMaxHeapPerNode
560-
);
560+
return currentClusterInfo;
561561
}
562562

563563
// allow tests to adjust the node stats on receipt

0 commit comments

Comments
 (0)