Skip to content

Commit d4d069d

Browse files
committed
Ignore number of threads of models with zero allocations in autoscaling decisions.
1 parent e5c8da6 commit d4d069d

File tree

2 files changed

+144
-53
lines changed

2 files changed

+144
-53
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java

Lines changed: 53 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -242,72 +242,72 @@ static void getMemoryAndProcessors(
242242
final int numMissingProcessors = numMissingAllocations * numberOfThreadsPerAllocation;
243243
int numExistingProcessorsToBeUsed = Math.min(numMissingProcessors, numberOfAvailableProcessors);
244244

245+
if (numberOfRequestedAllocations == 0) {
246+
continue;
247+
}
245248
if (assignment.getNodeRoutingTable().isEmpty() == false
246249
&& assignment.getNodeRoutingTable().values().stream().allMatch(r -> r.getState().consumesMemory() == false)) {
247250
// Ignore states that don't consume memory, for example all allocations are failed or stopped
248251
// if the node routing table is empty, then it will match the above condition, but it needs to be handled in the next branch
249252
continue;
253+
}
254+
255+
if (assignment.getNodeRoutingTable().isEmpty() == false) {
256+
// if the routing table is non-empty, this is an existing model
257+
existingModelMemoryBytes += estimatedMemoryUsage;
250258
} else {
259+
// only increase memory requirements for new models
260+
extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage);
261+
extraModelMemoryInBytes += estimatedMemoryUsage;
262+
}
251263

252-
if (assignment.getNodeRoutingTable().isEmpty() == false) {
253-
// if the routing table is non-empty, this is an existing model
254-
existingModelMemoryBytes += estimatedMemoryUsage;
255-
} else {
256-
// only increase memory requirements for new models
257-
extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage);
258-
extraModelMemoryInBytes += estimatedMemoryUsage;
264+
// if not low priority, check processor requirements.
265+
if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) {
266+
if (numMissingProcessors > numberOfAvailableProcessors) {
267+
// as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here
268+
extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed;
269+
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1
270+
// extraPerNodeProcessors
259271
}
260-
261-
// if not low priority, check processor requirements.
262-
if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) {
263-
if (numMissingProcessors > numberOfAvailableProcessors) {
264-
// as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here
265-
extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed;
266-
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1
267-
// extraPerNodeProcessors
268-
}
269-
if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) {
270-
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation);
271-
}
272-
numberOfAvailableProcessors -= numExistingProcessorsToBeUsed;
272+
if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) {
273+
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation);
273274
}
275+
numberOfAvailableProcessors -= numExistingProcessorsToBeUsed;
276+
}
277+
278+
if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) {
279+
logger.info(
280+
() -> format(
281+
"trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware",
282+
modelAssignment.getKey(),
283+
Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()),
284+
numMissingAllocations
285+
)
286+
);
287+
}
274288

275-
if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) {
276-
logger.info(
277-
() -> format(
278-
"trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware",
279-
modelAssignment.getKey(),
280-
Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()),
281-
numMissingAllocations
289+
for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
290+
sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
291+
* numberOfThreadsPerAllocation;
292+
293+
jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>())
294+
.add(
295+
MlJobRequirements.of(
296+
estimatedMemoryUsage,
297+
Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority())
298+
? 0
299+
: modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
300+
* numberOfThreadsPerAllocation
282301
)
283302
);
284-
}
285-
286-
for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
287-
sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue()
288-
.getNodeRoutingTable()
289-
.get(node)
290-
.getTargetAllocations() * numberOfThreadsPerAllocation;
291-
292-
jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>())
293-
.add(
294-
MlJobRequirements.of(
295-
estimatedMemoryUsage,
296-
Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority())
297-
? 0
298-
: modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
299-
* numberOfThreadsPerAllocation
300-
)
301-
);
302-
}
303-
304-
// min(3, max(number of allocations over all deployed models)
305-
// the minimum number of nodes is equal to the number of allocations, up to 3
306-
// if the number of allocations is greater than 3, then wantedMinNodes is still 3
307-
// in theory this should help availability for 2-3 allocations
308-
// the planner should split over all available nodes
309-
minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations));
310303
}
304+
305+
// min(3, max(number of allocations over all deployed models)
306+
// the minimum number of nodes is equal to the number of allocations, up to 3
307+
// if the number of allocations is greater than 3, then wantedMinNodes is still 3
308+
// in theory this should help availability for 2-3 allocations
309+
// the planner should split over all available nodes
310+
minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations));
311311
}
312312

