Skip to content

Commit d0b8068

Browse files
authored
[Backport][ML] Add setting to scale the processor count used in the model assignment planner (#98299)
* [ML] Add setting to scale the processor count used in the model assignment planner (#98296) Adds the ml.allocated_processors_scale setting which is used to scale the value of ml.allocated_processors_double. This setting influences the number of model allocations that can fit on a node # Conflicts: # x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java # x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java # x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java * non operator
1 parent 2649050 commit d0b8068

File tree

11 files changed

+187
-53
lines changed

11 files changed

+187
-53
lines changed

docs/changelog/98296.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 98296
2+
summary: Add setting to scale the processor count used in the model assignment planner
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,27 @@ public void loadExtensions(ExtensionLoader loader) {
575575
public static final String PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors";
576576

577577
public static final String ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors_double";
578+
579+
/**
580+
* For the NLP model assignment planner.
581+
* The {@link #ALLOCATED_PROCESSORS_NODE_ATTR} attribute may be
582+
* measured in hyper-threaded or virtual cores when the user
583+
* would like the planner to consider logical cores.
584+
*
585+
* ALLOCATED_PROCESSORS_NODE_ATTR is divided by this setting,
586+
* the default value of 1 means the attribute is unchanged, a value
587+
* of 2 accounts for hyper-threaded cores with 2 threads per core.
588+
* Increasing this setting above 1 reduces the number of model
589+
* allocations that can be deployed on a node.
590+
*/
591+
public static final Setting<Integer> ALLOCATED_PROCESSORS_SCALE = Setting.intSetting(
592+
"xpack.ml.allocated_processors_scale",
593+
1,
594+
1,
595+
Property.Dynamic,
596+
Property.NodeScope
597+
);
598+
578599
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting(
579600
"xpack.ml.node_concurrent_job_allocations",
580601
2,
@@ -752,6 +773,7 @@ public static boolean isMlNode(DiscoveryNode node) {
752773
@Override
753774
public List<Setting<?>> getSettings() {
754775
return List.of(
776+
ALLOCATED_PROCESSORS_SCALE,
755777
MachineLearningField.AUTODETECT_PROCESS,
756778
PROCESS_CONNECT_TIMEOUT,
757779
CONCURRENT_JOB_ALLOCATIONS,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,10 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
9494
final MlAutoscalingContext mlContext = new MlAutoscalingContext(clusterState);
9595
final NativeMemoryCapacity currentNativeMemoryCapacity = memoryDecider.currentScale(mlContext.mlNodes);
9696
final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
97-
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(mlContext.mlNodes);
97+
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(
98+
mlContext.mlNodes,
99+
configuration
100+
);
98101

99102
final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
100103
.setCurrentMlCapacity(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci
259259
}
260260
// We should keep this check here as well as in the processor decider while cloud is not
261261
// reacting to processor autoscaling.
262-
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes)) {
262+
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes, configuration)) {
263263
logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
264264
return null;
265265
}
@@ -825,11 +825,15 @@ static MlMemoryAutoscalingCapacity ensureScaleDown(
825825
return newCapacity;
826826
}
827827

828-
static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection<TrainedModelAssignment> assignments, List<DiscoveryNode> mlNodes) {
828+
static boolean modelAssignmentsRequireMoreThanHalfCpu(
829+
Collection<TrainedModelAssignment> assignments,
830+
List<DiscoveryNode> mlNodes,
831+
Settings settings
832+
) {
829833
int totalRequiredProcessors = assignments.stream()
830834
.mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
831835
.sum();
832-
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node).roundUp()).sum();
836+
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, settings).roundUp()).sum();
833837
return totalRequiredProcessors * 2 > totalMlProcessors;
834838
}
835839

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
5252
).build();
5353
}
5454

55-
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes);
55+
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, configuration);
5656

5757
final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();
5858

@@ -64,7 +64,8 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
6464

6565
if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
6666
trainedModelAssignmentMetadata.allAssignments().values(),
67-
mlContext.mlNodes
67+
mlContext.mlNodes,
68+
configuration
6869
)) {
6970
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
7071
.setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
@@ -136,11 +137,11 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo
136137
);
137138
}
138139

