Skip to content

Commit 1c763d7

Browse files
committed
simplifying
1 parent 6e82593 commit 1c763d7

File tree

1 file changed

+47
-33
lines changed

1 file changed

+47
-33
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.List;
4545
import java.util.Map;
4646
import java.util.NoSuchElementException;
47+
import java.util.function.BiConsumer;
4748

4849
import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate;
4950

@@ -217,41 +218,58 @@ private void maybeProcessNextIndex(
217218
reindexDataStreamIndexRequest.setParentTask(parentTaskId);
218219

219220
SubscribableListener.<ReindexDataStreamIndexAction.Response>newForked(
220-
l -> client.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
221+
l -> client.execute(
222+
ReindexDataStreamIndexAction.INSTANCE,
223+
reindexDataStreamIndexRequest,
224+
l.delegateResponse(new BiConsumer<ActionListener<ReindexDataStreamIndexAction.Response>, Exception>() {
225+
@Override
226+
public void accept(ActionListener<ReindexDataStreamIndexAction.Response> responseActionListener, Exception e) {
227+
IndexMetadata sourceIndex = clusterService.state().getMetadata().index(index.getName());
228+
if (sourceIndex == null) {
229+
/*
230+
* One possible cause of exception here is that the source index no longer exists. This can happen if ILM
231+
* performs certain actions while reindex is happening, or if a user manually deletes it. In this case we don't
232+
* want to fail the task. We treat it as a success and move on. The updateDataStreamOrCleanup method will make
233+
* an attempt at deleting the destination index if it has been created.
234+
*/
235+
logger.debug(
236+
"The source index {} in the data stream {} was removed during processing",
237+
index.getName(),
238+
sourceDataStream
239+
);
240+
// We need the destination index name in the response so that updateDataStreamOrCleanup can delete it
241+
l.onResponse(
242+
new ReindexDataStreamIndexAction.Response(
243+
ReindexDataStreamIndexTransportAction.generateDestIndexName(index.getName())
244+
)
245+
);
246+
} else {
247+
l.onFailure(e);
248+
}
249+
}
250+
})
251+
)
221252
)
222253
.<AcknowledgedResponse>andThen(
223-
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)
254+
(l, result) -> updateDataStreamOrCleanup(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)
224255
)
225256
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), parentTaskId, l))
226257
.addListener(ActionListener.wrap(unused -> {
227258
reindexDataStreamTask.reindexSucceeded(index.getName());
228259
listener.onResponse(null);
229260
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, sourceDataStream, listener, parentTaskId);
230261
}, e -> {
231-
/*
232-
* One possible cause of exception here is that the source index no longer exists. This can happen if ILM performs certain
233-
* actions while reindex is happening, or if a user manually deletes it. In this case we don't want to fail the task. We
234-
* treat it as a success and move on after making an attempt at deleting the destination index if it has been created yet.
235-
*/
236-
IndexMetadata sourceIndex = clusterService.state().getMetadata().index(index.getName());
237-
if (sourceIndex == null) {
238-
String destIndexName = ReindexDataStreamIndexTransportAction.generateDestIndexName(index.getName());
239-
logMissingSourceIndex(sourceDataStream, index.getName(), destIndexName);
240-
deleteIndex(destIndexName, parentTaskId, ActionListener.wrap(acknowledgedResponse -> {
241-
reindexDataStreamTask.reindexSucceeded(index.getName());
242-
listener.onResponse(null);
243-
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, sourceDataStream, listener, parentTaskId);
244-
}, listener::onFailure));
245-
} else {
246-
// The source index still exists, so this is a real problem we want to record
247-
reindexDataStreamTask.reindexFailed(index.getName(), e);
248-
listener.onResponse(null);
249-
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, sourceDataStream, listener, parentTaskId);
250-
}
262+
reindexDataStreamTask.reindexFailed(index.getName(), e);
263+
listener.onResponse(null);
264+
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, sourceDataStream, listener, parentTaskId);
251265
}));
252266
}
253267

254-
private void updateDataStream(
268+
/*
269+
* If the oldIndex still exists, this method swaps newIndex into the dataStream and swaps out oldIndex. If oldIndex no longer exists,
270+
* this method cleans up by deleting newIndex (if it exists).
271+
*/
272+
private void updateDataStreamOrCleanup(
255273
String dataStream,
256274
String oldIndex,
257275
String newIndex,
@@ -260,7 +278,12 @@ private void updateDataStream(
260278
) {
261279
IndexMetadata sourceIndex = clusterService.state().getMetadata().index(oldIndex);
262280
if (sourceIndex == null) {
263-
logMissingSourceIndex(dataStream, oldIndex, newIndex);
281+
logger.debug(
282+
"Index {} in data stream {} no longer exists after reindexing completed. Deleting reindexed index {}",
283+
dataStream,
284+
oldIndex,
285+
newIndex
286+
);
264287
deleteIndex(newIndex, parentTaskId, listener);
265288
} else {
266289
ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request(
@@ -273,15 +296,6 @@ private void updateDataStream(
273296
}
274297
}
275298

276-
private void logMissingSourceIndex(String dataStreamName, String sourceIndexName, String destinationIndexName) {
277-
logger.debug(
278-
"Index {} in data stream {} no longer exists after reindexing completed. Deleting reindexed index {}",
279-
sourceIndexName,
280-
dataStreamName,
281-
destinationIndexName
282-
);
283-
}
284-
285299
private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
286300
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName).indicesOptions(IGNORE_MISSING_OPTIONS);
287301
deleteIndexRequest.setParentTask(parentTaskId);

0 commit comments

Comments
 (0)