diff --git a/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java b/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java index cb98f9de31ff5..7df791bf11559 100644 --- a/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java +++ b/test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java @@ -18,7 +18,8 @@ public enum FeatureFlag { TIME_SERIES_MODE("es.index_mode_feature_flag_registered=true", Version.fromString("8.0.0"), null), FAILURE_STORE_ENABLED("es.failure_store_feature_flag_enabled=true", Version.fromString("8.12.0"), null), - CHUNKING_SETTINGS_ENABLED("es.inference_chunking_settings_feature_flag_enabled=true", Version.fromString("8.16.0"), null); + CHUNKING_SETTINGS_ENABLED("es.inference_chunking_settings_feature_flag_enabled=true", Version.fromString("8.16.0"), null), + INFERENCE_SCALE_TO_ZERO("es.inference_scale_to_zero_feature_flag_enabled=true", Version.fromString("8.16.0"), null); public final String systemProperty; public final Version from; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java index 7fca223b2ee7e..cb578fdb157de 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java @@ -161,7 +161,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public ActionRequestValidationException validate() { ActionRequestValidationException validationException = new ActionRequestValidationException(); if (numberOfAllocations != null) { - if (numberOfAllocations < 1) { + if (numberOfAllocations < 0 || (isInternal == false && numberOfAllocations == 0)) { validationException.addValidationError("[" + NUMBER_OF_ALLOCATIONS + "] must be a positive integer"); } if (isInternal == false diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 1bc867a849090..f8a590a23a2c1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -327,6 +327,7 @@ import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult; import org.elasticsearch.xpack.ml.inference.TrainedModelStatsService; +import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService; import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentClusterService; import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentService; import org.elasticsearch.xpack.ml.inference.deployment.DeploymentManager; @@ -1285,13 +1286,21 @@ public Collection createComponents(PluginServices services) { new MlAutoscalingDeciderService(memoryTracker, settings, nodeAvailabilityZoneMapper, clusterService) ); - MlInitializationService mlInitializationService = new MlInitializationService( - settings, + AdaptiveAllocationsScalerService adaptiveAllocationsScalerService = new AdaptiveAllocationsScalerService( threadPool, clusterService, client, inferenceAuditor, telemetryProvider.getMeterRegistry(), + machineLearningExtension.get().isNlpEnabled() + ); + + MlInitializationService mlInitializationService = new MlInitializationService( + settings, + threadPool, + clusterService, + client, + adaptiveAllocationsScalerService, mlAssignmentNotifier, machineLearningExtension.get().isAnomalyDetectionEnabled(), machineLearningExtension.get().isDataFrameAnalyticsEnabled(), @@ -1317,6 +1326,7 @@ public Collection createComponents(PluginServices services) { jobManagerHolder, autodetectProcessManager, mlInitializationService, + adaptiveAllocationsScalerService, jobDataCountsPersister, datafeedRunner, datafeedManager, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 98dfb13d9e3e4..45a71a80de077 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -30,11 +30,9 @@ import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService; -import org.elasticsearch.xpack.ml.notifications.InferenceAuditor; import java.util.Collections; import java.util.Map; @@ -67,8 +65,7 @@ public final class MlInitializationService implements ClusterStateListener { ThreadPool threadPool, ClusterService clusterService, Client client, - InferenceAuditor inferenceAuditor, - MeterRegistry meterRegistry, + AdaptiveAllocationsScalerService adaptiveAllocationsScalerService, MlAssignmentNotifier mlAssignmentNotifier, boolean isAnomalyDetectionEnabled, boolean isDataFrameAnalyticsEnabled, @@ -88,7 +85,7 @@ public final class MlInitializationService implements ClusterStateListener { isDataFrameAnalyticsEnabled, isNlpEnabled ), - new AdaptiveAllocationsScalerService(threadPool, clusterService, client, inferenceAuditor, meterRegistry, isNlpEnabled), + adaptiveAllocationsScalerService, clusterService ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExternalInferModelAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExternalInferModelAction.java index 545dcfbefecec..5603e9c4dca8d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExternalInferModelAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExternalInferModelAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.InferModelAction; +import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService; import org.elasticsearch.xpack.ml.inference.loadingservice.ModelLoadingService; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; @@ -25,7 +26,8 @@ public TransportExternalInferModelAction( Client client, ClusterService clusterService, XPackLicenseState licenseState, - TrainedModelProvider trainedModelProvider + TrainedModelProvider trainedModelProvider, + AdaptiveAllocationsScalerService adaptiveAllocationsScalerService ) { super( InferModelAction.EXTERNAL_NAME, @@ -35,7 +37,8 @@ public TransportExternalInferModelAction( client, clusterService, licenseState, - trainedModelProvider + trainedModelProvider, + adaptiveAllocationsScalerService ); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportInternalInferModelAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportInternalInferModelAction.java index 0c4064348b3f6..b69f8c7d62eb2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportInternalInferModelAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportInternalInferModelAction.java @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.core.ml.inference.results.ErrorInferenceResults; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService; import org.elasticsearch.xpack.ml.inference.loadingservice.LocalModel; import org.elasticsearch.xpack.ml.inference.loadingservice.ModelLoadingService; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; @@ -66,6 +67,7 @@ public class TransportInternalInferModelAction extends HandledTransportAction format("[%s] model deployment not allocated to any node", assignment.getDeploymentId())); - listener.onFailure( - ExceptionsHelper.conflictStatusException("Trained model deployment [" + request.getId() + "] is not allocated to any nodes") - ); + String message = "Trained model deployment [" + request.getId() + "] is not allocated to any nodes"; + boolean starting = adaptiveAllocationsScalerService.maybeStartAllocation(assignment); + if (starting) { + message += "; starting deployment of one allocation"; + } + logger.debug(message); + listener.onFailure(ExceptionsHelper.conflictStatusException(message)); return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java index 044556d1b30ac..05e7202b8efe9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.Strings; +import org.elasticsearch.core.TimeValue; /** * Processes measured requests counts and inference times and decides whether @@ -21,6 +22,12 @@ public class AdaptiveAllocationsScaler { static final double SCALE_UP_THRESHOLD = 0.9; private static final double SCALE_DOWN_THRESHOLD = 0.85; + /** + * The time interval without any requests that has to pass, before scaling down + * to zero allocations (in case min_allocations = 0). + */ + private static final long SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS = TimeValue.timeValueMinutes(15).getSeconds(); + /** * If the max_number_of_allocations is not set, use this value for now to prevent scaling up * to high numbers due to possible bugs or unexpected behaviour in the scaler. @@ -33,6 +40,7 @@ public class AdaptiveAllocationsScaler { private final String deploymentId; private final KalmanFilter1d requestRateEstimator; private final KalmanFilter1d inferenceTimeEstimator; + private double timeWithoutRequestsSeconds; private int numberOfAllocations; private int neededNumberOfAllocations; @@ -55,6 +63,7 @@ public class AdaptiveAllocationsScaler { // the number of allocations changes, which is passed explicitly to the estimator. requestRateEstimator = new KalmanFilter1d(deploymentId + ":rate", 100, true); inferenceTimeEstimator = new KalmanFilter1d(deploymentId + ":time", 100, false); + timeWithoutRequestsSeconds = 0.0; this.numberOfAllocations = numberOfAllocations; neededNumberOfAllocations = numberOfAllocations; minNumberOfAllocations = null; @@ -73,6 +82,11 @@ void setMinMaxNumberOfAllocations(Integer minNumberOfAllocations, Integer maxNum void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSeconds, int numberOfAllocations) { lastMeasuredQueueSize = stats.pendingCount(); + if (stats.requestCount() > 0) { + timeWithoutRequestsSeconds = 0.0; + } else { + timeWithoutRequestsSeconds += timeIntervalSeconds; + } // The request rate (per second) is the request count divided by the time. // Assuming a Poisson process for the requests, the variance in the request @@ -145,7 +159,7 @@ Integer scale() { numberOfAllocations--; } - this.neededNumberOfAllocations = numberOfAllocations; + neededNumberOfAllocations = numberOfAllocations; if (maxNumberOfAllocations == null) { numberOfAllocations = Math.min(numberOfAllocations, MAX_NUMBER_OF_ALLOCATIONS_SAFEGUARD); @@ -156,6 +170,13 @@ Integer scale() { if (maxNumberOfAllocations != null) { numberOfAllocations = Math.min(numberOfAllocations, maxNumberOfAllocations); } + if (ScaleToZeroFeatureFlag.isEnabled() + && (minNumberOfAllocations == null || minNumberOfAllocations == 0) + && timeWithoutRequestsSeconds > SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS) { + logger.debug("[{}] adaptive allocations scaler: scaling down to zero, because of no requests.", deploymentId); + numberOfAllocations = 0; + neededNumberOfAllocations = 0; + } if (numberOfAllocations != oldNumberOfAllocations) { logger.debug( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java index bbe90f769818b..775279a6b2553 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java @@ -415,49 +415,60 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo if (newNumberOfAllocations > numberOfAllocations.get(deploymentId)) { lastScaleUpTimesMillis.put(deploymentId, now); } - UpdateTrainedModelDeploymentAction.Request updateRequest = new UpdateTrainedModelDeploymentAction.Request(deploymentId); - updateRequest.setNumberOfAllocations(newNumberOfAllocations); - updateRequest.setIsInternal(true); - ClientHelper.executeAsyncWithOrigin( - client, - ClientHelper.ML_ORIGIN, - UpdateTrainedModelDeploymentAction.INSTANCE, - updateRequest, - ActionListener.wrap(updateResponse -> { - logger.info("adaptive allocations scaler: scaled [{}] to [{}] allocations.", deploymentId, newNumberOfAllocations); - threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME) - .execute( - () -> inferenceAuditor.info( - deploymentId, - Strings.format( - "adaptive allocations scaler: scaled [%s] to [%s] allocations.", - deploymentId, - newNumberOfAllocations - ) - ) - ); - }, e -> { - logger.atLevel(Level.WARN) - .withThrowable(e) - .log( - "adaptive allocations scaler: scaling [{}] to [{}] allocations failed.", - deploymentId, - newNumberOfAllocations - ); - threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME) - .execute( - () -> inferenceAuditor.warning( - deploymentId, - Strings.format( - "adaptive allocations scaler: scaling [%s] to [%s] allocations failed.", - deploymentId, - newNumberOfAllocations - ) - ) - ); - }) - ); + updateNumberOfAllocations(deploymentId, newNumberOfAllocations); } } } + + public boolean maybeStartAllocation(TrainedModelAssignment assignment) { + if (ScaleToZeroFeatureFlag.isEnabled() + && assignment.getAdaptiveAllocationsSettings() != null + && assignment.getAdaptiveAllocationsSettings().getEnabled() == Boolean.TRUE) { + lastScaleUpTimesMillis.put(assignment.getDeploymentId(), System.currentTimeMillis()); + updateNumberOfAllocations(assignment.getDeploymentId(), 1); + return true; + } + return false; + } + + private void updateNumberOfAllocations(String deploymentId, int numberOfAllocations) { + UpdateTrainedModelDeploymentAction.Request updateRequest = new UpdateTrainedModelDeploymentAction.Request(deploymentId); + updateRequest.setNumberOfAllocations(numberOfAllocations); + updateRequest.setIsInternal(true); + ClientHelper.executeAsyncWithOrigin( + client, + ClientHelper.ML_ORIGIN, + UpdateTrainedModelDeploymentAction.INSTANCE, + updateRequest, + ActionListener.wrap(updateResponse -> { + logger.info("adaptive allocations scaler: scaled [{}] to [{}] allocations.", deploymentId, numberOfAllocations); + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME) + .execute( + () -> inferenceAuditor.info( + deploymentId, + Strings.format( + "adaptive allocations scaler: scaled [%s] to [%s] allocations.", + deploymentId, + numberOfAllocations + ) + ) + ); + }, e -> { + logger.atLevel(Level.WARN) + .withThrowable(e) + .log("adaptive allocations scaler: scaling [{}] to [{}] allocations failed.", deploymentId, numberOfAllocations); + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME) + .execute( + () -> inferenceAuditor.warning( + deploymentId, + Strings.format( + "adaptive allocations scaler: scaling [%s] to [%s] allocations failed.", + deploymentId, + numberOfAllocations + ) + ) + ); + }) + ); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/ScaleToZeroFeatureFlag.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/ScaleToZeroFeatureFlag.java new file mode 100644 index 0000000000000..072b8c5593c93 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/ScaleToZeroFeatureFlag.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.inference.adaptiveallocations; + +import org.elasticsearch.common.util.FeatureFlag; + +public class ScaleToZeroFeatureFlag { + private ScaleToZeroFeatureFlag() {} + + private static final FeatureFlag FEATURE_FLAG = new FeatureFlag("inference_scale_to_zero"); + + public static boolean isEnabled() { + return FEATURE_FLAG.isEnabled(); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index a5b9597886e15..80c957ecb7a09 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -17,11 +17,9 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; -import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService; -import org.elasticsearch.xpack.ml.notifications.InferenceAuditor; import org.junit.Before; import java.util.Map; @@ -40,8 +38,7 @@ public class MlInitializationServiceTests extends ESTestCase { private ThreadPool threadPool; private ClusterService clusterService; private Client client; - private InferenceAuditor inferenceAuditor; - private MeterRegistry meterRegistry; + private AdaptiveAllocationsScalerService adaptiveAllocationsScalerService; private MlAssignmentNotifier mlAssignmentNotifier; @Before @@ -50,8 +47,7 @@ public void setUpMocks() { threadPool = deterministicTaskQueue.getThreadPool(); clusterService = mock(ClusterService.class); client = mock(Client.class); - inferenceAuditor = mock(InferenceAuditor.class); - meterRegistry = mock(MeterRegistry.class); + adaptiveAllocationsScalerService = mock(AdaptiveAllocationsScalerService.class); mlAssignmentNotifier = mock(MlAssignmentNotifier.class); when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME); @@ -77,8 +73,7 @@ public void testInitialize() { threadPool, clusterService, client, - inferenceAuditor, - meterRegistry, + adaptiveAllocationsScalerService, mlAssignmentNotifier, true, true, @@ -94,8 +89,7 @@ public void testInitialize_noMasterNode() { threadPool, clusterService, client, - inferenceAuditor, - meterRegistry, + adaptiveAllocationsScalerService, mlAssignmentNotifier, true, true, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerTests.java index 08097357725d0..7d98aaf67a7f3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerTests.java @@ -146,4 +146,49 @@ public void testAutoscaling_maxAllocationsSafeguard() { adaptiveAllocationsScaler.setMinMaxNumberOfAllocations(2, 77); assertThat(adaptiveAllocationsScaler.scale(), equalTo(77)); } + + public void testAutoscaling_scaleDownToZeroAllocations() { + assumeTrue("Should only run if adaptive allocations feature flag is enabled", ScaleToZeroFeatureFlag.isEnabled()); + + AdaptiveAllocationsScaler adaptiveAllocationsScaler = new AdaptiveAllocationsScaler("test-deployment", 1); + // 1 hour with 1 request per 1 seconds, so don't scale. + for (int i = 0; i < 3600; i++) { + adaptiveAllocationsScaler.process(new AdaptiveAllocationsScalerService.Stats(1, 0, 0, 0.05), 1, 1); + assertThat(adaptiveAllocationsScaler.scale(), nullValue()); + } + // 15 minutes with no requests, so don't scale. + for (int i = 0; i < 900; i++) { + adaptiveAllocationsScaler.process(new AdaptiveAllocationsScalerService.Stats(0, 0, 0, 0.05), 1, 1); + assertThat(adaptiveAllocationsScaler.scale(), nullValue()); + } + // 1 second with a request, so don't scale. + adaptiveAllocationsScaler.process(new AdaptiveAllocationsScalerService.Stats(1, 0, 0, 0.05), 1, 1); + assertThat(adaptiveAllocationsScaler.scale(), nullValue()); + // 15 minutes with no requests, so don't scale. + for (int i = 0; i < 900; i++) { + adaptiveAllocationsScaler.process(new AdaptiveAllocationsScalerService.Stats(0, 0, 0, 0.05), 1, 1); + assertThat(adaptiveAllocationsScaler.scale(), nullValue()); + } + // another second with no requests, so scale to zero allocations. + adaptiveAllocationsScaler.process(new AdaptiveAllocationsScalerService.Stats(0, 0, 0, 0.05), 1, 1); + assertThat(adaptiveAllocationsScaler.scale(), equalTo(0)); + // 15 minutes with no requests, so don't scale. + for (int i = 0; i < 900; i++) { + adaptiveAllocationsScaler.process(new AdaptiveAllocationsScalerService.Stats(0, 0, 0, 0.05), 1, 0); + assertThat(adaptiveAllocationsScaler.scale(), nullValue()); + } + } + + public void testAutoscaling_dontScaleDownToZeroAllocationsWhenMinAllocationsIsSet() { + assumeTrue("Should only run if adaptive allocations feature flag is enabled", ScaleToZeroFeatureFlag.isEnabled()); + + AdaptiveAllocationsScaler adaptiveAllocationsScaler = new AdaptiveAllocationsScaler("test-deployment", 1); + adaptiveAllocationsScaler.setMinMaxNumberOfAllocations(1, null); + + // 1 hour with no requests, + for (int i = 0; i < 3600; i++) { + adaptiveAllocationsScaler.process(new AdaptiveAllocationsScalerService.Stats(1, 0, 0, 0.05), 1, 1); + assertThat(adaptiveAllocationsScaler.scale(), nullValue()); + } + } }