Skip to content

Commit c883386

Browse files
committed
breaking into methods
1 parent ec78c0d commit c883386

File tree

1 file changed

+130
-107
lines changed

1 file changed

+130
-107
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PostDataStreamTransportAction.java

Lines changed: 130 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -155,111 +155,7 @@ private void updateSingleDataStream(
155155
@Override
156156
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
157157
if (acknowledgedResponse.isAcknowledged()) {
158-
final List<Index> concreteIndices = clusterService.state()
159-
.projectState(projectResolver.getProjectId())
160-
.metadata()
161-
.dataStreams()
162-
.get(dataStreamName)
163-
.getIndices();
164-
final Settings requestSettings = templateOverrides.template().settings();
165-
final List<PostDataStreamAction.DataStreamResponse.IndexSettingResult> indexSettingResults = new ArrayList<>();
166-
CountDownActionListener settingCountDownListener = new CountDownActionListener(
167-
requestSettings.size() + 1,
168-
new ActionListener<>() {
169-
// Called once all settings are completed for all indices
170-
@Override
171-
public void onResponse(Void unused) {
172-
ComposableIndexTemplate effectiveIndexTemplate = clusterService.state()
173-
.projectState(projectResolver.getProjectId())
174-
.metadata()
175-
.dataStreams()
176-
.get(dataStreamName)
177-
.getEffectiveIndexTemplate(
178-
clusterService.state().projectState(projectResolver.getProjectId()).metadata()
179-
);
180-
listener.onResponse(
181-
new PostDataStreamAction.DataStreamResponse(
182-
dataStreamName,
183-
true,
184-
null,
185-
effectiveIndexTemplate,
186-
indexSettingResults
187-
)
188-
);
189-
}
190-
191-
@Override
192-
public void onFailure(Exception e) {
193-
listener.onFailure(e);
194-
}
195-
}
196-
);
197-
settingCountDownListener.onResponse(null); // handles the case when there were zero settings
198-
for (String setting : requestSettings.keySet()) {
199-
if (APPLY_TO_BACKING_INDICES.contains(setting)) {
200-
final List<PostDataStreamAction.DataStreamResponse.IndexSettingError> errors = new ArrayList<>();
201-
CountDownActionListener indexCountDownListener = new CountDownActionListener(
202-
concreteIndices.size() + 1,
203-
new ActionListener<>() {
204-
// Called when all indices for a single setting are complete
205-
@Override
206-
public void onResponse(Void unused) {
207-
indexSettingResults.add(
208-
new PostDataStreamAction.DataStreamResponse.IndexSettingResult(setting, true, errors)
209-
);
210-
settingCountDownListener.onResponse(null);
211-
}
212-
213-
@Override
214-
public void onFailure(Exception e) {
215-
settingCountDownListener.onFailure(e);
216-
}
217-
}
218-
);
219-
indexCountDownListener.onResponse(null); // handles the case where there were zero indices
220-
for (Index index : concreteIndices) {
221-
updateSingleSettingForSingleIndex(
222-
setting,
223-
requestSettings.get(setting),
224-
index,
225-
masterNodeTimeout,
226-
ackTimeout,
227-
new ActionListener<>() {
228-
// Called when a single setting for a single index is complete
229-
@Override
230-
public void onResponse(AcknowledgedResponse response) {
231-
if (response.isAcknowledged() == false) {
232-
errors.add(
233-
new PostDataStreamAction.DataStreamResponse.IndexSettingError(
234-
index.getName(),
235-
"Updating setting not acknowledged for unknown reason"
236-
)
237-
);
238-
}
239-
indexCountDownListener.onResponse(null);
240-
}
241-
242-
@Override
243-
public void onFailure(Exception e) {
244-
errors.add(
245-
new PostDataStreamAction.DataStreamResponse.IndexSettingError(
246-
index.getName(),
247-
e.getMessage()
248-
)
249-
);
250-
indexCountDownListener.onResponse(null);
251-
}
252-
}
253-
);
254-
}
255-
} else {
256-
// This is not a setting that we will apply to backing indices
257-
indexSettingResults.add(
258-
new PostDataStreamAction.DataStreamResponse.IndexSettingResult(setting, false, List.of())
259-
);
260-
settingCountDownListener.onResponse(null);
261-
}
262-
}
158+
updateSettingsOnIndices(dataStreamName, templateOverrides, masterNodeTimeout, ackTimeout, listener);
263159
} else {
264160
listener.onResponse(
265161
new PostDataStreamAction.DataStreamResponse(
@@ -281,13 +177,121 @@ public void onFailure(Exception e) {
281177
);
282178
}
283179

180+
private void updateSettingsOnIndices(
181+
String dataStreamName,
182+
ComposableIndexTemplate templateOverrides,
183+
TimeValue masterNodeTimeout,
184+
TimeValue ackTimeout,
185+
ActionListener<PostDataStreamAction.DataStreamResponse> listener
186+
) {
187+
final List<Index> concreteIndices = clusterService.state()
188+
.projectState(projectResolver.getProjectId())
189+
.metadata()
190+
.dataStreams()
191+
.get(dataStreamName)
192+
.getIndices();
193+
final Settings requestSettings = templateOverrides.template().settings();
194+
final List<PostDataStreamAction.DataStreamResponse.IndexSettingResult> indexSettingResults = new ArrayList<>();
195+
CountDownActionListener settingCountDownListener = new CountDownActionListener(requestSettings.size() + 1, new ActionListener<>() {
196+
// Called once all settings are completed for all indices
197+
@Override
198+
public void onResponse(Void unused) {
199+
ComposableIndexTemplate effectiveIndexTemplate = clusterService.state()
200+
.projectState(projectResolver.getProjectId())
201+
.metadata()
202+
.dataStreams()
203+
.get(dataStreamName)
204+
.getEffectiveIndexTemplate(clusterService.state().projectState(projectResolver.getProjectId()).metadata());
205+
listener.onResponse(
206+
new PostDataStreamAction.DataStreamResponse(dataStreamName, true, null, effectiveIndexTemplate, indexSettingResults)
207+
);
208+
}
209+
210+
@Override
211+
public void onFailure(Exception e) {
212+
listener.onFailure(e);
213+
}
214+
});
215+
settingCountDownListener.onResponse(null); // handles the case when there were zero settings
216+
for (String setting : requestSettings.keySet()) {
217+
updateSingleSettingOnIndices(
218+
setting,
219+
requestSettings.get(setting),
220+
concreteIndices,
221+
masterNodeTimeout,
222+
ackTimeout,
223+
new ActionListener<PostDataStreamAction.DataStreamResponse.IndexSettingResult>() {
224+
@Override
225+
public void onResponse(PostDataStreamAction.DataStreamResponse.IndexSettingResult indexSettingResult) {
226+
indexSettingResults.add(indexSettingResult);
227+
settingCountDownListener.onResponse(null);
228+
}
229+
230+
@Override
231+
public void onFailure(Exception e) {
232+
settingCountDownListener.onFailure(e);
233+
}
234+
}
235+
);
236+
}
237+
}
238+
239+
private void updateSingleSettingOnIndices(
240+
String setting,
241+
Object settingValue,
242+
List<Index> concreteIndices,
243+
TimeValue masterNodeTimeout,
244+
TimeValue ackTimeout,
245+
ActionListener<PostDataStreamAction.DataStreamResponse.IndexSettingResult> listener
246+
) {
247+
if (APPLY_TO_BACKING_INDICES.contains(setting)) {
248+
final List<PostDataStreamAction.DataStreamResponse.IndexSettingError> errors = new ArrayList<>();
249+
CountDownActionListener indexCountDownListener = new CountDownActionListener(
250+
concreteIndices.size() + 1,
251+
new ActionListener<>() {
252+
// Called when all indices for a single setting are complete
253+
@Override
254+
public void onResponse(Void unused) {
255+
listener.onResponse(new PostDataStreamAction.DataStreamResponse.IndexSettingResult(setting, true, errors));
256+
}
257+
258+
@Override
259+
public void onFailure(Exception e) {
260+
listener.onFailure(e);
261+
}
262+
}
263+
);
264+
indexCountDownListener.onResponse(null); // handles the case where there were zero indices
265+
for (Index index : concreteIndices) {
266+
updateSingleSettingForSingleIndex(setting, settingValue, index, masterNodeTimeout, ackTimeout, new ActionListener<>() {
267+
// Called when a single setting for a single index is complete
268+
@Override
269+
public void onResponse(PostDataStreamAction.DataStreamResponse.IndexSettingError response) {
270+
if (response != null) {
271+
errors.add(response);
272+
}
273+
indexCountDownListener.onResponse(null);
274+
}
275+
276+
@Override
277+
public void onFailure(Exception e) {
278+
indexCountDownListener.onFailure(e);
279+
}
280+
});
281+
}
282+
} else {
283+
// This is not a setting that we will apply to backing indices
284+
listener.onResponse(new PostDataStreamAction.DataStreamResponse.IndexSettingResult(setting, false, List.of()));
285+
}
286+
}
287+
284288
private void updateSingleSettingForSingleIndex(
285289
String settingName,
286290
Object settingValue,
287291
Index index,
288292
TimeValue masterNodeTimeout,
289293
TimeValue ackTimeout,
290-
ActionListener<AcknowledgedResponse> listener
294+
ActionListener<PostDataStreamAction.DataStreamResponse.IndexSettingError> listener
291295
) {
292296
updateSettingsService.updateSettings(
293297
new UpdateSettingsClusterStateUpdateRequest(
@@ -299,7 +303,26 @@ private void updateSingleSettingForSingleIndex(
299303
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
300304
index
301305
),
302-
listener
306+
new ActionListener<>() {
307+
@Override
308+
public void onResponse(AcknowledgedResponse response) {
309+
PostDataStreamAction.DataStreamResponse.IndexSettingError error;
310+
if (response.isAcknowledged() == false) {
311+
error = new PostDataStreamAction.DataStreamResponse.IndexSettingError(
312+
index.getName(),
313+
"Updating setting not acknowledged for unknown reason"
314+
);
315+
} else {
316+
error = null;
317+
}
318+
listener.onResponse(error);
319+
}
320+
321+
@Override
322+
public void onFailure(Exception e) {
323+
listener.onResponse(new PostDataStreamAction.DataStreamResponse.IndexSettingError(index.getName(), e.getMessage()));
324+
}
325+
}
303326
);
304327
}
305328

0 commit comments

Comments
 (0)