Skip to content

Commit 31a1b9a

Browse files
committed
[ML] Set Adaptive Allocations Scale to Zero to 24h
Adaptive Allocations will now wait 24 hours to scale down to zero allocations (from one allocation). `xpack.ml.trained_models.adaptive_allocations.scale_to_zero_time` can be used to change this time from 24 hours to a minimum of 15 minutes. `xpack.ml.trained_models.adaptive_allocations.scale_cooldown_time` can be used to change the time adaptive allocation scales between 1 and max allocations. Defaults to 5 minutes. `xpack.ml.trained_models.adaptive_allocations.trigger_time` can be used to change the time adaptive allocations tries to scale. Defaults to 15 minutes.
1 parent 46e5f1c commit 31a1b9a

File tree

5 files changed

+109
-76
lines changed

5 files changed

+109
-76
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,44 @@ public void loadExtensions(ExtensionLoader loader) {
739739
Setting.Property.NodeScope
740740
);
741741

742+
/**
743+
* The time interval between the adaptive allocations triggers.
744+
* This setting requires a reboot to take effect, as it is only consumed during startup.
745+
*/
746+
public static final Setting<TimeValue> DEFAULT_TIME_INTERVAL = Setting.timeSetting(
747+
"xpack.ml.trained_models.adaptive_allocations.trigger_time",
748+
TimeValue.timeValueSeconds(10),
749+
TimeValue.timeValueSeconds(10),
750+
Setting.Property.NodeScope
751+
);
752+
753+
/**
754+
* The time that has to pass after scaling up, before scaling down is allowed.
755+
* Note that the ML autoscaling has its own cooldown time to release the hardware.
756+
*/
757+
public static final Setting<TimeValue> SCALE_UP_COOLDOWN_TIME = Setting.timeSetting(
758+
"xpack.ml.trained_models.adaptive_allocations.scale_up_cooldown_time",
759+
TimeValue.timeValueMinutes(5),
760+
TimeValue.timeValueMinutes(5),
761+
Property.Dynamic,
762+
Setting.Property.NodeScope
763+
);
764+
765+
/**
766+
* The time interval without any requests that has to pass, before scaling down
767+
* to zero allocations (in case min_allocations = 0). After this time interval
768+
* without requests, the number of allocations is set to zero. When this time
769+
* interval hasn't passed, the minimum number of allocations will always be
770+
* larger than zero.
771+
*/
772+
public static final Setting<TimeValue> SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME = Setting.timeSetting(
773+
"xpack.ml.trained_models.adaptive_allocations.scale_to_zero_time",
774+
TimeValue.timeValueHours(24),
775+
TimeValue.timeValueMinutes(15),
776+
Property.Dynamic,
777+
Setting.Property.NodeScope
778+
);
779+
742780
/**
743781
* Each model deployment results in one or more entries in the cluster state
744782
* for the model allocations. In order to prevent the cluster state from
@@ -1300,7 +1338,8 @@ public Collection<?> createComponents(PluginServices services) {
13001338
client,
13011339
inferenceAuditor,
13021340
telemetryProvider.getMeterRegistry(),
1303-
machineLearningExtension.get().isNlpEnabled()
1341+
machineLearningExtension.get().isNlpEnabled(),
1342+
settings
13041343
);
13051344

13061345
MlInitializationService mlInitializationService = new MlInitializationService(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.common.Strings;
1313

14+
import java.util.function.Supplier;
15+
1416
/**
1517
* Processes measured requests counts and inference times and decides whether
1618
* the number of allocations should be scaled up or down.
@@ -33,7 +35,7 @@ public class AdaptiveAllocationsScaler {
3335
private final String deploymentId;
3436
private final KalmanFilter1d requestRateEstimator;
3537
private final KalmanFilter1d inferenceTimeEstimator;
36-
private final long scaleToZeroAfterNoRequestsSeconds;
38+
private final Supplier<Long> scaleToZeroAfterNoRequestsSeconds;
3739
private double timeWithoutRequestsSeconds;
3840

3941
private int numberOfAllocations;
@@ -46,7 +48,7 @@ public class AdaptiveAllocationsScaler {
4648
private Double lastMeasuredInferenceTime;
4749
private Long lastMeasuredQueueSize;
4850

49-
AdaptiveAllocationsScaler(String deploymentId, int numberOfAllocations, long scaleToZeroAfterNoRequestsSeconds) {
51+
AdaptiveAllocationsScaler(String deploymentId, int numberOfAllocations, Supplier<Long> scaleToZeroAfterNoRequestsSeconds) {
5052
this.deploymentId = deploymentId;
5153
this.scaleToZeroAfterNoRequestsSeconds = scaleToZeroAfterNoRequestsSeconds;
5254

@@ -173,7 +175,7 @@ Integer scale() {
173175
}
174176

175177
if ((minNumberOfAllocations == null || minNumberOfAllocations == 0)
176-
&& timeWithoutRequestsSeconds > scaleToZeroAfterNoRequestsSeconds) {
178+
&& timeWithoutRequestsSeconds > scaleToZeroAfterNoRequestsSeconds.get()) {
177179

178180
if (oldNumberOfAllocations != 0) {
179181
// avoid logging this message if there is no change

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java

Lines changed: 27 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.node.DiscoveryNode;
1919
import org.elasticsearch.cluster.service.ClusterService;
2020
import org.elasticsearch.common.Strings;
21+
import org.elasticsearch.common.settings.Settings;
2122
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2223
import org.elasticsearch.core.TimeValue;
2324
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
@@ -47,6 +48,7 @@
4748
import java.util.concurrent.ConcurrentHashMap;
4849
import java.util.concurrent.ConcurrentSkipListSet;
4950
import java.util.concurrent.atomic.AtomicBoolean;
51+
import java.util.concurrent.atomic.AtomicLong;
5052
import java.util.function.Function;
5153

5254
/**
@@ -179,28 +181,9 @@ Collection<DoubleWithAttributes> observeDouble(Function<AdaptiveAllocationsScale
179181
}
180182
}
181183

182-
/**
183-
* The time interval between the adaptive allocations triggers.
184-
*/
185-
private static final int DEFAULT_TIME_INTERVAL_SECONDS = 10;
186-
/**
187-
* The time that has to pass after scaling up, before scaling down is allowed.
188-
* Note that the ML autoscaling has its own cooldown time to release the hardware.
189-
*/
190-
private static final long SCALE_UP_COOLDOWN_TIME_MILLIS = TimeValue.timeValueMinutes(5).getMillis();
191-
192-
/**
193-
* The time interval without any requests that has to pass, before scaling down
194-
* to zero allocations (in case min_allocations = 0). After this time interval
195-
* without requests, the number of allocations is set to zero. When this time
196-
* interval hasn't passed, the minimum number of allocations will always be
197-
* larger than zero.
198-
*/
199-
private static final long SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS = TimeValue.timeValueMinutes(15).getSeconds();
200-
201184
private static final Logger logger = LogManager.getLogger(AdaptiveAllocationsScalerService.class);
202185

