1616import org .elasticsearch .action .support .ActionFilters ;
1717import org .elasticsearch .action .support .CountDownActionListener ;
1818import org .elasticsearch .action .support .IndicesOptions ;
19- import org .elasticsearch .action .support .master .AcknowledgedResponse ;
2019import org .elasticsearch .action .support .master .TransportMasterNodeAction ;
2120import org .elasticsearch .cluster .ClusterState ;
2221import org .elasticsearch .cluster .block .ClusterBlockException ;
@@ -102,53 +101,46 @@ protected void masterOperation(
102101 request .indices ()
103102 );
104103 List <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > dataStreamSettingsResponse = new ArrayList <>();
105- CountDownActionListener countDownListener = new CountDownActionListener (dataStreamNames .size () + 1 , new ActionListener <>() {
106- @ Override
107- public void onResponse (Void unused ) {
108- listener .onResponse (new UpdateDataStreamSettingsAction .Response (dataStreamSettingsResponse ));
109- }
110-
111- @ Override
112- public void onFailure (Exception e ) {
113- listener .onFailure (e );
114- }
115- });
104+ CountDownActionListener countDownListener = new CountDownActionListener (
105+ dataStreamNames .size () + 1 ,
106+ listener .delegateFailure (
107+ (responseActionListener , unused ) -> responseActionListener .onResponse (
108+ new UpdateDataStreamSettingsAction .Response (dataStreamSettingsResponse )
109+ )
110+ )
111+ );
116112 countDownListener .onResponse (null );
117113 for (String dataStreamName : dataStreamNames ) {
118114 updateSingleDataStream (
119115 dataStreamName ,
120116 request .getSettings (),
117+ request .isDryRun (),
121118 request .masterNodeTimeout (),
122119 request .ackTimeout (),
123- new ActionListener <>() {
124- @ Override
125- public void onResponse (UpdateDataStreamSettingsAction .DataStreamSettingsResponse dataStreamResponse ) {
126- dataStreamSettingsResponse .add (dataStreamResponse );
127- countDownListener .onResponse (null );
128- }
129-
130- @ Override
131- public void onFailure (Exception e ) {
132- dataStreamSettingsResponse .add (
133- new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
134- dataStreamName ,
135- false ,
136- e .getMessage (),
137- EMPTY ,
138- EMPTY ,
139- UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
140- )
141- );
142- countDownListener .onResponse (null );
143- }
144- }
120+ ActionListener .wrap (dataStreamResponse -> {
121+ dataStreamSettingsResponse .add (dataStreamResponse );
122+ countDownListener .onResponse (null );
123+ }, e -> {
124+ dataStreamSettingsResponse .add (
125+ new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
126+ dataStreamName ,
127+ false ,
128+ e .getMessage (),
129+ EMPTY ,
130+ EMPTY ,
131+ UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
132+ )
133+ );
134+ countDownListener .onResponse (null );
135+ })
145136 );
146137 }
147138 }
148139
149140 private void updateSingleDataStream (
150141 String dataStreamName ,
151142 Settings settingsOverrides ,
143+ boolean dryRun ,
152144 TimeValue masterNodeTimeout ,
153145 TimeValue ackTimeout ,
154146 ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > listener
@@ -187,13 +179,17 @@ private void updateSingleDataStream(
187179 );
188180 return ;
189181 }
190- metadataDataStreamsService .updateSettings (masterNodeTimeout , ackTimeout , dataStreamName , settingsOverrides , new ActionListener <>() {
191- @ Override
192- public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
193- if (acknowledgedResponse .isAcknowledged ()) {
194- updateSettingsOnIndices (dataStreamName , settingsOverrides , masterNodeTimeout , ackTimeout , listener );
182+ metadataDataStreamsService .updateSettings (
183+ masterNodeTimeout ,
184+ ackTimeout ,
185+ dataStreamName ,
186+ settingsOverrides ,
187+ dryRun ,
188+ listener .delegateFailure ((dataStreamSettingsResponseActionListener , dataStream ) -> {
189+ if (dataStream != null ) {
190+ updateSettingsOnIndices (dataStream , settingsOverrides , dryRun , masterNodeTimeout , ackTimeout , listener );
195191 } else {
196- listener .onResponse (
192+ dataStreamSettingsResponseActionListener .onResponse (
197193 new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
198194 dataStreamName ,
199195 false ,
@@ -204,18 +200,14 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
204200 )
205201 );
206202 }
207- }
208-
209- @ Override
210- public void onFailure (Exception e ) {
211- listener .onFailure (e );
212- }
213- });
203+ })
204+ );
214205 }
215206
216207 private void updateSettingsOnIndices (
217- String dataStreamName ,
208+ DataStream dataStream ,
218209 Settings requestSettings ,
210+ boolean dryRun ,
219211 TimeValue masterNodeTimeout ,
220212 TimeValue ackTimeout ,
221213 ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > listener
@@ -231,17 +223,15 @@ private void updateSettingsOnIndices(
231223 appliedToDataStreamOnly .add (settingName );
232224 }
233225 }
234- final List <Index > concreteIndices = clusterService . state (). metadata (). dataStreams (). get ( dataStreamName ) .getIndices ();
226+ final List <Index > concreteIndices = dataStream .getIndices ();
235227 final List <UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError > indexSettingErrors = new ArrayList <>();
236228
237- CountDownActionListener indexCountDownListener = new CountDownActionListener (concreteIndices .size () + 1 , new ActionListener <>() {
238- // Called when all indices for all settings are complete
239- @ Override
240- public void onResponse (Void unused ) {
241- DataStream dataStream = clusterService .state ().metadata ().dataStreams ().get (dataStreamName );
242- listener .onResponse (
229+ CountDownActionListener indexCountDownListener = new CountDownActionListener (
230+ concreteIndices .size () + 1 ,
231+ listener .delegateFailure (
232+ (dataStreamSettingsResponseActionListener , unused ) -> dataStreamSettingsResponseActionListener .onResponse (
243233 new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
244- dataStreamName ,
234+ dataStream . getName () ,
245235 true ,
246236 null ,
247237 settingsFilter .filter (dataStream .getSettings ()),
@@ -252,37 +242,33 @@ public void onResponse(Void unused) {
252242 indexSettingErrors
253243 )
254244 )
255- );
256- }
245+ )
246+ )
247+ );
257248
258- @ Override
259- public void onFailure (Exception e ) {
260- listener .onFailure (e );
261- }
262- });
263249 indexCountDownListener .onResponse (null ); // handles the case where there were zero indices
264250 Settings applyToIndexSettings = builder ().loadFromMap (settingsToApply ).build ();
265251 for (Index index : concreteIndices ) {
266- updateSettingsOnSingleIndex (index , applyToIndexSettings , masterNodeTimeout , ackTimeout , new ActionListener <>() {
267- @ Override
268- public void onResponse (UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError indexSettingError ) {
252+ updateSettingsOnSingleIndex (
253+ index ,
254+ applyToIndexSettings ,
255+ dryRun ,
256+ masterNodeTimeout ,
257+ ackTimeout ,
258+ indexCountDownListener .delegateFailure ((listener1 , indexSettingError ) -> {
269259 if (indexSettingError != null ) {
270260 indexSettingErrors .add (indexSettingError );
271261 }
272- indexCountDownListener .onResponse (null );
273- }
274-
275- @ Override
276- public void onFailure (Exception e ) {
277- indexCountDownListener .onFailure (e );
278- }
279- });
262+ listener1 .onResponse (null );
263+ })
264+ );
280265 }
281266 }
282267
283268 private void updateSettingsOnSingleIndex (
284269 Index index ,
285270 Settings requestSettings ,
271+ boolean dryRun ,
286272 TimeValue masterNodeTimeout ,
287273 TimeValue ackTimeout ,
288274 ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError > listener
@@ -302,18 +288,23 @@ private void updateSettingsOnSingleIndex(
302288 );
303289 return ;
304290 }
305- updateSettingsService .updateSettings (
306- new UpdateSettingsClusterStateUpdateRequest (
307- masterNodeTimeout ,
308- ackTimeout ,
309- requestSettings ,
310- UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
311- UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REOPEN_INDICES ,
312- index
313- ),
314- new ActionListener <>() {
315- @ Override
316- public void onResponse (AcknowledgedResponse response ) {
291+ if (dryRun ) {
292+ /*
293+ * This is as far as we go with dry run mode. We get the benefit of having checked that all the indices that will be touced
294+ * are not blocked, but there is no value in going beyond this. So just respond to the listener and move on.
295+ */
296+ listener .onResponse (null );
297+ } else {
298+ updateSettingsService .updateSettings (
299+ new UpdateSettingsClusterStateUpdateRequest (
300+ masterNodeTimeout ,
301+ ackTimeout ,
302+ requestSettings ,
303+ UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
304+ UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REOPEN_INDICES ,
305+ index
306+ ),
307+ ActionListener .wrap (response -> {
317308 UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError error ;
318309 if (response .isAcknowledged () == false ) {
319310 error = new UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError (
@@ -324,16 +315,13 @@ public void onResponse(AcknowledgedResponse response) {
324315 error = null ;
325316 }
326317 listener .onResponse (error );
327- }
328-
329- @ Override
330- public void onFailure (Exception e ) {
331- listener .onResponse (
318+ },
319+ e -> listener .onResponse (
332320 new UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError (index .getName (), e .getMessage ())
333- );
334- }
335- }
336- );
321+ )
322+ )
323+ );
324+ }
337325 }
338326
339327 }
0 commit comments