Skip to content

Commit 2f1c7c2

Browse files
davidkyleDonalEvans
authored andcommitted
[ML] Filter out shutting down nodes from the zone mapper (elastic#134899)
Avoids a warning message to be logged repeatedly.
1 parent 715f680 commit 2f1c7c2

File tree

7 files changed

+37
-67
lines changed

7 files changed

+37
-67
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartTrainedModelDeploymentAction.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.xcontent.ToXContentObject;
2828
import org.elasticsearch.xcontent.XContentBuilder;
2929
import org.elasticsearch.xcontent.XContentParser;
30-
import org.elasticsearch.xpack.core.ml.MlConfigVersion;
3130
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
3231
import org.elasticsearch.xpack.core.ml.inference.assignment.AdaptiveAllocationsSettings;
3332
import org.elasticsearch.xpack.core.ml.inference.assignment.AllocationStatus;
@@ -445,17 +444,10 @@ public String toString() {
445444

446445
public static class TaskParams implements MlTaskParams, Writeable, ToXContentObject {
447446

448-
// TODO add support for other roles? If so, it may have to be an instance method...
449-
// NOTE, whatever determines assignment should not be dynamically set on the node
450-
// Otherwise assignment logic might fail
451447
public static boolean mayAssignToNode(@Nullable DiscoveryNode node) {
452-
return node != null
453-
&& node.getRoles().contains(DiscoveryNodeRole.ML_ROLE)
454-
&& MlConfigVersion.fromNode(node).onOrAfter(VERSION_INTRODUCED);
448+
return node != null && node.getRoles().contains(DiscoveryNodeRole.ML_ROLE);
455449
}
456450

457-
public static final MlConfigVersion VERSION_INTRODUCED = MlConfigVersion.V_8_0_0;
458-
459451
private static final ParseField MODEL_BYTES = new ParseField("model_bytes");
460452
public static final ParseField NUMBER_OF_ALLOCATIONS = new ParseField("number_of_allocations");
461453
public static final ParseField THREADS_PER_ALLOCATION = new ParseField("threads_per_allocation");
@@ -646,10 +638,6 @@ public long estimateMemoryUsageBytes() {
646638
);
647639
}
648640

649-
public MlConfigVersion getMinimalSupportedVersion() {
650-
return VERSION_INTRODUCED;
651-
}
652-
653641
@Override
654642
public void writeTo(StreamOutput out) throws IOException {
655643
out.writeString(modelId);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/AbstractNodeAvailabilityZoneMapper.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@
1111
import org.elasticsearch.cluster.ClusterStateListener;
1212
import org.elasticsearch.cluster.node.DiscoveryNode;
1313
import org.elasticsearch.cluster.node.DiscoveryNodes;
14-
import org.elasticsearch.common.settings.ClusterSettings;
15-
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.core.Nullable;
1615

1716
import java.util.Collection;
1817
import java.util.List;
@@ -24,14 +23,18 @@ public abstract class AbstractNodeAvailabilityZoneMapper implements ClusterState
2423
private volatile Map<List<String>, Collection<DiscoveryNode>> allNodesByAvailabilityZone;
2524
private volatile Map<List<String>, Collection<DiscoveryNode>> mlNodesByAvailabilityZone;
2625

27-
public AbstractNodeAvailabilityZoneMapper(Settings settings, ClusterSettings clusterSettings) {
28-
this(settings, clusterSettings, null);
29-
}
30-
31-
public AbstractNodeAvailabilityZoneMapper(Settings settings, ClusterSettings clusterSettings, DiscoveryNodes discoveryNodes) {
26+
public AbstractNodeAvailabilityZoneMapper(@Nullable DiscoveryNodes discoveryNodes) {
3227
lastDiscoveryNodes = discoveryNodes;
3328
}
3429

30+
/**
31+
* Build a node by zone map for all nodes in the cluster and
32+
* for just the ml nodes.
33+
* @param discoveryNodes All the nodes in the cluster
34+
* @return All nodes by zone and ml nodes by zone.
35+
*/
36+
protected abstract NodesByAvailabilityZone buildNodesByAvailabilityZone(Collection<DiscoveryNode> discoveryNodes);
37+
3538
/**
3639
* @return A map whose keys are lists of attributes that together define an availability zone, and whose values are
3740
* collections of nodes that have that combination of attributes. If availability zones
@@ -86,7 +89,7 @@ synchronized void updateNodesByAvailabilityZone() {
8689
mlNodesByAvailabilityZone = allNodesByAvailabilityZone;
8790
return;
8891
}
89-
NodesByAvailabilityZone nodesByAvailabilityZone = buildNodesByAvailabilityZone(lastDiscoveryNodes);
92+
NodesByAvailabilityZone nodesByAvailabilityZone = buildNodesByAvailabilityZone(lastDiscoveryNodes.getAllNodes());
9093
this.allNodesByAvailabilityZone = nodesByAvailabilityZone.allNodes();
9194
this.mlNodesByAvailabilityZone = nodesByAvailabilityZone.mlNodes();
9295
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/NodeAvailabilityZoneMapper.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77

88
package org.elasticsearch.xpack.ml.autoscaling;
99

10-
import org.elasticsearch.cluster.ClusterState;
1110
import org.elasticsearch.cluster.node.DiscoveryNode;
12-
import org.elasticsearch.cluster.node.DiscoveryNodes;
1311

1412
import java.util.Collection;
1513
import java.util.List;
@@ -18,17 +16,15 @@
1816

1917
public interface NodeAvailabilityZoneMapper {
2018
/**
21-
* @param clusterState The specific cluster state whose nodes will be used to detect ML nodes by availability zone.
19+
* @param mlNodes The nodes which will be used to detect ML nodes by availability zone.
2220
* @return A map whose keys are conceptually lists of availability zone attributes, and whose values are collections
2321
* of nodes corresponding to the availability zone attributes.
2422
* An empty map will be returned if there are no ML nodes in the cluster.
2523
*/
26-
Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(ClusterState clusterState);
24+
Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(List<DiscoveryNode> mlNodes);
2725

2826
OptionalInt getNumMlAvailabilityZones();
2927

30-
NodesByAvailabilityZone buildNodesByAvailabilityZone(DiscoveryNodes discoveryNodes);
31-
3228
record NodesByAvailabilityZone(
3329
Map<List<String>, Collection<DiscoveryNode>> allNodes,
3430
Map<List<String>, Collection<DiscoveryNode>> mlNodes

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/NodeFakeAvailabilityZoneMapper.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.ml.autoscaling;
99

10-
import org.elasticsearch.cluster.ClusterState;
1110
import org.elasticsearch.cluster.node.DiscoveryNode;
1211
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
1312
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -31,7 +30,7 @@ public NodeFakeAvailabilityZoneMapper(Settings settings, ClusterSettings cluster
3130

3231
@SuppressWarnings("this-escape")
3332
public NodeFakeAvailabilityZoneMapper(Settings settings, ClusterSettings clusterSettings, DiscoveryNodes discoveryNodes) {
34-
super(settings, clusterSettings, discoveryNodes);
33+
super(discoveryNodes);
3534
updateNodesByAvailabilityZone();
3635
}
3736

@@ -41,12 +40,11 @@ public NodeFakeAvailabilityZoneMapper(Settings settings, ClusterSettings cluster
4140
* of nodes corresponding to the node ids. An empty map will be returned if there are no ML nodes in the
4241
* cluster.
4342
*/
44-
public NodesByAvailabilityZone buildNodesByAvailabilityZone(DiscoveryNodes discoveryNodes) {
45-
Collection<DiscoveryNode> nodes = discoveryNodes.getNodes().values();
46-
43+
@Override
44+
protected NodesByAvailabilityZone buildNodesByAvailabilityZone(Collection<DiscoveryNode> discoveryNodes) {
4745
Map<List<String>, Collection<DiscoveryNode>> allNodesByAvailabilityZone = new HashMap<>();
4846
Map<List<String>, Collection<DiscoveryNode>> mlNodesByAvailabilityZone = new HashMap<>();
49-
for (DiscoveryNode node : nodes) {
47+
for (DiscoveryNode node : discoveryNodes) {
5048
List<String> nodeIdValues = List.of(node.getId());
5149
List<DiscoveryNode> nodeList = List.of(node);
5250
allNodesByAvailabilityZone.put(nodeIdValues, nodeList);
@@ -61,13 +59,13 @@ public NodesByAvailabilityZone buildNodesByAvailabilityZone(DiscoveryNodes disco
6159
* This is different to {@link #getMlNodesByAvailabilityZone()} in that the latter returns the ML nodes by (fake) availability zone
6260
* of the latest cluster state, while this method does the same for a specific cluster state.
6361
*
64-
* @param clusterState The cluster state whose nodes will be used to detect ML nodes by fake availability zone.
62+
* @param discoveryNodes The nodes used to detect ML nodes by fake availability zone.
6563
* @return A map whose keys are single item lists of node id values, and whose values are single item collections
6664
* of nodes corresponding to the node ids. An empty map will be returned if there are no ML nodes in the
6765
* cluster.
6866
*/
6967
@Override
70-
public Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(ClusterState clusterState) {
71-
return buildNodesByAvailabilityZone(clusterState.nodes()).mlNodes();
68+
public Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(List<DiscoveryNode> discoveryNodes) {
69+
return buildNodesByAvailabilityZone(discoveryNodes).mlNodes();
7270
}
7371
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/NodeRealAvailabilityZoneMapper.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12-
import org.elasticsearch.cluster.ClusterState;
1312
import org.elasticsearch.cluster.node.DiscoveryNode;
1413
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
1514
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -40,7 +39,7 @@ public NodeRealAvailabilityZoneMapper(Settings settings, ClusterSettings cluster
4039

4140
@SuppressWarnings("this-escape")
4241
public NodeRealAvailabilityZoneMapper(Settings settings, ClusterSettings clusterSettings, DiscoveryNodes discoveryNodes) {
43-
super(settings, clusterSettings, discoveryNodes);
42+
super(discoveryNodes);
4443
awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
4544
updateNodesByAvailabilityZone();
4645
clusterSettings.addSettingsUpdateConsumer(
@@ -58,13 +57,12 @@ public List<String> getAwarenessAttributes() {
5857
return awarenessAttributes;
5958
}
6059

61-
public NodesByAvailabilityZone buildNodesByAvailabilityZone(DiscoveryNodes discoveryNodes) {
60+
@Override
61+
protected NodesByAvailabilityZone buildNodesByAvailabilityZone(Collection<DiscoveryNode> discoveryNodes) {
6262
return buildNodesByAvailabilityZone(discoveryNodes, awarenessAttributes);
6363
}
6464

65-
private static NodesByAvailabilityZone buildNodesByAvailabilityZone(DiscoveryNodes discoveryNodes, List<String> awarenessAttributes) {
66-
Collection<DiscoveryNode> nodes = discoveryNodes.getNodes().values();
67-
65+
private static NodesByAvailabilityZone buildNodesByAvailabilityZone(Collection<DiscoveryNode> nodes, List<String> awarenessAttributes) {
6866
if (awarenessAttributes.isEmpty()) {
6967
return new NodesByAvailabilityZone(
7068
Map.of(List.of(), nodes),
@@ -110,7 +108,7 @@ private static NodesByAvailabilityZone buildNodesByAvailabilityZone(DiscoveryNod
110108
* This is different to {@link #getMlNodesByAvailabilityZone()} in that the latter returns the ML nodes by availability zone
111109
* of the latest cluster state, while this method does the same for a specific cluster state.
112110
*
113-
* @param clusterState The cluster state whose nodes will be used to detect ML nodes by availability zone.
111+
* @param nodes The nodes which will be used to detect ML nodes by availability zone.
114112
* @return A map whose keys are lists of awareness attribute values in the same order as the configured awareness attribute
115113
* names, and whose values are collections of nodes that have that combination of attributes. If availability zones
116114
* are not configured then the map will contain one entry mapping an empty list to a collection of all nodes. If
@@ -119,7 +117,7 @@ private static NodesByAvailabilityZone buildNodesByAvailabilityZone(DiscoveryNod
119117
* distinguished by calling one of the other methods.)
120118
*/
121119
@Override
122-
public Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(ClusterState clusterState) {
123-
return buildNodesByAvailabilityZone(clusterState.nodes(), awarenessAttributes).mlNodes();
120+
public Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(List<DiscoveryNode> nodes) {
121+
return buildNodesByAvailabilityZone(nodes, awarenessAttributes).mlNodes();
124122
}
125123
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -613,21 +613,21 @@ && detectNodeLoads(sourceNodes, source).equals(detectNodeLoads(targetNodes, targ
613613
private TrainedModelAssignmentMetadata.Builder rebalanceAssignments(
614614
ClusterState currentState,
615615
Optional<CreateTrainedModelAssignmentAction.Request> createAssignmentRequest
616-
) throws Exception {
617-
List<DiscoveryNode> nodes = getAssignableNodes(currentState);
618-
logger.debug(() -> format("assignable nodes are %s", nodes.stream().map(DiscoveryNode::getId).toList()));
619-
Map<DiscoveryNode, NodeLoad> nodeLoads = detectNodeLoads(nodes, currentState);
616+
) {
617+
List<DiscoveryNode> assignableNodes = getAssignableNodes(currentState);
618+
logger.debug(() -> format("assignable nodes are %s", assignableNodes.stream().map(DiscoveryNode::getId).toList()));
619+
Map<DiscoveryNode, NodeLoad> nodeLoads = detectNodeLoads(assignableNodes, currentState);
620620
TrainedModelAssignmentMetadata currentMetadata = TrainedModelAssignmentMetadata.fromState(currentState);
621621

622622
TrainedModelAssignmentRebalancer rebalancer = new TrainedModelAssignmentRebalancer(
623623
currentMetadata,
624624
nodeLoads,
625-
nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState),
625+
nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(assignableNodes),
626626
createAssignmentRequest,
627627
allocatedProcessorsScale
628628
);
629629

630-
Set<String> shuttingDownNodeIds = currentState.metadata().nodeShutdowns().getAllNodeIds();
630+
Set<String> shuttingDownNodeIds = nodesShuttingDown(currentState);
631631
/*
632632
* To signal that we should gracefully stop the deployments routed to a particular node we set the routing state to stopping.
633633
* The TrainedModelAssignmentNodeService will see that the route is in stopping for a shutting down node and gracefully shut down
@@ -643,7 +643,7 @@ private TrainedModelAssignmentMetadata.Builder rebalanceAssignments(
643643
checkModelIsFullyAllocatedIfScalingIsNotPossible(
644644
createAssignmentRequest.get().getTaskParams().getDeploymentId(),
645645
rebalanced,
646-
nodes
646+
assignableNodes
647647
);
648648
}
649649

@@ -968,8 +968,9 @@ private void decreaseNumberOfAllocations(
968968
AdaptiveAllocationsSettings adaptiveAllocationsSettings,
969969
ActionListener<TrainedModelAssignmentMetadata.Builder> listener
970970
) {
971+
List<DiscoveryNode> assignableNodes = getAssignableNodes(clusterState);
971972
TrainedModelAssignment.Builder updatedAssignment = numberOfAllocations < assignment.totalTargetAllocations()
972-
? new AllocationReducer(assignment, nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(clusterState)).reduceTo(
973+
? new AllocationReducer(assignment, nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(assignableNodes)).reduceTo(
973974
numberOfAllocations
974975
)
975976
: TrainedModelAssignment.Builder.fromAssignment(assignment).setNumberOfAllocations(numberOfAllocations);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.ElasticsearchStatusException;
1616
import org.elasticsearch.ResourceAlreadyExistsException;
1717
import org.elasticsearch.ResourceNotFoundException;
18-
import org.elasticsearch.Version;
1918
import org.elasticsearch.action.ActionListener;
2019
import org.elasticsearch.action.LatchedActionListener;
2120
import org.elasticsearch.client.internal.Client;
@@ -439,7 +438,6 @@ public void testCreateAssignment_GivenModelCannotByFullyAllocated_AndScalingIsPo
439438
.add(buildNode("ml-node-without-room", true, 1000L, 2))
440439
.add(buildNode("not-ml-node", false, ByteSizeValue.ofGb(4).getBytes(), 2))
441440
.add(buildNode("ml-node-shutting-down", true, ByteSizeValue.ofGb(4).getBytes(), 2))
442-
.add(buildOldNode("old-ml-node-with-room", true, ByteSizeValue.ofGb(4).getBytes(), 2))
443441
.build();
444442
nodeAvailabilityZoneMapper = randomFrom(
445443
new NodeRealAvailabilityZoneMapper(settings, clusterSettings, discoveryNodes),
@@ -489,7 +487,6 @@ public void testCreateAssignment_GivenModelCannotByFullyAllocated_AndScalingIsNo
489487
.add(buildNode("ml-node-without-room", true, 1000L, 2))
490488
.add(buildNode("not-ml-node", false, ByteSizeValue.ofGb(4).getBytes(), 2))
491489
.add(buildNode("ml-node-shutting-down", true, ByteSizeValue.ofGb(4).getBytes(), 2))
492-
.add(buildOldNode("old-ml-node-with-room", true, ByteSizeValue.ofGb(4).getBytes(), 2))
493490
.build();
494491
nodeAvailabilityZoneMapper = randomFrom(
495492
new NodeRealAvailabilityZoneMapper(settings, clusterSettings, discoveryNodes),
@@ -2139,17 +2136,6 @@ private static RoutingInfoUpdate started() {
21392136
return RoutingInfoUpdate.updateStateAndReason(new RoutingStateAndReason(RoutingState.STARTED, ""));
21402137
}
21412138

2142-
private static DiscoveryNode buildOldNode(String name, boolean isML, long nativeMemory, int allocatedProcessors) {
2143-
return buildNode(
2144-
name,
2145-
isML,
2146-
nativeMemory,
2147-
allocatedProcessors,
2148-
VersionInformation.inferVersions(Version.V_7_15_0),
2149-
MlConfigVersion.V_7_15_0
2150-
);
2151-
}
2152-
21532139
private static StartTrainedModelDeploymentAction.TaskParams newParams(String modelId, long modelSize) {
21542140
return newParams(modelId, modelSize, 1, 1);
21552141
}

0 commit comments

Comments
 (0)