Skip to content

Commit dc0abfb

Browse files
committed
Add feature flag
1 parent 6843148 commit dc0abfb

File tree

11 files changed

+66
-105
lines changed

11 files changed

+66
-105
lines changed

test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ public enum FeatureFlag {
2020
FAILURE_STORE_ENABLED("es.failure_store_feature_flag_enabled=true", Version.fromString("8.12.0"), null),
2121
SUB_OBJECTS_AUTO_ENABLED("es.sub_objects_auto_feature_flag_enabled=true", Version.fromString("8.16.0"), null),
2222
CHUNKING_SETTINGS_ENABLED("es.inference_chunking_settings_feature_flag_enabled=true", Version.fromString("8.16.0"), null),
23-
INFERENCE_DEFAULT_ELSER("es.inference_default_elser_feature_flag_enabled=true", Version.fromString("8.16.0"), null);
23+
INFERENCE_DEFAULT_ELSER("es.inference_default_elser_feature_flag_enabled=true", Version.fromString("8.16.0"), null),
24+
ML_SCALE_FROM_ZERO("es.ml_scale_from_zero_feature_flag_enabled=true", Version.fromString("8.16.0"), null);
2425

2526
public final String systemProperty;
2627
public final Version from;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/inference/assignment/TrainedModelAssignment.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,10 +224,7 @@ public boolean hasStartedRoutes() {
224224
return nodeRoutingTable.values().stream().anyMatch(routeInfo -> routeInfo.getState() == RoutingState.STARTED);
225225
}
226226

227-
public List<Tuple<String, Integer>> selectRandomNodesWeighedOnAllocationsForNRequestsAndState(
228-
int numberOfRequests,
229-
RoutingState... acceptableStates
230-
) {
227+
public List<Tuple<String, Integer>> selectRandomNodesWeighedOnAllocations(int numberOfRequests, RoutingState... acceptableStates) {
231228
List<String> nodeIds = new ArrayList<>(nodeRoutingTable.size());
232229
List<Integer> cumulativeAllocations = new ArrayList<>(nodeRoutingTable.size());
233230
int allocationSum = 0;

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/inference/assignment/TrainedModelAssignmentTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,15 @@ public void testselectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenNoS
195195
builder.addRoutingEntry("node-2", new RoutingInfo(1, 1, RoutingState.STOPPED, ""));
196196
TrainedModelAssignment assignment = builder.build();
197197

198-
assertThat(assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(1, RoutingState.STARTED).isEmpty(), is(true));
198+
assertThat(assignment.selectRandomNodesWeighedOnAllocations(1, RoutingState.STARTED).isEmpty(), is(true));
199199
}
200200

201201
public void testselectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenSingleStartedNode() {
202202
TrainedModelAssignment.Builder builder = TrainedModelAssignment.Builder.empty(randomTaskParams(5), null);
203203
builder.addRoutingEntry("node-1", new RoutingInfo(4, 4, RoutingState.STARTED, ""));
204204
TrainedModelAssignment assignment = builder.build();
205205

206-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(1, RoutingState.STARTED);
206+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(1, RoutingState.STARTED);
207207

208208
assertThat(nodes, contains(new Tuple<>("node-1", 1)));
209209
}
@@ -213,7 +213,7 @@ public void testselectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenASh
213213
builder.addRoutingEntry("node-1", new RoutingInfo(4, 4, RoutingState.STARTED, ""));
214214
TrainedModelAssignment assignment = builder.build();
215215

216-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(1, RoutingState.STOPPING);
216+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(1, RoutingState.STOPPING);
217217

218218
assertThat(nodes, empty());
219219
}
@@ -223,7 +223,7 @@ public void testselectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenASh
223223
builder.addRoutingEntry("node-1", new RoutingInfo(4, 4, RoutingState.STOPPING, ""));
224224
TrainedModelAssignment assignment = builder.build();
225225

226-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(1, RoutingState.STOPPING);
226+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(1, RoutingState.STOPPING);
227227

228228
assertThat(nodes, contains(new Tuple<>("node-1", 1)));
229229
}
@@ -234,7 +234,7 @@ public void testSingleRequestWith2Nodes() {
234234
builder.addRoutingEntry("node-2", new RoutingInfo(1, 1, RoutingState.STARTED, ""));
235235
TrainedModelAssignment assignment = builder.build();
236236

237-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(1, RoutingState.STARTED);
237+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(1, RoutingState.STARTED);
238238
assertThat(nodes, hasSize(1));
239239
assertEquals(nodes.get(0).v2(), Integer.valueOf(1));
240240
}
@@ -248,7 +248,7 @@ public void testSelectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenMul
248248

