Skip to content

Commit e76d333

Browse files
pxsalehielasticsearchmachinerjernst
authored
* Revert "Move individual stats fields to AsyncRefresh (#135052)" This reverts commit 2b0153b. * Revert "Track shardStarted events for simulation in DesiredBalanceComputer (#133630)" This reverts commit f248596. * [CI] Update transport version definitions * Revert "[CI] Update transport version definitions" This reverts commit 90f38b0. * Don't reset upper bounds (#135226) Transport version upper bounds are currently set to their values from upstream main whenever no new definition is detected. However, this is like a partial merge of upstream, and produces broken state files. This commit temporarily comments out resetting until a more robust solution is built. * Revert "Don't reset upper bounds (#135226)" This reverts commit ddbac68. --------- Co-authored-by: elasticsearchmachine <[email protected]> Co-authored-by: Ryan Ernst <[email protected]>
1 parent 4d79a59 commit e76d333

File tree

8 files changed

+47
-652
lines changed

8 files changed

+47
-652
lines changed

docs/changelog/133630.yaml

Lines changed: 0 additions & 5 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,12 @@
2020
import org.elasticsearch.common.io.stream.StreamOutput;
2121
import org.elasticsearch.common.io.stream.Writeable;
2222
import org.elasticsearch.common.unit.ByteSizeValue;
23-
import org.elasticsearch.common.util.Maps;
2423
import org.elasticsearch.common.xcontent.ChunkedToXContent;
2524
import org.elasticsearch.index.shard.ShardId;
2625
import org.elasticsearch.xcontent.ToXContent;
2726
import org.elasticsearch.xcontent.XContentBuilder;
2827

2928
import java.io.IOException;
30-
import java.util.Collections;
31-
import java.util.HashMap;
3229
import java.util.HashSet;
3330
import java.util.Iterator;
3431
import java.util.Map;
@@ -68,7 +65,6 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
6865
final Map<ShardId, Double> shardWriteLoads;
6966
// max heap size per node ID
7067
final Map<String, ByteSizeValue> maxHeapSizePerNode;
71-
private final Map<ShardId, Set<String>> shardToNodeIds;
7268

7369
protected ClusterInfo() {
7470
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
@@ -110,7 +106,6 @@ public ClusterInfo(
110106
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools);
111107
this.shardWriteLoads = Map.copyOf(shardWriteLoads);
112108
this.maxHeapSizePerNode = Map.copyOf(maxHeapSizePerNode);
113-
this.shardToNodeIds = computeShardToNodeIds(dataPath);
114109
}
115110

116111
public ClusterInfo(StreamInput in) throws IOException {
@@ -140,23 +135,6 @@ public ClusterInfo(StreamInput in) throws IOException {
140135
} else {
141136
this.maxHeapSizePerNode = Map.of();
142137
}
143-
this.shardToNodeIds = computeShardToNodeIds(dataPath);
144-
}
145-
146-
private static Map<ShardId, Set<String>> computeShardToNodeIds(Map<NodeAndShard, String> dataPath) {
147-
if (dataPath.isEmpty()) {
148-
return Map.of();
149-
}
150-
final var shardToNodeIds = new HashMap<ShardId, Set<String>>();
151-
for (NodeAndShard nodeAndShard : dataPath.keySet()) {
152-
shardToNodeIds.computeIfAbsent(nodeAndShard.shardId, ignore -> new HashSet<>()).add(nodeAndShard.nodeId);
153-
}
154-
Maps.transformValues(shardToNodeIds, nodeIds -> Collections.unmodifiableSet(nodeIds));
155-
return shardToNodeIds;
156-
}
157-
158-
public Set<String> getNodeIdsForShard(ShardId shardId) {
159-
return shardToNodeIds.getOrDefault(shardId, Set.of());
160138
}
161139

162140
@Override
@@ -365,16 +343,6 @@ public Map<String, ByteSizeValue> getMaxHeapSizePerNode() {
365343
return this.maxHeapSizePerNode;
366344
}
367345

368-
/**
369-
* Return true if the shard has moved since the time ClusterInfo was created.
370-
*/
371-
public boolean hasShardMoved(ShardRouting shardRouting) {
372-
// We use dataPath to find out whether a shard is allocated on a node.
373-
// TODO: DataPath is sent with disk usages but thread pool usage is sent separately so that local shard allocation
374-
// may change between the two calls.
375-
return getDataPath(shardRouting) == null;
376-
}
377-
378346
/**
379347
* Method that incorporates the ShardId for the shard into a string that
380348
* includes a 'p' or 'r' depending on whether the shard is a primary.

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,12 @@
99

1010
package org.elasticsearch.cluster;
1111

12-
import org.apache.logging.log4j.LogManager;
13-
import org.apache.logging.log4j.Logger;
1412
import org.elasticsearch.cluster.ClusterInfo.NodeAndShard;
1513
import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator;
1614
import org.elasticsearch.cluster.routing.ShardRouting;
17-
import org.elasticsearch.cluster.routing.UnassignedInfo;
1815
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
1916
import org.elasticsearch.common.unit.ByteSizeValue;
2017
import org.elasticsearch.common.util.CopyOnFirstWriteMap;
21-
import org.elasticsearch.core.Nullable;
2218
import org.elasticsearch.index.shard.ShardId;
2319

2420
import java.util.HashMap;
@@ -29,12 +25,9 @@
2925
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize;
3026
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.shouldReserveSpaceForInitializingShard;
3127
import static org.elasticsearch.cluster.routing.ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE;
32-
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.REINITIALIZED;
3328

3429
public class ClusterInfoSimulator {
3530

36-
private static final Logger logger = LogManager.getLogger(ClusterInfoSimulator.class);
37-
3831
private final RoutingAllocation allocation;
3932

4033
private final Map<String, DiskUsage> leastAvailableSpaceUsage;
@@ -102,7 +95,7 @@ private static Map<String, DiskUsage> getAdjustedDiskSpace(RoutingAllocation all
10295
* Balance is later recalculated with a refreshed cluster info containing actual shards placement.
10396
*/
10497
public void simulateShardStarted(ShardRouting shard) {
105-
assert shard.initializing() : "expected an initializing shard, but got: " + shard;
98+
assert shard.initializing();
10699

107100
var project = allocation.metadata().projectFor(shard.index());
108101
var size = getExpectedShardSize(
@@ -129,36 +122,6 @@ public void simulateShardStarted(ShardRouting shard) {
129122
shardMovementWriteLoadSimulator.simulateShardStarted(shard);
130123
}
131124

132-
/**
133-
* This method simulates starting an already started shard with an optional {@code sourceNodeId} in case of a relocation.
134-
* @param startedShard The shard to simulate. Must be started already.
135-
* @param sourceNodeId The source node ID if the shard started as a result of relocation. {@code null} otherwise.
136-
*/
137-
public void simulateAlreadyStartedShard(ShardRouting startedShard, @Nullable String sourceNodeId) {
138-
assert startedShard.started() : "expected an already started shard, but got: " + startedShard;
139-
if (logger.isDebugEnabled()) {
140-
logger.debug(
141-
"simulated started shard {} on node [{}] as a {}",
142-
startedShard.shardId(),
143-
startedShard.currentNodeId(),
144-
sourceNodeId != null ? "relocating shard from node [" + sourceNodeId + "]" : "new shard"
145-
);
146-
}
147-
final long expectedShardSize = startedShard.getExpectedShardSize();
148-
if (sourceNodeId != null) {
149-
final var relocatingShard = startedShard.moveToUnassigned(new UnassignedInfo(REINITIALIZED, "simulation"))
150-
.initialize(sourceNodeId, null, expectedShardSize)
151-
.moveToStarted(expectedShardSize)
152-
.relocate(startedShard.currentNodeId(), expectedShardSize)
153-
.getTargetRelocatingShard();
154-
simulateShardStarted(relocatingShard);
155-
} else {
156-
final var initializingShard = startedShard.moveToUnassigned(new UnassignedInfo(REINITIALIZED, "simulation"))
157-
.initialize(startedShard.currentNodeId(), null, expectedShardSize);
158-
simulateShardStarted(initializingShard);
159-
}
160-
}
161-
162125
private void modifyDiskUsage(String nodeId, long freeDelta) {
163126
if (freeDelta == 0) {
164127
return;

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
100100
private volatile TimeValue updateFrequency;
101101
private volatile TimeValue fetchTimeout;
102102

103+
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
104+
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
105+
private volatile Map<String, ByteSizeValue> maxHeapPerNode;
106+
private volatile Map<String, Long> estimatedHeapUsagePerNode;
107+
private volatile Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStatsPerNode;
108+
private volatile IndicesStatsSummary indicesStatsSummary;
109+
103110
private final ThreadPool threadPool;
104111
private final Client client;
105112
private final Supplier<ClusterState> clusterStateSupplier;
@@ -113,8 +120,6 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
113120
private AsyncRefresh currentRefresh;
114121
private RefreshScheduler refreshScheduler;
115122

116-
private volatile ClusterInfo currentClusterInfo = ClusterInfo.EMPTY;
117-
118123
@SuppressWarnings("this-escape")
119124
public InternalClusterInfoService(
120125
Settings settings,
@@ -124,6 +129,12 @@ public InternalClusterInfoService(
124129
EstimatedHeapUsageCollector estimatedHeapUsageCollector,
125130
NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector
126131
) {
132+
this.leastAvailableSpaceUsages = Map.of();
133+
this.mostAvailableSpaceUsages = Map.of();
134+
this.maxHeapPerNode = Map.of();
135+
this.estimatedHeapUsagePerNode = Map.of();
136+
this.nodeThreadPoolUsageStatsPerNode = Map.of();
137+
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
127138
this.threadPool = threadPool;
128139
this.client = client;
129140
this.estimatedHeapUsageCollector = estimatedHeapUsageCollector;
@@ -197,13 +208,6 @@ public void clusterChanged(ClusterChangedEvent event) {
197208

198209
private class AsyncRefresh {
199210

200-
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
201-
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
202-
private volatile Map<String, ByteSizeValue> maxHeapPerNode;
203-
private volatile Map<String, Long> estimatedHeapUsagePerNode;
204-
private volatile Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStatsPerNode;
205-
private volatile IndicesStatsSummary indicesStatsSummary;
206-
207211
private final List<ActionListener<ClusterInfo>> thisRefreshListeners;
208212
private final RefCountingRunnable fetchRefs = new RefCountingRunnable(this::callListeners);
209213

@@ -449,7 +453,7 @@ public void onFailure(Exception e) {
449453
private void callListeners() {
450454
try {
451455
logger.trace("stats all received, computing cluster info and notifying listeners");
452-
final ClusterInfo clusterInfo = updateAndGetCurrentClusterInfo();
456+
final ClusterInfo clusterInfo = getClusterInfo();
453457
boolean anyListeners = false;
454458
for (final Consumer<ClusterInfo> listener : listeners) {
455459
anyListeners = true;
@@ -469,32 +473,6 @@ private void callListeners() {
469473
onRefreshComplete(this);
470474
}
471475
}
472-
473-
private ClusterInfo updateAndGetCurrentClusterInfo() {
474-
final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read
475-
final Map<String, EstimatedHeapUsage> estimatedHeapUsages = new HashMap<>();
476-
final var currentMaxHeapPerNode = this.maxHeapPerNode; // Make sure we use a consistent view
477-
currentMaxHeapPerNode.forEach((nodeId, maxHeapSize) -> {
478-
final Long estimatedHeapUsage = estimatedHeapUsagePerNode.get(nodeId);
479-
if (estimatedHeapUsage != null) {
480-
estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
481-
}
482-
});
483-
final var newClusterInfo = new ClusterInfo(
484-
leastAvailableSpaceUsages,
485-
mostAvailableSpaceUsages,
486-
indicesStatsSummary.shardSizes,
487-
indicesStatsSummary.shardDataSetSizes,
488-
indicesStatsSummary.dataPath,
489-
indicesStatsSummary.reservedSpace,
490-
estimatedHeapUsages,
491-
nodeThreadPoolUsageStatsPerNode,
492-
indicesStatsSummary.shardWriteLoads(),
493-
currentMaxHeapPerNode
494-
);
495-
currentClusterInfo = newClusterInfo;
496-
return newClusterInfo;
497-
}
498476
}
499477

500478
private void onRefreshComplete(AsyncRefresh completedRefresh) {
@@ -559,7 +537,27 @@ private boolean shouldRefresh() {
559537

560538
@Override
561539
public ClusterInfo getClusterInfo() {
562-
return currentClusterInfo;
540+
final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read
541+
final Map<String, EstimatedHeapUsage> estimatedHeapUsages = new HashMap<>();
542+
final var currentMaxHeapPerNode = this.maxHeapPerNode; // Make sure we use a consistent view
543+
currentMaxHeapPerNode.forEach((nodeId, maxHeapSize) -> {
544+
final Long estimatedHeapUsage = estimatedHeapUsagePerNode.get(nodeId);
545+
if (estimatedHeapUsage != null) {
546+
estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
547+
}
548+
});
549+
return new ClusterInfo(
550+
leastAvailableSpaceUsages,
551+
mostAvailableSpaceUsages,
552+
indicesStatsSummary.shardSizes,
553+
indicesStatsSummary.shardDataSetSizes,
554+
indicesStatsSummary.dataPath,
555+
indicesStatsSummary.reservedSpace,
556+
estimatedHeapUsages,
557+
nodeThreadPoolUsageStatsPerNode,
558+
indicesStatsSummary.shardWriteLoads(),
559+
currentMaxHeapPerNode
560+
);
563561
}
564562

565563
// allow tests to adjust the node stats on receipt

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,7 @@
1212
import org.apache.logging.log4j.Level;
1313
import org.apache.logging.log4j.LogManager;
1414
import org.apache.logging.log4j.Logger;
15-
import org.elasticsearch.cluster.ClusterInfo;
1615
import org.elasticsearch.cluster.ClusterInfoSimulator;
17-
import org.elasticsearch.cluster.node.DiscoveryNode;
18-
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
19-
import org.elasticsearch.cluster.routing.RoutingNode;
2016
import org.elasticsearch.cluster.routing.RoutingNodes;
2117
import org.elasticsearch.cluster.routing.ShardRouting;
2218
import org.elasticsearch.cluster.routing.UnassignedInfo;
@@ -144,8 +140,6 @@ public DesiredBalance compute(
144140
return new DesiredBalance(desiredBalanceInput.index(), Map.of(), Map.of(), finishReason);
145141
}
146142

147-
maybeSimulateAlreadyStartedShards(desiredBalanceInput.routingAllocation().clusterInfo(), routingNodes, clusterInfoSimulator);
148-
149143
// we assume that all ongoing recoveries will complete
150144
for (final var routingNode : routingNodes) {
151145
for (final var shardRouting : routingNode) {
@@ -489,73 +483,6 @@ public DesiredBalance compute(
489483
return new DesiredBalance(lastConvergedIndex, assignments, routingNodes.getBalanceWeightStatsPerNode(), finishReason);
490484
}
491485

492-
/**
493-
* For shards started after initial polling of the ClusterInfo but before the next polling, we need to
494-
* account for their impacts by simulating the events, either relocation or new shard start. This is done
495-
* by comparing the current RoutingNodes against the shard allocation information from the ClusterInfo to
496-
* find out the shard allocation changes. Note this approach is approximate in some edge cases:
497-
* <ol>
498-
* <li> If a shard is relocated twice from node A to B to C. It is considered as relocating from A to C directly
499-
* for simulation purpose.</li>
500-
* <li>If a shard has 2 replicas and they both relocate, replica 1 from A to X and replica 2 from B to Y. The
501-
* simulation may see them as relocations A->X and B->Y. But it may also see them as A->Y and B->X. </li>
502-
* </ol>
503-
* In both cases, it should not really matter for simulation to account for resource changes.
504-
*/
505-
static void maybeSimulateAlreadyStartedShards(
506-
ClusterInfo clusterInfo,
507-
RoutingNodes routingNodes,
508-
ClusterInfoSimulator clusterInfoSimulator
509-
) {
510-
// Find all shards that are started in RoutingNodes but have no data on corresponding node in ClusterInfo
511-
final var startedShards = new ArrayList<ShardRouting>();
512-
for (var routingNode : routingNodes) {
513-
for (var shardRouting : routingNode.started()) {
514-
if (clusterInfo.hasShardMoved(shardRouting)) {
515-
startedShards.add(shardRouting);
516-
}
517-
}
518-
}
519-
if (startedShards.isEmpty()) {
520-
return;
521-
}
522-
logger.debug(
523-
"Found [{}] started shards not accounted in ClusterInfo. The first one is {}",
524-
startedShards.size(),
525-
startedShards.getFirst()
526-
);
527-
528-
// For started shards, attempt to find its source node. If found, it is a relocation, otherwise it is a new shard.
529-
// The same shard on the same source node cannot be relocated twice to different nodes. So we exclude it once used.
530-
final Map<ShardId, Set<String>> alreadySeenSourceNodes = new HashMap<>();
531-
for (var startedShard : startedShards) {
532-
// The source node is found by checking whether the ClusterInfo has a node hosting a shard with the same ShardId
533-
// and has compatible node role. If multiple nodes are found, simply pick the first one.
534-
final var sourceNodeId = clusterInfo.getNodeIdsForShard(startedShard.shardId())
535-
.stream()
536-
// Do not use the same source node twice for the same shard
537-
.filter(nodeId -> alreadySeenSourceNodes.getOrDefault(startedShard.shardId(), Set.of()).contains(nodeId) == false)
538-
.map(routingNodes::node)
539-
// The source node must not currently host the shard
540-
.filter(routingNode -> routingNode != null && routingNode.getByShardId(startedShard.shardId()) == null)
541-
.map(RoutingNode::node)
542-
// The source node must have compatible node roles
543-
.filter(node -> node != null && switch (startedShard.role()) {
544-
case DEFAULT -> node.canContainData();
545-
case INDEX_ONLY -> node.getRoles().contains(DiscoveryNodeRole.INDEX_ROLE);
546-
case SEARCH_ONLY -> node.getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE);
547-
})
548-
.map(DiscoveryNode::getId)
549-
.findFirst()
550-
.orElse(null);
551-
552-
if (sourceNodeId != null) {
553-
alreadySeenSourceNodes.computeIfAbsent(startedShard.shardId(), k -> new HashSet<>()).add(sourceNodeId);
554-
}
555-
clusterInfoSimulator.simulateAlreadyStartedShard(startedShard, sourceNodeId);
556-
}
557-
}
558-
559486
private void maybeLogAllocationExplainForUnassigned(
560487
DesiredBalance.ComputationFinishReason finishReason,
561488
RoutingNodes routingNodes,

0 commit comments

Comments
 (0)