Skip to content

Commit 66ebc1e

Browse files
authored
[ML] Make the scale the processor count setting updatable (#98305)
Makes the setting added in #98299 updatable.
1 parent d0b8068 commit 66ebc1e

12 files changed

+197
-114
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
2121
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
2222
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
23+
import org.elasticsearch.xpack.ml.MachineLearning;
2324
import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
2425
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
2526

@@ -43,6 +44,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
4344
private final MlProcessorAutoscalingDecider processorDecider;
4445

4546
private volatile boolean isMaster;
47+
private volatile int allocatedProcessorsScale;
4648

4749
public MlAutoscalingDeciderService(
4850
MlMemoryTracker memoryTracker,
@@ -69,7 +71,15 @@ public MlAutoscalingDeciderService(
6971
scaleTimer
7072
);
7173
this.processorDecider = new MlProcessorAutoscalingDecider(scaleTimer);
74+
this.allocatedProcessorsScale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
75+
7276
clusterService.addLocalNodeMasterListener(this);
77+
clusterService.getClusterSettings()
78+
.addSettingsUpdateConsumer(MachineLearning.ALLOCATED_PROCESSORS_SCALE, this::setAllocatedProcessorsScale);
79+
}
80+
81+
void setAllocatedProcessorsScale(int scale) {
82+
this.allocatedProcessorsScale = scale;
7383
}
7484

7585
@Override
@@ -96,7 +106,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
96106
final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
97107
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(
98108
mlContext.mlNodes,
99-
configuration
109+
allocatedProcessorsScale
100110
);
101111

102112
final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
@@ -123,7 +133,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
123133
return downscaleToZero(configuration, context, currentNativeMemoryCapacity, reasonBuilder);
124134
}
125135

126-
MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext);
136+
MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext, allocatedProcessorsScale);
127137
if (memoryCapacity.isUndetermined()) {
128138
// If we cannot determine memory capacity we shouldn't make any autoscaling decision
129139
// as it could lead to undesired capacity. For example, it could be that the processor decider decides
@@ -134,7 +144,12 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
134144
reasonBuilder.setSimpleReason(format("[memory_decider] %s", memoryCapacity.reason())).build()
135145
);
136146
}
137-
MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(configuration, context, mlContext);
147+
MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(
148+
configuration,
149+
context,
150+
mlContext,
151+
allocatedProcessorsScale
152+
);
138153
reasonBuilder.setSimpleReason(
139154
format("[memory_decider] %s; [processor_decider] %s", memoryCapacity.reason(), processorCapacity.reason())
140155
);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,12 @@ void setMaxMlNodeSize(ByteSizeValue maxMlNodeSize) {
119119
}
120120
}
121121

122-
public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
122+
public MlMemoryAutoscalingCapacity scale(
123+
Settings configuration,
124+
AutoscalingDeciderContext context,
125+
MlAutoscalingContext mlContext,
126+
int allocatedProcessorsScale
127+
) {
123128
final ClusterState clusterState = context.state();
124129

125130
scaleTimer.lastScaleToScaleIntervalMillis()
@@ -259,7 +264,11 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci
259264
}
260265
// We should keep this check here as well as in the processor decider while cloud is not
261266
// reacting to processor autoscaling.
262-
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes, configuration)) {
267+
if (modelAssignmentsRequireMoreThanHalfCpu(
268+
mlContext.modelAssignments.values(),
269+
mlContext.mlNodes,
270+
allocatedProcessorsScale
271+
)) {
263272
logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
264273
return null;
265274
}
@@ -828,12 +837,12 @@ static MlMemoryAutoscalingCapacity ensureScaleDown(
828837
static boolean modelAssignmentsRequireMoreThanHalfCpu(
829838
Collection<TrainedModelAssignment> assignments,
830839
List<DiscoveryNode> mlNodes,
831-
Settings settings
840+
int allocatedProcessorsScale
832841
) {
833842
int totalRequiredProcessors = assignments.stream()
834843
.mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
835844
.sum();
836-
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, settings).roundUp()).sum();
845+
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, allocatedProcessorsScale).roundUp()).sum();
837846
return totalRequiredProcessors * 2 > totalMlProcessors;
838847
}
839848

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ class MlProcessorAutoscalingDecider {
4141
this.scaleTimer = Objects.requireNonNull(scaleTimer);
4242
}
4343

44-
public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
44+
public MlProcessorAutoscalingCapacity scale(
45+
Settings configuration,
46+
AutoscalingDeciderContext context,
47+
MlAutoscalingContext mlContext,
48+
int allocatedProcessorsScale
49+
) {
4550
TrainedModelAssignmentMetadata trainedModelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(context.state());
4651

4752
if (hasUnsatisfiedDeployments(trainedModelAssignmentMetadata, mlContext.mlNodes)) {
@@ -52,7 +57,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
5257
).build();
5358
}
5459

55-
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, configuration);
60+
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, allocatedProcessorsScale);
5661

5762
final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();
5863