313313
// dummy autoscaling entity

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTrackerTests.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1876,6 +1876,97 @@ public void testGetMemoryAndProcessorsScaleDownForModelWithZeroAllocations() thr
18761876
);
18771877
}
18781878

1879+
public void testGetMemoryAndProcessorsIgnoreThreadsOfModelWithZeroAllocations() throws InterruptedException {
1880+
long memory = 1000000000;
1881+
Map<String, String> nodeAttr = Map.of(
1882+
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
1883+
Long.toString(memory),
1884+
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
1885+
"400000000",
1886+
MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
1887+
"7.2.0",
1888+
MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR,
1889+
"2.0"
1890+
);
1891+
1892+
MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
1893+
List.of(),
1894+
List.of(),
1895+
List.of(),
1896+
Map.of(
1897+
"model-with-one-allocation",
1898+
TrainedModelAssignment.Builder.empty(
1899+
new StartTrainedModelDeploymentAction.TaskParams(
1900+
"model-with-one-allocation",
1901+
"model-with-one-allocation-deployment",
1902+
400,
1903+
1,
1904+
2,
1905+
100,
1906+
null,
1907+
Priority.NORMAL,
1908+
0L,
1909+
0L
1910+
),
1911+
null
1912+
).addRoutingEntry("ml-node-1", new RoutingInfo(1, 1, RoutingState.STARTED, "")).build(),
1913+
"model-with-zero-allocations",
1914+
TrainedModelAssignment.Builder.empty(
1915+
new StartTrainedModelDeploymentAction.TaskParams(
1916+
"model-with-zero-allocations",
1917+
"model-with-zero-allocations-deployment",
1918+
400,
1919+
0,
1920+
4,
1921+
100,
1922+
null,
1923+
Priority.NORMAL,
1924+
0L,
1925+
0L
1926+
),
1927+
new AdaptiveAllocationsSettings(true, 0, 4)
1928+
).build()
1929+
),
1930+
List.of(
1931+
DiscoveryNodeUtils.builder("ml-node-1")
1932+
.name("ml-node-name-1")
1933+
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
1934+
.attributes(nodeAttr)
1935+
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
1936+
.build()
1937+
),
1938+
PersistentTasksCustomMetadata.builder().build()
1939+
);
1940+
MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);
1941+
1942+
this.<MlAutoscalingStats>assertAsync(
1943+
listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
1944+
mlAutoscalingContext,
1945+
mockTracker,
1946+
Map.of("ml-node-1", memory),
1947+
600000000,
1948+
2,
1949+
MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
1950+
MlDummyAutoscalingEntity.of(0, 0),
1951+
1,
1952+
listener
1953+
),
1954+
stats -> {
1955+
assertEquals(memory, stats.currentPerNodeMemoryBytes());
1956+
assertEquals(251659040, stats.currentTotalModelMemoryBytes());
1957+
assertEquals(2, stats.currentTotalProcessorsInUse());
1958+
assertEquals(1, stats.currentTotalNodes());
1959+
assertEquals(1, stats.wantedMinNodes());
1960+
assertEquals(0, stats.wantedExtraPerNodeNodeProcessors());
1961+
assertEquals(0, stats.wantedExtraProcessors());
1962+
assertEquals(0, stats.wantedExtraModelMemoryBytes());
1963+
assertEquals(0, stats.wantedExtraPerNodeMemoryBytes());
1964+
assertEquals(0, stats.unwantedNodeMemoryBytesToRemove());
1965+
assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.currentPerNodeMemoryOverheadBytes());
1966+
}
1967+
);
1968+
}
1969+
18791970
private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> furtherTests) throws InterruptedException {
18801971
CountDownLatch latch = new CountDownLatch(1);
18811972
AtomicBoolean listenerCalled = new AtomicBoolean(false);

0 commit comments

Comments
 (0)