Skip to content

Commit 6931a3a

Browse files
committed
Clean up debug logging
# Conflicts: # test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java
1 parent d63605d commit 6931a3a

File tree

9 files changed

+37
-100
lines changed

9 files changed

+37
-100
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/inference/assignment/TrainedModelAssignment.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,10 +224,7 @@ public boolean hasStartedRoutes() {
224224
return nodeRoutingTable.values().stream().anyMatch(routeInfo -> routeInfo.getState() == RoutingState.STARTED);
225225
}
226226

227-
public List<Tuple<String, Integer>> selectRandomNodesWeighedOnAllocationsForNRequestsAndState(
228-
int numberOfRequests,
229-
RoutingState... acceptableStates
230-
) {
227+
public List<Tuple<String, Integer>> selectRandomNodesWeighedOnAllocations(int numberOfRequests, RoutingState... acceptableStates) {
231228
List<String> nodeIds = new ArrayList<>(nodeRoutingTable.size());
232229
List<Integer> cumulativeAllocations = new ArrayList<>(nodeRoutingTable.size());
233230
int allocationSum = 0;

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/inference/assignment/TrainedModelAssignmentTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,15 @@ public void testselectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenNoS
195195
builder.addRoutingEntry("node-2", new RoutingInfo(1, 1, RoutingState.STOPPED, ""));
196196
TrainedModelAssignment assignment = builder.build();
197197

198-
assertThat(assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(1, RoutingState.STARTED).isEmpty(), is(true));
198+
assertThat(assignment.selectRandomNodesWeighedOnAllocations(1, RoutingState.STARTED).isEmpty(), is(true));
199199
}
200200

201201
public void testselectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenSingleStartedNode() {
202202
TrainedModelAssignment.Builder builder = TrainedModelAssignment.Builder.empty(randomTaskParams(5), null);
203203
builder.addRoutingEntry("node-1", new RoutingInfo(4, 4, RoutingState.STARTED, ""));
204204
TrainedModelAssignment assignment = builder.build();
205205

206-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(1, RoutingState.STARTED);
206+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(1, RoutingState.STARTED);
207207

208208
assertThat(nodes, contains(new Tuple<>("node-1", 1)));
209209
}
@@ -213,7 +213,7 @@ public void testselectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenASh
213213
builder.addRoutingEntry("node-1", new RoutingInfo(4, 4, RoutingState.STARTED, ""));
214214
TrainedModelAssignment assignment = builder.build();
215215

216-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(1, RoutingState.STOPPING);
216+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(1, RoutingState.STOPPING);
217217

218218
assertThat(nodes, empty());
219219
}
@@ -223,7 +223,7 @@ public void testselectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenASh
223223
builder.addRoutingEntry("node-1", new RoutingInfo(4, 4, RoutingState.STOPPING, ""));
224224
TrainedModelAssignment assignment = builder.build();
225225

226-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(1, RoutingState.STOPPING);
226+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(1, RoutingState.STOPPING);
227227

228228
assertThat(nodes, contains(new Tuple<>("node-1", 1)));
229229
}
@@ -234,7 +234,7 @@ public void testSingleRequestWith2Nodes() {
234234
builder.addRoutingEntry("node-2", new RoutingInfo(1, 1, RoutingState.STARTED, ""));
235235
TrainedModelAssignment assignment = builder.build();
236236

237-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(1, RoutingState.STARTED);
237+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(1, RoutingState.STARTED);
238238
assertThat(nodes, hasSize(1));
239239
assertEquals(nodes.get(0).v2(), Integer.valueOf(1));
240240
}
@@ -248,7 +248,7 @@ public void testSelectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenMul
248248

249249
final int selectionCount = 10000;
250250
final CountAccumulator countsPerNodeAccumulator = new CountAccumulator();
251-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(selectionCount, RoutingState.STARTED);
251+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(selectionCount, RoutingState.STARTED);
252252

