Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class WriteLoadConstraintDeciderIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the write load threshold explicitly (and randomly) and calculate the above/below thresholds based on that below? We're relying on defaults as it stands and they might change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. IIUC, you're also proposing randomizing the node's reported thread pool utilization to somewhere above the random threshold? Updated as such. 5d3dd54

.build();
}

@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
final Collection<Class<? extends Plugin>> plugins = new ArrayList<>(super.getMockPlugins());
plugins.add(MockTransportService.TestPlugin.class);
return plugins;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe

return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done 👍 bb30938

}

/**
* Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the
* balancer, specifically the effect of the {@link WriteLoadConstraintDecider}.
*
* Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of
* Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2.
*/
public void testHighNodeWriteLoadPreventsNewShardAllocation() {
final String masterName = internalCluster().startMasterOnlyNode();
final var dataNodes = internalCluster().startDataOnlyNodes(3);
final String firstDataNodeName = dataNodes.get(0);
final String secondDataNodeName = dataNodes.get(1);
final String thirdDataNodeName = dataNodes.get(2);
final String firstDataNodeId = getNodeId(firstDataNodeName);
final String secondDataNodeId = getNodeId(secondDataNodeName);
final String thirdDataNodeId = getNodeId(thirdDataNodeName);
ensureStableCluster(4);

logger.info(
"---> first node name "
+ firstDataNodeName
+ " and ID "
+ firstDataNodeId
+ "; second node name "
+ secondDataNodeName
+ " and ID "
+ secondDataNodeId
+ "; third node name "
+ thirdDataNodeName
+ " and ID "
+ thirdDataNodeId
);

/**
* Exclude assignment of shards to the second and third data nodes via the {@link FilterAllocationDecider} settings.
* Then create an index with many shards, which will all be assigned to the first data node.
*/

logger.info("---> Limit shard assignment to node " + firstDataNodeName + " by excluding the other nodes");
updateClusterSettings(
Settings.builder().put("cluster.routing.allocation.exclude._name", secondDataNodeName + "," + thirdDataNodeName)
);

String indexName = randomIdentifier();
int randomNumberOfShards = randomIntBetween(15, 40); // Pick a high number of shards, so it is clear assignment is not accidental.

var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
var indexRoutingTable = clusterState.routingTable().index(indexName);
if (indexRoutingTable == null) {
return false;
}
return checkShardAssignment(
clusterState.getRoutingNodes(),
indexRoutingTable.getIndex(),
firstDataNodeId,
secondDataNodeId,
thirdDataNodeId,
randomNumberOfShards,
0,
0
);
});

createIndex(
indexName,
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomNumberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build()
);
index(indexName, Integer.toString(randomInt(10)), Collections.singletonMap("foo", "bar"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe

indexRandom(false, indexName, 1);

to make it clear the ID and doc is not important?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact is this even necessary? or does createIndex create all the shards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be necessary, good point. Removed and it works fine 👍

ensureGreen(indexName);

logger.info("---> Waiting for all shards to be assigned to node " + firstDataNodeName);
safeAwait(verifyAssignmentToFirstNodeListener);

/**
* Override the {@link TransportNodeUsageStatsForThreadPoolsAction} and {@link TransportIndicesStatsAction} actions on the data
* nodes to supply artificial write load stats. The stats will show the third node hot-spotting, and that all shards have non-empty
* write load stats (so that the WriteLoadDecider will evaluate assigning them to a node).
*/

final DiscoveryNode firstDiscoveryNode = getDiscoveryNode(firstDataNodeName);
final DiscoveryNode secondDiscoveryNode = getDiscoveryNode(secondDataNodeName);
final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName);
final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
firstDiscoveryNode,
2,
0.5f,
0
);
final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
secondDiscoveryNode,
2,
0.5f,
0
);
final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
thirdDiscoveryNode,
2,
1.00f,
0
);

MockTransportService.getInstance(firstDataNodeName).<NodeUsageStatsForThreadPoolsAction.NodeRequest>addRequestHandlingBehavior(
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
(handler, request, channel, task) -> channel.sendResponse(
new NodeUsageStatsForThreadPoolsAction.NodeResponse(firstDiscoveryNode, firstNodeNonHotSpottingNodeStats)
)
);
MockTransportService.getInstance(secondDataNodeName)
.addRequestHandlingBehavior(
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
(handler, request, channel, task) -> channel.sendResponse(
new NodeUsageStatsForThreadPoolsAction.NodeResponse(secondDiscoveryNode, secondNodeNonHotSpottingNodeStats)
)
);
MockTransportService.getInstance(thirdDataNodeName)
.addRequestHandlingBehavior(
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
(handler, request, channel, task) -> channel.sendResponse(
new NodeUsageStatsForThreadPoolsAction.NodeResponse(thirdDiscoveryNode, thirdNodeHotSpottingNodeStats)
)
);

IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.state()
.getMetadata()
.getProject()
.index(indexName);
double shardWriteLoadDefault = 0.2;
MockTransportService.getInstance(firstDataNodeName)
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> {

List<ShardStats> shardStats = new ArrayList<>(indexMetadata.getNumberOfShards());
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
shardStats.add(createShardStats(indexMetadata, i, shardWriteLoadDefault, firstDataNodeId));
}
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName);
channel.sendResponse(instance.new NodeResponse(firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of()));
});
MockTransportService.getInstance(secondDataNodeName)
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> {
// Return no stats for the index because none are assigned to this node.
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be

internalCluster().getInstance(TransportIndicesStatsAction.class, secondDataNodeName);

Also below on the third one (maybe copy-paste error)? It probably doesn't matter because it doesn't look like NodeResponse references anything from the enclosing class when used this way, but it's a bit confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed it should be! Thanks!

channel.sendResponse(instance.new NodeResponse(secondDataNodeId, 0, List.of(), List.of()));
});
MockTransportService.getInstance(thirdDataNodeName)
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> {
// Return no stats for the index because none are assigned to this node.
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName);
channel.sendResponse(instance.new NodeResponse(thirdDataNodeId, 0, List.of(), List.of()));
});