203-
private final int timeIntervalSeconds;
186+
private final long timeIntervalSeconds;
204187
private final ThreadPool threadPool;
205188
private final ClusterService clusterService;
206189
private final Client client;
@@ -214,8 +197,8 @@ Collection<DoubleWithAttributes> observeDouble(Function<AdaptiveAllocationsScale
214197
private final Map<String, Long> lastScaleUpTimesMillis;
215198
private volatile Scheduler.Cancellable cancellable;
216199
private final AtomicBoolean busy;
217-
private final long scaleToZeroAfterNoRequestsSeconds;
218-
private final long scaleUpCooldownTimeMillis;
200+
private final AtomicLong scaleToZeroAfterNoRequestsSeconds;
201+
private final AtomicLong scaleUpCooldownTimeMillis;
219202
private final Set<String> deploymentIdsWithInFlightScaleFromZeroRequests = new ConcurrentSkipListSet<>();
220203
private final Map<String, String> lastWarningMessages = new ConcurrentHashMap<>();
221204

@@ -225,7 +208,8 @@ public AdaptiveAllocationsScalerService(
225208
Client client,
226209
InferenceAuditor inferenceAuditor,
227210
MeterRegistry meterRegistry,
228-
boolean isNlpEnabled
211+
boolean isNlpEnabled,
212+
Settings settings
229213
) {
230214
this(
231215
threadPool,
@@ -234,10 +218,20 @@ public AdaptiveAllocationsScalerService(
234218
inferenceAuditor,
235219
meterRegistry,
236220
isNlpEnabled,
237-
DEFAULT_TIME_INTERVAL_SECONDS,
238-
SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS,
239-
SCALE_UP_COOLDOWN_TIME_MILLIS
221+
MachineLearning.DEFAULT_TIME_INTERVAL.get(settings).getSeconds(),
222+
new AtomicLong(MachineLearning.SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME.get(settings).getSeconds()),
223+
new AtomicLong(MachineLearning.SCALE_UP_COOLDOWN_TIME.get(settings).getMillis())
240224
);
225+
clusterService.getClusterSettings()
226+
.addSettingsUpdateConsumer(
227+
MachineLearning.SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME,
228+
timeInterval -> this.scaleToZeroAfterNoRequestsSeconds.set(timeInterval.getSeconds())
229+
);
230+
clusterService.getClusterSettings()
231+
.addSettingsUpdateConsumer(
232+
MachineLearning.SCALE_UP_COOLDOWN_TIME,
233+
timeInterval -> this.scaleUpCooldownTimeMillis.set(timeInterval.getMillis())
234+
);
241235
}
242236

243237
// visible for testing
@@ -248,9 +242,9 @@ public AdaptiveAllocationsScalerService(
248242
InferenceAuditor inferenceAuditor,
249243
MeterRegistry meterRegistry,
250244
boolean isNlpEnabled,
251-
int timeIntervalSeconds,
252-
long scaleToZeroAfterNoRequestsSeconds,
253-
long scaleUpCooldownTimeMillis
245+
long timeIntervalSeconds,
246+
AtomicLong scaleToZeroAfterNoRequestsSeconds,
247+
AtomicLong scaleUpCooldownTimeMillis
254248
) {
255249
this.threadPool = threadPool;
256250
this.clusterService = clusterService;
@@ -314,7 +308,7 @@ private synchronized void updateAutoscalers(ClusterState state) {
314308
key -> new AdaptiveAllocationsScaler(
315309
assignment.getDeploymentId(),
316310
assignment.totalTargetAllocations(),
317-
scaleToZeroAfterNoRequestsSeconds
311+
scaleToZeroAfterNoRequestsSeconds::get
318312
)
319313
);
320314
adaptiveAllocationsScaler.setMinMaxNumberOfAllocations(
@@ -331,7 +325,7 @@ private synchronized void updateAutoscalers(ClusterState state) {
331325

332326
private synchronized void startScheduling() {
333327
if (cancellable == null) {
334-
logger.debug("Starting ML adaptive allocations scaler");
328+
logger.debug("Starting ML adaptive allocations scaler at interval [{}].", timeIntervalSeconds);
335329
try {
336330
cancellable = threadPool.scheduleWithFixedDelay(
337331
this::trigger,
@@ -425,7 +419,7 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo
425419
if (nodeStats.getRoutingState() != null && nodeStats.getRoutingState().getState() == RoutingState.STARTING) {
426420
hasRecentObservedScaleUp.add(deploymentId);
427421
}
428-
if (nodeStats.getStartTime() != null && now < nodeStats.getStartTime().toEpochMilli() + scaleUpCooldownTimeMillis) {
422+
if (nodeStats.getStartTime() != null && now < nodeStats.getStartTime().toEpochMilli() + scaleUpCooldownTimeMillis.get()) {
429423
hasRecentObservedScaleUp.add(deploymentId);
430424
}
431425
}
@@ -446,7 +440,7 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo
446440
// hasRecentScaleUp indicates whether this service has recently scaled up the deployment.
447441
// hasRecentObservedScaleUp indicates whether a deployment recently has started,
448442
// potentially triggered by another node.
449-
boolean hasRecentScaleUp = lastScaleUpTimeMillis != null && now < lastScaleUpTimeMillis + scaleUpCooldownTimeMillis;
443+
boolean hasRecentScaleUp = lastScaleUpTimeMillis != null && now < lastScaleUpTimeMillis + scaleUpCooldownTimeMillis.get();
450444
if (newNumberOfAllocations < numberOfAllocations.get(deploymentId)
451445
&& (hasRecentScaleUp || hasRecentObservedScaleUp.contains(deploymentId))) {
452446
logger.debug("adaptive allocations scaler: skipping scaling down [{}] because of recent scaleup.", deploymentId);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.Set;
4545
import java.util.concurrent.CountDownLatch;
4646
import java.util.concurrent.atomic.AtomicBoolean;
47+
import java.util.concurrent.atomic.AtomicLong;
4748

4849
import static org.mockito.ArgumentMatchers.any;
4950
import static org.mockito.ArgumentMatchers.eq;
@@ -58,6 +59,11 @@
5859

5960
public class AdaptiveAllocationsScalerServiceTests extends ESTestCase {
6061

62+
private static final long ONE_SECOND = 1L;
63+
private static final AtomicLong ATOMIC_SECOND = new AtomicLong(1);
64+
private static final AtomicLong SIXTY_SECONDS = new AtomicLong(60);
65+
private static final AtomicLong TWO_THOUSAND_MILLISECONDS = new AtomicLong(2_000);
66+
private static final AtomicLong SIXTY_THOUSAND_MILLISECONDS = new AtomicLong(60_000);
6167
private TestThreadPool threadPool;
6268
private ClusterService clusterService;
6369
private Client client;
@@ -175,9 +181,9 @@ public void test_scaleUp() {
175181
inferenceAuditor,
176182
meterRegistry,
177183
true,
178-
1,
179-
60,
180-
60_000
184+
ONE_SECOND,
185+
SIXTY_SECONDS,
186+
SIXTY_THOUSAND_MILLISECONDS
181187
);
182188
service.start();
183189

@@ -269,9 +275,9 @@ public void test_scaleDownToZero_whenNoRequests() {
269275
inferenceAuditor,
270276
meterRegistry,
271277
true,
272-
1,
273-
1,
274-
2_000
278+
ONE_SECOND,
279+
ATOMIC_SECOND,
280+
TWO_THOUSAND_MILLISECONDS
275281
);
276282
service.start();
277283

@@ -336,9 +342,9 @@ public void test_dontScale_whenNotStarted() {
336342
inferenceAuditor,
337343
meterRegistry,
338344
true,
339-
1,
340-
1,
341-
2_000
345+
ONE_SECOND,
346+
ATOMIC_SECOND,
347+
TWO_THOUSAND_MILLISECONDS
342348
);
343349
service.start();
344350

@@ -392,9 +398,9 @@ public void test_noScaleDownToZero_whenRecentlyScaledUpByOtherNode() {
392398
inferenceAuditor,
393399
meterRegistry,
394400
true,
395-
1,
396-
1,
397-
2_000
401+
ONE_SECOND,
402+
ATOMIC_SECOND,
403+
TWO_THOUSAND_MILLISECONDS
398404
);
399405
service.start();
400406

@@ -477,9 +483,9 @@ public void testMaybeStartAllocation() {
477483
inferenceAuditor,
478484
meterRegistry,
479485
true,
480-
1,
481-
60,
482-
60_000
486+
ONE_SECOND,
487+
SIXTY_SECONDS,
488+
TWO_THOUSAND_MILLISECONDS
483489
);
484490

485491
when(client.threadPool()).thenReturn(threadPool);
@@ -512,9 +518,9 @@ public void testMaybeStartAllocation_BlocksMultipleRequests() throws Exception {
512518
inferenceAuditor,
513519
meterRegistry,
514520
true,
515-
1,
516-
60,
517-
60_000
521+
ONE_SECOND,
522+
SIXTY_SECONDS,
523+
SIXTY_THOUSAND_MILLISECONDS
518524
);
519525

520526
var latch = new CountDownLatch(1);

0 commit comments

Comments
 (0)