139-
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes) {
140+
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, Settings settings) {
140141
Processors maxNodeProcessors = Processors.ZERO;
141142
Processors tierProcessors = Processors.ZERO;
142143
for (DiscoveryNode node : mlNodes) {
143-
Processors nodeProcessors = MlProcessors.get(node);
144+
Processors nodeProcessors = MlProcessors.get(node, settings);
144145
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
145146
maxNodeProcessors = nodeProcessors;
146147
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ private TrainedModelAssignmentMetadata.Builder rebalanceAssignments(
489489
nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState),
490490
modelToAdd
491491
);
492-
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance();
492+
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(clusterService.getSettings());
493493
if (modelToAdd.isPresent()) {
494494
checkModelIsFullyAllocatedIfScalingIsNotPossible(modelToAdd.get().getDeploymentId(), rebalanced, nodes);
495495
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.ResourceAlreadyExistsException;
1414
import org.elasticsearch.cluster.node.DiscoveryNode;
1515
import org.elasticsearch.common.Strings;
16+
import org.elasticsearch.common.settings.Settings;
1617
import org.elasticsearch.common.unit.ByteSizeValue;
1718
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
1819
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
@@ -63,7 +64,7 @@ class TrainedModelAssignmentRebalancer {
6364
this.deploymentToAdd = Objects.requireNonNull(deploymentToAdd);
6465
}
6566

66-
TrainedModelAssignmentMetadata.Builder rebalance() throws Exception {
67+
TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) {
6768
if (deploymentToAdd.isPresent() && currentMetadata.hasDeployment(deploymentToAdd.get().getDeploymentId())) {
6869
throw new ResourceAlreadyExistsException(
6970
"[{}] assignment for deployment with model [{}] already exists",
@@ -77,8 +78,8 @@ TrainedModelAssignmentMetadata.Builder rebalance() throws Exception {
7778
return TrainedModelAssignmentMetadata.Builder.fromMetadata(currentMetadata);
7879
}
7980

80-
AssignmentPlan assignmentPlan = computeAssignmentPlan();
81-
return buildAssignmentsFromPlan(assignmentPlan);
81+
AssignmentPlan assignmentPlan = computeAssignmentPlan(settings);
82+
return buildAssignmentsFromPlan(assignmentPlan, settings);
8283
}
8384

8485
private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
@@ -91,8 +92,8 @@ private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
9192
return true;
9293
}
9394

94-
AssignmentPlan computeAssignmentPlan() {
95-
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap();
95+
AssignmentPlan computeAssignmentPlan(Settings settings) {
96+
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap(settings);
9697
final Set<String> assignableNodeIds = nodesByZone.values()
9798
.stream()
9899
.flatMap(List::stream)
@@ -270,7 +271,7 @@ private Map<String, Integer> findFittingAssignments(
270271
return fittingAssignments;
271272
}
272273

273-
private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
274+
private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap(Settings settings) {
274275
return mlNodesByZone.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> {
275276
Collection<DiscoveryNode> discoveryNodes = e.getValue();
276277
List<AssignmentPlan.Node> nodes = new ArrayList<>();
@@ -284,7 +285,7 @@ private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
284285
// We subtract native inference memory as the planner expects available memory for
285286
// native inference including current assignments.
286287
getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load),
287-
MlProcessors.get(discoveryNode).roundUp()
288+
MlProcessors.get(discoveryNode, settings).roundUp()
288289
)
289290
);
290291
} else {
@@ -304,7 +305,7 @@ private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(
304305
return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory();
305306
}
306307

307-
private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan) {
308+
private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan, Settings settings) {
308309
TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.Builder.empty();
309310
for (AssignmentPlan.Deployment deployment : assignmentPlan.models()) {
310311
TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.id());
@@ -342,7 +343,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
342343
}
343344
assignmentBuilder.calculateAndSetAssignmentState();
344345

345-
explainAssignments(assignmentPlan, nodeLoads, deployment).ifPresent(assignmentBuilder::setReason);
346+
explainAssignments(assignmentPlan, nodeLoads, deployment, settings).ifPresent(assignmentBuilder::setReason);
346347
builder.addNewAssignment(deployment.id(), assignmentBuilder);
347348
}
348349
return builder;
@@ -351,7 +352,8 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
351352
private Optional<String> explainAssignments(
352353
AssignmentPlan assignmentPlan,
353354
Map<DiscoveryNode, NodeLoad> nodeLoads,
354-
AssignmentPlan.Deployment deployment
355+
AssignmentPlan.Deployment deployment,
356+
Settings settings
355357
) {
356358
if (assignmentPlan.satisfiesAllocations(deployment)) {
357359
return Optional.empty();
@@ -363,7 +365,7 @@ private Optional<String> explainAssignments(
363365

364366
Map<String, String> nodeToReason = new TreeMap<>();
365367
for (Map.Entry<DiscoveryNode, NodeLoad> nodeAndLoad : nodeLoads.entrySet()) {
366-
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment);
368+
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment, settings);
367369
reason.ifPresent(s -> nodeToReason.put(nodeAndLoad.getKey().getId(), s));
368370
}
369371

@@ -382,7 +384,8 @@ private Optional<String> explainAssignment(
382384
AssignmentPlan assignmentPlan,
383385
DiscoveryNode node,
384386
NodeLoad load,
385-
AssignmentPlan.Deployment deployment
387+
AssignmentPlan.Deployment deployment,
388+
Settings settings
386389
) {
387390
if (Strings.isNullOrEmpty(load.getError()) == false) {
388391
return Optional.of(load.getError());
@@ -395,7 +398,7 @@ private Optional<String> explainAssignment(
395398
// But we should also check if we managed to assign a model during the rebalance for which
396399
// we check if the node has used up any of its allocated processors.
397400
boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0
398-
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node).roundUp();
401+
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, settings).roundUp();
399402
long requiredMemory = deployment.memoryBytes() + (isPerNodeOverheadAccountedFor
400403
? 0
401404
: MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes());
@@ -424,7 +427,7 @@ private Optional<String> explainAssignment(
424427
"This node has insufficient allocated processors. Available processors [{}], free processors [{}], "
425428
+ "processors required for each allocation of this model [{}]",
426429
new Object[] {
427-
MlProcessors.get(node).roundUp(),
430+
MlProcessors.get(node, settings).roundUp(),
428431
assignmentPlan.getRemainingNodeCores(node.getId()),
429432
deployment.threadsPerAllocation() }
430433
)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@
99

1010
import org.elasticsearch.Version;
1111
import org.elasticsearch.cluster.node.DiscoveryNode;
12+
import org.elasticsearch.common.settings.Settings;
1213
import org.elasticsearch.common.unit.Processors;
1314
import org.elasticsearch.xpack.ml.MachineLearning;
1415

1516
public final class MlProcessors {
1617

1718
private MlProcessors() {}
1819

19-
public static Processors get(DiscoveryNode node) {
20+
public static Processors get(DiscoveryNode node, Settings settings) {
2021
String allocatedProcessorsString = node.getVersion().onOrAfter(Version.V_8_5_0)
2122
? node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR)
2223
: node.getAttributes().get(MachineLearning.PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR);
@@ -25,7 +26,19 @@ public static Processors get(DiscoveryNode node) {
2526
}
2627
try {
2728
double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
28-
return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
29+
if (processorsAsDouble <= 0) {
30+
return Processors.ZERO;
31+
}
32+
33+
Integer scale = null;
34+
if (settings != null) {
35+
scale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
36+
}
37+
if (scale != null) {
38+
processorsAsDouble = processorsAsDouble / scale;
39+
}
40+
return Processors.of(processorsAsDouble);
41+
2942
} catch (NumberFormatException e) {
3043
assert e == null
3144
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,7 +1079,8 @@ public void testCpuModelAssignmentRequirements() {
10791079
)
10801080
).build()
10811081
),
1082-
withMlNodes("ml_node_1", "ml_node_2")
1082+
withMlNodes("ml_node_1", "ml_node_2"),
1083+
Settings.EMPTY
10831084
)
10841085
);
10851086
assertTrue(
@@ -1110,7 +1111,8 @@ public void testCpuModelAssignmentRequirements() {
11101111
)
11111112
).build()
11121113
),
1113-
withMlNodes("ml_node_1", "ml_node_2")
1114+
withMlNodes("ml_node_1", "ml_node_2"),
1115+
Settings.EMPTY
11141116
)
11151117
);
11161118
assertFalse(
@@ -1141,7 +1143,8 @@ public void testCpuModelAssignmentRequirements() {
11411143
)
11421144
).build()
11431145
),
1144-
withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4")
1146+
withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4"),
1147+
Settings.EMPTY
11451148
)
11461149
);
11471150
}

0 commit comments

Comments
 (0)