Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,9 @@ public String getDeploymentId() {
* @return the estimated memory (in bytes) required for the model deployment to run
*/
public long estimateMemoryUsageBytes() {
if (numberOfAllocations == 0) {
return 0;
}
// We already take into account 2x the model bytes. If the cache size is larger than the model bytes, then
// we need to take it into account when returning the estimate.
if (cacheSize != null && cacheSize.getBytes() > modelBytes) {
Expand Down Expand Up @@ -796,6 +799,9 @@ public static long estimateMemoryUsageBytes(
long perAllocationMemoryBytes,
int numberOfAllocations
) {
if (numberOfAllocations == 0) {
return 0;
}
// While loading the model in the process we need twice the model size.

// 1. If ELSER v1 or v2 then 2004MB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@

import static org.elasticsearch.core.Strings.format;

/**
* This handles ML autoscaling just for classic cloud.
* For serverless, see: {@link MlAutoscalingResourceTracker}.
*/
public final class MlAutoscalingDeciderService implements AutoscalingDeciderService, LocalNodeMasterListener {

private static final Logger logger = LogManager.getLogger(MlAutoscalingDeciderService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT;

/**
* backend for new kubernetes based autoscaler.
* This handles ML autoscaling just for serverless.
* For classic cloud, see: {@link MlAutoscalingDeciderService}.
*/
public final class MlAutoscalingResourceTracker {
private static final Logger logger = LogManager.getLogger(MlAutoscalingResourceTracker.class);
Expand Down Expand Up @@ -242,72 +243,72 @@ static void getMemoryAndProcessors(
final int numMissingProcessors = numMissingAllocations * numberOfThreadsPerAllocation;
int numExistingProcessorsToBeUsed = Math.min(numMissingProcessors, numberOfAvailableProcessors);

if (numberOfRequestedAllocations == 0) {
continue;
}
if (assignment.getNodeRoutingTable().isEmpty() == false
&& assignment.getNodeRoutingTable().values().stream().allMatch(r -> r.getState().consumesMemory() == false)) {
// Ignore states that don't consume memory, for example all allocations are failed or stopped
// if the node routing table is empty, then it will match the above condition, but it needs to be handled in the next branch
continue;
}

if (assignment.getNodeRoutingTable().isEmpty() == false) {
// if the routing table is non-empty, this is an existing model
existingModelMemoryBytes += estimatedMemoryUsage;
} else {
// only increase memory requirements for new models
extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage);
extraModelMemoryInBytes += estimatedMemoryUsage;
}

if (assignment.getNodeRoutingTable().isEmpty() == false) {
// if the routing table is non-empty, this is an existing model
existingModelMemoryBytes += estimatedMemoryUsage;
} else {
// only increase memory requirements for new models
extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage);
extraModelMemoryInBytes += estimatedMemoryUsage;
// if not low priority, check processor requirements.
if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) {
if (numMissingProcessors > numberOfAvailableProcessors) {
// as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here
extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed;
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1
// extraPerNodeProcessors
}

// if not low priority, check processor requirements.
if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) {
if (numMissingProcessors > numberOfAvailableProcessors) {
// as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here
extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed;
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1
// extraPerNodeProcessors
}
if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) {
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation);
}
numberOfAvailableProcessors -= numExistingProcessorsToBeUsed;
if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) {
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation);
}
numberOfAvailableProcessors -= numExistingProcessorsToBeUsed;
}

if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) {
logger.info(
() -> format(
"trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware",
modelAssignment.getKey(),
Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()),
numMissingAllocations
)
);
}

if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) {
logger.info(
() -> format(
"trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware",
modelAssignment.getKey(),
Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()),
numMissingAllocations
for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
* numberOfThreadsPerAllocation;

jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>())
.add(
MlJobRequirements.of(
estimatedMemoryUsage,
Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority())
? 0
: modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
* numberOfThreadsPerAllocation
)
);
}

for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue()
.getNodeRoutingTable()
.get(node)
.getTargetAllocations() * numberOfThreadsPerAllocation;

jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>())
.add(
MlJobRequirements.of(
estimatedMemoryUsage,
Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority())
? 0
: modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
* numberOfThreadsPerAllocation
)
);
}

// min(3, max(number of allocations over all deployed models)
// the minimum number of nodes is equal to the number of allocations, up to 3
// if the number of allocations is greater than 3, then wantedMinNodes is still 3
// in theory this should help availability for 2-3 allocations
// the planner should split over all available nodes
minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations));
}

// min(3, max(number of allocations over all deployed models)
// the minimum number of nodes is equal to the number of allocations, up to 3
// if the number of allocations is greater than 3, then wantedMinNodes is still 3
// in theory this should help availability for 2-3 allocations
// the planner should split over all available nodes
minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations));
}

