Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
public enum FeatureFlag {
TIME_SERIES_MODE("es.index_mode_feature_flag_registered=true", Version.fromString("8.0.0"), null),
FAILURE_STORE_ENABLED("es.failure_store_feature_flag_enabled=true", Version.fromString("8.12.0"), null),
CHUNKING_SETTINGS_ENABLED("es.inference_chunking_settings_feature_flag_enabled=true", Version.fromString("8.16.0"), null);
CHUNKING_SETTINGS_ENABLED("es.inference_chunking_settings_feature_flag_enabled=true", Version.fromString("8.16.0"), null),
INFERENCE_SCALE_TO_ZERO("es.inference_scale_to_zero_feature_flag_enabled=true", Version.fromString("8.16.0"), null);

public final String systemProperty;
public final Version from;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = new ActionRequestValidationException();
if (numberOfAllocations != null) {
if (numberOfAllocations < 1) {
if (numberOfAllocations < 0 || (isInternal == false && numberOfAllocations == 0)) {
validationException.addValidationError("[" + NUMBER_OF_ALLOCATIONS + "] must be a positive integer");
}
if (isInternal == false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@
import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult;
import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult;
import org.elasticsearch.xpack.ml.inference.TrainedModelStatsService;
import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService;
import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentClusterService;
import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentService;
import org.elasticsearch.xpack.ml.inference.deployment.DeploymentManager;
Expand Down Expand Up @@ -1285,13 +1286,21 @@ public Collection<?> createComponents(PluginServices services) {
new MlAutoscalingDeciderService(memoryTracker, settings, nodeAvailabilityZoneMapper, clusterService)
);

MlInitializationService mlInitializationService = new MlInitializationService(
settings,
AdaptiveAllocationsScalerService adaptiveAllocationsScalerService = new AdaptiveAllocationsScalerService(
threadPool,
clusterService,
client,
inferenceAuditor,
telemetryProvider.getMeterRegistry(),
machineLearningExtension.get().isNlpEnabled()
);

MlInitializationService mlInitializationService = new MlInitializationService(
settings,
threadPool,
clusterService,
client,
adaptiveAllocationsScalerService,
mlAssignmentNotifier,
machineLearningExtension.get().isAnomalyDetectionEnabled(),
machineLearningExtension.get().isDataFrameAnalyticsEnabled(),
Expand All @@ -1317,6 +1326,7 @@ public Collection<?> createComponents(PluginServices services) {
jobManagerHolder,
autodetectProcessManager,
mlInitializationService,
adaptiveAllocationsScalerService,
jobDataCountsPersister,
datafeedRunner,
datafeedManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService;
import org.elasticsearch.xpack.ml.notifications.InferenceAuditor;

import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -67,8 +65,7 @@ public final class MlInitializationService implements ClusterStateListener {
ThreadPool threadPool,
ClusterService clusterService,
Client client,
InferenceAuditor inferenceAuditor,
MeterRegistry meterRegistry,
AdaptiveAllocationsScalerService adaptiveAllocationsScalerService,
MlAssignmentNotifier mlAssignmentNotifier,
boolean isAnomalyDetectionEnabled,
boolean isDataFrameAnalyticsEnabled,
Expand All @@ -88,7 +85,7 @@ public final class MlInitializationService implements ClusterStateListener {
isDataFrameAnalyticsEnabled,
isNlpEnabled
),
new AdaptiveAllocationsScalerService(threadPool, clusterService, client, inferenceAuditor, meterRegistry, isNlpEnabled),
adaptiveAllocationsScalerService,
clusterService
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.InferModelAction;
import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService;
import org.elasticsearch.xpack.ml.inference.loadingservice.ModelLoadingService;
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;

Expand All @@ -25,7 +26,8 @@ public TransportExternalInferModelAction(
Client client,
ClusterService clusterService,
XPackLicenseState licenseState,
TrainedModelProvider trainedModelProvider
TrainedModelProvider trainedModelProvider,
AdaptiveAllocationsScalerService adaptiveAllocationsScalerService
) {
super(
InferModelAction.EXTERNAL_NAME,
Expand All @@ -35,7 +37,8 @@ public TransportExternalInferModelAction(
client,
clusterService,
licenseState,
trainedModelProvider
trainedModelProvider,
adaptiveAllocationsScalerService
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.xpack.core.ml.inference.results.ErrorInferenceResults;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService;
import org.elasticsearch.xpack.ml.inference.loadingservice.LocalModel;
import org.elasticsearch.xpack.ml.inference.loadingservice.ModelLoadingService;
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
Expand All @@ -66,6 +67,7 @@ public class TransportInternalInferModelAction extends HandledTransportAction<Re
private final ClusterService clusterService;
private final XPackLicenseState licenseState;
private final TrainedModelProvider trainedModelProvider;
private final AdaptiveAllocationsScalerService adaptiveAllocationsScalerService;

TransportInternalInferModelAction(
String actionName,
Expand All @@ -75,14 +77,16 @@ public class TransportInternalInferModelAction extends HandledTransportAction<Re
Client client,
ClusterService clusterService,
XPackLicenseState licenseState,
TrainedModelProvider trainedModelProvider
TrainedModelProvider trainedModelProvider,
AdaptiveAllocationsScalerService adaptiveAllocationsScalerService
) {
super(actionName, transportService, actionFilters, InferModelAction.Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.modelLoadingService = modelLoadingService;
this.client = client;
this.clusterService = clusterService;
this.licenseState = licenseState;
this.trainedModelProvider = trainedModelProvider;
this.adaptiveAllocationsScalerService = adaptiveAllocationsScalerService;
}

@Inject
Expand All @@ -93,7 +97,8 @@ public TransportInternalInferModelAction(
Client client,
ClusterService clusterService,
XPackLicenseState licenseState,
TrainedModelProvider trainedModelProvider
TrainedModelProvider trainedModelProvider,
AdaptiveAllocationsScalerService adaptiveAllocationsScalerService
) {
this(
InferModelAction.NAME,
Expand All @@ -103,7 +108,8 @@ public TransportInternalInferModelAction(
client,
clusterService,
licenseState,
trainedModelProvider
trainedModelProvider,
adaptiveAllocationsScalerService
);
}

Expand Down Expand Up @@ -253,10 +259,13 @@ private void inferAgainstAllocatedModel(
}

if (nodes.isEmpty()) {
logger.trace(() -> format("[%s] model deployment not allocated to any node", assignment.getDeploymentId()));
listener.onFailure(
ExceptionsHelper.conflictStatusException("Trained model deployment [" + request.getId() + "] is not allocated to any nodes")
);
String message = "Trained model deployment [" + request.getId() + "] is not allocated to any nodes";
boolean starting = adaptiveAllocationsScalerService.maybeStartAllocation(assignment);
if (starting) {
message += "; starting deployment of one allocation";
}
logger.debug(message);
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;

/**
* Processes measured requests counts and inference times and decides whether
Expand All @@ -21,6 +22,12 @@ public class AdaptiveAllocationsScaler {
static final double SCALE_UP_THRESHOLD = 0.9;
private static final double SCALE_DOWN_THRESHOLD = 0.85;

/**
* The time interval without any requests that has to pass, before scaling down
* to zero allocations (in case min_allocations = 0).
*/
private static final long SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS = TimeValue.timeValueMinutes(15).getSeconds();

/**
* If the max_number_of_allocations is not set, use this value for now to prevent scaling up
* to high numbers due to possible bugs or unexpected behaviour in the scaler.
Expand All @@ -33,6 +40,7 @@ public class AdaptiveAllocationsScaler {
private final String deploymentId;
private final KalmanFilter1d requestRateEstimator;
private final KalmanFilter1d inferenceTimeEstimator;
private double timeWithoutRequestsSeconds;

private int numberOfAllocations;
private int neededNumberOfAllocations;
Expand All @@ -55,6 +63,7 @@ public class AdaptiveAllocationsScaler {
// the number of allocations changes, which is passed explicitly to the estimator.
requestRateEstimator = new KalmanFilter1d(deploymentId + ":rate", 100, true);
inferenceTimeEstimator = new KalmanFilter1d(deploymentId + ":time", 100, false);
timeWithoutRequestsSeconds = 0.0;
this.numberOfAllocations = numberOfAllocations;
neededNumberOfAllocations = numberOfAllocations;
minNumberOfAllocations = null;
Expand All @@ -73,6 +82,11 @@ void setMinMaxNumberOfAllocations(Integer minNumberOfAllocations, Integer maxNum

void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSeconds, int numberOfAllocations) {
lastMeasuredQueueSize = stats.pendingCount();
if (stats.requestCount() > 0) {
timeWithoutRequestsSeconds = 0.0;
} else {
timeWithoutRequestsSeconds += timeIntervalSeconds;
}

// The request rate (per second) is the request count divided by the time.
// Assuming a Poisson process for the requests, the variance in the request
Expand Down Expand Up @@ -145,7 +159,7 @@ Integer scale() {
numberOfAllocations--;
}

this.neededNumberOfAllocations = numberOfAllocations;
neededNumberOfAllocations = numberOfAllocations;

if (maxNumberOfAllocations == null) {
numberOfAllocations = Math.min(numberOfAllocations, MAX_NUMBER_OF_ALLOCATIONS_SAFEGUARD);
Expand All @@ -156,6 +170,13 @@ Integer scale() {
if (maxNumberOfAllocations != null) {
numberOfAllocations = Math.min(numberOfAllocations, maxNumberOfAllocations);
}
if (ScaleToZeroFeatureFlag.isEnabled()
&& (minNumberOfAllocations == null || minNumberOfAllocations == 0)
&& timeWithoutRequestsSeconds > SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS) {
logger.debug("[{}] adaptive allocations scaler: scaling down to zero, because of no requests.", deploymentId);
numberOfAllocations = 0;
neededNumberOfAllocations = 0;
}

if (numberOfAllocations != oldNumberOfAllocations) {
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,49 +415,60 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo
if (newNumberOfAllocations > numberOfAllocations.get(deploymentId)) {
lastScaleUpTimesMillis.put(deploymentId, now);
}
UpdateTrainedModelDeploymentAction.Request updateRequest = new UpdateTrainedModelDeploymentAction.Request(deploymentId);
updateRequest.setNumberOfAllocations(newNumberOfAllocations);
updateRequest.setIsInternal(true);
ClientHelper.executeAsyncWithOrigin(
client,
ClientHelper.ML_ORIGIN,
UpdateTrainedModelDeploymentAction.INSTANCE,
updateRequest,
ActionListener.wrap(updateResponse -> {
logger.info("adaptive allocations scaler: scaled [{}] to [{}] allocations.", deploymentId, newNumberOfAllocations);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
.execute(
() -> inferenceAuditor.info(
deploymentId,
Strings.format(
"adaptive allocations scaler: scaled [%s] to [%s] allocations.",
deploymentId,
newNumberOfAllocations
)
)
);
}, e -> {
logger.atLevel(Level.WARN)
.withThrowable(e)
.log(
"adaptive allocations scaler: scaling [{}] to [{}] allocations failed.",
deploymentId,
newNumberOfAllocations
);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
.execute(
() -> inferenceAuditor.warning(
deploymentId,
Strings.format(
"adaptive allocations scaler: scaling [%s] to [%s] allocations failed.",
deploymentId,
newNumberOfAllocations
)
)
);
})
);
updateNumberOfAllocations(deploymentId, newNumberOfAllocations);
}
}
}

public boolean maybeStartAllocation(TrainedModelAssignment assignment) {
if (ScaleToZeroFeatureFlag.isEnabled()
&& assignment.getAdaptiveAllocationsSettings() != null
&& assignment.getAdaptiveAllocationsSettings().getEnabled() == Boolean.TRUE) {
lastScaleUpTimesMillis.put(assignment.getDeploymentId(), System.currentTimeMillis());
updateNumberOfAllocations(assignment.getDeploymentId(), 1);
return true;
}
return false;
}

private void updateNumberOfAllocations(String deploymentId, int numberOfAllocations) {
UpdateTrainedModelDeploymentAction.Request updateRequest = new UpdateTrainedModelDeploymentAction.Request(deploymentId);
updateRequest.setNumberOfAllocations(numberOfAllocations);
updateRequest.setIsInternal(true);
ClientHelper.executeAsyncWithOrigin(
client,
ClientHelper.ML_ORIGIN,
UpdateTrainedModelDeploymentAction.INSTANCE,
updateRequest,
ActionListener.wrap(updateResponse -> {
logger.info("adaptive allocations scaler: scaled [{}] to [{}] allocations.", deploymentId, numberOfAllocations);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
.execute(
() -> inferenceAuditor.info(
deploymentId,
Strings.format(
"adaptive allocations scaler: scaled [%s] to [%s] allocations.",
deploymentId,
numberOfAllocations
)
)
);
}, e -> {
logger.atLevel(Level.WARN)
.withThrowable(e)
.log("adaptive allocations scaler: scaling [{}] to [{}] allocations failed.", deploymentId, numberOfAllocations);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
.execute(
() -> inferenceAuditor.warning(
deploymentId,
Strings.format(
"adaptive allocations scaler: scaling [%s] to [%s] allocations failed.",
deploymentId,
numberOfAllocations
)
)
);
})
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml.inference.adaptiveallocations;

import org.elasticsearch.common.util.FeatureFlag;

public class ScaleToZeroFeatureFlag {
private ScaleToZeroFeatureFlag() {}

private static final FeatureFlag FEATURE_FLAG = new FeatureFlag("inference_scale_to_zero");

public static boolean isEnabled() {
return FEATURE_FLAG.isEnabled();
}
}
Loading
Loading