@@ -163,25 +163,14 @@ void setDeleteExpiredDataRequestsPerSecond(float value) {
163163 * @param clusterName the cluster name is used to seed the random offset
164164 * @return the delay to the next time the maintenance should be triggered
165165 */
166- // private static TimeValue delayToNextTime(ClusterName clusterName) {
167- // Random random = new Random(clusterName.hashCode());
168- // int minutesOffset = random.ints(0, MAX_TIME_OFFSET_MINUTES).findFirst().getAsInt();
169- //
170- // ZonedDateTime now = ZonedDateTime.now(Clock.systemDefaultZone());
171- // ZonedDateTime next = now.plusDays(1).toLocalDate().atStartOfDay(now.getZone()).plusMinutes(30).plusMinutes(minutesOffset);
172- // return TimeValue.timeValueMillis(next.toInstant().toEpochMilli() - now.toInstant().toEpochMilli());
173- // }
174-
175- private static TimeValue delayToNextTime (ClusterName clusterName ) {
176- Random random = new Random (clusterName .hashCode ());
177- int minutesOffset = 5 ;
178-
179- ZonedDateTime now = ZonedDateTime .now (Clock .systemDefaultZone ());
180- ZonedDateTime next = now .plusMinutes (minutesOffset );
181- var ret = TimeValue .timeValueMillis (next .toInstant ().toEpochMilli () - now .toInstant ().toEpochMilli ());
182- logger .warn ("Delay until next time [{}] is [{}]" , next , ret );
183- return ret ;
184- }
166+ private static TimeValue delayToNextTime (ClusterName clusterName ) {
167+ Random random = new Random (clusterName .hashCode ());
168+ int minutesOffset = random .ints (0 , MAX_TIME_OFFSET_MINUTES ).findFirst ().getAsInt ();
169+
170+ ZonedDateTime now = ZonedDateTime .now (Clock .systemDefaultZone ());
171+ ZonedDateTime next = now .plusDays (1 ).toLocalDate ().atStartOfDay (now .getZone ()).plusMinutes (30 ).plusMinutes (minutesOffset );
172+ return TimeValue .timeValueMillis (next .toInstant ().toEpochMilli () - now .toInstant ().toEpochMilli ());
173+ }
185174
186175 public synchronized void start () {
187176 logger .info ("Starting ML daily maintenance service" );
@@ -255,41 +244,29 @@ private void triggerAnomalyDetectionMaintenance() {
255244
256245 // Step 4: Roll over results indices if necessary
257246 ActionListener <AcknowledgedResponse > rollResultsIndicesIfNecessaryListener = ActionListener .wrap (unused -> {
258- logger .warn ("1. About to call [triggerRollResultsIndicesIfNecessaryTask]" );
259-
260247 triggerRollResultsIndicesIfNecessaryTask (finalListener );
261248 }, e -> {
262- logger .warn ("[ML] maintenance task: triggerDeleteExpiredDataTask failed " , e );
263- logger .warn ("2. About to call [triggerRollResultsIndicesIfNecessaryTask]" );
264-
265249 // Note: Steps 1-4 are independent, so continue upon errors.
266250 triggerRollResultsIndicesIfNecessaryTask (finalListener );
267251 });
268252
269253 // Step 3: Delete expired data
270254 ActionListener <AcknowledgedResponse > deleteJobsListener = ActionListener .wrap (unused -> {
271- logger .warn ("About to call [triggerDeleteExpiredDataTask]" );
272255 triggerDeleteExpiredDataTask (rollResultsIndicesIfNecessaryListener );
273256 }, e -> {
274- logger .warn ("[ML] maintenance task: triggerResetJobsInStateResetWithoutResetTask failed" , e );
275- logger .warn ("About to call [triggerDeleteExpiredDataTask]" );
276257 // Note: Steps 1-4 are independent, so continue upon errors.
277258 triggerDeleteExpiredDataTask (rollResultsIndicesIfNecessaryListener );
278259 });
279260
280261 // Step 2: Reset jobs that are in resetting state without task
281262 ActionListener <AcknowledgedResponse > resetJobsListener = ActionListener .wrap (unused -> {
282- logger .warn ("About to call [triggerResetJobsInStateResetWithoutResetTask]" );
283263 triggerResetJobsInStateResetWithoutResetTask (deleteJobsListener );
284264 }, e -> {
285- logger .warn ("[ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask failed" , e );
286- logger .warn ("About to call [triggerResetJobsInStateResetWithoutResetTask]" );
287265 // Note: Steps 1-4 are independent, so continue upon errors.
288266 triggerResetJobsInStateResetWithoutResetTask (deleteJobsListener );
289267 });
290268
291269 // Step 1: Delete jobs that are in deleting state without task
292- logger .warn ("About to call [triggerDeleteJobsInStateDeletingWithoutDeletionTask]" );
293270 triggerDeleteJobsInStateDeletingWithoutDeletionTask (resetJobsListener );
294271 }
295272
@@ -331,19 +308,16 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio
331308 MachineLearning .HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT
332309 );
333310
334- // 3 Clean up any dangling aliases
311+ // 4 Clean up any dangling aliases
335312 ActionListener <Boolean > aliasListener = ActionListener .wrap (r -> {
336- logger .warn ("[ML] Update of aliases succeeded." , rolloverAlias );
337313 listener .onResponse (r );
338314 }, e -> {
339315 if (e instanceof IndexNotFoundException ) {
340- logger .warn ("[ML] Update of aliases failed: " , e );
341316 // Removal of the rollover alias may have failed in the case of rollover not occurring, e.g. when the rollover conditions
342317 // were not satisfied.
343318 // We must still clean up the temporary alias from the original index.
344319 // The index name is either the original one provided or the original with a suffix appended.
345320 var indexName = MlIndexAndAlias .has6DigitSuffix (index ) ? index : index + MlIndexAndAlias .FIRST_INDEX_SIX_DIGIT_SUFFIX ;
346- logger .warn ("[ML] Removing dangling rollover alias [{}] from index [{}]." , rolloverAlias , indexName );
347321
348322 // Make sure we use a fresh IndicesAliasesRequestBuilder, the original one may have changed internal state.
349323 IndicesAliasesRequestBuilder localAliasRequestBuilder = client .admin ()
@@ -362,11 +336,6 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio
362336
363337 // 3 Update aliases
364338 ActionListener <String > rolloverListener = ActionListener .wrap (newIndexNameResponse -> {
365- logger .warn (
366- "[ML] maintenance task: rollAndUpdateAliases for index [{}] succeeded. Cleaning up dangling alias [{}]." ,
367- newIndexNameResponse ,
368- rolloverAlias
369- );
370339 MlAnomaliesIndexUtils .addIndexAliasesRequests (aliasRequestBuilder , index , newIndexNameResponse , clusterState );
371340 // On success, the rollover alias may have been moved to the new index, so we attempt to remove it from there.
372341 // Note that the rollover request is considered "successful" even if it didn't occur due to a condition not being met
@@ -377,42 +346,24 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio
377346 // If rollover fails, we must still clean up the temporary alias from the original index.
378347 // The index name is either the original one provided or the original with a suffix appended.
379348 var indexName = MlIndexAndAlias .has6DigitSuffix (index ) ? index : index + MlIndexAndAlias .FIRST_INDEX_SIX_DIGIT_SUFFIX ;
380- logger .warn (
381- "[ML] maintenance task: rollAndUpdateAliases for index [{}] failed with exception [{}]. Cleaning up dangling alias [{}]" ,
382- indexName ,
383- e ,
384- rolloverAlias
385- );
386349 // Execute the cleanup, no need to propagate the original failure.
387350 removeRolloverAlias (indexName , rolloverAlias , aliasRequestBuilder , aliasListener );
388351 });
389352
390353 // 2 rollover the index alias to the new index name
391354 ActionListener <IndicesAliasesResponse > getIndicesAliasesListener = ActionListener .wrap (getIndicesAliasesResponse -> {
392- logger .info (
393- "[ML] getIndicesAliasesResponse: [{}] about to execute rollover request of alias [{}] to new concrete index name [{}]" ,
394- getIndicesAliasesResponse ,
395- rolloverAlias ,
396- newIndexName
397- );
398355 MlAnomaliesIndexUtils .rollover (
399356 client ,
400357 new RolloverRequestBuilder (client ).setRolloverTarget (rolloverAlias )
401358 .setNewIndexName (newIndexName )
402359 // TODO Make these conditions configurable settings?
403- // .setConditions(RolloverConditions.newBuilder().addMaxIndexSizeCondition(ByteSizeValue.of(50,
404- // ByteSizeUnit.GB)).build())
405- .setConditions (RolloverConditions .newBuilder ().addMaxIndexSizeCondition (ByteSizeValue .of (2 , ByteSizeUnit .MB )).build ())
360+ .setConditions (RolloverConditions .newBuilder ().addMaxIndexSizeCondition (ByteSizeValue .of (50 , ByteSizeUnit .GB )).build ())
406361 .request (),
407362 rolloverListener
408363 );
409- }, (e ) -> {
410- logger .warn ("XXX [ML] getIndicesAliasesResponse: [{}] rollover request failed " , e );
411- rolloverListener .onFailure (e );
412- });
364+ }, rolloverListener ::onFailure );
413365
414366 // 1. Create necessary aliases
415- logger .warn ("Creating rollover alias [{}] for index [{}]" , rolloverAlias , index );
416367 MlAnomaliesIndexUtils .createAliasForRollover (logger , client , index , rolloverAlias , getIndicesAliasesListener );
417368 }
418369
@@ -431,36 +382,32 @@ private void triggerRollResultsIndicesIfNecessaryTask(ActionListener<Acknowledge
431382 );
432383
433384 logger .info ("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask" );
434- logger .warn ("AD results indices [{}]" , (Object ) indices );
435385
436386 for (String index : indices ) {
437- logger .warn ("Processing index [{}]" , index );
438387 // Check if this index has already been rolled over
439388 String latestIndex = MlIndexAndAlias .latestIndexMatchingBaseName (index , expressionResolver , clusterState );
440389
441390 if (index .equals (latestIndex ) == false ) {
442- logger .warn ("index [{}] will not be rolled over as there is a later index [{}]" , index , latestIndex );
443391 continue ;
444392 }
445393
446394 ActionListener <Boolean > rollAndUpdateAliasesResponseListener = finalListener .delegateFailureAndWrap (
447395 (l , rolledAndUpdatedAliasesResponse ) -> {
448396 if (rolledAndUpdatedAliasesResponse ) {
449- logger .warn (
450- "2: Successfully completed [ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask for index [{}]" ,
397+ logger .info (
398+ "Successfully completed [ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask for index [{}]" ,
451399 index
452400 );
453401 } else {
454402 logger .warn (
455- "2: Unsuccessful run of [ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask for index [{}]" ,
403+ "Unsuccessful run of [ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask for index [{}]" ,
456404 index
457405 );
458406 }
459407 l .onResponse (AcknowledgedResponse .TRUE ); // TODO return false if operation failed for any index?
460408 }
461409 );
462410
463- logger .warn ("Executing [rollAndUpdateAliases]" );
464411 rollAndUpdateAliases (clusterState , index , rollAndUpdateAliasesResponseListener );
465412 }
466413 }
@@ -542,7 +489,6 @@ private void triggerJobsInStateWithoutMatchingTask(
542489 ) {
543490 SetOnce <Set <String >> jobsInStateHolder = new SetOnce <>();
544491
545- // 3. Filter job responses by those that were not acknowledged (failed) and log an appropriate message
546492 ActionListener <List <Tuple <String , AcknowledgedResponse >>> jobsActionListener = finalListener .delegateFailureAndWrap (
547493 (delegate , jobsResponses ) -> {
548494 List <String > jobIds = jobsResponses .stream ().filter (t -> t .v2 ().isAcknowledged () == false ).map (Tuple ::v1 ).collect (toList ());
@@ -551,31 +497,26 @@ private void triggerJobsInStateWithoutMatchingTask(
551497 } else {
552498 logger .info ("[ML] maintenance task {} failed for jobs: {}" , maintenanceTaskName , jobIds );
553499 }
554- delegate .onResponse (AcknowledgedResponse .TRUE ); // The overall return value is always true
500+ delegate .onResponse (AcknowledgedResponse .TRUE );
555501 }
556502 );
557503
558- // 2. Get all ML tasks
559504 ActionListener <ListTasksResponse > listTasksActionListener = ActionListener .wrap (listTasksResponse -> {
560- // 2a work out all jobs in the specified state that *don't* have an associated task
561505 Set <String > jobsInState = jobsInStateHolder .get ();
562506 Set <String > jobsWithTask = listTasksResponse .getTasks ().stream ().map (jobIdExtractor ).filter (Objects ::nonNull ).collect (toSet ());
563507 Set <String > jobsInStateWithoutTask = Sets .difference (jobsInState , jobsWithTask );
564508 if (jobsInStateWithoutTask .isEmpty ()) {
565- finalListener .onResponse (AcknowledgedResponse .TRUE ); // If nothing to do set true in finalListener and return, performing no
566- // further operations
509+ finalListener .onResponse (AcknowledgedResponse .TRUE );
510+
567511 return ;
568512 }
569513
570- // 2b Create a chained task executor whose associated responses will have return type Tuple<String, AcknowledgedResponse>
571514 TypedChainTaskExecutor <Tuple <String , AcknowledgedResponse >> chainTaskExecutor = new TypedChainTaskExecutor <>(
572515 EsExecutors .DIRECT_EXECUTOR_SERVICE ,
573516 Predicates .always (),
574517 Predicates .always ()
575518 );
576519
577- // 2c for each job in the specified state without an associated persistent task, add a supplied request to the list of chained
578- // tasks to execute
579520 for (String jobId : jobsInStateWithoutTask ) {
580521 chainTaskExecutor .add (
581522 listener -> executeAsyncWithOrigin (
@@ -588,24 +529,19 @@ private void triggerJobsInStateWithoutMatchingTask(
588529 );
589530 }
590531
591- // 2d Execute the list of chained requests
592532 chainTaskExecutor .execute (jobsActionListener );
593533 }, finalListener ::onFailure );
594534
595- // 1. Get all jobs
596535 ActionListener <GetJobsAction .Response > getJobsActionListener = ActionListener .wrap (getJobsResponse -> {
597- // 1a Filter jobs by specified particular state
598536 Set <String > jobsInState = getJobsResponse .getResponse ().results ().stream ().filter (jobFilter ).map (Job ::getId ).collect (toSet ());
599537 if (jobsInState .isEmpty ()) {
600538 logger .warn ("[{}]: no jobs in state [{}]" , maintenanceTaskName , jobsInState );
601- finalListener .onResponse (AcknowledgedResponse .TRUE ); // If nothing to do return true in the final listener, do not perform
602- // any more operations
539+ finalListener .onResponse (AcknowledgedResponse .TRUE );
540+
603541 return ;
604542 }
605- // 1b Stash the filtered jobs in a set for further operations, do this once and only once
606543 jobsInStateHolder .set (jobsInState );
607544
608- // 1c Execute another operation to list all permanent ML tasks
609545 executeAsyncWithOrigin (
610546 client ,
611547 ML_ORIGIN ,
@@ -615,7 +551,6 @@ private void triggerJobsInStateWithoutMatchingTask(
615551 );
616552 }, finalListener ::onFailure );
617553
618- logger .warn ("Executing GetJobsAction" );
619554 executeAsyncWithOrigin (client , ML_ORIGIN , GetJobsAction .INSTANCE , new GetJobsAction .Request ("*" ), getJobsActionListener );
620555 }
621556
0 commit comments