1212import org .elasticsearch .ElasticsearchException ;
1313import org .elasticsearch .ElasticsearchStatusException ;
1414import org .elasticsearch .action .ActionListener ;
15+ import org .elasticsearch .action .DocWriteResponse ;
16+ import org .elasticsearch .action .search .SearchResponse ;
17+ import org .elasticsearch .action .support .WriteRequest ;
1518import org .elasticsearch .client .internal .Client ;
1619import org .elasticsearch .common .CheckedSupplier ;
1720import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
1821import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
19- import org .elasticsearch .common .util .concurrent .FutureUtils ;
2022import org .elasticsearch .common .util .concurrent .ThreadContext ;
2123import org .elasticsearch .core .IOUtils ;
24+ import org .elasticsearch .core .Nullable ;
25+ import org .elasticsearch .index .query .QueryBuilders ;
2226import org .elasticsearch .persistent .PersistentTasksCustomMetadata .PersistentTask ;
2327import org .elasticsearch .rest .RestStatus ;
28+ import org .elasticsearch .search .SearchHit ;
2429import org .elasticsearch .threadpool .ThreadPool ;
2530import org .elasticsearch .xpack .core .ml .job .config .AnalysisConfig ;
2631import org .elasticsearch .xpack .core .ml .job .config .Job ;
32+ import org .elasticsearch .xpack .core .ml .job .persistence .AnomalyDetectorsIndex ;
2733import org .elasticsearch .xpack .core .ml .job .process .autodetect .output .FlushAcknowledgement ;
34+ import org .elasticsearch .xpack .core .ml .job .process .autodetect .state .ModelSizeStats ;
35+ import org .elasticsearch .xpack .core .ml .job .process .autodetect .state .ModelSnapshot ;
2836import org .elasticsearch .xpack .core .ml .job .snapshot .upgrade .SnapshotUpgradeState ;
2937import org .elasticsearch .xpack .core .ml .job .snapshot .upgrade .SnapshotUpgradeTaskState ;
3038import org .elasticsearch .xpack .core .ml .utils .ExceptionsHelper ;
4452import java .util .HashMap ;
4553import java .util .Map ;
4654import java .util .Objects ;
47- import java .util .concurrent .ExecutionException ;
4855import java .util .concurrent .ExecutorService ;
49- import java .util .concurrent .Future ;
5056import java .util .concurrent .TimeoutException ;
5157import java .util .function .BiConsumer ;
5258import java .util .function .Consumer ;
@@ -153,6 +159,55 @@ synchronized void start() {
153159 executor .execute ();
154160 }
155161
162+ private void removeDuplicateModelSnapshotDoc (Consumer <Exception > runAfter ) {
163+ String snapshotDocId = jobId + "_model_snapshot_" + snapshotId ;
164+ client .prepareSearch (AnomalyDetectorsIndex .jobResultsIndexPattern ())
165+ .setQuery (QueryBuilders .constantScoreQuery (QueryBuilders .idsQuery ().addIds (snapshotDocId )))
166+ .setSize (2 )
167+ .addSort (ModelSnapshot .MIN_VERSION .getPreferredName (), org .elasticsearch .search .sort .SortOrder .ASC )
168+ .execute (ActionListener .wrap (searchResponse -> {
169+ if (searchResponse .getHits ().getTotalHits ().value > 1 ) {
170+ deleteOlderSnapshotDoc (searchResponse , runAfter );
171+ } else {
172+ onFinish .accept (null );
173+ }
174+ }, e -> {
175+ logger .warn (() -> format ("[%s] [%s] error during search for model snapshot documents" , jobId , snapshotId ), e );
176+ onFinish .accept (null );
177+ }));
178+ }
179+
180+ private void deleteOlderSnapshotDoc (SearchResponse searchResponse , Consumer <Exception > runAfter ) {
181+ SearchHit firstHit = searchResponse .getHits ().getAt (0 );
182+ logger .debug (() -> format ("[%s] deleting duplicate model snapshot doc [%s]" , jobId , firstHit .getId ()));
183+ client .prepareDelete ()
184+ .setIndex (firstHit .getIndex ())
185+ .setId (firstHit .getId ())
186+ .setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE )
187+ .execute (ActionListener .runAfter (ActionListener .wrap (deleteResponse -> {
188+ if ((deleteResponse .getResult () == DocWriteResponse .Result .DELETED ) == false ) {
189+ logger .warn (
190+ () -> format (
191+ "[%s] [%s] failed to delete old snapshot [%s] result document, document not found" ,
192+ jobId ,
193+ snapshotId ,
194+ ModelSizeStats .RESULT_TYPE_FIELD .getPreferredName ()
195+ )
196+ );
197+ }
198+ }, e -> {
199+ logger .warn (
200+ () -> format (
201+ "[%s] [%s] failed to delete old snapshot [%s] result document" ,
202+ jobId ,
203+ snapshotId ,
204+ ModelSizeStats .RESULT_TYPE_FIELD .getPreferredName ()
205+ ),
206+ e
207+ );
208+ }), () -> runAfter .accept (null )));
209+ }
210+
156211 void setTaskToFailed (String reason , ActionListener <PersistentTask <?>> listener ) {
157212 SnapshotUpgradeTaskState taskState = new SnapshotUpgradeTaskState (SnapshotUpgradeState .FAILED , task .getAllocationId (), reason );
158213 task .updatePersistentTaskState (taskState , ActionListener .wrap (listener ::onResponse , f -> {
@@ -259,7 +314,7 @@ void restoreState() {
259314 logger .error (() -> format ("[%s] [%s] failed to write old state" , jobId , snapshotId ), e );
260315 setTaskToFailed (
261316 "Failed to write old state due to: " + e .getMessage (),
262- ActionListener .wrap ( t -> shutdown ( e ), f -> shutdown (e ))
317+ ActionListener .running (() -> shutdownWithFailure (e ))
263318 );
264319 return ;
265320 }
@@ -273,7 +328,7 @@ void restoreState() {
273328 logger .error (() -> format ("[%s] [%s] failed to flush after writing old state" , jobId , snapshotId ), e );
274329 nextStep = () -> setTaskToFailed (
275330 "Failed to flush after writing old state due to: " + e .getMessage (),
276- ActionListener .wrap ( t -> shutdown ( e ), f -> shutdown (e ))
331+ ActionListener .running (() -> shutdownWithFailure (e ))
277332 );
278333 } else {
279334 logger .debug (
@@ -295,7 +350,7 @@ private void requestStateWrite() {
295350 new SnapshotUpgradeTaskState (SnapshotUpgradeState .SAVING_NEW_STATE , task .getAllocationId (), "" ),
296351 ActionListener .wrap (readingNewState -> {
297352 if (continueRunning .get () == false ) {
298- shutdown (null );
353+ shutdownWithFailure (null );
299354 return ;
300355 }
301356 submitOperation (() -> {
@@ -310,12 +365,12 @@ private void requestStateWrite() {
310365 // Execute callback in the UTILITY thread pool, as the current thread in the callback will be one in the
311366 // autodetectWorkerExecutor. Trying to run the callback in that executor will cause a dead lock as that
312367 // executor has a single processing queue.
313- (aVoid , e ) -> threadPool .executor (UTILITY_THREAD_POOL_NAME ).execute (() -> shutdown (e ))
368+ (aVoid , e ) -> threadPool .executor (UTILITY_THREAD_POOL_NAME ).execute (() -> handlePersistingState (e ))
314369 );
315370 logger .debug ("[{}] [{}] asked for state to be persisted" , jobId , snapshotId );
316371 }, f -> {
317372 logger .error (() -> format ("[%s] [%s] failed to update snapshot upgrader task to started" , jobId , snapshotId ), f );
318- shutdown (
373+ shutdownWithFailure (
319374 new ElasticsearchStatusException (
320375 "Failed to start snapshot upgrade [{}] for job [{}]" ,
321376 RestStatus .INTERNAL_SERVER_ERROR ,
@@ -378,17 +433,45 @@ private void checkResultsProcessorIsAlive() {
378433 }
379434 }
380435
381- void shutdown (Exception e ) {
436+ private void handlePersistingState (@ Nullable Exception exception ) {
437+ assert Thread .currentThread ().getName ().contains (UTILITY_THREAD_POOL_NAME );
438+
439+ if (exception != null ) {
440+ shutdownWithFailure (exception );
441+ } else {
442+ stopProcess ((aVoid , e ) -> {
443+ threadPool .executor (UTILITY_THREAD_POOL_NAME ).execute (() -> {
444+ autodetectWorkerExecutor .shutdownNow ();
445+ // If there are two snapshot documents in the results indices with the same snapshot id,
446+ // remove the old one. This can happen when the result index has been rolled over and
447+ // the write alias is pointing to the new index.
448+ removeDuplicateModelSnapshotDoc (onFinish );
449+ });
450+
451+ });
452+ }
453+ }
454+
455+ void shutdownWithFailure (Exception e ) {
456+ stopProcess ((aVoid , ignored ) -> {
457+ threadPool .executor (UTILITY_THREAD_POOL_NAME ).execute (() -> {
458+ onFinish .accept (e );
459+ autodetectWorkerExecutor .shutdownNow ();
460+ });
461+ });
462+ }
463+
464+ private void stopProcess (BiConsumer <Class <Void >, Exception > runNext ) {
382465 logger .debug ("[{}] [{}] shutdown initiated" , jobId , snapshotId );
383466 // No point in sending an action to the executor if the process has died
384467 if (process .isProcessAlive () == false ) {
385468 logger .debug ("[{}] [{}] process is dead, no need to shutdown" , jobId , snapshotId );
386- onFinish .accept (e );
387- autodetectWorkerExecutor .shutdownNow ();
388469 stateStreamer .cancel ();
470+ runNext .accept (null , null );
389471 return ;
390472 }
391- Future <?> future = autodetectWorkerExecutor .submit (() -> {
473+
474+ submitOperation (() -> {
392475 try {
393476 logger .debug ("[{}] [{}] shutdown is now occurring" , jobId , snapshotId );
394477 if (process .isReady ()) {
@@ -401,24 +484,10 @@ void shutdown(Exception e) {
401484 processor .awaitCompletion ();
402485 } catch (IOException | TimeoutException exc ) {
403486 logger .warn (() -> format ("[%s] [%s] failed to shutdown process" , jobId , snapshotId ), exc );
404- } finally {
405- onFinish .accept (e );
406487 }
407488 logger .debug ("[{}] [{}] connection for upgrade has been closed, process is shutdown" , jobId , snapshotId );
408- });
409- try {
410- future .get ();
411- autodetectWorkerExecutor .shutdownNow ();
412- } catch (InterruptedException interrupt ) {
413- Thread .currentThread ().interrupt ();
414- } catch (ExecutionException executionException ) {
415- if (processor .isProcessKilled ()) {
416- // In this case the original exception is spurious and highly misleading
417- throw ExceptionsHelper .conflictStatusException ("close snapshot upgrade interrupted by kill request" );
418- } else {
419- throw FutureUtils .rethrowExecutionException (executionException );
420- }
421- }
489+ return Void .TYPE ;
490+ }, runNext );
422491 }
423492 }
424493}
0 commit comments