11
11
12
12
import org .apache .logging .log4j .LogManager ;
13
13
import org .apache .logging .log4j .Logger ;
14
- import org .elasticsearch .action .admin .indices .delete .DeleteIndexClusterStateUpdateRequest ;
14
+ import org .elasticsearch .action .ActionListener ;
15
+ import org .elasticsearch .action .support .master .AcknowledgedResponse ;
15
16
import org .elasticsearch .cluster .ClusterState ;
16
17
import org .elasticsearch .cluster .ClusterStateAckListener ;
17
18
import org .elasticsearch .cluster .ClusterStateTaskExecutor ;
19
+ import org .elasticsearch .cluster .ClusterStateTaskListener ;
18
20
import org .elasticsearch .cluster .RestoreInProgress ;
19
21
import org .elasticsearch .cluster .SimpleBatchedAckListenerTaskExecutor ;
20
22
import org .elasticsearch .cluster .block .ClusterBlocks ;
23
+ import org .elasticsearch .cluster .node .DiscoveryNode ;
21
24
import org .elasticsearch .cluster .routing .RoutingTable ;
22
25
import org .elasticsearch .cluster .routing .allocation .AllocationService ;
23
26
import org .elasticsearch .cluster .service .ClusterService ;
24
27
import org .elasticsearch .cluster .service .MasterServiceTaskQueue ;
25
28
import org .elasticsearch .common .Priority ;
26
29
import org .elasticsearch .common .collect .ImmutableOpenMap ;
27
30
import org .elasticsearch .common .settings .Settings ;
28
- import org .elasticsearch .common . util . set . Sets ;
31
+ import org .elasticsearch .core . TimeValue ;
29
32
import org .elasticsearch .core .Tuple ;
30
33
import org .elasticsearch .index .Index ;
31
34
import org .elasticsearch .injection .guice .Inject ;
32
35
import org .elasticsearch .snapshots .RestoreService ;
33
36
import org .elasticsearch .snapshots .SnapshotInProgressException ;
34
37
import org .elasticsearch .snapshots .SnapshotsService ;
35
38
36
- import java .util .Arrays ;
37
39
import java .util .HashMap ;
38
40
import java .util .HashSet ;
39
41
import java .util .Map ;
42
+ import java .util .Objects ;
40
43
import java .util .Set ;
41
44
42
45
import static org .elasticsearch .cluster .routing .allocation .allocator .AllocationActionListener .rerouteCompletionIsNotRequired ;
@@ -48,22 +51,19 @@ public class MetadataDeleteIndexService {
48
51
49
52
private static final Logger logger = LogManager .getLogger (MetadataDeleteIndexService .class );
50
53
51
- private final Settings settings ;
52
-
53
54
// package private for tests
54
- final ClusterStateTaskExecutor <DeleteIndexClusterStateUpdateRequest > executor ;
55
- private final MasterServiceTaskQueue <DeleteIndexClusterStateUpdateRequest > taskQueue ;
55
+ final ClusterStateTaskExecutor <DeleteIndicesClusterStateUpdateTask > executor ;
56
+ private final MasterServiceTaskQueue <DeleteIndicesClusterStateUpdateTask > taskQueue ;
56
57
57
58
@ Inject
58
59
public MetadataDeleteIndexService (Settings settings , ClusterService clusterService , AllocationService allocationService ) {
59
- this .settings = settings ;
60
60
executor = new SimpleBatchedAckListenerTaskExecutor <>() {
61
61
@ Override
62
62
public Tuple <ClusterState , ClusterStateAckListener > executeTask (
63
- DeleteIndexClusterStateUpdateRequest task ,
63
+ DeleteIndicesClusterStateUpdateTask task ,
64
64
ClusterState clusterState
65
65
) {
66
- return Tuple .tuple (MetadataDeleteIndexService .deleteIndices (clusterState , Sets . newHashSet ( task .indices ()) , settings ), task );
66
+ return Tuple .tuple (MetadataDeleteIndexService .deleteIndices (clusterState , task .indices , settings ), task );
67
67
}
68
68
69
69
@ Override
@@ -81,11 +81,64 @@ public ClusterState afterBatchExecution(ClusterState clusterState, boolean clust
81
81
taskQueue = clusterService .createTaskQueue ("delete-index" , Priority .URGENT , executor );
82
82
}
83
83
84
- public void deleteIndices (final DeleteIndexClusterStateUpdateRequest request ) {
85
- if (request .indices () == null || request .indices ().length == 0 ) {
86
- throw new IllegalArgumentException ("Index name is required" );
84
+ public void deleteIndices (
85
+ TimeValue masterNodeTimeout ,
86
+ TimeValue ackTimeout ,
87
+ Set <Index > indices ,
88
+ ActionListener <AcknowledgedResponse > listener
89
+ ) {
90
+ if (indices == null || indices .isEmpty ()) {
91
+ throw new IllegalArgumentException ("Indices are required" );
92
+ }
93
+ taskQueue .submitTask (
94
+ "delete-index " + indices ,
95
+ new DeleteIndicesClusterStateUpdateTask (indices , ackTimeout , listener ),
96
+ masterNodeTimeout
97
+ );
98
+ }
99
+
100
+ // package private for tests
101
+ static class DeleteIndicesClusterStateUpdateTask implements ClusterStateTaskListener , ClusterStateAckListener {
102
+
103
+ private final Set <Index > indices ;
104
+ private final TimeValue ackTimeout ;
105
+ private final ActionListener <AcknowledgedResponse > listener ;
106
+
107
+ DeleteIndicesClusterStateUpdateTask (Set <Index > indices , TimeValue ackTimeout , ActionListener <AcknowledgedResponse > listener ) {
108
+ this .indices = Objects .requireNonNull (indices );
109
+ this .ackTimeout = Objects .requireNonNull (ackTimeout );
110
+ this .listener = Objects .requireNonNull (listener );
111
+ }
112
+
113
+ @ Override
114
+ public boolean mustAck (DiscoveryNode discoveryNode ) {
115
+ return true ;
116
+ }
117
+
118
+ @ Override
119
+ public void onAllNodesAcked () {
120
+ listener .onResponse (AcknowledgedResponse .TRUE );
121
+ }
122
+
123
+ @ Override
124
+ public void onAckFailure (Exception e ) {
125
+ listener .onResponse (AcknowledgedResponse .FALSE );
126
+ }
127
+
128
+ @ Override
129
+ public void onAckTimeout () {
130
+ listener .onResponse (AcknowledgedResponse .FALSE );
131
+ }
132
+
133
+ @ Override
134
+ public TimeValue ackTimeout () {
135
+ return ackTimeout ;
136
+ }
137
+
138
+ @ Override
139
+ public void onFailure (Exception e ) {
140
+ listener .onFailure (e );
87
141
}
88
- taskQueue .submitTask ("delete-index " + Arrays .toString (request .indices ()), request , request .masterNodeTimeout ());
89
142
}
90
143
91
144
/**
0 commit comments