2222import org .elasticsearch .cluster .block .ClusterBlockException ;
2323import org .elasticsearch .cluster .block .ClusterBlockLevel ;
2424import org .elasticsearch .cluster .metadata .ComposableIndexTemplate ;
25- import org .elasticsearch .cluster .metadata .DataStream ;
2625import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
2726import org .elasticsearch .cluster .metadata .MetadataDataStreamsService ;
2827import org .elasticsearch .cluster .metadata .MetadataUpdateSettingsService ;
3938import org .elasticsearch .transport .TransportService ;
4039
4140import java .util .ArrayList ;
42- import java .util .Arrays ;
4341import java .util .List ;
42+ import java .util .Map ;
4443import java .util .Set ;
4544
4645public class PostDataStreamTransportAction extends TransportMasterNodeAction <PostDataStreamAction .Request , PostDataStreamAction .Response > {
@@ -156,119 +155,122 @@ private void updateSingleDataStream(
156155 @ Override
157156 public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
158157 if (acknowledgedResponse .isAcknowledged ()) {
159- final Index [] concreteIndices = clusterService .state ()
158+ final List < Index > concreteIndices = clusterService .state ()
160159 .projectState (projectResolver .getProjectId ())
161160 .metadata ()
162161 .dataStreams ()
163162 .get (dataStreamName )
164- .getIndices ()
165- .toArray (Index .EMPTY_ARRAY );
163+ .getIndices ();
166164 final Settings requestSettings = templateOverrides .template ().settings ();
167- final Settings .Builder settingsBuilder = Settings .builder ();
168- for (String setting : requestSettings .keySet ()) {
169- if (APPLY_TO_BACKING_INDICES .contains (setting )) {
170- settingsBuilder .put (setting , requestSettings .get (setting ));
171- }
172- }
173- Settings settingsToUpdate = settingsBuilder .build ();
174- // TODO do we want to do this once per setting per index?
175- updateSettingsService .updateSettings (
176- new UpdateSettingsClusterStateUpdateRequest (
177- projectResolver .getProjectId (),
178- masterNodeTimeout ,
179- ackTimeout ,
180- settingsToUpdate ,
181- UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
182- UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REOPEN_INDICES ,
183- concreteIndices
184- ),
165+ final List <PostDataStreamAction .DataStreamResponse .IndexSettingResult > indexSettingResults = new ArrayList <>();
166+ CountDownActionListener settingCountDownListener = new CountDownActionListener (
167+ requestSettings .size () + 1 ,
185168 new ActionListener <>() {
186169 @ Override
187- public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
188- DataStream dataStream = clusterService .state ()
170+ public void onResponse (Void unused ) {
171+ ComposableIndexTemplate effectiveIndexTemplate = clusterService .state ()
189172 .projectState (projectResolver .getProjectId ())
190173 .metadata ()
191174 .dataStreams ()
192- .get (dataStreamName );
193- List <PostDataStreamAction .DataStreamResponse .IndexSettingResult > indexSettingResults =
194- new ArrayList <>();
195- for (String setting : requestSettings .keySet ()) {
196- PostDataStreamAction .DataStreamResponse .IndexSettingResult indexSettingResult =
197- new PostDataStreamAction .DataStreamResponse .IndexSettingResult (
198- setting ,
199- APPLY_TO_BACKING_INDICES .contains (setting ),
200- List .of ()
201- );
202- indexSettingResults .add (indexSettingResult );
203- }
204- PostDataStreamAction .DataStreamResponse dataStreamResponse =
175+ .get (dataStreamName )
176+ .getEffectiveIndexTemplate (
177+ clusterService .state ().projectState (projectResolver .getProjectId ()).metadata ()
178+ );
179+ listener .onResponse (
205180 new PostDataStreamAction .DataStreamResponse (
206181 dataStreamName ,
207182 true ,
208183 null ,
209- dataStream .getEffectiveIndexTemplate (
210- clusterService .state ().projectState (projectResolver .getProjectId ()).metadata ()
211- ),
184+ effectiveIndexTemplate ,
212185 indexSettingResults
213- );
214- listener . onResponse ( dataStreamResponse );
186+ )
187+ );
215188 }
216189
217190 @ Override
218191 public void onFailure (Exception e ) {
219- logger .debug (
220- () -> "failed to update settings on indices [" + Arrays .toString (concreteIndices ) + "]" ,
221- e
222- );
223- if (e instanceof IllegalArgumentException ) {
224- DataStream dataStream = clusterService .state ()
225- .projectState (projectResolver .getProjectId ())
226- .metadata ()
227- .dataStreams ()
228- .get (dataStreamName );
229- List <PostDataStreamAction .DataStreamResponse .IndexSettingResult > indexSettingResults =
230- new ArrayList <>();
231- for (String setting : requestSettings .keySet ()) {
232- boolean attemptedToSet = APPLY_TO_BACKING_INDICES .contains (setting );
233- List <PostDataStreamAction .DataStreamResponse .IndexSettingError > indexSettingErrors ;
234- if (attemptedToSet ) {
235- indexSettingErrors = new ArrayList <>();
236- for (Index index : concreteIndices ) {
237- indexSettingErrors .add (
192+ listener .onFailure (e );
193+ }
194+ }
195+ );
196+ settingCountDownListener .onResponse (null ); // handles the case when there were zero settings
197+ ActionListener <PostDataStreamAction .DataStreamResponse .IndexSettingResult > indexSettingResultListener =
198+ new ActionListener <>() {
199+ // Called each time we have results for all indices for a single setting
200+ @ Override
201+ public void onResponse (PostDataStreamAction .DataStreamResponse .IndexSettingResult indexSettingResult ) {
202+ indexSettingResults .add (indexSettingResult );
203+ settingCountDownListener .onResponse (null );
204+ }
205+
206+ @ Override
207+ public void onFailure (Exception e ) {
208+ settingCountDownListener .onFailure (e );
209+ }
210+ };
211+ for (String setting : requestSettings .keySet ()) {
212+ if (APPLY_TO_BACKING_INDICES .contains (setting )) {
213+ final List <PostDataStreamAction .DataStreamResponse .IndexSettingError > errors = new ArrayList <>();
214+ CountDownActionListener indexCountDownListener = new CountDownActionListener (
215+ concreteIndices .size () + 1 ,
216+ new ActionListener <>() {
217+ @ Override
218+ public void onResponse (Void unused ) {
219+ indexSettingResultListener .onResponse (
220+ new PostDataStreamAction .DataStreamResponse .IndexSettingResult (setting , true , errors )
221+ );
222+ }
223+
224+ @ Override
225+ public void onFailure (Exception e ) {
226+ indexSettingResultListener .onFailure (e );
227+ }
228+ }
229+ );
230+ indexCountDownListener .onResponse (null ); // handles the case where there were zero indices
231+ for (Index index : concreteIndices ) {
232+ updateSingleSettingForSingleIndex (
233+ setting ,
234+ requestSettings .get (setting ),
235+ index ,
236+ masterNodeTimeout ,
237+ ackTimeout ,
238+ new ActionListener <>() {
239+ @ Override
240+ public void onResponse (AcknowledgedResponse response ) {
241+ if (response .isAcknowledged () == false ) {
242+ errors .add (
238243 new PostDataStreamAction .DataStreamResponse .IndexSettingError (
239244 index .getName (),
240- e . getMessage ()
245+ "Updating setting not acknowledged for unknown reason"
241246 )
242247 );
243248 }
244- } else {
245- indexSettingErrors = List . of ();
249+ indexCountDownListener . onResponse ( null );
250+
246251 }
247- PostDataStreamAction .DataStreamResponse .IndexSettingResult indexSettingResult =
248- new PostDataStreamAction .DataStreamResponse .IndexSettingResult (
249- setting ,
250- attemptedToSet ,
251- indexSettingErrors
252+
253+ @ Override
254+ public void onFailure (Exception e ) {
255+ errors .add (
256+ new PostDataStreamAction .DataStreamResponse .IndexSettingError (
257+ index .getName (),
258+ e .getMessage ()
259+ )
252260 );
253- indexSettingResults .add (indexSettingResult );
261+ indexCountDownListener .onResponse (null );
262+ }
254263 }
255- PostDataStreamAction .DataStreamResponse dataStreamResponse =
256- new PostDataStreamAction .DataStreamResponse (
257- dataStreamName ,
258- true ,
259- null ,
260- dataStream .getEffectiveIndexTemplate (
261- clusterService .state ().projectState (projectResolver .getProjectId ()).metadata ()
262- ),
263- indexSettingResults
264- );
265- listener .onResponse (dataStreamResponse );
266- } else {
267- listener .onFailure (e );
268- }
264+ );
269265 }
266+ } else {
267+ // This is not a setting that we will apply to backing indices
268+ indexSettingResults .add (
269+ new PostDataStreamAction .DataStreamResponse .IndexSettingResult (setting , false , List .of ())
270+ );
271+ settingCountDownListener .onResponse (null );
270272 }
271- );
273+ }
272274 } else {
273275 listener .onResponse (
274276 new PostDataStreamAction .DataStreamResponse (
@@ -290,6 +292,28 @@ public void onFailure(Exception e) {
290292 );
291293 }
292294
295+ private void updateSingleSettingForSingleIndex (
296+ String settingName ,
297+ Object settingValue ,
298+ Index index ,
299+ TimeValue masterNodeTimeout ,
300+ TimeValue ackTimeout ,
301+ ActionListener <AcknowledgedResponse > listener
302+ ) {
303+ updateSettingsService .updateSettings (
304+ new UpdateSettingsClusterStateUpdateRequest (
305+ projectResolver .getProjectId (),
306+ masterNodeTimeout ,
307+ ackTimeout ,
308+ Settings .builder ().loadFromMap (Map .of (settingName , settingValue )).build (),
309+ UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
310+ UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REOPEN_INDICES ,
311+ index
312+ ),
313+ listener
314+ );
315+ }
316+
293317 @ Override
294318 protected ClusterBlockException checkBlock (PostDataStreamAction .Request request , ClusterState state ) {
295319 return state .blocks ().globalBlockedException (ClusterBlockLevel .METADATA_WRITE );
0 commit comments