253253
assertThat(nodes, hasSize(3));
254254
assertThat(nodes.stream().mapToInt(Tuple::v2).sum(), equalTo(selectionCount));
@@ -269,7 +269,7 @@ public void testselectRandomStartedNodeWeighedOnAllocationsForNRequests_GivenMul
269269
builder.addRoutingEntry("node-3", new RoutingInfo(0, 0, RoutingState.STARTED, ""));
270270
TrainedModelAssignment assignment = builder.build();
271271
final int selectionCount = 1000;
272-
var nodeCounts = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(selectionCount, RoutingState.STARTED);
272+
var nodeCounts = assignment.selectRandomNodesWeighedOnAllocations(selectionCount, RoutingState.STARTED);
273273
assertThat(nodeCounts, hasSize(3));
274274

275275
var selectedNodes = new HashSet<String>();

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AdaptiveAllocationsScaleFromZeroIT.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,13 @@
77

88
package org.elasticsearch.xpack.ml.integration;
99

10-
import org.elasticsearch.client.Request;
10+
import org.apache.lucene.tests.util.LuceneTestCase;
1111
import org.elasticsearch.client.Response;
1212
import org.elasticsearch.client.ResponseListener;
1313
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1414
import org.elasticsearch.core.TimeValue;
1515
import org.elasticsearch.xpack.core.ml.inference.assignment.AdaptiveAllocationsSettings;
16-
import org.junit.Before;
1716

18-
import java.io.IOException;
1917
import java.util.Arrays;
2018
import java.util.List;
2119
import java.util.Map;
@@ -29,22 +27,9 @@
2927
import static org.hamcrest.Matchers.not;
3028
import static org.hamcrest.Matchers.nullValue;
3129