@@ -65,7 +70,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
6570
if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
6671
trainedModelAssignmentMetadata.allAssignments().values(),
6772
mlContext.mlNodes,
68-
configuration
73+
allocatedProcessorsScale
6974
)) {
7075
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
7176
.setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
@@ -137,11 +142,11 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo
137142
);
138143
}
139144

140-
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, Settings settings) {
145+
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, int allocatedProcessorsScale) {
141146
Processors maxNodeProcessors = Processors.ZERO;
142147
Processors tierProcessors = Processors.ZERO;
143148
for (DiscoveryNode node : mlNodes) {
144-
Processors nodeProcessors = MlProcessors.get(node, settings);
149+
Processors nodeProcessors = MlProcessors.get(node, allocatedProcessorsScale);
145150
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
146151
maxNodeProcessors = nodeProcessors;
147152
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
8080
private volatile int maxOpenJobs;
8181
protected volatile int maxLazyMLNodes;
8282
protected volatile long maxMLNodeSize;
83+
protected volatile int allocatedProcessorsScale;
8384

8485
public TrainedModelAssignmentClusterService(
8586
Settings settings,
@@ -99,6 +100,7 @@ public TrainedModelAssignmentClusterService(
99100
this.maxOpenJobs = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings);
100101
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
101102
this.maxMLNodeSize = MachineLearning.MAX_ML_NODE_SIZE.get(settings).getBytes();
103+
this.allocatedProcessorsScale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
102104
// Only nodes that can possibly be master nodes really need this service running
103105
if (DiscoveryNode.isMasterNode(settings)) {
104106
clusterService.addListener(this);
@@ -109,6 +111,8 @@ public TrainedModelAssignmentClusterService(
109111
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs);
110112
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
111113
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_ML_NODE_SIZE, this::setMaxMLNodeSize);
114+
clusterService.getClusterSettings()
115+
.addSettingsUpdateConsumer(MachineLearning.ALLOCATED_PROCESSORS_SCALE, this::setAllocatedProcessorsScale);
112116
}
113117
}
114118

@@ -132,6 +136,10 @@ private void setMaxMLNodeSize(ByteSizeValue value) {
132136
this.maxMLNodeSize = value.getBytes();
133137
}
134138

139+
private void setAllocatedProcessorsScale(int scale) {
140+
this.allocatedProcessorsScale = scale;
141+
}
142+
135143
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
136144
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
137145
clusterService.submitUnbatchedStateUpdateTask(source, task);
@@ -487,9 +495,10 @@ private TrainedModelAssignmentMetadata.Builder rebalanceAssignments(
487495
TrainedModelAssignmentMetadata.fromState(currentState),
488496
nodeLoads,
489497
nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState),
490-
modelToAdd
498+
modelToAdd,
499+
allocatedProcessorsScale
491500
);
492-
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(clusterService.getSettings());
501+
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance();
493502
if (modelToAdd.isPresent()) {
494503
checkModelIsFullyAllocatedIfScalingIsNotPossible(modelToAdd.get().getDeploymentId(), rebalanced, nodes);
495504
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.ResourceAlreadyExistsException;
1414
import org.elasticsearch.cluster.node.DiscoveryNode;
1515
import org.elasticsearch.common.Strings;
16-
import org.elasticsearch.common.settings.Settings;
1716
import org.elasticsearch.common.unit.ByteSizeValue;
1817
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
1918
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
@@ -51,20 +50,23 @@ class TrainedModelAssignmentRebalancer {
5150
private final Map<DiscoveryNode, NodeLoad> nodeLoads;
5251
private final Map<List<String>, Collection<DiscoveryNode>> mlNodesByZone;
5352
private final Optional<StartTrainedModelDeploymentAction.TaskParams> deploymentToAdd;
53+
private final int allocatedProcessorsScale;
5454

5555
TrainedModelAssignmentRebalancer(
5656
TrainedModelAssignmentMetadata currentMetadata,
5757
Map<DiscoveryNode, NodeLoad> nodeLoads,
5858
Map<List<String>, Collection<DiscoveryNode>> mlNodesByZone,
59-
Optional<StartTrainedModelDeploymentAction.TaskParams> deploymentToAdd
59+
Optional<StartTrainedModelDeploymentAction.TaskParams> deploymentToAdd,
60+
int allocatedProcessorsScale
6061
) {
6162
this.currentMetadata = Objects.requireNonNull(currentMetadata);
6263
this.nodeLoads = Objects.requireNonNull(nodeLoads);
6364
this.mlNodesByZone = Objects.requireNonNull(mlNodesByZone);
6465
this.deploymentToAdd = Objects.requireNonNull(deploymentToAdd);
66+
this.allocatedProcessorsScale = allocatedProcessorsScale;
6567
}
6668

67-
TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) {
69+
TrainedModelAssignmentMetadata.Builder rebalance() {
6870
if (deploymentToAdd.isPresent() && currentMetadata.hasDeployment(deploymentToAdd.get().getDeploymentId())) {
6971
throw new ResourceAlreadyExistsException(
7072
"[{}] assignment for deployment with model [{}] already exists",
@@ -78,8 +80,8 @@ TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) {
7880
return TrainedModelAssignmentMetadata.Builder.fromMetadata(currentMetadata);
7981
}
8082

81-
AssignmentPlan assignmentPlan = computeAssignmentPlan(settings);
82-
return buildAssignmentsFromPlan(assignmentPlan, settings);
83+
AssignmentPlan assignmentPlan = computeAssignmentPlan();
84+
return buildAssignmentsFromPlan(assignmentPlan);
8385
}
8486

8587
private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
@@ -92,8 +94,8 @@ private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
9294
return true;
9395
}
9496

95-
AssignmentPlan computeAssignmentPlan(Settings settings) {
96-
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap(settings);
97+
AssignmentPlan computeAssignmentPlan() {
98+
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap();
9799
final Set<String> assignableNodeIds = nodesByZone.values()
98100
.stream()
99101
.flatMap(List::stream)
@@ -271,7 +273,7 @@ private Map<String, Integer> findFittingAssignments(
271273
return fittingAssignments;
272274
}
273275

274-
private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap(Settings settings) {
276+
private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
275277
return mlNodesByZone.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> {
276278
Collection<DiscoveryNode> discoveryNodes = e.getValue();
277279
List<AssignmentPlan.Node> nodes = new ArrayList<>();
@@ -285,7 +287,7 @@ private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap(Settin
285287
// We subtract native inference memory as the planner expects available memory for
286288
// native inference including current assignments.
287289
getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load),
288-
MlProcessors.get(discoveryNode, settings).roundUp()
290+
MlProcessors.get(discoveryNode, allocatedProcessorsScale).roundUp()
289291
)
290292
);
291293
} else {
@@ -305,7 +307,7 @@ private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(
305307
return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory();
306308
}
307309

308-
private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan, Settings settings) {
310+
private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan) {
309311
TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.Builder.empty();
310312
for (AssignmentPlan.Deployment deployment : assignmentPlan.models()) {
311313
TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.id());
@@ -343,7 +345,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
343345
}
344346
assignmentBuilder.calculateAndSetAssignmentState();
345347

346-
explainAssignments(assignmentPlan, nodeLoads, deployment, settings).ifPresent(assignmentBuilder::setReason);
348+
explainAssignments(assignmentPlan, nodeLoads, deployment).ifPresent(assignmentBuilder::setReason);
347349
builder.addNewAssignment(deployment.id(), assignmentBuilder);
348350
}
349351
return builder;
@@ -352,8 +354,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
352354
private Optional<String> explainAssignments(
353355
AssignmentPlan assignmentPlan,
354356
Map<DiscoveryNode, NodeLoad> nodeLoads,
355-
AssignmentPlan.Deployment deployment,
356-
Settings settings
357+
AssignmentPlan.Deployment deployment
357358
) {
358359
if (assignmentPlan.satisfiesAllocations(deployment)) {
359360
return Optional.empty();
@@ -365,7 +366,7 @@ private Optional<String> explainAssignments(
365366

366367
Map<String, String> nodeToReason = new TreeMap<>();
367368
for (Map.Entry<DiscoveryNode, NodeLoad> nodeAndLoad : nodeLoads.entrySet()) {
368-
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment, settings);
369+
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment);
369370
reason.ifPresent(s -> nodeToReason.put(nodeAndLoad.getKey().getId(), s));
370371
}
371372

@@ -384,8 +385,7 @@ private Optional<String> explainAssignment(
384385
AssignmentPlan assignmentPlan,
385386
DiscoveryNode node,
386387
NodeLoad load,
387-
AssignmentPlan.Deployment deployment,
388-
Settings settings
388+
AssignmentPlan.Deployment deployment
389389
) {
390390
if (Strings.isNullOrEmpty(load.getError()) == false) {
391391
return Optional.of(load.getError());
@@ -398,7 +398,7 @@ private Optional<String> explainAssignment(
398398
// But we should also check if we managed to assign a model during the rebalance for which
399399
// we check if the node has used up any of its allocated processors.
400400
boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0
401-
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, settings).roundUp();
401+
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, allocatedProcessorsScale).roundUp();
402402
long requiredMemory = deployment.memoryBytes() + (isPerNodeOverheadAccountedFor
403403
? 0
404404
: MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes());
@@ -427,7 +427,7 @@ private Optional<String> explainAssignment(
427427
"This node has insufficient allocated processors. Available processors [{}], free processors [{}], "
428428
+ "processors required for each allocation of this model [{}]",
429429
new Object[] {
430-
MlProcessors.get(node, settings).roundUp(),
430+
MlProcessors.get(node, allocatedProcessorsScale).roundUp(),
431431
assignmentPlan.getRemainingNodeCores(node.getId()),
432432
deployment.threadsPerAllocation() }
433433
)

0 commit comments

Comments
 (0)