Skip to content

Commit bf70b46

Browse files
authored
Fix ml autoscaling for zero allocations (#114982) (#114993) (#115125)
* Fix estimated memory usage for a model with zero allocations. * Ignore number of threads of models with zero allocations in autoscaling decisions. * Add some long overdue comments. * Another estimateMemoryUsageBytes fix
1 parent d096b58 commit bf70b46

File tree

4 files changed

+232
-54
lines changed

4 files changed

+232
-54
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartTrainedModelDeploymentAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,9 @@ public String getDeploymentId() {
623623
* @return the estimated memory (in bytes) required for the model deployment to run
624624
*/
625625
public long estimateMemoryUsageBytes() {
626+
if (numberOfAllocations == 0) {
627+
return 0;
628+
}
626629
// We already take into account 2x the model bytes. If the cache size is larger than the model bytes, then
627630
// we need to take it into account when returning the estimate.
628631
if (cacheSize != null && cacheSize.getBytes() > modelBytes) {
@@ -796,6 +799,9 @@ public static long estimateMemoryUsageBytes(
796799
long perAllocationMemoryBytes,
797800
int numberOfAllocations
798801
) {
802+
if (numberOfAllocations == 0) {
803+
return 0;
804+
}
799805
// While loading the model in the process we need twice the model size.
800806

801807
// 1. If ELSER v1 or v2 then 2004MB

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030

3131
import static org.elasticsearch.core.Strings.format;
3232

33+
/**
34+
* This handles ML autoscaling just for classic cloud.
35+
* For serverless, see: {@link MlAutoscalingResourceTracker}.
36+
*/
3337
public final class MlAutoscalingDeciderService implements AutoscalingDeciderService, LocalNodeMasterListener {
3438

3539
private static final Logger logger = LogManager.getLogger(MlAutoscalingDeciderService.class);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java

Lines changed: 55 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@
4848
import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT;
4949

5050
/**
51-
* backend for new kubernetes based autoscaler.
51+
* This handles ML autoscaling just for serverless.
52+
* For classic cloud, see: {@link MlAutoscalingDeciderService}.
5253
*/
5354
public final class MlAutoscalingResourceTracker {
5455
private static final Logger logger = LogManager.getLogger(MlAutoscalingResourceTracker.class);
@@ -242,72 +243,72 @@ static void getMemoryAndProcessors(
242243
final int numMissingProcessors = numMissingAllocations * numberOfThreadsPerAllocation;
243244
int numExistingProcessorsToBeUsed = Math.min(numMissingProcessors, numberOfAvailableProcessors);
244245

246+
if (numberOfRequestedAllocations == 0) {
247+
continue;
248+
}
245249
if (assignment.getNodeRoutingTable().isEmpty() == false
246250
&& assignment.getNodeRoutingTable().values().stream().allMatch(r -> r.getState().consumesMemory() == false)) {
247251
// Ignore states that don't consume memory, for example all allocations are failed or stopped
248252
// if the node routing table is empty, then it will match the above condition, but it needs to be handled in the next branch
249253
continue;
254+
}
255+
256+
if (assignment.getNodeRoutingTable().isEmpty() == false) {
257+
// if the routing table is non-empty, this is an existing model
258+
existingModelMemoryBytes += estimatedMemoryUsage;
250259
} else {
260+
// only increase memory requirements for new models
261+
extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage);
262+
extraModelMemoryInBytes += estimatedMemoryUsage;
263+
}
251264

252-
if (assignment.getNodeRoutingTable().isEmpty() == false) {
253-
// if the routing table is non-empty, this is an existing model
254-
existingModelMemoryBytes += estimatedMemoryUsage;
255-
} else {
256-
// only increase memory requirements for new models
257-
extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage);
258-
extraModelMemoryInBytes += estimatedMemoryUsage;
265+
// if not low priority, check processor requirements.
266+
if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) {
267+
if (numMissingProcessors > numberOfAvailableProcessors) {
268+
// as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here
269+
extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed;
270+
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1
271+
// extraPerNodeProcessors
259272
}
260-
261-
// if not low priority, check processor requirements.
262-
if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) {
263-
if (numMissingProcessors > numberOfAvailableProcessors) {
264-
// as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here
265-
extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed;
266-
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1
267-
// extraPerNodeProcessors
268-
}
269-
if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) {
270-
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation);
271-
}
272-
numberOfAvailableProcessors -= numExistingProcessorsToBeUsed;
273+
if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) {
274+
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation);
273275
}
276+
numberOfAvailableProcessors -= numExistingProcessorsToBeUsed;
277+
}
278+
279+
if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) {
280+
logger.info(
281+
() -> format(
282+
"trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware",
283+
modelAssignment.getKey(),
284+
Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()),
285+
numMissingAllocations
286+
)
287+
);
288+
}
274289

