|
6 | 6 | */ |
7 | 7 | package org.elasticsearch.xpack.ml; |
8 | 8 |
|
9 | | -import org.elasticsearch.action.admin.indices.rollover.RolloverConditions; |
10 | | -import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder; |
11 | | -import org.elasticsearch.client.internal.OriginSettingClient; |
12 | | -import org.elasticsearch.common.unit.ByteSizeUnit; |
13 | | -import org.elasticsearch.common.unit.ByteSizeValue; |
14 | | -import org.elasticsearch.index.IndexNotFoundException; |
15 | | -import org.elasticsearch.logging.LogManager; |
16 | 9 | import org.apache.lucene.util.SetOnce; |
17 | 10 | import org.elasticsearch.action.ActionListener; |
18 | 11 | import org.elasticsearch.action.ActionType; |
|
21 | 14 | import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; |
22 | 15 | import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; |
23 | 16 | import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; |
| 17 | +import org.elasticsearch.action.admin.indices.rollover.RolloverConditions; |
| 18 | +import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder; |
24 | 19 | import org.elasticsearch.action.support.IndicesOptions; |
25 | 20 | import org.elasticsearch.action.support.master.AcknowledgedRequest; |
26 | 21 | import org.elasticsearch.action.support.master.AcknowledgedResponse; |
27 | 22 | import org.elasticsearch.client.internal.Client; |
| 23 | +import org.elasticsearch.client.internal.OriginSettingClient; |
28 | 24 | import org.elasticsearch.cluster.ClusterName; |
29 | 25 | import org.elasticsearch.cluster.ClusterState; |
30 | 26 | import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; |
31 | 27 | import org.elasticsearch.cluster.metadata.ProjectMetadata; |
32 | 28 | import org.elasticsearch.cluster.service.ClusterService; |
33 | 29 | import org.elasticsearch.common.settings.Settings; |
| 30 | +import org.elasticsearch.common.unit.ByteSizeUnit; |
| 31 | +import org.elasticsearch.common.unit.ByteSizeValue; |
34 | 32 | import org.elasticsearch.common.util.concurrent.EsExecutors; |
35 | 33 | import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; |
36 | 34 | import org.elasticsearch.common.util.set.Sets; |
37 | 35 | import org.elasticsearch.core.Predicates; |
38 | 36 | import org.elasticsearch.core.Releasable; |
39 | 37 | import org.elasticsearch.core.TimeValue; |
40 | 38 | import org.elasticsearch.core.Tuple; |
| 39 | +import org.elasticsearch.index.IndexNotFoundException; |
| 40 | +import org.elasticsearch.logging.LogManager; |
41 | 41 | import org.elasticsearch.persistent.PersistentTasksCustomMetadata; |
42 | 42 | import org.elasticsearch.tasks.TaskInfo; |
43 | 43 | import org.elasticsearch.threadpool.Scheduler; |
@@ -92,7 +92,6 @@ public class MlDailyMaintenanceService implements Releasable { |
92 | 92 |
|
93 | 93 | private final IndexNameExpressionResolver expressionResolver; |
94 | 94 |
|
95 | | - |
96 | 95 | private final boolean isAnomalyDetectionEnabled; |
97 | 96 | private final boolean isDataFrameAnalyticsEnabled; |
98 | 97 | private final boolean isNlpEnabled; |
@@ -164,22 +163,22 @@ void setDeleteExpiredDataRequestsPerSecond(float value) { |
164 | 163 | * @param clusterName the cluster name is used to seed the random offset |
165 | 164 | * @return the delay to the next time the maintenance should be triggered |
166 | 165 | */ |
167 | | -// private static TimeValue delayToNextTime(ClusterName clusterName) { |
168 | | -// Random random = new Random(clusterName.hashCode()); |
169 | | -// int minutesOffset = random.ints(0, MAX_TIME_OFFSET_MINUTES).findFirst().getAsInt(); |
170 | | -// |
171 | | -// ZonedDateTime now = ZonedDateTime.now(Clock.systemDefaultZone()); |
172 | | -// ZonedDateTime next = now.plusDays(1).toLocalDate().atStartOfDay(now.getZone()).plusMinutes(30).plusMinutes(minutesOffset); |
173 | | -// return TimeValue.timeValueMillis(next.toInstant().toEpochMilli() - now.toInstant().toEpochMilli()); |
174 | | -// } |
| 166 | + // private static TimeValue delayToNextTime(ClusterName clusterName) { |
| 167 | + // Random random = new Random(clusterName.hashCode()); |
| 168 | + // int minutesOffset = random.ints(0, MAX_TIME_OFFSET_MINUTES).findFirst().getAsInt(); |
| 169 | + // |
| 170 | + // ZonedDateTime now = ZonedDateTime.now(Clock.systemDefaultZone()); |
| 171 | + // ZonedDateTime next = now.plusDays(1).toLocalDate().atStartOfDay(now.getZone()).plusMinutes(30).plusMinutes(minutesOffset); |
| 172 | + // return TimeValue.timeValueMillis(next.toInstant().toEpochMilli() - now.toInstant().toEpochMilli()); |
| 173 | + // } |
175 | 174 |
|
176 | 175 | private static TimeValue delayToNextTime(ClusterName clusterName) { |
177 | 176 | Random random = new Random(clusterName.hashCode()); |
178 | 177 | int minutesOffset = 5; |
179 | 178 |
|
180 | 179 | ZonedDateTime now = ZonedDateTime.now(Clock.systemDefaultZone()); |
181 | 180 | ZonedDateTime next = now.plusMinutes(minutesOffset); |
182 | | - var ret = TimeValue.timeValueMillis(next.toInstant().toEpochMilli() - now.toInstant().toEpochMilli()); |
| 181 | + var ret = TimeValue.timeValueMillis(next.toInstant().toEpochMilli() - now.toInstant().toEpochMilli()); |
183 | 182 | logger.warn("Delay until next time [{}] is [{}]", next, ret); |
184 | 183 | return ret; |
185 | 184 | } |
@@ -246,57 +245,48 @@ private void triggerTasks() { |
246 | 245 |
|
247 | 246 | private void triggerAnomalyDetectionMaintenance() { |
248 | 247 | // Step 5: Log any error that could have happened |
249 | | - ActionListener<AcknowledgedResponse> finalListener = ActionListener.wrap( |
250 | | - response -> { |
251 | | - if (response.isAcknowledged() == false) { |
252 | | - logger.warn("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask failed"); |
253 | | - } else { |
254 | | - logger.info("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask succeeded"); |
255 | | - } |
256 | | - }, |
257 | | - e -> logger.warn("An error occurred during [ML] maintenance tasks execution ", e) ); |
| 248 | + ActionListener<AcknowledgedResponse> finalListener = ActionListener.wrap(response -> { |
| 249 | + if (response.isAcknowledged() == false) { |
| 250 | + logger.warn("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask failed"); |
| 251 | + } else { |
| 252 | + logger.info("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask succeeded"); |
| 253 | + } |
| 254 | + }, e -> logger.warn("An error occurred during [ML] maintenance tasks execution ", e)); |
258 | 255 |
|
259 | 256 | // Step 4: Roll over results indices if necessary |
260 | | - ActionListener<AcknowledgedResponse> rollResultsIndicesIfNecessaryListener = ActionListener.wrap( |
261 | | - unused -> { |
262 | | - logger.warn("1. About to call [triggerRollResultsIndicesIfNecessaryTask]"); |
263 | | - |
264 | | - triggerRollResultsIndicesIfNecessaryTask(finalListener);}, |
265 | | - e -> { |
266 | | - logger.warn("[ML] maintenance task: triggerDeleteExpiredDataTask failed ", e); |
267 | | - logger.warn("2. About to call [triggerRollResultsIndicesIfNecessaryTask]"); |
| 257 | + ActionListener<AcknowledgedResponse> rollResultsIndicesIfNecessaryListener = ActionListener.wrap(unused -> { |
| 258 | + logger.warn("1. About to call [triggerRollResultsIndicesIfNecessaryTask]"); |
268 | 259 |
|
| 260 | + triggerRollResultsIndicesIfNecessaryTask(finalListener); |
| 261 | + }, e -> { |
| 262 | + logger.warn("[ML] maintenance task: triggerDeleteExpiredDataTask failed ", e); |
| 263 | + logger.warn("2. About to call [triggerRollResultsIndicesIfNecessaryTask]"); |
269 | 264 |
|
270 | | - // Note: Steps 1-4 are independent, so continue upon errors. |
271 | | - triggerRollResultsIndicesIfNecessaryTask(finalListener); |
272 | | - } |
273 | | - ); |
| 265 | + // Note: Steps 1-4 are independent, so continue upon errors. |
| 266 | + triggerRollResultsIndicesIfNecessaryTask(finalListener); |
| 267 | + }); |
274 | 268 |
|
275 | 269 | // Step 3: Delete expired data |
276 | | - ActionListener<AcknowledgedResponse> deleteJobsListener = ActionListener.wrap( |
277 | | - unused -> { |
278 | | - logger.warn("About to call [triggerDeleteExpiredDataTask]"); |
279 | | - triggerDeleteExpiredDataTask(rollResultsIndicesIfNecessaryListener);}, |
280 | | - e -> { |
281 | | - logger.warn("[ML] maintenance task: triggerResetJobsInStateResetWithoutResetTask failed", e); |
282 | | - logger.warn("About to call [triggerDeleteExpiredDataTask]"); |
283 | | - // Note: Steps 1-4 are independent, so continue upon errors. |
284 | | - triggerDeleteExpiredDataTask(rollResultsIndicesIfNecessaryListener); |
285 | | - } |
286 | | - ); |
| 270 | + ActionListener<AcknowledgedResponse> deleteJobsListener = ActionListener.wrap(unused -> { |
| 271 | + logger.warn("About to call [triggerDeleteExpiredDataTask]"); |
| 272 | + triggerDeleteExpiredDataTask(rollResultsIndicesIfNecessaryListener); |
| 273 | + }, e -> { |
| 274 | + logger.warn("[ML] maintenance task: triggerResetJobsInStateResetWithoutResetTask failed", e); |
| 275 | + logger.warn("About to call [triggerDeleteExpiredDataTask]"); |
| 276 | + // Note: Steps 1-4 are independent, so continue upon errors. |
| 277 | + triggerDeleteExpiredDataTask(rollResultsIndicesIfNecessaryListener); |
| 278 | + }); |
287 | 279 |
|
288 | 280 | // Step 2: Reset jobs that are in resetting state without task |
289 | | - ActionListener<AcknowledgedResponse> resetJobsListener = ActionListener.wrap( |
290 | | - unused -> { |
291 | | - logger.warn("About to call [triggerResetJobsInStateResetWithoutResetTask]"); |
292 | | - triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener);}, |
293 | | - e -> { |
294 | | - logger.warn("[ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask failed", e); |
295 | | - logger.warn("About to call [triggerResetJobsInStateResetWithoutResetTask]"); |
296 | | - // Note: Steps 1-4 are independent, so continue upon errors. |
297 | | - triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener); |
298 | | - } |
299 | | - ); |
| 281 | + ActionListener<AcknowledgedResponse> resetJobsListener = ActionListener.wrap(unused -> { |
| 282 | + logger.warn("About to call [triggerResetJobsInStateResetWithoutResetTask]"); |
| 283 | + triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener); |
| 284 | + }, e -> { |
| 285 | + logger.warn("[ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask failed", e); |
| 286 | + logger.warn("About to call [triggerResetJobsInStateResetWithoutResetTask]"); |
| 287 | + // Note: Steps 1-4 are independent, so continue upon errors. |
| 288 | + triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener); |
| 289 | + }); |
300 | 290 |
|
301 | 291 | // Step 1: Delete jobs that are in deleting state without task |
302 | 292 | logger.warn("About to call [triggerDeleteJobsInStateDeletingWithoutDeletionTask]"); |
@@ -353,11 +343,7 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio |
353 | 343 | // We must still clean up the temporary alias from the original index. |
354 | 344 | // The index name is either the original one provided or the original with a suffix appended. |
355 | 345 | var indexName = MlIndexAndAlias.has6DigitSuffix(index) ? index : index + MlIndexAndAlias.FIRST_INDEX_SIX_DIGIT_SUFFIX; |
356 | | - logger.warn( |
357 | | - "[ML] Removing dangling rollover alias [{}] from index [{}].", |
358 | | - rolloverAlias, |
359 | | - indexName |
360 | | - ); |
| 346 | + logger.warn("[ML] Removing dangling rollover alias [{}] from index [{}].", rolloverAlias, indexName); |
361 | 347 |
|
362 | 348 | // Make sure we use a fresh IndicesAliasesRequestBuilder, the original one may have changed internal state. |
363 | 349 | IndicesAliasesRequestBuilder localAliasRequestBuilder = client.admin() |
@@ -415,13 +401,11 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio |
415 | 401 | .setNewIndexName(newIndexName) |
416 | 402 | // .setConditions(RolloverConditions.newBuilder().addMaxIndexSizeCondition(ByteSizeValue.of(50, |
417 | 403 | // ByteSizeUnit.GB)).build()) // TODO Make these settings? |
418 | | - .setConditions( |
419 | | - RolloverConditions.newBuilder().addMaxIndexSizeCondition(ByteSizeValue.of(2, ByteSizeUnit.MB)).build() |
420 | | - ) // TODO |
421 | | - // Make |
422 | | - // these |
423 | | - // changeable |
424 | | - // settings? |
| 404 | + .setConditions(RolloverConditions.newBuilder().addMaxIndexSizeCondition(ByteSizeValue.of(2, ByteSizeUnit.MB)).build()) // TODO |
| 405 | + // Make |
| 406 | + // these |
| 407 | + // changeable |
| 408 | + // settings? |
425 | 409 | .request(), |
426 | 410 | rolloverListener |
427 | 411 | ); |
@@ -452,7 +436,6 @@ private void triggerRollResultsIndicesIfNecessaryTask(ActionListener<Acknowledge |
452 | 436 | logger.info("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask"); |
453 | 437 | logger.warn("AD results indices [{}]", (Object) indices); |
454 | 438 |
|
455 | | - |
456 | 439 | for (String index : indices) { |
457 | 440 | logger.warn("Processing index [{}]", index); |
458 | 441 | // Check if this index has already been rolled over |
|
0 commit comments