// dummy autoscaling entity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
import org.elasticsearch.xpack.core.ml.autoscaling.MlAutoscalingStats;
import org.elasticsearch.xpack.core.ml.inference.assignment.AdaptiveAllocationsSettings;
import org.elasticsearch.xpack.core.ml.inference.assignment.AssignmentState;
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo;
Expand Down Expand Up @@ -1800,6 +1801,172 @@ public void testGetMemoryAndProcessorsScaleDownNotPreventedByDummyEntityAsMemory
);
}

public void testGetMemoryAndProcessorsScaleDownForModelWithZeroAllocations() throws InterruptedException {
long memory = 1000000000;
Map<String, String> nodeAttr = Map.of(
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
Long.toString(memory),
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
"400000000",
MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
"7.2.0",
MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR,
"2.0"
);

MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
List.of(),
List.of(),
List.of(),
Map.of(
"model-with-zero-allocations",
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams(
"model-with-zero-allocations",
"model-with-zero-allocations-deployment",
400,
0,
2,
100,
null,
Priority.NORMAL,
0L,
0L
),
new AdaptiveAllocationsSettings(true, 0, 4)
).build()
),
List.of(
DiscoveryNodeUtils.builder("ml-node-1")
.name("ml-node-name-1")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
.attributes(nodeAttr)
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.build()
),
PersistentTasksCustomMetadata.builder().build()
);
MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);

this.<MlAutoscalingStats>assertAsync(
listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
mlAutoscalingContext,
mockTracker,
Map.of("ml-node-1", memory),
600000000,
2,
MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
MlDummyAutoscalingEntity.of(0, 0),
1,
listener
),
stats -> {
assertEquals(memory, stats.currentPerNodeMemoryBytes());
assertEquals(0, stats.currentTotalModelMemoryBytes());
assertEquals(0, stats.currentTotalProcessorsInUse());
assertEquals(1, stats.currentTotalNodes());
assertEquals(0, stats.wantedMinNodes());
assertEquals(0, stats.wantedExtraPerNodeNodeProcessors());
assertEquals(0, stats.wantedExtraProcessors());
assertEquals(0, stats.wantedExtraModelMemoryBytes());
assertEquals(0, stats.wantedExtraPerNodeMemoryBytes());
assertEquals(memory, stats.unwantedNodeMemoryBytesToRemove());
assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.currentPerNodeMemoryOverheadBytes());
}
);
}

public void testGetMemoryAndProcessorsIgnoreThreadsOfModelWithZeroAllocations() throws InterruptedException {
long memory = 1000000000;
Map<String, String> nodeAttr = Map.of(
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
Long.toString(memory),
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
"400000000",
MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
"7.2.0",
MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR,
"2.0"
);

MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
List.of(),
List.of(),
List.of(),
Map.of(
"model-with-one-allocation",
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams(
"model-with-one-allocation",
"model-with-one-allocation-deployment",
400,
1,
2,
100,
null,
Priority.NORMAL,
0L,
0L
),
null
).addRoutingEntry("ml-node-1", new RoutingInfo(1, 1, RoutingState.STARTED, "")).build(),
"model-with-zero-allocations",
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams(
"model-with-zero-allocations",
"model-with-zero-allocations-deployment",
400,
0,
4,
100,
null,
Priority.NORMAL,
0L,
0L
),
new AdaptiveAllocationsSettings(true, 0, 4)
).build()
),
List.of(
DiscoveryNodeUtils.builder("ml-node-1")
.name("ml-node-name-1")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
.attributes(nodeAttr)
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.build()
),
PersistentTasksCustomMetadata.builder().build()
);
MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);

this.<MlAutoscalingStats>assertAsync(
listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
mlAutoscalingContext,
mockTracker,
Map.of("ml-node-1", memory),
600000000,
2,
MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
MlDummyAutoscalingEntity.of(0, 0),
1,
listener
),
stats -> {
assertEquals(memory, stats.currentPerNodeMemoryBytes());
assertEquals(251659040, stats.currentTotalModelMemoryBytes());
assertEquals(2, stats.currentTotalProcessorsInUse());
assertEquals(1, stats.currentTotalNodes());
assertEquals(1, stats.wantedMinNodes());
assertEquals(0, stats.wantedExtraPerNodeNodeProcessors());
assertEquals(0, stats.wantedExtraProcessors());
assertEquals(0, stats.wantedExtraModelMemoryBytes());
assertEquals(0, stats.wantedExtraPerNodeMemoryBytes());
assertEquals(0, stats.unwantedNodeMemoryBytesToRemove());
assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.currentPerNodeMemoryOverheadBytes());
}
);
}

private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> furtherTests) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean listenerCalled = new AtomicBoolean(false);
Expand Down
Loading