3030import org .elasticsearch .xpack .core .ml .action .GetDeploymentStatsAction ;
3131import org .elasticsearch .xpack .core .ml .action .UpdateTrainedModelDeploymentAction ;
3232import org .elasticsearch .xpack .core .ml .inference .assignment .AssignmentStats ;
33+ import org .elasticsearch .xpack .core .ml .inference .assignment .RoutingState ;
3334import org .elasticsearch .xpack .core .ml .inference .assignment .TrainedModelAssignment ;
3435import org .elasticsearch .xpack .core .ml .inference .assignment .TrainedModelAssignmentMetadata ;
3536import org .elasticsearch .xpack .ml .MachineLearning ;
@@ -213,6 +214,7 @@ Collection<DoubleWithAttributes> observeDouble(Function<AdaptiveAllocationsScale
213214 private volatile Scheduler .Cancellable cancellable ;
214215 private final AtomicBoolean busy ;
215216 private final long scaleToZeroAfterNoRequestsSeconds ;
217+ private final long scaleUpCooldownTimeMillis ;
216218 private final Set <String > deploymentIdsWithInFlightScaleFromZeroRequests = new ConcurrentSkipListSet <>();
217219 private final Map <String , String > lastWarningMessages = new ConcurrentHashMap <>();
218220
@@ -224,7 +226,17 @@ public AdaptiveAllocationsScalerService(
224226 MeterRegistry meterRegistry ,
225227 boolean isNlpEnabled
226228 ) {
227- this (threadPool , clusterService , client , inferenceAuditor , meterRegistry , isNlpEnabled , DEFAULT_TIME_INTERVAL_SECONDS );
229+ this (
230+ threadPool ,
231+ clusterService ,
232+ client ,
233+ inferenceAuditor ,
234+ meterRegistry ,
235+ isNlpEnabled ,
236+ DEFAULT_TIME_INTERVAL_SECONDS ,
237+ SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS ,
238+ SCALE_UP_COOLDOWN_TIME_MILLIS
239+ );
228240 }
229241
230242 // visible for testing
@@ -235,7 +247,9 @@ public AdaptiveAllocationsScalerService(
235247 InferenceAuditor inferenceAuditor ,
236248 MeterRegistry meterRegistry ,
237249 boolean isNlpEnabled ,
238- int timeIntervalSeconds
250+ int timeIntervalSeconds ,
251+ long scaleToZeroAfterNoRequestsSeconds ,
252+ long scaleUpCooldownTimeMillis
239253 ) {
240254 this .threadPool = threadPool ;
241255 this .clusterService = clusterService ;
@@ -244,14 +258,15 @@ public AdaptiveAllocationsScalerService(
244258 this .meterRegistry = meterRegistry ;
245259 this .isNlpEnabled = isNlpEnabled ;
246260 this .timeIntervalSeconds = timeIntervalSeconds ;
261+ this .scaleToZeroAfterNoRequestsSeconds = scaleToZeroAfterNoRequestsSeconds ;
262+ this .scaleUpCooldownTimeMillis = scaleUpCooldownTimeMillis ;
247263
248264 lastInferenceStatsByDeploymentAndNode = new HashMap <>();
249265 lastInferenceStatsTimestampMillis = null ;
250266 lastScaleUpTimesMillis = new HashMap <>();
251267 scalers = new HashMap <>();
252268 metrics = new Metrics ();
253269 busy = new AtomicBoolean (false );
254- scaleToZeroAfterNoRequestsSeconds = SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS ;
255270 }
256271
257272 public synchronized void start () {
@@ -375,6 +390,9 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo
375390
376391 Map <String , Stats > recentStatsByDeployment = new HashMap <>();
377392 Map <String , Integer > numberOfAllocations = new HashMap <>();
393+ // Check for recent scale ups in the deployment stats, because a different node may have
394+ // caused a scale up when an inference request arrives and there were zero allocations.
395+ Set <String > hasRecentObservedScaleUp = new HashSet <>();
378396
379397 for (AssignmentStats assignmentStats : statsResponse .getStats ().results ()) {
380398 String deploymentId = assignmentStats .getDeploymentId ();
@@ -401,6 +419,12 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo
401419 (key , value ) -> value == null ? recentStats : value .add (recentStats )
402420 );
403421 }
422+ if (nodeStats .getRoutingState () != null && nodeStats .getRoutingState ().getState () == RoutingState .STARTING ) {
423+ hasRecentObservedScaleUp .add (deploymentId );
424+ }
425+ if (nodeStats .getStartTime () != null && now < nodeStats .getStartTime ().toEpochMilli () + scaleUpCooldownTimeMillis ) {
426+ hasRecentObservedScaleUp .add (deploymentId );
427+ }
404428 }
405429 }
406430
@@ -416,9 +440,12 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo
416440 Integer newNumberOfAllocations = adaptiveAllocationsScaler .scale ();
417441 if (newNumberOfAllocations != null ) {
418442 Long lastScaleUpTimeMillis = lastScaleUpTimesMillis .get (deploymentId );
443+ // hasRecentScaleUp indicates whether this service has recently scaled up the deployment.
444+ // hasRecentObservedScaleUp indicates whether a deployment recently has started,
445+ // potentially triggered by another node.
446+ boolean hasRecentScaleUp = lastScaleUpTimeMillis != null && now < lastScaleUpTimeMillis + scaleUpCooldownTimeMillis ;
419447 if (newNumberOfAllocations < numberOfAllocations .get (deploymentId )
420- && lastScaleUpTimeMillis != null
421- && now < lastScaleUpTimeMillis + SCALE_UP_COOLDOWN_TIME_MILLIS ) {
448+ && (hasRecentScaleUp || hasRecentObservedScaleUp .contains (deploymentId ))) {
422449 logger .debug ("adaptive allocations scaler: skipping scaling down [{}] because of recent scaleup." , deploymentId );
423450 continue ;
424451 }
0 commit comments