275-
if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) {
276-
logger.info(
277-
() -> format(
278-
"trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware",
279-
modelAssignment.getKey(),
280-
Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()),
281-
numMissingAllocations
290+
for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
291+
sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
292+
* numberOfThreadsPerAllocation;
293+
294+
jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>())
295+
.add(
296+
MlJobRequirements.of(
297+
estimatedMemoryUsage,
298+
Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority())
299+
? 0
300+
: modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
301+
* numberOfThreadsPerAllocation
282302
)
283303
);
284-
}
285-
286-
for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
287-
sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue()
288-
.getNodeRoutingTable()
289-
.get(node)
290-
.getTargetAllocations() * numberOfThreadsPerAllocation;
291-
292-
jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>())
293-
.add(
294-
MlJobRequirements.of(
295-
estimatedMemoryUsage,
296-
Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority())
297-
? 0
298-
: modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
299-
* numberOfThreadsPerAllocation
300-
)
301-
);
302-
}
303-
304-
// min(3, max(number of allocations over all deployed models)
305-
// the minimum number of nodes is equal to the number of allocations, up to 3
306-
// if the number of allocations is greater than 3, then wantedMinNodes is still 3
307-
// in theory this should help availability for 2-3 allocations
308-
// the planner should split over all available nodes
309-
minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations));
310304
}
305+
306+
// min(3, max(number of allocations over all deployed models)
307+
// the minimum number of nodes is equal to the number of allocations, up to 3
308+
// if the number of allocations is greater than 3, then wantedMinNodes is still 3
309+
// in theory this should help availability for 2-3 allocations
310+
// the planner should split over all available nodes
311+
minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations));
311312
}
312313

313314
// dummy autoscaling entity

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTrackerTests.java

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
2323
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
2424
import org.elasticsearch.xpack.core.ml.autoscaling.MlAutoscalingStats;
25+
import org.elasticsearch.xpack.core.ml.inference.assignment.AdaptiveAllocationsSettings;
2526
import org.elasticsearch.xpack.core.ml.inference.assignment.AssignmentState;
2627
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
2728
import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo;
@@ -1800,6 +1801,172 @@ public void testGetMemoryAndProcessorsScaleDownNotPreventedByDummyEntityAsMemory
18001801
);
18011802
}
18021803

