Skip to content

Commit 39cf10e

Browse files
committed
using helper methods for action listeners
1 parent 59b8079 commit 39cf10e

File tree

1 file changed

+72
-107
lines changed

1 file changed

+72
-107
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamSettingsAction.java

Lines changed: 72 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.action.support.ActionFilters;
1717
import org.elasticsearch.action.support.CountDownActionListener;
1818
import org.elasticsearch.action.support.IndicesOptions;
19-
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2019
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2120
import org.elasticsearch.cluster.ClusterState;
2221
import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -107,17 +106,14 @@ protected void masterOperation(
107106
request.indices()
108107
);
109108
List<UpdateDataStreamSettingsAction.DataStreamSettingsResponse> dataStreamSettingsResponse = new ArrayList<>();
110-
CountDownActionListener countDownListener = new CountDownActionListener(dataStreamNames.size() + 1, new ActionListener<>() {
111-
@Override
112-
public void onResponse(Void unused) {
113-
listener.onResponse(new UpdateDataStreamSettingsAction.Response(dataStreamSettingsResponse));
114-
}
115-
116-
@Override
117-
public void onFailure(Exception e) {
118-
listener.onFailure(e);
119-
}
120-
});
109+
CountDownActionListener countDownListener = new CountDownActionListener(
110+
dataStreamNames.size() + 1,
111+
listener.delegateFailure(
112+
(responseActionListener, unused) -> responseActionListener.onResponse(
113+
new UpdateDataStreamSettingsAction.Response(dataStreamSettingsResponse)
114+
)
115+
)
116+
);
121117
countDownListener.onResponse(null);
122118
for (String dataStreamName : dataStreamNames) {
123119
updateSingleDataStream(
@@ -126,28 +122,22 @@ public void onFailure(Exception e) {
126122
request.isDryRun(),
127123
request.masterNodeTimeout(),
128124
request.ackTimeout(),
129-
new ActionListener<>() {
130-
@Override
131-
public void onResponse(UpdateDataStreamSettingsAction.DataStreamSettingsResponse dataStreamResponse) {
132-
dataStreamSettingsResponse.add(dataStreamResponse);
133-
countDownListener.onResponse(null);
134-
}
135-
136-
@Override
137-
public void onFailure(Exception e) {
138-
dataStreamSettingsResponse.add(
139-
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
140-
dataStreamName,
141-
false,
142-
e.getMessage(),
143-
EMPTY,
144-
EMPTY,
145-
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
146-
)
147-
);
148-
countDownListener.onResponse(null);
149-
}
150-
}
125+
ActionListener.wrap(dataStreamResponse -> {
126+
dataStreamSettingsResponse.add(dataStreamResponse);
127+
countDownListener.onResponse(null);
128+
}, e -> {
129+
dataStreamSettingsResponse.add(
130+
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
131+
dataStreamName,
132+
false,
133+
e.getMessage(),
134+
EMPTY,
135+
EMPTY,
136+
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
137+
)
138+
);
139+
countDownListener.onResponse(null);
140+
})
151141
);
152142
}
153143
}
@@ -201,30 +191,22 @@ private void updateSingleDataStream(
201191
dataStreamName,
202192
settingsOverrides,
203193
dryRun,
204-
new ActionListener<>() {
205-
@Override
206-
public void onResponse(DataStream dataStream) {
207-
if (dataStream != null) {
208-
updateSettingsOnIndices(dataStream, settingsOverrides, dryRun, masterNodeTimeout, ackTimeout, listener);
209-
} else {
210-
listener.onResponse(
211-
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
212-
dataStreamName,
213-
false,
214-
"Updating settings not accepted for unknown reasons",
215-
EMPTY,
216-
EMPTY,
217-
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
218-
)
219-
);
220-
}
221-
}
222-
223-
@Override
224-
public void onFailure(Exception e) {
225-
listener.onFailure(e);
194+
listener.delegateFailure((dataStreamSettingsResponseActionListener, dataStream) -> {
195+
if (dataStream != null) {
196+
updateSettingsOnIndices(dataStream, settingsOverrides, dryRun, masterNodeTimeout, ackTimeout, listener);
197+
} else {
198+
dataStreamSettingsResponseActionListener.onResponse(
199+
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
200+
dataStreamName,
201+
false,
202+
"Updating settings not accepted for unknown reasons",
203+
EMPTY,
204+
EMPTY,
205+
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
206+
)
207+
);
226208
}
227-
}
209+
})
228210
);
229211
}
230212

