1616import org .elasticsearch .action .support .ActionFilters ;
1717import org .elasticsearch .action .support .CountDownActionListener ;
1818import org .elasticsearch .action .support .IndicesOptions ;
19- import org .elasticsearch .action .support .master .AcknowledgedResponse ;
2019import org .elasticsearch .action .support .master .TransportMasterNodeAction ;
2120import org .elasticsearch .cluster .ClusterState ;
2221import org .elasticsearch .cluster .block .ClusterBlockException ;
@@ -107,53 +106,46 @@ protected void masterOperation(
107106 request .indices ()
108107 );
109108 List <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > dataStreamSettingsResponse = new ArrayList <>();
110- CountDownActionListener countDownListener = new CountDownActionListener (dataStreamNames .size () + 1 , new ActionListener <>() {
111- @ Override
112- public void onResponse (Void unused ) {
113- listener .onResponse (new UpdateDataStreamSettingsAction .Response (dataStreamSettingsResponse ));
114- }
115-
116- @ Override
117- public void onFailure (Exception e ) {
118- listener .onFailure (e );
119- }
120- });
109+ CountDownActionListener countDownListener = new CountDownActionListener (
110+ dataStreamNames .size () + 1 ,
111+ listener .delegateFailure (
112+ (responseActionListener , unused ) -> responseActionListener .onResponse (
113+ new UpdateDataStreamSettingsAction .Response (dataStreamSettingsResponse )
114+ )
115+ )
116+ );
121117 countDownListener .onResponse (null );
122118 for (String dataStreamName : dataStreamNames ) {
123119 updateSingleDataStream (
124120 dataStreamName ,
125121 request .getSettings (),
122+ request .isDryRun (),
126123 request .masterNodeTimeout (),
127124 request .ackTimeout (),
128- new ActionListener <>() {
129- @ Override
130- public void onResponse (UpdateDataStreamSettingsAction .DataStreamSettingsResponse dataStreamResponse ) {
131- dataStreamSettingsResponse .add (dataStreamResponse );
132- countDownListener .onResponse (null );
133- }
134-
135- @ Override
136- public void onFailure (Exception e ) {
137- dataStreamSettingsResponse .add (
138- new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
139- dataStreamName ,
140- false ,
141- e .getMessage (),
142- EMPTY ,
143- EMPTY ,
144- UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
145- )
146- );
147- countDownListener .onResponse (null );
148- }
149- }
125+ ActionListener .wrap (dataStreamResponse -> {
126+ dataStreamSettingsResponse .add (dataStreamResponse );
127+ countDownListener .onResponse (null );
128+ }, e -> {
129+ dataStreamSettingsResponse .add (
130+ new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
131+ dataStreamName ,
132+ false ,
133+ e .getMessage (),
134+ EMPTY ,
135+ EMPTY ,
136+ UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
137+ )
138+ );
139+ countDownListener .onResponse (null );
140+ })
150141 );
151142 }
152143 }
153144
154145 private void updateSingleDataStream (
155146 String dataStreamName ,
156147 Settings settingsOverrides ,
148+ boolean dryRun ,
157149 TimeValue masterNodeTimeout ,
158150 TimeValue ackTimeout ,
159151 ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > listener
@@ -198,36 +190,30 @@ private void updateSingleDataStream(
198190 ackTimeout ,
199191 dataStreamName ,
200192 settingsOverrides ,
201- new ActionListener <>() {
202- @ Override
203- public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
204- if (acknowledgedResponse .isAcknowledged ()) {
205- updateSettingsOnIndices (dataStreamName , settingsOverrides , masterNodeTimeout , ackTimeout , listener );
206- } else {
207- listener .onResponse (
208- new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
209- dataStreamName ,
210- false ,
211- "Updating settings not accepted for unknown reasons" ,
212- EMPTY ,
213- EMPTY ,
214- UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
215- )
216- );
217- }
218- }
219-
220- @ Override
221- public void onFailure (Exception e ) {
222- listener .onFailure (e );
193+ dryRun ,
194+ listener .delegateFailure ((dataStreamSettingsResponseActionListener , dataStream ) -> {
195+ if (dataStream != null ) {
196+ updateSettingsOnIndices (dataStream , settingsOverrides , dryRun , masterNodeTimeout , ackTimeout , listener );
197+ } else {
198+ dataStreamSettingsResponseActionListener .onResponse (
199+ new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
200+ dataStreamName ,
201+ false ,
202+ "Updating settings not accepted for unknown reasons" ,
203+ EMPTY ,
204+ EMPTY ,
205+ UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
206+ )
207+ );
223208 }
224- }
209+ })
225210 );
226211 }
227212
228213 private void updateSettingsOnIndices (
229- String dataStreamName ,
214+ DataStream dataStream ,
230215 Settings requestSettings ,
216+ boolean dryRun ,
231217 TimeValue masterNodeTimeout ,
232218 TimeValue ackTimeout ,
233219 ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > listener
@@ -243,26 +229,15 @@ private void updateSettingsOnIndices(
243229 appliedToDataStreamOnly .add (settingName );
244230 }
245231 }
246- final List <Index > concreteIndices = clusterService .state ()
247- .projectState (projectResolver .getProjectId ())
248- .metadata ()
249- .dataStreams ()
250- .get (dataStreamName )
251- .getIndices ();
232+ final List <Index > concreteIndices = dataStream .getIndices ();
252233 final List <UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError > indexSettingErrors = new ArrayList <>();
253234
254- CountDownActionListener indexCountDownListener = new CountDownActionListener (concreteIndices .size () + 1 , new ActionListener <>() {
255- // Called when all indices for all settings are complete
256- @ Override
257- public void onResponse (Void unused ) {
258- DataStream dataStream = clusterService .state ()
259- .projectState (projectResolver .getProjectId ())
260- .metadata ()
261- .dataStreams ()
262- .get (dataStreamName );
263- listener .onResponse (
235+ CountDownActionListener indexCountDownListener = new CountDownActionListener (
236+ concreteIndices .size () + 1 ,
237+ listener .delegateFailure (
238+ (dataStreamSettingsResponseActionListener , unused ) -> dataStreamSettingsResponseActionListener .onResponse (
264239 new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
265- dataStreamName ,
240+ dataStream . getName () ,
266241 true ,
267242 null ,
268243 settingsFilter .filter (dataStream .getSettings ()),
@@ -275,37 +250,33 @@ public void onResponse(Void unused) {
275250 indexSettingErrors
276251 )
277252 )
278- );
279- }
253+ )
254+ )
255+ );
280256
281- @ Override
282- public void onFailure (Exception e ) {
283- listener .onFailure (e );
284- }
285- });
286257 indexCountDownListener .onResponse (null ); // handles the case where there were zero indices
287258 Settings applyToIndexSettings = builder ().loadFromMap (settingsToApply ).build ();
288259 for (Index index : concreteIndices ) {
289- updateSettingsOnSingleIndex (index , applyToIndexSettings , masterNodeTimeout , ackTimeout , new ActionListener <>() {
290- @ Override
291- public void onResponse (UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError indexSettingError ) {
260+ updateSettingsOnSingleIndex (
261+ index ,
262+ applyToIndexSettings ,
263+ dryRun ,
264+ masterNodeTimeout ,
265+ ackTimeout ,
266+ indexCountDownListener .delegateFailure ((listener1 , indexSettingError ) -> {
292267 if (indexSettingError != null ) {
293268 indexSettingErrors .add (indexSettingError );
294269 }
295- indexCountDownListener .onResponse (null );
296- }
297-
298- @ Override
299- public void onFailure (Exception e ) {
300- indexCountDownListener .onFailure (e );
301- }
302- });
270+ listener1 .onResponse (null );
271+ })
272+ );
303273 }
304274 }
305275
306276 private void updateSettingsOnSingleIndex (
307277 Index index ,
308278 Settings requestSettings ,
279+ boolean dryRun ,
309280 TimeValue masterNodeTimeout ,
310281 TimeValue ackTimeout ,
311282 ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError > listener
@@ -326,19 +297,24 @@ private void updateSettingsOnSingleIndex(
326297 );
327298 return ;
328299 }
329- updateSettingsService .updateSettings (
330- new UpdateSettingsClusterStateUpdateRequest (
331- projectResolver .getProjectId (),
332- masterNodeTimeout ,
333- ackTimeout ,
334- requestSettings ,
335- UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
336- UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REOPEN_INDICES ,
337- index
338- ),
339- new ActionListener <>() {
340- @ Override
341- public void onResponse (AcknowledgedResponse response ) {
300+ if (dryRun ) {
301+ /*
302+ * This is as far as we go with dry run mode. We get the benefit of having checked that all the indices that will be touced
303+ * are not blocked, but there is no value in going beyond this. So just respond to the listener and move on.
304+ */
305+ listener .onResponse (null );
306+ } else {
307+ updateSettingsService .updateSettings (
308+ new UpdateSettingsClusterStateUpdateRequest (
309+ projectResolver .getProjectId (),
310+ masterNodeTimeout ,
311+ ackTimeout ,
312+ requestSettings ,
313+ UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
314+ UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REOPEN_INDICES ,
315+ index
316+ ),
317+ ActionListener .wrap (response -> {
342318 UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError error ;
343319 if (response .isAcknowledged () == false ) {
344320 error = new UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError (
@@ -349,16 +325,13 @@ public void onResponse(AcknowledgedResponse response) {
349325 error = null ;
350326 }
351327 listener .onResponse (error );
352- }
353-
354- @ Override
355- public void onFailure (Exception e ) {
356- listener .onResponse (
328+ },
329+ e -> listener .onResponse (
357330 new UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError (index .getName (), e .getMessage ())
358- );
359- }
360- }
361- );
331+ )
332+ )
333+ );
334+ }
362335 }
363336
364337 }
0 commit comments