8
8
9
9
package org .elasticsearch .indices ;
10
10
11
+ import org .apache .logging .log4j .LogManager ;
12
+ import org .apache .logging .log4j .Logger ;
11
13
import org .apache .logging .log4j .message .ParameterizedMessage ;
12
14
import org .apache .lucene .util .automaton .Automata ;
13
15
import org .apache .lucene .util .automaton .Automaton ;
14
16
import org .apache .lucene .util .automaton .CharacterRunAutomaton ;
15
17
import org .apache .lucene .util .automaton .MinimizationOperations ;
16
18
import org .apache .lucene .util .automaton .Operations ;
17
19
import org .elasticsearch .action .ActionListener ;
20
+ import org .elasticsearch .action .admin .cluster .snapshots .features .ResetFeatureStateResponse ;
18
21
import org .elasticsearch .action .admin .cluster .snapshots .features .ResetFeatureStateResponse .ResetFeatureStateStatus ;
19
22
import org .elasticsearch .action .admin .indices .delete .DeleteIndexAction ;
20
23
import org .elasticsearch .action .admin .indices .delete .DeleteIndexRequest ;
24
+ import org .elasticsearch .action .support .GroupedActionListener ;
21
25
import org .elasticsearch .action .support .master .AcknowledgedResponse ;
22
26
import org .elasticsearch .client .internal .Client ;
27
+ import org .elasticsearch .client .internal .OriginSettingClient ;
23
28
import org .elasticsearch .cluster .metadata .IndexMetadata ;
24
29
import org .elasticsearch .cluster .metadata .Metadata ;
25
30
import org .elasticsearch .cluster .service .ClusterService ;
@@ -66,6 +71,8 @@ public class SystemIndices {
66
71
67
72
private static final Automaton EMPTY = Automata .makeEmpty ();
68
73
74
+ private static final Logger logger = LogManager .getLogger (SystemIndices .class );
75
+
69
76
/**
70
77
* This is the source for non-plugin system features.
71
78
*/
@@ -288,6 +295,11 @@ public ExecutorSelector getExecutorSelector() {
288
295
* @return The matching {@link SystemIndexDescriptor} or {@code null} if no descriptor is found
289
296
*/
290
297
public @ Nullable SystemIndexDescriptor findMatchingDescriptor (String name ) {
298
+ return findMatchingDescriptor (indexDescriptors , name );
299
+ }
300
+
301
+ @ Nullable
302
+ static SystemIndexDescriptor findMatchingDescriptor (SystemIndexDescriptor [] indexDescriptors , String name ) {
291
303
SystemIndexDescriptor matchingDescriptor = null ;
292
304
for (SystemIndexDescriptor systemIndexDescriptor : indexDescriptors ) {
293
305
if (systemIndexDescriptor .matchesIndexPattern (name )) {
@@ -798,6 +810,27 @@ public MigrationCompletionHandler getPostMigrationFunction() {
798
810
return postMigrationFunction ;
799
811
}
800
812
813
+ private static void cleanUpFeatureForIndices (
814
+ String name ,
815
+ Client client ,
816
+ String [] indexNames ,
817
+ final ActionListener <ResetFeatureStateStatus > listener
818
+ ) {
819
+ DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest ();
820
+ deleteIndexRequest .indices (indexNames );
821
+ client .execute (DeleteIndexAction .INSTANCE , deleteIndexRequest , new ActionListener <>() {
822
+ @ Override
823
+ public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
824
+ listener .onResponse (ResetFeatureStateStatus .success (name ));
825
+ }
826
+
827
+ @ Override
828
+ public void onFailure (Exception e ) {
829
+ listener .onResponse (ResetFeatureStateStatus .failure (name , e ));
830
+ }
831
+ });
832
+ }
833
+
801
834
/**
802
835
* Clean up the state of a feature
803
836
* @param indexDescriptors List of descriptors of a feature's system indices
@@ -808,39 +841,66 @@ public MigrationCompletionHandler getPostMigrationFunction() {
808
841
* @param listener A listener to return success or failure of cleanup
809
842
*/
810
843
public static void cleanUpFeature (
811
- Collection <? extends IndexPatternMatcher > indexDescriptors ,
844
+ Collection <SystemIndexDescriptor > indexDescriptors ,
812
845
Collection <? extends IndexPatternMatcher > associatedIndexDescriptors ,
813
846
String name ,
814
847
ClusterService clusterService ,
815
848
Client client ,
816
- ActionListener <ResetFeatureStateStatus > listener
849
+ final ActionListener <ResetFeatureStateStatus > listener
817
850
) {
818
851
Metadata metadata = clusterService .state ().getMetadata ();
819
852
820
- List <String > allIndices = Stream . concat ( indexDescriptors . stream (), associatedIndexDescriptors .stream () )
853
+ List <String > associatedIndices = associatedIndexDescriptors .stream ()
821
854
.map (descriptor -> descriptor .getMatchingIndices (metadata ))
822
855
.flatMap (List ::stream )
823
856
.toList ();
824
857
825
- if (allIndices .isEmpty ()) {
826
- // if no actual indices match the pattern, we can stop here
858
+ final int taskCount = ((associatedIndices .size () > 0 ) ? 1 : 0 ) + (int ) indexDescriptors .stream ()
859
+ .filter (id -> id .getMatchingIndices (metadata ).isEmpty () == false )
860
+ .count ();
861
+
862
+ // check if there's nothing to do and take an early out
863
+ if (taskCount == 0 ) {
827
864
listener .onResponse (ResetFeatureStateStatus .success (name ));
828
865
return ;
829
866
}
830
867
831
- DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest ();
832
- deleteIndexRequest .indices (allIndices .toArray (Strings .EMPTY_ARRAY ));
833
- client .execute (DeleteIndexAction .INSTANCE , deleteIndexRequest , new ActionListener <>() {
834
- @ Override
835
- public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
836
- listener .onResponse (ResetFeatureStateStatus .success (name ));
837
- }
868
+ GroupedActionListener <ResetFeatureStateStatus > groupedListener = new GroupedActionListener <>(
869
+ ActionListener .wrap (listenerResults -> {
870
+ List <ResetFeatureStateStatus > errors = listenerResults .stream ()
871
+ .filter (status -> status .getStatus () == ResetFeatureStateResponse .ResetFeatureStateStatus .Status .FAILURE )
872
+ .collect (Collectors .toList ());
838
873
839
- @ Override
840
- public void onFailure (Exception e ) {
841
- listener .onResponse (ResetFeatureStateStatus .failure (name , e ));
874
+ if (errors .isEmpty ()) {
875
+ listener .onResponse (ResetFeatureStateStatus .success (name ));
876
+ } else {
877
+ StringBuilder exceptions = new StringBuilder ("[" );
878
+ exceptions .append (errors .stream ().map (e -> e .getException ().getMessage ()).collect (Collectors .joining (", " )));
879
+ exceptions .append (']' );
880
+ errors .forEach (e -> logger .warn (() -> "error while resetting feature [" + name + "]" , e .getException ()));
881
+ listener .onResponse (ResetFeatureStateStatus .failure (name , new Exception (exceptions .toString ())));
882
+ }
883
+ }, listener ::onFailure ),
884
+ taskCount
885
+ );
886
+
887
+ // Send cleanup for the associated indices, they don't need special origin since they are not protected
888
+ if (associatedIndices .isEmpty () == false ) {
889
+ cleanUpFeatureForIndices (name , client , associatedIndices .toArray (Strings .EMPTY_ARRAY ), groupedListener );
890
+ }
891
+
892
+ // One descriptor at a time, create an originating client and clean up the feature
893
+ for (var indexDescriptor : indexDescriptors ) {
894
+ List <String > matchingIndices = indexDescriptor .getMatchingIndices (metadata );
895
+
896
+ if (matchingIndices .isEmpty () == false ) {
897
+ final Client clientWithOrigin = (indexDescriptor .getOrigin () == null )
898
+ ? client
899
+ : new OriginSettingClient (client , indexDescriptor .getOrigin ());
900
+
901
+ cleanUpFeatureForIndices (name , clientWithOrigin , matchingIndices .toArray (Strings .EMPTY_ARRAY ), groupedListener );
842
902
}
843
- });
903
+ }
844
904
}
845
905
846
906
// No-op pre-migration function to be used as the default in case none are provided.
0 commit comments