Skip to content

Commit f0339ed

Browse files
authored
Adaptive allocations: scale to zero allocations (#113455)
1 parent 6e73c14 commit f0339ed

File tree

11 files changed

+180
-71
lines changed

11 files changed

+180
-71
lines changed

test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
public enum FeatureFlag {
1919
TIME_SERIES_MODE("es.index_mode_feature_flag_registered=true", Version.fromString("8.0.0"), null),
2020
FAILURE_STORE_ENABLED("es.failure_store_feature_flag_enabled=true", Version.fromString("8.12.0"), null),
21-
CHUNKING_SETTINGS_ENABLED("es.inference_chunking_settings_feature_flag_enabled=true", Version.fromString("8.16.0"), null);
21+
CHUNKING_SETTINGS_ENABLED("es.inference_chunking_settings_feature_flag_enabled=true", Version.fromString("8.16.0"), null),
22+
INFERENCE_SCALE_TO_ZERO("es.inference_scale_to_zero_feature_flag_enabled=true", Version.fromString("8.16.0"), null);
2223

2324
public final String systemProperty;
2425
public final Version from;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
161161
public ActionRequestValidationException validate() {
162162
ActionRequestValidationException validationException = new ActionRequestValidationException();
163163
if (numberOfAllocations != null) {
164-
if (numberOfAllocations < 1) {
164+
if (numberOfAllocations < 0 || (isInternal == false && numberOfAllocations == 0)) {
165165
validationException.addValidationError("[" + NUMBER_OF_ALLOCATIONS + "] must be a positive integer");
166166
}
167167
if (isInternal == false

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@
327327
import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult;
328328
import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult;
329329
import org.elasticsearch.xpack.ml.inference.TrainedModelStatsService;
330+
import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService;
330331
import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentClusterService;
331332
import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentService;
332333
import org.elasticsearch.xpack.ml.inference.deployment.DeploymentManager;
@@ -1285,13 +1286,21 @@ public Collection<?> createComponents(PluginServices services) {
12851286
new MlAutoscalingDeciderService(memoryTracker, settings, nodeAvailabilityZoneMapper, clusterService)
12861287
);
12871288

1288-
MlInitializationService mlInitializationService = new MlInitializationService(
1289-
settings,
1289+
AdaptiveAllocationsScalerService adaptiveAllocationsScalerService = new AdaptiveAllocationsScalerService(
12901290
threadPool,
12911291
clusterService,
12921292
client,
12931293
inferenceAuditor,
12941294
telemetryProvider.getMeterRegistry(),
1295+
machineLearningExtension.get().isNlpEnabled()
1296+
);
1297+
1298+
MlInitializationService mlInitializationService = new MlInitializationService(
1299+
settings,
1300+
threadPool,
1301+
clusterService,
1302+
client,
1303+
adaptiveAllocationsScalerService,
12951304
mlAssignmentNotifier,
12961305
machineLearningExtension.get().isAnomalyDetectionEnabled(),
12971306
machineLearningExtension.get().isDataFrameAnalyticsEnabled(),
@@ -1317,6 +1326,7 @@ public Collection<?> createComponents(PluginServices services) {
13171326
jobManagerHolder,
13181327
autodetectProcessManager,
13191328
mlInitializationService,
1329+
adaptiveAllocationsScalerService,
13201330
jobDataCountsPersister,
13211331
datafeedRunner,
13221332
datafeedManager,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@
3030
import org.elasticsearch.common.component.LifecycleListener;
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.gateway.GatewayService;
33-
import org.elasticsearch.telemetry.metric.MeterRegistry;
3433
import org.elasticsearch.threadpool.ThreadPool;
3534
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
3635
import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService;
37-
import org.elasticsearch.xpack.ml.notifications.InferenceAuditor;
3836

3937
import java.util.Collections;
4038
import java.util.Map;
@@ -67,8 +65,7 @@ public final class MlInitializationService implements ClusterStateListener {
6765
ThreadPool threadPool,
6866
ClusterService clusterService,
6967
Client client,
70-
InferenceAuditor inferenceAuditor,
71-
MeterRegistry meterRegistry,
68+
AdaptiveAllocationsScalerService adaptiveAllocationsScalerService,
7269
MlAssignmentNotifier mlAssignmentNotifier,
7370
boolean isAnomalyDetectionEnabled,
7471
boolean isDataFrameAnalyticsEnabled,
@@ -88,7 +85,7 @@ public final class MlInitializationService implements ClusterStateListener {
8885
isDataFrameAnalyticsEnabled,
8986
isNlpEnabled
9087
),
91-
new AdaptiveAllocationsScalerService(threadPool, clusterService, client, inferenceAuditor, meterRegistry, isNlpEnabled),
88+
adaptiveAllocationsScalerService,
9289
clusterService
9390
);
9491
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExternalInferModelAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.license.XPackLicenseState;
1414
import org.elasticsearch.transport.TransportService;
1515
import org.elasticsearch.xpack.core.ml.action.InferModelAction;
16+
import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService;
1617
import org.elasticsearch.xpack.ml.inference.loadingservice.ModelLoadingService;
1718
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
1819

@@ -25,7 +26,8 @@ public TransportExternalInferModelAction(
2526
Client client,
2627
ClusterService clusterService,
2728
XPackLicenseState licenseState,
28-
TrainedModelProvider trainedModelProvider
29+
TrainedModelProvider trainedModelProvider,
30+
AdaptiveAllocationsScalerService adaptiveAllocationsScalerService
2931
) {
3032
super(
3133
InferModelAction.EXTERNAL_NAME,
@@ -35,7 +37,8 @@ public TransportExternalInferModelAction(
3537
client,
3638
clusterService,
3739
licenseState,
38-
trainedModelProvider
40+
trainedModelProvider,
41+
adaptiveAllocationsScalerService
3942
);
4043
}
4144
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportInternalInferModelAction.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.xpack.core.ml.inference.results.ErrorInferenceResults;
4343
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
4444
import org.elasticsearch.xpack.ml.MachineLearning;
45+
import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService;
4546
import org.elasticsearch.xpack.ml.inference.loadingservice.LocalModel;
4647
import org.elasticsearch.xpack.ml.inference.loadingservice.ModelLoadingService;
4748
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
@@ -66,6 +67,7 @@ public class TransportInternalInferModelAction extends HandledTransportAction<Re
6667
private final ClusterService clusterService;
6768
private final XPackLicenseState licenseState;
6869
private final TrainedModelProvider trainedModelProvider;
70+
private final AdaptiveAllocationsScalerService adaptiveAllocationsScalerService;
6971

7072
TransportInternalInferModelAction(
7173
String actionName,
@@ -75,14 +77,16 @@ public class TransportInternalInferModelAction extends HandledTransportAction<Re
7577
Client client,
7678
ClusterService clusterService,
7779
XPackLicenseState licenseState,
78-
TrainedModelProvider trainedModelProvider
80+
TrainedModelProvider trainedModelProvider,
81+
AdaptiveAllocationsScalerService adaptiveAllocationsScalerService
7982
) {
8083
super(actionName, transportService, actionFilters, InferModelAction.Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
8184
this.modelLoadingService = modelLoadingService;
8285
this.client = client;
8386
this.clusterService = clusterService;
8487
this.licenseState = licenseState;
8588
this.trainedModelProvider = trainedModelProvider;
89+
this.adaptiveAllocationsScalerService = adaptiveAllocationsScalerService;
8690
}
8791

8892
@Inject
@@ -93,7 +97,8 @@ public TransportInternalInferModelAction(
9397
Client client,
9498
ClusterService clusterService,
9599
XPackLicenseState licenseState,
96-
TrainedModelProvider trainedModelProvider
100+
TrainedModelProvider trainedModelProvider,
101+
AdaptiveAllocationsScalerService adaptiveAllocationsScalerService
97102
) {
98103
this(
99104
InferModelAction.NAME,
@@ -103,7 +108,8 @@ public TransportInternalInferModelAction(
103108
client,
104109
clusterService,
105110
licenseState,
106-
trainedModelProvider
111+
trainedModelProvider,
112+
adaptiveAllocationsScalerService
107113
);
108114
}
109115

@@ -253,10 +259,13 @@ private void inferAgainstAllocatedModel(
253259
}
254260

255261
if (nodes.isEmpty()) {
256-
logger.trace(() -> format("[%s] model deployment not allocated to any node", assignment.getDeploymentId()));
257-
listener.onFailure(
258-
ExceptionsHelper.conflictStatusException("Trained model deployment [" + request.getId() + "] is not allocated to any nodes")
259-
);
262+
String message = "Trained model deployment [" + request.getId() + "] is not allocated to any nodes";
263+
boolean starting = adaptiveAllocationsScalerService.maybeStartAllocation(assignment);
264+
if (starting) {
265+
message += "; starting deployment of one allocation";
266+
}
267+
logger.debug(message);
268+
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
260269
return;
261270
}
262271

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.common.Strings;
13+
import org.elasticsearch.core.TimeValue;
1314

1415
/**
1516
* Processes measured requests counts and inference times and decides whether
@@ -21,6 +22,12 @@ public class AdaptiveAllocationsScaler {
2122
static final double SCALE_UP_THRESHOLD = 0.9;
2223
private static final double SCALE_DOWN_THRESHOLD = 0.85;
2324

25+
/**
26+
* The time interval without any requests that has to pass, before scaling down
27+
* to zero allocations (in case min_allocations = 0).
28+
*/
29+
private static final long SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS = TimeValue.timeValueMinutes(15).getSeconds();
30+
2431
/**
2532
* If the max_number_of_allocations is not set, use this value for now to prevent scaling up
2633
* to high numbers due to possible bugs or unexpected behaviour in the scaler.
@@ -33,6 +40,7 @@ public class AdaptiveAllocationsScaler {
3340
private final String deploymentId;
3441
private final KalmanFilter1d requestRateEstimator;
3542
private final KalmanFilter1d inferenceTimeEstimator;
43+
private double timeWithoutRequestsSeconds;
3644

3745
private int numberOfAllocations;
3846
private int neededNumberOfAllocations;
@@ -55,6 +63,7 @@ public class AdaptiveAllocationsScaler {
5563
// the number of allocations changes, which is passed explicitly to the estimator.
5664
requestRateEstimator = new KalmanFilter1d(deploymentId + ":rate", 100, true);
5765
inferenceTimeEstimator = new KalmanFilter1d(deploymentId + ":time", 100, false);
66+
timeWithoutRequestsSeconds = 0.0;
5867
this.numberOfAllocations = numberOfAllocations;
5968
neededNumberOfAllocations = numberOfAllocations;
6069
minNumberOfAllocations = null;
@@ -73,6 +82,11 @@ void setMinMaxNumberOfAllocations(Integer minNumberOfAllocations, Integer maxNum
7382

7483
void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSeconds, int numberOfAllocations) {
7584
lastMeasuredQueueSize = stats.pendingCount();
85+
if (stats.requestCount() > 0) {
86+
timeWithoutRequestsSeconds = 0.0;
87+
} else {
88+
timeWithoutRequestsSeconds += timeIntervalSeconds;
89+
}
7690

7791
// The request rate (per second) is the request count divided by the time.
7892
// Assuming a Poisson process for the requests, the variance in the request
@@ -145,7 +159,7 @@ Integer scale() {
145159
numberOfAllocations--;
146160
}
147161

148-
this.neededNumberOfAllocations = numberOfAllocations;
162+
neededNumberOfAllocations = numberOfAllocations;
149163

150164
if (maxNumberOfAllocations == null) {
151165
numberOfAllocations = Math.min(numberOfAllocations, MAX_NUMBER_OF_ALLOCATIONS_SAFEGUARD);
@@ -156,6 +170,13 @@ Integer scale() {
156170
if (maxNumberOfAllocations != null) {
157171
numberOfAllocations = Math.min(numberOfAllocations, maxNumberOfAllocations);
158172
}
173+
if (ScaleToZeroFeatureFlag.isEnabled()
174+
&& (minNumberOfAllocations == null || minNumberOfAllocations == 0)
175+
&& timeWithoutRequestsSeconds > SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS) {
176+
logger.debug("[{}] adaptive allocations scaler: scaling down to zero, because of no requests.", deploymentId);
177+
numberOfAllocations = 0;
178+
neededNumberOfAllocations = 0;
179+
}
159180

160181
if (numberOfAllocations != oldNumberOfAllocations) {
161182
logger.debug(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -415,49 +415,60 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo
415415
if (newNumberOfAllocations > numberOfAllocations.get(deploymentId)) {
416416
lastScaleUpTimesMillis.put(deploymentId, now);
417417
}
418-
UpdateTrainedModelDeploymentAction.Request updateRequest = new UpdateTrainedModelDeploymentAction.Request(deploymentId);
419-
updateRequest.setNumberOfAllocations(newNumberOfAllocations);
420-
updateRequest.setIsInternal(true);
421-
ClientHelper.executeAsyncWithOrigin(
422-
client,
423-
ClientHelper.ML_ORIGIN,
424-
UpdateTrainedModelDeploymentAction.INSTANCE,
425-
updateRequest,
426-
ActionListener.wrap(updateResponse -> {
427-
logger.info("adaptive allocations scaler: scaled [{}] to [{}] allocations.", deploymentId, newNumberOfAllocations);
428-
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
429-
.execute(
430-
() -> inferenceAuditor.info(
431-
deploymentId,
432-
Strings.format(
433-
"adaptive allocations scaler: scaled [%s] to [%s] allocations.",
434-
deploymentId,
435-
newNumberOfAllocations
436-
)
437-
)
438-
);
439-
}, e -> {
440-
logger.atLevel(Level.WARN)
441-
.withThrowable(e)
442-
.log(
443-
"adaptive allocations scaler: scaling [{}] to [{}] allocations failed.",
444-
deploymentId,
445-
newNumberOfAllocations
446-
);
447-
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
448-
.execute(
449-
() -> inferenceAuditor.warning(
450-
deploymentId,
451-
Strings.format(
452-
"adaptive allocations scaler: scaling [%s] to [%s] allocations failed.",
453-
deploymentId,
454-
newNumberOfAllocations
455-
)
456-
)
457-
);
458-
})
459-
);
418+
updateNumberOfAllocations(deploymentId, newNumberOfAllocations);
460419
}
461420
}
462421
}
422+
423+
public boolean maybeStartAllocation(TrainedModelAssignment assignment) {
424+
if (ScaleToZeroFeatureFlag.isEnabled()
425+
&& assignment.getAdaptiveAllocationsSettings() != null
426+
&& assignment.getAdaptiveAllocationsSettings().getEnabled() == Boolean.TRUE) {
427+
lastScaleUpTimesMillis.put(assignment.getDeploymentId(), System.currentTimeMillis());
428+
updateNumberOfAllocations(assignment.getDeploymentId(), 1);
429+
return true;
430+
}
431+
return false;
432+
}
433+
434+
private void updateNumberOfAllocations(String deploymentId, int numberOfAllocations) {
435+
UpdateTrainedModelDeploymentAction.Request updateRequest = new UpdateTrainedModelDeploymentAction.Request(deploymentId);
436+
updateRequest.setNumberOfAllocations(numberOfAllocations);
437+
updateRequest.setIsInternal(true);
438+
ClientHelper.executeAsyncWithOrigin(
439+
client,
440+
ClientHelper.ML_ORIGIN,
441+
UpdateTrainedModelDeploymentAction.INSTANCE,
442+
updateRequest,
443+
ActionListener.wrap(updateResponse -> {
444+
logger.info("adaptive allocations scaler: scaled [{}] to [{}] allocations.", deploymentId, numberOfAllocations);
445+
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
446+
.execute(
447+
() -> inferenceAuditor.info(
448+
deploymentId,
449+
Strings.format(
450+
"adaptive allocations scaler: scaled [%s] to [%s] allocations.",
451+
deploymentId,
452+
numberOfAllocations
453+
)
454+
)
455+
);
456+
}, e -> {
457+
logger.atLevel(Level.WARN)
458+
.withThrowable(e)
459+
.log("adaptive allocations scaler: scaling [{}] to [{}] allocations failed.", deploymentId, numberOfAllocations);
460+
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
461+
.execute(
462+
() -> inferenceAuditor.warning(
463+
deploymentId,
464+
Strings.format(
465+
"adaptive allocations scaler: scaling [%s] to [%s] allocations failed.",
466+
deploymentId,
467+
numberOfAllocations
468+
)
469+
)
470+
);
471+
})
472+
);
473+
}
463474
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.inference.adaptiveallocations;
9+
10+
import org.elasticsearch.common.util.FeatureFlag;
11+
12+
public class ScaleToZeroFeatureFlag {
13+
private ScaleToZeroFeatureFlag() {}
14+
15+
private static final FeatureFlag FEATURE_FLAG = new FeatureFlag("inference_scale_to_zero");
16+
17+
public static boolean isEnabled() {
18+
return FEATURE_FLAG.isEnabled();
19+
}
20+
}

0 commit comments

Comments
 (0)