Skip to content

Commit 0a5dfc0

Browse files
authored
[ML] Set Adaptive Allocations Scale to Zero to 24h (#128914)
Adaptive Allocations will now wait 24 hours to scale down to zero allocations (from one allocation). `xpack.ml.trained_models.adaptive_allocations.scale_to_zero_time` can be used to change this time from 24 hours to a minimum of 1 minute. `xpack.ml.trained_models.adaptive_allocations.scale_cooldown_time` can be used to change the time adaptive allocation scales between 1 and max allocations. Defaults to 5 minutes.
1 parent 9cfd293 commit 0a5dfc0

File tree

6 files changed

+106
-72
lines changed

6 files changed

+106
-72
lines changed

docs/changelog/128914.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128914
2+
summary: Make Adaptive Allocations Scale to Zero configurable and set default to 24h
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,33 @@ public void loadExtensions(ExtensionLoader loader) {
739739
Setting.Property.NodeScope
740740
);
741741

742+
/**
743+
* The time that has to pass after scaling up, before scaling down is allowed.
744+
* Note that the ML autoscaling has its own cooldown time to release the hardware.
745+
*/
746+
public static final Setting<TimeValue> SCALE_UP_COOLDOWN_TIME = Setting.timeSetting(
747+
"xpack.ml.trained_models.adaptive_allocations.scale_up_cooldown_time",
748+
TimeValue.timeValueMinutes(5),
749+
TimeValue.timeValueMinutes(1),
750+
Property.Dynamic,
751+
Setting.Property.NodeScope
752+
);
753+
754+
/**
755+
* The time interval without any requests that has to pass, before scaling down
756+
* to zero allocations (in case min_allocations = 0). After this time interval
757+
* without requests, the number of allocations is set to zero. When this time
758+
* interval hasn't passed, the minimum number of allocations will always be
759+
* larger than zero.
760+
*/
761+
public static final Setting<TimeValue> SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME = Setting.timeSetting(
762+
"xpack.ml.trained_models.adaptive_allocations.scale_to_zero_time",
763+
TimeValue.timeValueHours(24),
764+
TimeValue.timeValueMinutes(1),
765+
Property.Dynamic,
766+
Setting.Property.NodeScope
767+
);
768+
742769
/**
743770
* Each model deployment results in one or more entries in the cluster state
744771
* for the model allocations. In order to prevent the cluster state from
@@ -814,7 +841,9 @@ public List<Setting<?>> getSettings() {
814841
MAX_ML_NODE_SIZE,
815842
DELAYED_DATA_CHECK_FREQ,
816843
DUMMY_ENTITY_MEMORY,
817-
DUMMY_ENTITY_PROCESSORS
844+
DUMMY_ENTITY_PROCESSORS,
845+
SCALE_UP_COOLDOWN_TIME,
846+
SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME
818847
);
819848
}
820849

@@ -1300,7 +1329,8 @@ public Collection<?> createComponents(PluginServices services) {
13001329
client,
13011330
inferenceAuditor,
13021331
telemetryProvider.getMeterRegistry(),
1303-
machineLearningExtension.get().isNlpEnabled()
1332+
machineLearningExtension.get().isNlpEnabled(),
1333+
settings
13041334
);
13051335

13061336
MlInitializationService mlInitializationService = new MlInitializationService(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.common.Strings;
1313

14+
import java.util.function.Supplier;
15+
1416
/**
1517
* Processes measured requests counts and inference times and decides whether
1618
* the number of allocations should be scaled up or down.
@@ -33,7 +35,7 @@ public class AdaptiveAllocationsScaler {
3335
private final String deploymentId;
3436
private final KalmanFilter1d requestRateEstimator;
3537
private final KalmanFilter1d inferenceTimeEstimator;
36-
private final long scaleToZeroAfterNoRequestsSeconds;
38+
private final Supplier<Long> scaleToZeroAfterNoRequestsSeconds;
3739
private double timeWithoutRequestsSeconds;
3840

3941
private int numberOfAllocations;
@@ -46,7 +48,7 @@ public class AdaptiveAllocationsScaler {
4648
private Double lastMeasuredInferenceTime;
4749
private Long lastMeasuredQueueSize;
4850

49-
AdaptiveAllocationsScaler(String deploymentId, int numberOfAllocations, long scaleToZeroAfterNoRequestsSeconds) {
51+
AdaptiveAllocationsScaler(String deploymentId, int numberOfAllocations, Supplier<Long> scaleToZeroAfterNoRequestsSeconds) {
5052
this.deploymentId = deploymentId;
5153
this.scaleToZeroAfterNoRequestsSeconds = scaleToZeroAfterNoRequestsSeconds;
5254

@@ -173,7 +175,7 @@ Integer scale() {
173175
}
174176

175177
if ((minNumberOfAllocations == null || minNumberOfAllocations == 0)
176-
&& timeWithoutRequestsSeconds > scaleToZeroAfterNoRequestsSeconds) {
178+
&& timeWithoutRequestsSeconds > scaleToZeroAfterNoRequestsSeconds.get()) {
177179

178180
if (oldNumberOfAllocations != 0) {
179181
// avoid logging this message if there is no change

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.node.DiscoveryNode;
1919
import org.elasticsearch.cluster.service.ClusterService;
2020
import org.elasticsearch.common.Strings;
21+
import org.elasticsearch.common.settings.Settings;
2122
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2223
import org.elasticsearch.core.TimeValue;
2324
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
@@ -47,6 +48,7 @@
4748
import java.util.concurrent.ConcurrentHashMap;
4849
import java.util.concurrent.ConcurrentSkipListSet;
4950
import java.util.concurrent.atomic.AtomicBoolean;
51+
import java.util.concurrent.atomic.AtomicLong;
5052
import java.util.function.Function;
5153

5254
/**
@@ -182,25 +184,11 @@ Collection<DoubleWithAttributes> observeDouble(Function<AdaptiveAllocationsScale
182184
/**
183185
* The time interval between the adaptive allocations triggers.
184186
*/
185-
private static final int DEFAULT_TIME_INTERVAL_SECONDS = 10;
186-
/**
187-
* The time that has to pass after scaling up, before scaling down is allowed.
188-
* Note that the ML autoscaling has its own cooldown time to release the hardware.
189-
*/
190-
private static final long SCALE_UP_COOLDOWN_TIME_MILLIS = TimeValue.timeValueMinutes(5).getMillis();
191-
192-
/**
193-
* The time interval without any requests that has to pass, before scaling down
194-
* to zero allocations (in case min_allocations = 0). After this time interval
195-
* without requests, the number of allocations is set to zero. When this time
196-
* interval hasn't passed, the minimum number of allocations will always be
197-
* larger than zero.
198-
*/
199-
private static final long SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS = TimeValue.timeValueMinutes(15).getSeconds();
187+
private static final long DEFAULT_TIME_INTERVAL_SECONDS = 10;
200188

201189
private static final Logger logger = LogManager.getLogger(AdaptiveAllocationsScalerService.class);
202190

203-
private final int timeIntervalSeconds;
191+
private final long timeIntervalSeconds;
204192
private final ThreadPool threadPool;
205193
private final ClusterService clusterService;
206194
private final Client client;
@@ -214,8 +202,8 @@ Collection<DoubleWithAttributes> observeDouble(Function<AdaptiveAllocationsScale
214202
private final Map<String, Long> lastScaleUpTimesMillis;
215203
private volatile Scheduler.Cancellable cancellable;
216204
private final AtomicBoolean busy;
217-
private final long scaleToZeroAfterNoRequestsSeconds;
218-
private final long scaleUpCooldownTimeMillis;
205+
private final AtomicLong scaleToZeroAfterNoRequestsSeconds;
206+
private final AtomicLong scaleUpCooldownTimeMillis;
219207
private final Set<String> deploymentIdsWithInFlightScaleFromZeroRequests = new ConcurrentSkipListSet<>();
220208
private final Map<String, String> lastWarningMessages = new ConcurrentHashMap<>();
221209

@@ -225,7 +213,8 @@ public AdaptiveAllocationsScalerService(
225213
Client client,
226214
InferenceAuditor inferenceAuditor,
227215
MeterRegistry meterRegistry,
228-
boolean isNlpEnabled
216+
boolean isNlpEnabled,
217+
Settings settings
229218
) {
230219
this(
231220
threadPool,
@@ -235,9 +224,19 @@ public AdaptiveAllocationsScalerService(
235224
meterRegistry,
236225
isNlpEnabled,
237226
DEFAULT_TIME_INTERVAL_SECONDS,
238-
SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS,
239-
SCALE_UP_COOLDOWN_TIME_MILLIS
227+
new AtomicLong(MachineLearning.SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME.get(settings).getSeconds()),
228+
new AtomicLong(MachineLearning.SCALE_UP_COOLDOWN_TIME.get(settings).getMillis())
240229
);
230+
clusterService.getClusterSettings()
231+
.addSettingsUpdateConsumer(
232+
MachineLearning.SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME,
233+
timeInterval -> this.scaleToZeroAfterNoRequestsSeconds.set(timeInterval.getSeconds())
234+
);
235+
clusterService.getClusterSettings()
236+
.addSettingsUpdateConsumer(
237+
MachineLearning.SCALE_UP_COOLDOWN_TIME,
238+
timeInterval -> this.scaleUpCooldownTimeMillis.set(timeInterval.getMillis())
239+
);
241240
}
242241

243242
// visible for testing
@@ -248,9 +247,9 @@ public AdaptiveAllocationsScalerService(
248247
InferenceAuditor inferenceAuditor,
249248
MeterRegistry meterRegistry,
250249
boolean isNlpEnabled,
251-
int timeIntervalSeconds,
252-
long scaleToZeroAfterNoRequestsSeconds,
253-
long scaleUpCooldownTimeMillis
250+
long timeIntervalSeconds,
251+
AtomicLong scaleToZeroAfterNoRequestsSeconds,
252+
AtomicLong scaleUpCooldownTimeMillis
254253
) {
255254
this.threadPool = threadPool;
256255
this.clusterService = clusterService;
@@ -314,7 +313,7 @@ private synchronized void updateAutoscalers(ClusterState state) {
314313
key -> new AdaptiveAllocationsScaler(
315314
assignment.getDeploymentId(),
316315
assignment.totalTargetAllocations(),
317-
scaleToZeroAfterNoRequestsSeconds
316+
scaleToZeroAfterNoRequestsSeconds::get
318317
)
319318
);
320319
adaptiveAllocationsScaler.setMinMaxNumberOfAllocations(
@@ -331,7 +330,7 @@ private synchronized void updateAutoscalers(ClusterState state) {
331330

332331
private synchronized void startScheduling() {
333332
if (cancellable == null) {
334-
logger.debug("Starting ML adaptive allocations scaler");
333+
logger.debug("Starting ML adaptive allocations scaler at interval [{}].", timeIntervalSeconds);
335334
try {
336335
cancellable = threadPool.scheduleWithFixedDelay(
337336
this::trigger,
@@ -425,7 +424,7 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo
425424
if (nodeStats.getRoutingState() != null && nodeStats.getRoutingState().getState() == RoutingState.STARTING) {
426425
hasRecentObservedScaleUp.add(deploymentId);
427426
}
428-
if (nodeStats.getStartTime() != null && now < nodeStats.getStartTime().toEpochMilli() + scaleUpCooldownTimeMillis) {
427+
if (nodeStats.getStartTime() != null && now < nodeStats.getStartTime().toEpochMilli() + scaleUpCooldownTimeMillis.get()) {
429428
hasRecentObservedScaleUp.add(deploymentId);
430429
}
431430
}
@@ -446,7 +445,7 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo
446445
// hasRecentScaleUp indicates whether this service has recently scaled up the deployment.
447446
// hasRecentObservedScaleUp indicates whether a deployment recently has started,
448447
// potentially triggered by another node.
449-
boolean hasRecentScaleUp = lastScaleUpTimeMillis != null && now < lastScaleUpTimeMillis + scaleUpCooldownTimeMillis;
448+
boolean hasRecentScaleUp = lastScaleUpTimeMillis != null && now < lastScaleUpTimeMillis + scaleUpCooldownTimeMillis.get();
450449
if (newNumberOfAllocations < numberOfAllocations.get(deploymentId)
451450
&& (hasRecentScaleUp || hasRecentObservedScaleUp.contains(deploymentId))) {
452451
logger.debug("adaptive allocations scaler: skipping scaling down [{}] because of recent scaleup.", deploymentId);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.Set;
4545
import java.util.concurrent.CountDownLatch;
4646
import java.util.concurrent.atomic.AtomicBoolean;
47+
import java.util.concurrent.atomic.AtomicLong;
4748

4849
import static org.mockito.ArgumentMatchers.any;
4950
import static org.mockito.ArgumentMatchers.eq;
@@ -58,6 +59,11 @@
5859

5960
public class AdaptiveAllocationsScalerServiceTests extends ESTestCase {
6061

62+
private static final long ONE_SECOND = 1L;
63+
private static final AtomicLong ATOMIC_SECOND = new AtomicLong(1);
64+
private static final AtomicLong SIXTY_SECONDS = new AtomicLong(60);
65+
private static final AtomicLong TWO_THOUSAND_MILLISECONDS = new AtomicLong(2_000);
66+
private static final AtomicLong SIXTY_THOUSAND_MILLISECONDS = new AtomicLong(60_000);
6167
private TestThreadPool threadPool;
6268
private ClusterService clusterService;
6369
private Client client;
@@ -175,9 +181,9 @@ public void test_scaleUp() {
175181
inferenceAuditor,
176182
meterRegistry,
177183
true,
178-
1,
179-
60,
180-
60_000
184+
ONE_SECOND,
185+
SIXTY_SECONDS,
186+
SIXTY_THOUSAND_MILLISECONDS
181187
);
182188
service.start();
183189

@@ -269,9 +275,9 @@ public void test_scaleDownToZero_whenNoRequests() {
269275
inferenceAuditor,
270276
meterRegistry,
271277
true,
272-
1,
273-
1,
274-
2_000
278+
ONE_SECOND,
279+
ATOMIC_SECOND,
280+
TWO_THOUSAND_MILLISECONDS
275281
);
276282
service.start();
277283

@@ -336,9 +342,9 @@ public void test_dontScale_whenNotStarted() {
336342
inferenceAuditor,
337343
meterRegistry,
338344
true,
339-
1,
340-
1,
341-
2_000
345+
ONE_SECOND,
346+
ATOMIC_SECOND,
347+
TWO_THOUSAND_MILLISECONDS
342348
);
343349
service.start();
344350

@@ -392,9 +398,9 @@ public void test_noScaleDownToZero_whenRecentlyScaledUpByOtherNode() {
392398
inferenceAuditor,
393399
meterRegistry,
394400
true,
395-
1,
396-
1,
397-
2_000
401+
ONE_SECOND,
402+
ATOMIC_SECOND,
403+
TWO_THOUSAND_MILLISECONDS
398404
);
399405
service.start();
400406

@@ -477,9 +483,9 @@ public void testMaybeStartAllocation() {
477483
inferenceAuditor,
478484
meterRegistry,
479485
true,
480-
1,
481-
60,
482-
60_000
486+
ONE_SECOND,
487+
SIXTY_SECONDS,
488+
TWO_THOUSAND_MILLISECONDS
483489
);
484490

485491
when(client.threadPool()).thenReturn(threadPool);
@@ -512,9 +518,9 @@ public void testMaybeStartAllocation_BlocksMultipleRequests() throws Exception {
512518
inferenceAuditor,
513519
meterRegistry,
514520
true,
515-
1,
516-
60,
517-
60_000
521+
ONE_SECOND,
522+
SIXTY_SECONDS,
523+
SIXTY_THOUSAND_MILLISECONDS
518524
);
519525

520526
var latch = new CountDownLatch(1);

0 commit comments

Comments
 (0)