@@ -250,11 +232,10 @@ private void updateSettingsOnIndices(
250232
final List<Index> concreteIndices = dataStream.getIndices();
251233
final List<UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> indexSettingErrors = new ArrayList<>();
252234

253-
CountDownActionListener indexCountDownListener = new CountDownActionListener(concreteIndices.size() + 1, new ActionListener<>() {
254-
// Called when all indices for all settings are complete
255-
@Override
256-
public void onResponse(Void unused) {
257-
listener.onResponse(
235+
CountDownActionListener indexCountDownListener = new CountDownActionListener(
236+
concreteIndices.size() + 1,
237+
listener.delegateFailure(
238+
(dataStreamSettingsResponseActionListener, unused) -> dataStreamSettingsResponseActionListener.onResponse(
258239
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
259240
dataStream.getName(),
260241
true,
@@ -269,31 +250,26 @@ public void onResponse(Void unused) {
269250
indexSettingErrors
270251
)
271252
)
272-
);
273-
}
253+
)
254+
)
255+
);
274256

275-
@Override
276-
public void onFailure(Exception e) {
277-
listener.onFailure(e);
278-
}
279-
});
280257
indexCountDownListener.onResponse(null); // handles the case where there were zero indices
281258
Settings applyToIndexSettings = builder().loadFromMap(settingsToApply).build();
282259
for (Index index : concreteIndices) {
283-
updateSettingsOnSingleIndex(index, applyToIndexSettings, dryRun, masterNodeTimeout, ackTimeout, new ActionListener<>() {
284-
@Override
285-
public void onResponse(UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError indexSettingError) {
260+
updateSettingsOnSingleIndex(
261+
index,
262+
applyToIndexSettings,
263+
dryRun,
264+
masterNodeTimeout,
265+
ackTimeout,
266+
indexCountDownListener.delegateFailure((listener1, indexSettingError) -> {
286267
if (indexSettingError != null) {
287268
indexSettingErrors.add(indexSettingError);
288269
}
289-
indexCountDownListener.onResponse(null);
290-
}
291-
292-
@Override
293-
public void onFailure(Exception e) {
294-
indexCountDownListener.onFailure(e);
295-
}
296-
});
270+
listener1.onResponse(null);
271+
})
272+
);
297273
}
298274
}
299275

@@ -338,37 +314,26 @@ private void updateSettingsOnSingleIndex(
338314
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
339315
index
340316
),
341-
new UpdateSingleIndexSettingsListener(index.getName(), listener)
342-
);
343-
}
344-
}
345-
346-
}
347-
348-
private record UpdateSingleIndexSettingsListener(
349-
String indexName,
350-
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> delegateListener
351-
) implements ActionListener<AcknowledgedResponse> {
352-
@Override
353-
public void onResponse(AcknowledgedResponse response) {
354-
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError error;
355-
if (response.isAcknowledged() == false) {
356-
error = new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(
357-
indexName,
358-
"Updating settings not acknowledged for unknown reason"
317+
listener.delegateResponse(
318+
(listener1, e) -> listener1.onResponse(
319+
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(index.getName(), e.getMessage())
320+
)
321+
).delegateFailure((listener1, response) -> {
322+
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError error;
323+
if (response.isAcknowledged() == false) {
324+
error = new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(
325+
index.getName(),
326+
"Updating settings not acknowledged for unknown reason"
327+
);
328+
} else {
329+
error = null;
330+
}
331+
listener1.onResponse(error);
332+
})
359333
);
360-
} else {
361-
error = null;
362334
}
363-
delegateListener.onResponse(error);
364335
}
365336

366-
@Override
367-
public void onFailure(Exception e) {
368-
delegateListener.onResponse(
369-
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(indexName, e.getMessage())
370-
);
371-
}
372337
}
373338

374339
@Override

0 commit comments

Comments
 (0)