Skip to content

Commit e9f1723

Browse files
committed
review
1 parent e649ffb commit e9f1723

File tree

3 files changed

+7
-8
lines changed

3 files changed

+7
-8
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AdaptiveAllocationsScaleFromZeroIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.CountDownLatch;
2424
import java.util.concurrent.TimeUnit;
2525

26+
import static org.hamcrest.Matchers.empty;
2627
import static org.hamcrest.Matchers.hasSize;
2728
import static org.hamcrest.Matchers.is;
2829
import static org.hamcrest.Matchers.not;
@@ -93,9 +94,7 @@ public void onFailure(Exception exception) {
9394
}
9495

9596
latch.await();
96-
if (failures.isEmpty() == false) {
97-
fail(failures.getFirst());
98-
}
97+
assertThat(failures, empty());
9998
}
10099

101100
@SuppressWarnings("unchecked")

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportInternalInferModelAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class TransportInternalInferModelAction extends HandledTransportAction<Re
7171
private final XPackLicenseState licenseState;
7272
private final TrainedModelProvider trainedModelProvider;
7373
private final AdaptiveAllocationsScalerService adaptiveAllocationsScalerService;
74-
private final InferenceWaitForAllocation scalingInference;
74+
private final InferenceWaitForAllocation waitForAllocation;
7575
private final ThreadPool threadPool;
7676

7777
TransportInternalInferModelAction(
@@ -94,7 +94,7 @@ public class TransportInternalInferModelAction extends HandledTransportAction<Re
9494
this.licenseState = licenseState;
9595
this.trainedModelProvider = trainedModelProvider;
9696
this.adaptiveAllocationsScalerService = adaptiveAllocationsScalerService;
97-
this.scalingInference = new InferenceWaitForAllocation(assignmentService, this::inferOnBlockedRequest);
97+
this.waitForAllocation = new InferenceWaitForAllocation(assignmentService, this::inferOnBlockedRequest);
9898
this.threadPool = threadPool;
9999
}
100100

@@ -280,7 +280,7 @@ private void inferAgainstAllocatedModel(
280280
if (starting) {
281281
message += "; starting deployment of one allocation";
282282
logger.info(message);
283-
scalingInference.waitForAssignment(
283+
waitForAllocation.waitForAssignment(
284284
new InferenceWaitForAllocation.WaitingRequest(request, responseBuilder, parentTaskId, listener)
285285
);
286286
return;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/InferenceWaitForAllocation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ public InferenceWaitForAllocation(
8585
*/
8686
public synchronized void waitForAssignment(WaitingRequest request) {
8787
logger.info("waitForAssignment will wait for condition");
88-
if (pendingRequestCount.get() > MAX_PENDING_REQUEST_COUNT) {
88+
if (pendingRequestCount.incrementAndGet() >= MAX_PENDING_REQUEST_COUNT) {
89+
pendingRequestCount.decrementAndGet();
8990
request.listener.onFailure(
9091
new ElasticsearchStatusException(
9192
"Rejected inference request waiting for an allocation of deployment [{}]. Too many pending requests",
@@ -96,7 +97,6 @@ public synchronized void waitForAssignment(WaitingRequest request) {
9697
return;
9798
}
9899

99-
pendingRequestCount.incrementAndGet();
100100
var predicate = new DeploymentHasAtLeastOneAllocation(request.deploymentId());
101101

102102
assignmentService.waitForAssignmentCondition(

0 commit comments

Comments
 (0)