/**
* Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and
* initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the
* second node, since the third is reporting as hot-spotted and should not accept any shards.
*/

logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node");
final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster().getInstance(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe

final InternalClusterInfoService clusterInfoService = asInstanceOf(InternalClusterService.class, internalCluster.getInstance(...

Just to make it fail with an assertion error if it's not what we expect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, done 👍

ClusterInfoService.class,
masterName
);
ClusterInfoServiceUtils.refresh(clusterInfoService);

logger.info(
"---> Update the filter to exclude " + firstDataNodeName + " so that shards will be reassigned away to the other nodes"
);
// Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test.
updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", firstDataNodeName));

safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
Index index = clusterState.routingTable().index(indexName).getIndex();
return checkShardAssignment(
clusterState.getRoutingNodes(),
index,
firstDataNodeId,
secondDataNodeId,
thirdDataNodeId,
0,
randomNumberOfShards,
0
);
}));
}

/**
* Verifies that the {@link RoutingNodes} shows that the expected portion of an index's shards are assigned to each node.
*/
private boolean checkShardAssignment(
RoutingNodes routingNodes,
Index index,
String firstDataNodeId,
String secondDataNodeId,
String thirdDataNodeId,
int firstDataNodeExpectedNumShards,
int secondDataNodeExpectedNumShards,
int thirdDataNodeExpectedNumShards
) {

int firstDataNodeRealNumberOfShards = routingNodes.node(firstDataNodeId).numberOfOwningShardsForIndex(index);
if (firstDataNodeRealNumberOfShards != firstDataNodeExpectedNumShards) {
return false;
}
int secondDataNodeRealNumberOfShards = routingNodes.node(secondDataNodeId).numberOfOwningShardsForIndex(index);
if (secondDataNodeRealNumberOfShards != secondDataNodeExpectedNumShards) {
return false;
}
int thirdDataNodeRealNumberOfShards = routingNodes.node(thirdDataNodeId).numberOfOwningShardsForIndex(index);
if (thirdDataNodeRealNumberOfShards != thirdDataNodeExpectedNumShards) {
return false;
}

return true;
}

private DiscoveryNode getDiscoveryNode(String nodeName) {
final TransportService transportService = internalCluster().getInstance(TransportService.class, nodeName);
assertNotNull(transportService);
return transportService.getLocalNode();
}

/**
* Helper to create a {@link NodeUsageStatsForThreadPools} for the given node with the given WRITE thread pool usage stats.
*/
private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools(
DiscoveryNode discoveryNode,
int totalWriteThreadPoolThreads,
float averageWriteThreadPoolUtilization,
long averageWriteThreadPoolQueueLatencyMillis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it max write thread pool queue latency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy-paste takes another victim. Fixed. Thanks!

) {

// Create thread pool usage stats map for node1.
var writeThreadPoolUsageStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
totalWriteThreadPoolThreads,
averageWriteThreadPoolUtilization,
averageWriteThreadPoolQueueLatencyMillis
);
var threadPoolUsageMap = new HashMap<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats>();
threadPoolUsageMap.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe this is more succinct (and also results in an immutable singleton map)?

var threadPoolUsageMap = Map.of(ThreadPool.Names.WRITE, new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
            totalWriteThreadPoolThreads,
            averageWriteThreadPoolUtilization,
            averageWriteThreadPoolQueueLatencyMillis
        );

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, immutable sounds like a win too 👍


// Create the node's thread pool usage map
return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap);
}

/**
* Helper to create a dummy {@link ShardStats} for the given index shard with the supplied {@code peekWriteLoad} value.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: peak rather than peek?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed 👍

private static ShardStats createShardStats(IndexMetadata indexMeta, int shardIndex, double peekWriteLoad, String assignedShardNodeId) {
ShardId shardId = new ShardId(indexMeta.getIndex(), shardIndex);
Path path = createTempDir().resolve("indices").resolve(indexMeta.getIndexUUID()).resolve(String.valueOf(shardIndex));
ShardRouting shardRouting = ShardRouting.newUnassigned(
shardId,
true,
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null),
ShardRouting.Role.DEFAULT
);
shardRouting = shardRouting.initialize(assignedShardNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
shardRouting = shardRouting.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
CommonStats stats = new CommonStats();
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
stats.store = new StoreStats();
stats.indexing = new IndexingStats(
new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, 234, 234, 1000, 0.123, peekWriteLoad)
);
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public record NodeUsageStatsForThreadPools(String nodeId, Map<String, ThreadPoolUsageStats> threadPoolUsageStatsMap) implements Writeable {

public NodeUsageStatsForThreadPools(StreamInput in) throws IOException {
this(in.readString(), in.readMap(ThreadPoolUsageStats::new));
this(in.readString(), in.readImmutableMap(ThreadPoolUsageStats::new));
Copy link
Contributor Author

@DiannaHohensee DiannaHohensee Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change the NodeUsageStatsForThreadPool constructor to read into an immutable map because the ShardMovements*Simulator expects an immutable map.

}

@Override
Expand Down