249249
final int selectionCount = 10000;
250250
final CountAccumulator countsPerNodeAccumulator = new CountAccumulator();
251-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(selectionCount, RoutingState.STARTED);
251+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(selectionCount, RoutingState.STARTED);
252252

253253
assertThat(nodes, hasSize(3));
254254
assertThat(nodes.stream().mapToInt(Tuple::v2).sum(), equalTo(selectionCount));
@@ -269,7 +269,7 @@ public void testselectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenMul
269269
builder.addRoutingEntry("node-3", new RoutingInfo(0, 0, RoutingState.STARTED, ""));
270270
TrainedModelAssignment assignment = builder.build();
271271
final int selectionCount = 1000;
272-
var nodeCounts = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(selectionCount, RoutingState.STARTED);
272+
var nodeCounts = assignment.selectRandomNodesWeighedOnAllocations(selectionCount, RoutingState.STARTED);
273273
assertThat(nodeCounts, hasSize(3));
274274

275275
var selectedNodes = new HashSet<String>();

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AdaptiveAllocationsScaleFromZeroIT.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,13 @@
77

88
package org.elasticsearch.xpack.ml.integration;
99

10-
import org.elasticsearch.client.Request;
10+
import org.apache.lucene.tests.util.LuceneTestCase;
1111
import org.elasticsearch.client.Response;
1212
import org.elasticsearch.client.ResponseListener;
1313
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1414
import org.elasticsearch.core.TimeValue;
1515
import org.elasticsearch.xpack.core.ml.inference.assignment.AdaptiveAllocationsSettings;
16-
import org.junit.Before;
1716

18-
import java.io.IOException;
1917
import java.util.Arrays;
2018
import java.util.List;
2119
import java.util.Map;
@@ -29,22 +27,9 @@
2927
import static org.hamcrest.Matchers.not;
3028
import static org.hamcrest.Matchers.nullValue;
3129

