1212import  org .elasticsearch .ElasticsearchException ;
1313import  org .elasticsearch .ElasticsearchStatusException ;
1414import  org .elasticsearch .action .ActionListener ;
15+ import  org .elasticsearch .action .DocWriteResponse ;
16+ import  org .elasticsearch .action .search .SearchResponse ;
17+ import  org .elasticsearch .action .support .WriteRequest ;
1518import  org .elasticsearch .client .internal .Client ;
1619import  org .elasticsearch .common .CheckedSupplier ;
1720import  org .elasticsearch .common .util .concurrent .AbstractRunnable ;
1821import  org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
19- import  org .elasticsearch .common .util .concurrent .FutureUtils ;
2022import  org .elasticsearch .common .util .concurrent .ThreadContext ;
2123import  org .elasticsearch .core .IOUtils ;
24+ import  org .elasticsearch .core .Nullable ;
25+ import  org .elasticsearch .index .query .QueryBuilders ;
2226import  org .elasticsearch .persistent .PersistentTasksCustomMetadata .PersistentTask ;
2327import  org .elasticsearch .rest .RestStatus ;
28+ import  org .elasticsearch .search .SearchHit ;
2429import  org .elasticsearch .threadpool .ThreadPool ;
2530import  org .elasticsearch .xpack .core .ml .job .config .AnalysisConfig ;
2631import  org .elasticsearch .xpack .core .ml .job .config .Job ;
32+ import  org .elasticsearch .xpack .core .ml .job .persistence .AnomalyDetectorsIndex ;
2733import  org .elasticsearch .xpack .core .ml .job .process .autodetect .output .FlushAcknowledgement ;
34+ import  org .elasticsearch .xpack .core .ml .job .process .autodetect .state .ModelSizeStats ;
35+ import  org .elasticsearch .xpack .core .ml .job .process .autodetect .state .ModelSnapshot ;
2836import  org .elasticsearch .xpack .core .ml .job .snapshot .upgrade .SnapshotUpgradeState ;
2937import  org .elasticsearch .xpack .core .ml .job .snapshot .upgrade .SnapshotUpgradeTaskState ;
3038import  org .elasticsearch .xpack .core .ml .utils .ExceptionsHelper ;
4452import  java .util .HashMap ;
4553import  java .util .Map ;
4654import  java .util .Objects ;
47- import  java .util .concurrent .ExecutionException ;
4855import  java .util .concurrent .ExecutorService ;
49- import  java .util .concurrent .Future ;
5056import  java .util .concurrent .TimeoutException ;
5157import  java .util .function .BiConsumer ;
5258import  java .util .function .Consumer ;
@@ -153,6 +159,55 @@ synchronized void start() {
153159        executor .execute ();
154160    }
155161
162+     private  void  removeDuplicateModelSnapshotDoc (Consumer <Exception > runAfter ) {
163+         String  snapshotDocId  = jobId  + "_model_snapshot_"  + snapshotId ;
164+         client .prepareSearch (AnomalyDetectorsIndex .jobResultsIndexPattern ())
165+             .setQuery (QueryBuilders .constantScoreQuery (QueryBuilders .idsQuery ().addIds (snapshotDocId )))
166+             .setSize (2 )
167+             .addSort (ModelSnapshot .MIN_VERSION .getPreferredName (), org .elasticsearch .search .sort .SortOrder .ASC )
168+             .execute (ActionListener .wrap (searchResponse  -> {
169+                 if  (searchResponse .getHits ().getTotalHits ().value () > 1 ) {
170+                     deleteOlderSnapshotDoc (searchResponse , runAfter );
171+                 } else  {
172+                     onFinish .accept (null );
173+                 }
174+             }, e  -> {
175+                 logger .warn (() -> format ("[%s] [%s] error during search for model snapshot documents" , jobId , snapshotId ), e );
176+                 onFinish .accept (null );
177+             }));
178+     }
179+ 
180+     private  void  deleteOlderSnapshotDoc (SearchResponse  searchResponse , Consumer <Exception > runAfter ) {
181+         SearchHit  firstHit  = searchResponse .getHits ().getAt (0 );
182+         logger .debug (() -> format ("[%s] deleting duplicate model snapshot doc [%s]" , jobId , firstHit .getId ()));
183+         client .prepareDelete ()
184+             .setIndex (firstHit .getIndex ())
185+             .setId (firstHit .getId ())
186+             .setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE )
187+             .execute (ActionListener .runAfter (ActionListener .wrap (deleteResponse  -> {
188+                 if  ((deleteResponse .getResult () == DocWriteResponse .Result .DELETED ) == false ) {
189+                     logger .warn (
190+                         () -> format (
191+                             "[%s] [%s] failed to delete old snapshot [%s] result document, document not found" ,
192+                             jobId ,
193+                             snapshotId ,
194+                             ModelSizeStats .RESULT_TYPE_FIELD .getPreferredName ()
195+                         )
196+                     );
197+                 }
198+             }, e  -> {
199+                 logger .warn (
200+                     () -> format (
201+                         "[%s] [%s] failed to delete old snapshot [%s] result document" ,
202+                         jobId ,
203+                         snapshotId ,
204+                         ModelSizeStats .RESULT_TYPE_FIELD .getPreferredName ()
205+                     ),
206+                     e 
207+                 );
208+             }), () -> runAfter .accept (null )));
209+     }
210+ 
156211    void  setTaskToFailed (String  reason , ActionListener <PersistentTask <?>> listener ) {
157212        SnapshotUpgradeTaskState  taskState  = new  SnapshotUpgradeTaskState (SnapshotUpgradeState .FAILED , task .getAllocationId (), reason );
158213        task .updatePersistentTaskState (taskState , ActionListener .wrap (listener ::onResponse , f  -> {
@@ -259,7 +314,7 @@ void restoreState() {
259314                logger .error (() -> format ("[%s] [%s] failed to write old state" , jobId , snapshotId ), e );
260315                setTaskToFailed (
261316                    "Failed to write old state due to: "  + e .getMessage (),
262-                     ActionListener .wrap ( t  ->  shutdown ( e ),  f   -> shutdown (e ))
317+                     ActionListener .running (()  -> shutdownWithFailure (e ))
263318                );
264319                return ;
265320            }
@@ -273,7 +328,7 @@ void restoreState() {
273328                    logger .error (() -> format ("[%s] [%s] failed to flush after writing old state" , jobId , snapshotId ), e );
274329                    nextStep  = () -> setTaskToFailed (
275330                        "Failed to flush after writing old state due to: "  + e .getMessage (),
276-                         ActionListener .wrap ( t  ->  shutdown ( e ),  f   -> shutdown (e ))
331+                         ActionListener .running (()  -> shutdownWithFailure (e ))
277332                    );
278333                } else  {
279334                    logger .debug (
@@ -295,7 +350,7 @@ private void requestStateWrite() {
295350                new  SnapshotUpgradeTaskState (SnapshotUpgradeState .SAVING_NEW_STATE , task .getAllocationId (), "" ),
296351                ActionListener .wrap (readingNewState  -> {
297352                    if  (continueRunning .get () == false ) {
298-                         shutdown (null );
353+                         shutdownWithFailure (null );
299354                        return ;
300355                    }
301356                    submitOperation (() -> {
@@ -310,12 +365,12 @@ private void requestStateWrite() {
310365                        // Execute callback in the UTILITY thread pool, as the current thread in the callback will be one in the 
311366                        // autodetectWorkerExecutor. Trying to run the callback in that executor will cause a dead lock as that 
312367                        // executor has a single processing queue. 
313-                         (aVoid , e ) -> threadPool .executor (UTILITY_THREAD_POOL_NAME ).execute (() -> shutdown (e ))
368+                         (aVoid , e ) -> threadPool .executor (UTILITY_THREAD_POOL_NAME ).execute (() -> handlePersistingState (e ))
314369                    );
315370                    logger .debug ("[{}] [{}] asked for state to be persisted" , jobId , snapshotId );
316371                }, f  -> {
317372                    logger .error (() -> format ("[%s] [%s] failed to update snapshot upgrader task to started" , jobId , snapshotId ), f );
318-                     shutdown (
373+                     shutdownWithFailure (
319374                        new  ElasticsearchStatusException (
320375                            "Failed to start snapshot upgrade [{}] for job [{}]" ,
321376                            RestStatus .INTERNAL_SERVER_ERROR ,
@@ -378,17 +433,45 @@ private void checkResultsProcessorIsAlive() {
378433            }
379434        }
380435
381-         void  shutdown (Exception  e ) {
436+         private  void  handlePersistingState (@ Nullable  Exception  exception ) {
437+             assert  Thread .currentThread ().getName ().contains (UTILITY_THREAD_POOL_NAME );
438+ 
439+             if  (exception  != null ) {
440+                 shutdownWithFailure (exception );
441+             } else  {
442+                 stopProcess ((aVoid , e ) -> {
443+                     threadPool .executor (UTILITY_THREAD_POOL_NAME ).execute (() -> {
444+                         autodetectWorkerExecutor .shutdownNow ();
445+                         // If there are two snapshot documents in the results indices with the same snapshot id, 
446+                         // remove the old one. This can happen when the result index has been rolled over and 
447+                         // the write alias is pointing to the new index. 
448+                         removeDuplicateModelSnapshotDoc (onFinish );
449+                     });
450+ 
451+                 });
452+             }
453+         }
454+ 
455+         void  shutdownWithFailure (Exception  e ) {
456+             stopProcess ((aVoid , ignored ) -> {
457+                 threadPool .executor (UTILITY_THREAD_POOL_NAME ).execute (() -> {
458+                     onFinish .accept (e );
459+                     autodetectWorkerExecutor .shutdownNow ();
460+                 });
461+             });
462+         }
463+ 
464+         private  void  stopProcess (BiConsumer <Class <Void >, Exception > runNext ) {
382465            logger .debug ("[{}] [{}] shutdown initiated" , jobId , snapshotId );
383466            // No point in sending an action to the executor if the process has died 
384467            if  (process .isProcessAlive () == false ) {
385468                logger .debug ("[{}] [{}] process is dead, no need to shutdown" , jobId , snapshotId );
386-                 onFinish .accept (e );
387-                 autodetectWorkerExecutor .shutdownNow ();
388469                stateStreamer .cancel ();
470+                 runNext .accept (null , null );
389471                return ;
390472            }
391-             Future <?> future  = autodetectWorkerExecutor .submit (() -> {
473+ 
474+             submitOperation (() -> {
392475                try  {
393476                    logger .debug ("[{}] [{}] shutdown is now occurring" , jobId , snapshotId );
394477                    if  (process .isReady ()) {
@@ -401,24 +484,10 @@ void shutdown(Exception e) {
401484                    processor .awaitCompletion ();
402485                } catch  (IOException  | TimeoutException  exc ) {
403486                    logger .warn (() -> format ("[%s] [%s] failed to shutdown process" , jobId , snapshotId ), exc );
404-                 } finally  {
405-                     onFinish .accept (e );
406487                }
407488                logger .debug ("[{}] [{}] connection for upgrade has been closed, process is shutdown" , jobId , snapshotId );
408-             });
409-             try  {
410-                 future .get ();
411-                 autodetectWorkerExecutor .shutdownNow ();
412-             } catch  (InterruptedException  interrupt ) {
413-                 Thread .currentThread ().interrupt ();
414-             } catch  (ExecutionException  executionException ) {
415-                 if  (processor .isProcessKilled ()) {
416-                     // In this case the original exception is spurious and highly misleading 
417-                     throw  ExceptionsHelper .conflictStatusException ("close snapshot upgrade interrupted by kill request" );
418-                 } else  {
419-                     throw  FutureUtils .rethrowExecutionException (executionException );
420-                 }
421-             }
489+                 return  Void .TYPE ;
490+             }, runNext );
422491        }
423492    }
424493}
0 commit comments