1804+
public void testGetMemoryAndProcessorsScaleDownForModelWithZeroAllocations() throws InterruptedException {
1805+
long memory = 1000000000;
1806+
Map<String, String> nodeAttr = Map.of(
1807+
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
1808+
Long.toString(memory),
1809+
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
1810+
"400000000",
1811+
MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
1812+
"7.2.0",
1813+
MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR,
1814+
"2.0"
1815+
);
1816+
1817+
MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
1818+
List.of(),
1819+
List.of(),
1820+
List.of(),
1821+
Map.of(
1822+
"model-with-zero-allocations",
1823+
TrainedModelAssignment.Builder.empty(
1824+
new StartTrainedModelDeploymentAction.TaskParams(
1825+
"model-with-zero-allocations",
1826+
"model-with-zero-allocations-deployment",
1827+
400,
1828+
0,
1829+
2,
1830+
100,
1831+
null,
1832+
Priority.NORMAL,
1833+
0L,
1834+
0L
1835+
),
1836+
new AdaptiveAllocationsSettings(true, 0, 4)
1837+
).build()
1838+
),
1839+
List.of(
1840+
DiscoveryNodeUtils.builder("ml-node-1")
1841+
.name("ml-node-name-1")
1842+
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
1843+
.attributes(nodeAttr)
1844+
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
1845+
.build()
1846+
),
1847+
PersistentTasksCustomMetadata.builder().build()
1848+
);
1849+
MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);
1850+
1851+
this.<MlAutoscalingStats>assertAsync(
1852+
listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
1853+
mlAutoscalingContext,
1854+
mockTracker,
1855+
Map.of("ml-node-1", memory),
1856+
600000000,
1857+
2,
1858+
MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
1859+
MlDummyAutoscalingEntity.of(0, 0),
1860+
1,
1861+
listener
1862+
),
1863+
stats -> {
1864+
assertEquals(memory, stats.currentPerNodeMemoryBytes());
1865+
assertEquals(0, stats.currentTotalModelMemoryBytes());
1866+
assertEquals(0, stats.currentTotalProcessorsInUse());
1867+
assertEquals(1, stats.currentTotalNodes());
1868+
assertEquals(0, stats.wantedMinNodes());
1869+
assertEquals(0, stats.wantedExtraPerNodeNodeProcessors());
1870+
assertEquals(0, stats.wantedExtraProcessors());
1871+
assertEquals(0, stats.wantedExtraModelMemoryBytes());
1872+
assertEquals(0, stats.wantedExtraPerNodeMemoryBytes());
1873+
assertEquals(memory, stats.unwantedNodeMemoryBytesToRemove());
1874+
assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.currentPerNodeMemoryOverheadBytes());
1875+
}
1876+
);
1877+
}
1878+
1879+
public void testGetMemoryAndProcessorsIgnoreThreadsOfModelWithZeroAllocations() throws InterruptedException {
1880+
long memory = 1000000000;
1881+
Map<String, String> nodeAttr = Map.of(
1882+
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
1883+
Long.toString(memory),
1884+
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
1885+
"400000000",
1886+
MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
1887+
"7.2.0",
1888+
MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR,
1889+
"2.0"
1890+
);
1891+
1892+
MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
1893+
List.of(),
1894+
List.of(),
1895+
List.of(),
1896+
Map.of(
1897+
"model-with-one-allocation",
1898+
TrainedModelAssignment.Builder.empty(
1899+
new StartTrainedModelDeploymentAction.TaskParams(
1900+
"model-with-one-allocation",
1901+
"model-with-one-allocation-deployment",
1902+
400,
1903+
1,
1904+
2,
1905+
100,
1906+
null,
1907+
Priority.NORMAL,
1908+
0L,
1909+
0L
1910+
),
1911+
null
1912+
).addRoutingEntry("ml-node-1", new RoutingInfo(1, 1, RoutingState.STARTED, "")).build(),
1913+
"model-with-zero-allocations",
1914+
TrainedModelAssignment.Builder.empty(
1915+
new StartTrainedModelDeploymentAction.TaskParams(
1916+
"model-with-zero-allocations",
1917+
"model-with-zero-allocations-deployment",
1918+
400,
1919+
0,
1920+
4,
1921+
100,
1922+
null,
1923+
Priority.NORMAL,
1924+
0L,
1925+
0L
1926+
),
1927+
new AdaptiveAllocationsSettings(true, 0, 4)
1928+
).build()
1929+
),
1930+
List.of(
1931+
DiscoveryNodeUtils.builder("ml-node-1")
1932+
.name("ml-node-name-1")
1933+
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
1934+
.attributes(nodeAttr)
1935+
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
1936+
.build()
1937+
),
1938+
PersistentTasksCustomMetadata.builder().build()
1939+
);
1940+
MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);
1941+
1942+
this.<MlAutoscalingStats>assertAsync(
1943+
listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
1944+
mlAutoscalingContext,
1945+
mockTracker,
1946+
Map.of("ml-node-1", memory),
1947+
600000000,
1948+
2,
1949+
MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
1950+
MlDummyAutoscalingEntity.of(0, 0),
1951+
1,
1952+
listener
1953+
),
1954+
stats -> {
1955+
assertEquals(memory, stats.currentPerNodeMemoryBytes());
1956+
assertEquals(251659040, stats.currentTotalModelMemoryBytes());
1957+
assertEquals(2, stats.currentTotalProcessorsInUse());
1958+
assertEquals(1, stats.currentTotalNodes());
1959+
assertEquals(1, stats.wantedMinNodes());
1960+
assertEquals(0, stats.wantedExtraPerNodeNodeProcessors());
1961+
assertEquals(0, stats.wantedExtraProcessors());
1962+
assertEquals(0, stats.wantedExtraModelMemoryBytes());
1963+
assertEquals(0, stats.wantedExtraPerNodeMemoryBytes());
1964+
assertEquals(0, stats.unwantedNodeMemoryBytesToRemove());
1965+
assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.currentPerNodeMemoryOverheadBytes());
1966+
}
1967+
);
1968+
}
1969+
18031970
private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> furtherTests) throws InterruptedException {
18041971
CountDownLatch latch = new CountDownLatch(1);
18051972
AtomicBoolean listenerCalled = new AtomicBoolean(false);

0 commit comments

Comments
 (0)