30+
@LuceneTestCase.AwaitsFix(bugUrl = "Cannot test without setting the scale to zero period to a small value")
3231
public class AdaptiveAllocationsScaleFromZeroIT extends PyTorchModelRestTestCase {
3332

34-
@Before
35-
public void setShortScaleToZeroPeriod() throws IOException {
36-
logger.info("setting time");
37-
Request scaleToZeroTime = new Request("PUT", "_cluster/settings");
38-
scaleToZeroTime.setJsonEntity("""
39-
{
40-
"persistent": {
41-
"xpack.ml.adaptive_allocations_scale_to_zero_interval": "2s"
42-
}
43-
}""");
44-
45-
client().performRequest(scaleToZeroTime);
46-
}
47-
4833
@SuppressWarnings("unchecked")
4934
public void testScaleFromZero() throws Exception {
5035
String modelId = "test_scale_from_zero";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -758,18 +758,6 @@ public void loadExtensions(ExtensionLoader loader) {
758758
*/
759759
public static final int MAX_LOW_PRIORITY_MODELS_PER_NODE = 100;
760760

761-
/**
762-
* The time interval without any requests that has to pass, before scaling down
763-
* to zero allocations.
764-
*/
765-
public static final Setting<TimeValue> ADAPTIVE_ALLOCATIONS_SCALE_TO_ZERO_INTERVAL = Setting.timeSetting(
766-
"xpack.ml.adaptive_allocations_scale_to_zero_interval",
767-
TimeValue.timeValueMinutes(15),
768-
TimeValue.timeValueSeconds(1),
769-
Property.Dynamic,
770-
Setting.Property.NodeScope
771-
);
772-
773761
private static final Logger logger = LogManager.getLogger(MachineLearning.class);
774762
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(MachineLearning.class);
775763

@@ -829,8 +817,7 @@ public List<Setting<?>> getSettings() {
829817
MAX_ML_NODE_SIZE,
830818
DELAYED_DATA_CHECK_FREQ,
831819
DUMMY_ENTITY_MEMORY,
832-
DUMMY_ENTITY_PROCESSORS,
833-
ADAPTIVE_ALLOCATIONS_SCALE_TO_ZERO_INTERVAL
820+
DUMMY_ENTITY_PROCESSORS
834821
);
835822
}
836823

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportInternalInferModelAction.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.xpack.ml.MachineLearning;
4646
import org.elasticsearch.xpack.ml.inference.InferenceWaitForAllocation;
4747
import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService;
48+
import org.elasticsearch.xpack.ml.inference.adaptiveallocations.ScaleFromZeroFeatureFlag;
4849
import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentService;
4950
import org.elasticsearch.xpack.ml.inference.loadingservice.LocalModel;
5051
import org.elasticsearch.xpack.ml.inference.loadingservice.ModelLoadingService;
@@ -264,26 +265,25 @@ private void inferAgainstAllocatedModel(
264265

265266
// Get a list of nodes to send the requests to and the number of
266267
// documents for each node.
267-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(request.numberOfDocuments(), RoutingState.STARTED);
268+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(request.numberOfDocuments(), RoutingState.STARTED);
268269

269270
// We couldn't find any nodes in the started state so let's look for ones that are stopping in case we're shutting down some nodes
270271
if (nodes.isEmpty()) {
271-
nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(
272-
request.numberOfDocuments(),
273-
RoutingState.STOPPING
274-
);
272+
nodes = assignment.selectRandomNodesWeighedOnAllocations(request.numberOfDocuments(), RoutingState.STOPPING);
275273
}
276274

277275
if (nodes.isEmpty()) {
278276
String message = "Trained model deployment [" + request.getId() + "] is not allocated to any nodes";
279277
boolean starting = adaptiveAllocationsScalerService.maybeStartAllocation(assignment);
280278
if (starting) {
281279
message += "; starting deployment of one allocation";
282-
logger.info(message);
283-
waitForAllocation.waitForAssignment(
284-
new InferenceWaitForAllocation.WaitingRequest(request, responseBuilder, parentTaskId, listener)
285-
);
286-
return;
280+
281+
if (ScaleFromZeroFeatureFlag.isEnabled()) {
282+
waitForAllocation.waitForAssignment(
283+
new InferenceWaitForAllocation.WaitingRequest(request, responseBuilder, parentTaskId, listener)
284+
);
285+
return;
286+
}
287287
}
288288

289289
logger.debug(message);
@@ -299,10 +299,7 @@ private void inferAgainstAllocatedModel(
299299
private void inferOnBlockedRequest(InferenceWaitForAllocation.WaitingRequest request, TrainedModelAssignment assignment) {
300300
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
301301

302-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(
303-
request.request().numberOfDocuments(),
304-
RoutingState.STARTED
305-
);
302+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(request.request().numberOfDocuments(), RoutingState.STARTED);
306303

307304
if (nodes.isEmpty()) {
308305
request.listener()

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/InferenceWaitForAllocation.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ public InferenceWaitForAllocation(
8484
* @param request The inference request details
8585
*/
8686
public synchronized void waitForAssignment(WaitingRequest request) {
87-
logger.info("waitForAssignment will wait for condition");
8887
if (pendingRequestCount.incrementAndGet() >= MAX_PENDING_REQUEST_COUNT) {
8988
pendingRequestCount.decrementAndGet();
9089
request.listener.onFailure(
@@ -103,7 +102,7 @@ public synchronized void waitForAssignment(WaitingRequest request) {
103102
request.deploymentId(),
104103
predicate,
105104
request.request().getInferenceTimeout(),
106-
new WaitingListener(request.deploymentId(), request, predicate)
105+
new WaitingListener(request, predicate)
107106
);
108107
}
109108

@@ -118,14 +117,20 @@ private static class DeploymentHasAtLeastOneAllocation implements Predicate<Clus
118117

119118
@Override
120119
public boolean test(ClusterState clusterState) {
121-
logger.info("predicate test");
122120
TrainedModelAssignment trainedModelAssignment = TrainedModelAssignmentMetadata.assignmentForDeploymentId(
123121
clusterState,
124122
deploymentId
125123
).orElse(null);
126124
if (trainedModelAssignment == null) {
127125
logger.info(() -> format("[%s] assignment was null while waiting to scale up", deploymentId));
128-
return false;
126+
exception.set(
127+
new ElasticsearchStatusException(
128+
"[{}] Error waiting for a model allocation, model assignment has been removed",
129+
RestStatus.CONFLICT,
130+
deploymentId
131+
)
132+
);
133+
return true; // don't try again
129134
}
130135

131136
Map<String, String> nodeFailuresAndReasons = new HashMap<>();
@@ -151,24 +156,16 @@ public boolean test(ClusterState clusterState) {
151156
}
152157

153158
var routable = trainedModelAssignment.getNodeRoutingTable().values().stream().filter(RoutingInfo::isRoutable).findFirst();
154-
if (routable.isPresent()) {
155-
logger.info("first route " + routable.get() + ", state" + trainedModelAssignment.calculateAllocationStatus());
156-
} else {
157-
logger.info("no routes");
158-
}
159-
160159
return routable.isPresent();
161160
}
162161
}
163162

164163
private class WaitingListener implements TrainedModelAssignmentService.WaitForAssignmentListener {
165164

166-
private final String deploymentId;
167165
private final WaitingRequest request;
168166
private final DeploymentHasAtLeastOneAllocation predicate;
169167

170-
private WaitingListener(String deploymentId, WaitingRequest request, DeploymentHasAtLeastOneAllocation predicate) {
171-
this.deploymentId = deploymentId;
168+
private WaitingListener(WaitingRequest request, DeploymentHasAtLeastOneAllocation predicate) {
172169
this.request = request;
173170
this.predicate = predicate;
174171
}
@@ -183,13 +180,11 @@ public void onResponse(TrainedModelAssignment assignment) {
183180
return;
184181
}
185182

186-
logger.info("sending waited request");
187183
queuedConsumer.accept(request, assignment);
188184
}
189185

190186
@Override
191187
public void onFailure(Exception e) {
192-
logger.info("failed waiting", e);
193188
pendingRequestCount.decrementAndGet();
194189
request.listener().onFailure(e);
195190
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,6 @@ void setMinMaxNumberOfAllocations(Integer minNumberOfAllocations, Integer maxNum
7575
this.maxNumberOfAllocations = maxNumberOfAllocations;
7676
}
7777

78-
void setScaleToZeroPeriod(long inactivitySeconds) {
79-
logger.info("setting scale to zero " + inactivitySeconds);
80-
this.scaleToZeroAfterNoRequestsSeconds = inactivitySeconds;
81-
}
82-
8378
void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSeconds, int numberOfAllocations) {
8479
lastMeasuredQueueSize = stats.pendingCount();
8580
if (stats.requestCount() > 0) {
@@ -144,9 +139,6 @@ Double getInferenceTimeEstimate() {
144139

145140
Integer scale() {
146141

147-
logger.info("[{}] checking scale.", deploymentId);
148-
logger.info("[{}] to zero", scaleToZeroAfterNoRequestsSeconds);
149-
150142
if (requestRateEstimator.hasValue() == false) {
151143
return null;
152144
}
@@ -180,7 +172,7 @@ Integer scale() {
180172

181173
if (oldNumberOfAllocations != 0) {
182174
// avoid logging this message if there is no change
183-
logger.info("[{}] adaptive allocations scaler: scaling down to zero, because of no requests.", deploymentId);
175+
logger.debug("[{}] adaptive allocations scaler: scaling down to zero, because of no requests.", deploymentId);
184176
}
185177
numberOfAllocations = 0;
186178
neededNumberOfAllocations = 0;

0 commit comments

Comments
 (0)