30+
@LuceneTestCase.AwaitsFix(bugUrl = "Cannot test without setting the scale to zero period to a small value")
3231
public class AdaptiveAllocationsScaleFromZeroIT extends PyTorchModelRestTestCase {
3332

34-
@Before
35-
public void setShortScaleToZeroPeriod() throws IOException {
36-
logger.info("setting time");
37-
Request scaleToZeroTime = new Request("PUT", "_cluster/settings");
38-
scaleToZeroTime.setJsonEntity("""
39-
{
40-
"persistent": {
41-
"xpack.ml.adaptive_allocations_scale_to_zero_interval": "2s"
42-
}
43-
}""");
44-
45-
client().performRequest(scaleToZeroTime);
46-
}
47-
4833
@SuppressWarnings("unchecked")
4934
public void testScaleFromZero() throws Exception {
5035
String modelId = "test_scale_from_zero";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -758,18 +758,6 @@ public void loadExtensions(ExtensionLoader loader) {
758758
*/
759759
public static final int MAX_LOW_PRIORITY_MODELS_PER_NODE = 100;
760760

761-
/**
762-
* The time interval without any requests that has to pass, before scaling down
763-
* to zero allocations.
764-
*/
765-
public static final Setting<TimeValue> ADAPTIVE_ALLOCATIONS_SCALE_TO_ZERO_INTERVAL = Setting.timeSetting(
766-
"xpack.ml.adaptive_allocations_scale_to_zero_interval",
767-
TimeValue.timeValueMinutes(15),
768-
TimeValue.timeValueSeconds(1),
769-
Property.Dynamic,
770-
Setting.Property.NodeScope
771-
);
772-
773761
private static final Logger logger = LogManager.getLogger(MachineLearning.class);
774762
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(MachineLearning.class);
775763

@@ -829,8 +817,7 @@ public List<Setting<?>> getSettings() {
829817
MAX_ML_NODE_SIZE,
830818
DELAYED_DATA_CHECK_FREQ,
831819
DUMMY_ENTITY_MEMORY,
832-
DUMMY_ENTITY_PROCESSORS,
833-
ADAPTIVE_ALLOCATIONS_SCALE_TO_ZERO_INTERVAL
820+
DUMMY_ENTITY_PROCESSORS
834821
);
835822
}
836823

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportInternalInferModelAction.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -264,22 +264,19 @@ private void inferAgainstAllocatedModel(
264264

265265
// Get a list of nodes to send the requests to and the number of
266266
// documents for each node.
267-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(request.numberOfDocuments(), RoutingState.STARTED);
267+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(request.numberOfDocuments(), RoutingState.STARTED);
268268

269269
// We couldn't find any nodes in the started state so let's look for ones that are stopping in case we're shutting down some nodes
270270
if (nodes.isEmpty()) {
271-
nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(
272-
request.numberOfDocuments(),
273-
RoutingState.STOPPING
274-
);
271+
nodes = assignment.selectRandomNodesWeighedOnAllocations(request.numberOfDocuments(), RoutingState.STOPPING);
275272
}
276273

277274
if (nodes.isEmpty()) {
278275
String message = "Trained model deployment [" + request.getId() + "] is not allocated to any nodes";
279276
boolean starting = adaptiveAllocationsScalerService.maybeStartAllocation(assignment);
280277
if (starting) {
281278
message += "; starting deployment of one allocation";
282-
logger.info(message);
279+
logger.debug(message);
283280
waitForAllocation.waitForAssignment(
284281
new InferenceWaitForAllocation.WaitingRequest(request, responseBuilder, parentTaskId, listener)
285282
);
@@ -299,10 +296,7 @@ private void inferAgainstAllocatedModel(
299296
private void inferOnBlockedRequest(InferenceWaitForAllocation.WaitingRequest request, TrainedModelAssignment assignment) {
300297
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
301298

302-
var nodes = assignment.selectRandomNodesWeighedOnAllocationsForNRequestsAndState(
303-
request.request().numberOfDocuments(),
304-
RoutingState.STARTED
305-
);
299+
var nodes = assignment.selectRandomNodesWeighedOnAllocations(request.request().numberOfDocuments(), RoutingState.STARTED);
306300

307301
if (nodes.isEmpty()) {
308302
request.listener()

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/InferenceWaitForAllocation.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ public InferenceWaitForAllocation(
8484
* @param request The inference request details
8585
*/
8686
public synchronized void waitForAssignment(WaitingRequest request) {
87-
logger.info("waitForAssignment will wait for condition");
8887
if (pendingRequestCount.incrementAndGet() >= MAX_PENDING_REQUEST_COUNT) {
8988
pendingRequestCount.decrementAndGet();
9089
request.listener.onFailure(
@@ -103,7 +102,7 @@ public synchronized void waitForAssignment(WaitingRequest request) {
103102
request.deploymentId(),
104103
predicate,
105104
request.request().getInferenceTimeout(),
106-
new WaitingListener(request.deploymentId(), request, predicate)
105+
new WaitingListener(request, predicate)
107106
);
108107
}
109108

@@ -118,14 +117,20 @@ private static class DeploymentHasAtLeastOneAllocation implements Predicate<Clus
118117

119118
@Override
120119
public boolean test(ClusterState clusterState) {
121-
logger.info("predicate test");
122120
TrainedModelAssignment trainedModelAssignment = TrainedModelAssignmentMetadata.assignmentForDeploymentId(
123121
clusterState,
124122
deploymentId
125123
).orElse(null);
126124
if (trainedModelAssignment == null) {
127125
logger.info(() -> format("[%s] assignment was null while waiting to scale up", deploymentId));
128-
return false;
126+
exception.set(
127+
new ElasticsearchStatusException(
128+
"[{}] Error waiting for a model allocation, model assignment has been removed",
129+
RestStatus.CONFLICT,
130+
deploymentId
131+
)
132+
);
133+
return true; // don't try again
129134
}
130135

131136
Map<String, String> nodeFailuresAndReasons = new HashMap<>();
@@ -151,24 +156,16 @@ public boolean test(ClusterState clusterState) {
151156
}
152157

153158
var routable = trainedModelAssignment.getNodeRoutingTable().values().stream().filter(RoutingInfo::isRoutable).findFirst();
154-
if (routable.isPresent()) {
155-
logger.info("first route " + routable.get() + ", state" + trainedModelAssignment.calculateAllocationStatus());
156-
} else {
157-
logger.info("no routes");
158-
}
159-
160159
return routable.isPresent();
161160
}
162161
}
163162

164163
private class WaitingListener implements TrainedModelAssignmentService.WaitForAssignmentListener {
165164

166-
private final String deploymentId;
167165
private final WaitingRequest request;
168166
private final DeploymentHasAtLeastOneAllocation predicate;
169167

170-
private WaitingListener(String deploymentId, WaitingRequest request, DeploymentHasAtLeastOneAllocation predicate) {
171-
this.deploymentId = deploymentId;
168+
private WaitingListener(WaitingRequest request, DeploymentHasAtLeastOneAllocation predicate) {
172169
this.request = request;
173170
this.predicate = predicate;
174171
}
@@ -183,13 +180,11 @@ public void onResponse(TrainedModelAssignment assignment) {
183180
return;
184181
}
185182

186-
logger.info("sending waited request");
187183
queuedConsumer.accept(request, assignment);
188184
}
189185

190186
@Override
191187
public void onFailure(Exception e) {
192-
logger.info("failed waiting", e);
193188
pendingRequestCount.decrementAndGet();
194189
request.listener().onFailure(e);
195190
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,6 @@ void setMinMaxNumberOfAllocations(Integer minNumberOfAllocations, Integer maxNum
7575
this.maxNumberOfAllocations = maxNumberOfAllocations;
7676
}
7777

78-
void setScaleToZeroPeriod(long inactivitySeconds) {
79-
logger.info("setting scale to zero " + inactivitySeconds);
80-
this.scaleToZeroAfterNoRequestsSeconds = inactivitySeconds;
81-
}
82-
8378
void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSeconds, int numberOfAllocations) {
8479
lastMeasuredQueueSize = stats.pendingCount();
8580
if (stats.requestCount() > 0) {
@@ -144,9 +139,6 @@ Double getInferenceTimeEstimate() {
144139

145140
Integer scale() {
146141

147-
logger.info("[{}] checking scale.", deploymentId);
148-
logger.info("[{}] to zero", scaleToZeroAfterNoRequestsSeconds);
149-
150142
if (requestRateEstimator.hasValue() == false) {
151143
return null;
152144
}
@@ -180,7 +172,7 @@ Integer scale() {
180172

181173
if (oldNumberOfAllocations != 0) {
182174
// avoid logging this message if there is no change
183-
logger.info("[{}] adaptive allocations scaler: scaling down to zero, because of no requests.", deploymentId);
175+
logger.debug("[{}] adaptive allocations scaler: scaling down to zero, because of no requests.", deploymentId);
184176
}
185177
numberOfAllocations = 0;
186178
neededNumberOfAllocations = 0;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,8 @@
4343
import java.util.Set;
4444
import java.util.concurrent.ConcurrentSkipListSet;
4545
import java.util.concurrent.atomic.AtomicBoolean;
46-
import java.util.concurrent.atomic.AtomicLong;
4746
import java.util.function.Function;
4847

49-
import static org.elasticsearch.xpack.ml.MachineLearning.ADAPTIVE_ALLOCATIONS_SCALE_TO_ZERO_INTERVAL;
50-
5148
/**
5249
* Periodically schedules adaptive allocations scaling. This process consists
5350
* of calling the trained model stats API, processing the results, determining
@@ -188,6 +185,12 @@ Collection<DoubleWithAttributes> observeDouble(Function<AdaptiveAllocationsScale
188185
*/
189186
private static final long SCALE_UP_COOLDOWN_TIME_MILLIS = TimeValue.timeValueMinutes(5).getMillis();
190187

188+
/**
189+
* The time interval without any requests that has to pass, before scaling down
190+
* to zero allocations (in case min_allocations = 0).
191+
*/
192+
private static final long SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS = TimeValue.timeValueMinutes(15).getSeconds();
193+
191194
private static final Logger logger = LogManager.getLogger(AdaptiveAllocationsScalerService.class);
192195

193196
private final int timeIntervalSeconds;
@@ -206,7 +209,7 @@ Collection<DoubleWithAttributes> observeDouble(Function<AdaptiveAllocationsScale
206209
private volatile Scheduler.Cancellable cancellable;
207210
private final AtomicBoolean busy;
208211

209-
private final AtomicLong scaleToZeroAfterNoRequestsSeconds = new AtomicLong();
212+
private final long scaleToZeroAfterNoRequestsSeconds;
210213

211214
private final Set<String> deploymentIdsWithInFlightScaleFromZeroRequests = new ConcurrentSkipListSet<>();
212215

@@ -245,10 +248,7 @@ public AdaptiveAllocationsScalerService(
245248
scalers = new HashMap<>();
246249
metrics = new Metrics();
247250
busy = new AtomicBoolean(false);
248-
249-
setScaleToZeroPeriod(ADAPTIVE_ALLOCATIONS_SCALE_TO_ZERO_INTERVAL.get(clusterService.getSettings()));
250-
clusterService.getClusterSettings()
251-
.addSettingsUpdateConsumer(ADAPTIVE_ALLOCATIONS_SCALE_TO_ZERO_INTERVAL, this::setScaleToZeroPeriod);
251+
scaleToZeroAfterNoRequestsSeconds = SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS;
252252
}
253253

254254
public synchronized void start() {
@@ -295,7 +295,7 @@ private synchronized void updateAutoscalers(ClusterState state) {
295295
key -> new AdaptiveAllocationsScaler(
296296
assignment.getDeploymentId(),
297297
assignment.totalTargetAllocations(),
298-
scaleToZeroAfterNoRequestsSeconds.get()
298+
scaleToZeroAfterNoRequestsSeconds
299299
)
300300
);
301301
adaptiveAllocationsScaler.setMinMaxNumberOfAllocations(
@@ -336,8 +336,6 @@ private synchronized void stopScheduling() {
336336
}
337337

338338
private void trigger() {
339-
logger.info("trigger adaptive");
340-
341339
if (busy.getAndSet(true)) {
342340
logger.debug("Skipping inference adaptive allocations scaling, because it's still busy.");
343341
return;
@@ -350,7 +348,6 @@ private void trigger() {
350348
}
351349

352350
private void getDeploymentStats(ActionListener<GetDeploymentStatsAction.Response> processDeploymentStats) {
353-
logger.info("get deployment stats");
354351
String deploymentIds = String.join(",", scalers.keySet());
355352
ClientHelper.executeAsyncWithOrigin(
356353
client,
@@ -472,17 +469,12 @@ private void updateNumberOfAllocations(
472469
);
473470
}
474471

475-
private void setScaleToZeroPeriod(TimeValue timeValue) {
476-
logger.info("setting scaler service to zero " + timeValue);
477-
scaleToZeroAfterNoRequestsSeconds.set(timeValue.seconds());
478-
}
479-
480472
private ActionListener<CreateTrainedModelAssignmentAction.Response> updateAssigmentListener(
481473
String deploymentId,
482474
int numberOfAllocations
483475
) {
484476
return ActionListener.wrap(updateResponse -> {
485-
logger.info("adaptive allocations scaler: scaled [{}] to [{}] allocations.", deploymentId, numberOfAllocations);
477+
logger.debug("adaptive allocations scaler: scaled [{}] to [{}] allocations.", deploymentId, numberOfAllocations);
486478
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
487479
.execute(
488480
() -> inferenceAuditor.info(

0 commit comments

Comments
 (0)