Skip to content

Commit ae92ed1

Browse files
authored
Setting parent tasks on all requests for data stream reindex (#119034) (#119170)
1 parent fdc401e commit ae92ed1

File tree

3 files changed

+76
-53
lines changed

3 files changed

+76
-53
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.index.IndexNotFoundException;
2727
import org.elasticsearch.injection.guice.Inject;
2828
import org.elasticsearch.tasks.Task;
29+
import org.elasticsearch.tasks.TaskId;
2930
import org.elasticsearch.threadpool.ThreadPool;
3031
import org.elasticsearch.transport.TransportService;
3132
import org.elasticsearch.xcontent.XContentType;
@@ -96,6 +97,7 @@ protected void doExecute(Task task, CreateIndexFromSourceAction.Request request,
9697
if (mergeMappings.isEmpty() == false) {
9798
createIndexRequest.mapping(mergeMappings);
9899
}
100+
createIndexRequest.setParentTask(new TaskId(clusterService.localNode().getId(), task.getId()));
99101

100102
client.admin().indices().create(createIndexRequest, listener.map(response -> response));
101103
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.index.reindex.ReindexRequest;
3131
import org.elasticsearch.injection.guice.Inject;
3232
import org.elasticsearch.tasks.Task;
33+
import org.elasticsearch.tasks.TaskId;
3334
import org.elasticsearch.threadpool.ThreadPool;
3435
import org.elasticsearch.transport.TransportService;
3536
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
@@ -76,6 +77,7 @@ protected void doExecute(
7677
) {
7778
var sourceIndexName = request.getSourceIndex();
7879
var destIndexName = generateDestIndexName(sourceIndexName);
80+
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
7981
IndexMetadata sourceIndex = clusterService.state().getMetadata().index(sourceIndexName);
8082
Settings settingsBefore = sourceIndex.getSettings();
8183

@@ -89,17 +91,17 @@ protected void doExecute(
8991
);
9092
}
9193

92-
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l))
93-
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l))
94-
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l))
95-
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l))
96-
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l))
97-
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l))
94+
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
95+
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
96+
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
97+
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId))
98+
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l, taskId))
99+
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l, taskId))
98100
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
99101
.addListener(listener);
100102
}
101103

102-
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener) {
104+
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
103105
logger.debug("Setting write block on source index [{}]", sourceIndexName);
104106
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
105107
@Override
@@ -121,18 +123,24 @@ public void onFailure(Exception e) {
121123
listener.onFailure(e);
122124
}
123125
}
124-
});
126+
}, parentTaskId);
125127
}
126128

127-
private void deleteDestIfExists(String destIndexName, ActionListener<AcknowledgedResponse> listener) {
129+
private void deleteDestIfExists(String destIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
128130
logger.debug("Attempting to delete index [{}]", destIndexName);
129131
var deleteIndexRequest = new DeleteIndexRequest(destIndexName).indicesOptions(IGNORE_MISSING_OPTIONS)
130132
.masterNodeTimeout(TimeValue.MAX_VALUE);
133+
deleteIndexRequest.setParentTask(parentTaskId);
131134
var errorMessage = String.format(Locale.ROOT, "Failed to acknowledge delete of index [%s]", destIndexName);
132135
client.admin().indices().delete(deleteIndexRequest, failIfNotAcknowledged(listener, errorMessage));
133136
}
134137

135-
private void createIndex(IndexMetadata sourceIndex, String destIndexName, ActionListener<AcknowledgedResponse> listener) {
138+
private void createIndex(
139+
IndexMetadata sourceIndex,
140+
String destIndexName,
141+
ActionListener<AcknowledgedResponse> listener,
142+
TaskId parentTaskId
143+
) {
136144
logger.debug("Creating destination index [{}] for source index [{}]", destIndexName, sourceIndex.getIndex().getName());
137145

138146
// override read-only settings if they exist
@@ -147,29 +155,32 @@ private void createIndex(IndexMetadata sourceIndex, String destIndexName, Action
147155
removeReadOnlyOverride,
148156
Map.of()
149157
);
158+
request.setParentTask(parentTaskId);
150159
var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", request.getDestIndex());
151160
client.execute(CreateIndexFromSourceAction.INSTANCE, request, failIfNotAcknowledged(listener, errorMessage));
152161
}
153162

154-
private void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkByScrollResponse> listener) {
163+
private void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkByScrollResponse> listener, TaskId parentTaskId) {
155164
logger.debug("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName);
156165
var reindexRequest = new ReindexRequest();
157166
reindexRequest.setSourceIndices(sourceIndexName);
158167
reindexRequest.getSearchRequest().allowPartialSearchResults(false);
159168
reindexRequest.getSearchRequest().source().fetchSource(true);
160169
reindexRequest.setDestIndex(destIndexName);
170+
reindexRequest.setParentTask(parentTaskId);
161171
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
162172
}
163173

164174
private void addBlockIfFromSource(
165175
IndexMetadata.APIBlock block,
166176
Settings settingsBefore,
167177
String destIndexName,
168-
ActionListener<AddIndexBlockResponse> listener
178+
ActionListener<AddIndexBlockResponse> listener,
179+
TaskId parentTaskId
169180
) {
170181
if (settingsBefore.getAsBoolean(block.settingName(), false)) {
171182
var errorMessage = String.format(Locale.ROOT, "Add [%s] block to index [%s] was not acknowledged", block.name(), destIndexName);
172-
addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage));
183+
addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage), parentTaskId);
173184
} else {
174185
listener.onResponse(null);
175186
}
@@ -192,7 +203,14 @@ private static <U extends AcknowledgedResponse> ActionListener<U> failIfNotAckno
192203
});
193204
}
194205

195-
private void addBlockToIndex(IndexMetadata.APIBlock block, String index, ActionListener<AddIndexBlockResponse> listener) {
196-
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, new AddIndexBlockRequest(block, index), listener);
206+
private void addBlockToIndex(
207+
IndexMetadata.APIBlock block,
208+
String index,
209+
ActionListener<AddIndexBlockResponse> listener,
210+
TaskId parentTaskId
211+
) {
212+
AddIndexBlockRequest addIndexBlockRequest = new AddIndexBlockRequest(block, index);
213+
addIndexBlockRequest.setParentTask(parentTaskId);
214+
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest, listener);
197215
}
198216
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ protected ReindexDataStreamTask createTask(
7575
@Override
7676
protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTaskParams params, PersistentTaskState state) {
7777
String sourceDataStream = params.getSourceDataStream();
78+
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
7879
GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream });
80+
request.setParentTask(taskId);
7981
assert task instanceof ReindexDataStreamTask;
8082
final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task;
8183
ExecuteWithHeadersClient reindexClient = new ExecuteWithHeadersClient(client, params.headers());
@@ -84,16 +86,18 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
8486
if (dataStreamInfos.size() == 1) {
8587
DataStream dataStream = dataStreamInfos.get(0).getDataStream();
8688
if (getReindexRequiredPredicate(clusterService.state().metadata()).test(dataStream.getWriteIndex())) {
89+
RolloverRequest rolloverRequest = new RolloverRequest(sourceDataStream, null);
90+
rolloverRequest.setParentTask(taskId);
8791
reindexClient.execute(
8892
RolloverAction.INSTANCE,
89-
new RolloverRequest(sourceDataStream, null),
93+
rolloverRequest,
9094
ActionListener.wrap(
91-
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream),
95+
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId),
9296
e -> completeFailedPersistentTask(reindexDataStreamTask, e)
9397
)
9498
);
9599
} else {
96-
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream);
100+
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId);
97101
}
98102
} else {
99103
completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist"));
@@ -105,7 +109,8 @@ private void reindexIndices(
105109
DataStream dataStream,
106110
ReindexDataStreamTask reindexDataStreamTask,
107111
ExecuteWithHeadersClient reindexClient,
108-
String sourceDataStream
112+
String sourceDataStream,
113+
TaskId parentTaskId
109114
) {
110115
List<Index> indices = dataStream.getIndices();
111116
List<Index> indicesToBeReindexed = indices.stream().filter(getReindexRequiredPredicate(clusterService.state().metadata())).toList();
@@ -117,7 +122,7 @@ private void reindexIndices(
117122
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
118123
final int maxConcurrentIndices = 1;
119124
for (int i = 0; i < maxConcurrentIndices; i++) {
120-
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
125+
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
121126
}
122127
// This takes care of the additional latch count referenced above:
123128
listener.onResponse(null);
@@ -128,7 +133,8 @@ private void maybeProcessNextIndex(
128133
ReindexDataStreamTask reindexDataStreamTask,
129134
ExecuteWithHeadersClient reindexClient,
130135
String sourceDataStream,
131-
CountDownActionListener listener
136+
CountDownActionListener listener,
137+
TaskId parentTaskId
132138
) {
133139
if (indicesRemaining.isEmpty()) {
134140
return;
@@ -140,51 +146,48 @@ private void maybeProcessNextIndex(
140146
return;
141147
}
142148
reindexDataStreamTask.incrementInProgressIndicesCount(index.getName());
143-
reindexClient.execute(
144-
ReindexDataStreamIndexAction.INSTANCE,
145-
new ReindexDataStreamIndexAction.Request(index.getName()),
146-
ActionListener.wrap(response1 -> {
147-
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {
148-
reindexDataStreamTask.reindexSucceeded(index.getName());
149-
listener.onResponse(null);
150-
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
151-
}, exception -> {
152-
reindexDataStreamTask.reindexFailed(index.getName(), exception);
153-
listener.onResponse(null);
154-
}), reindexClient);
149+
ReindexDataStreamIndexAction.Request reindexDataStreamIndexRequest = new ReindexDataStreamIndexAction.Request(index.getName());
150+
reindexDataStreamIndexRequest.setParentTask(parentTaskId);
151+
reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, ActionListener.wrap(response1 -> {
152+
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {
153+
reindexDataStreamTask.reindexSucceeded(index.getName());
154+
listener.onResponse(null);
155+
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
155156
}, exception -> {
156157
reindexDataStreamTask.reindexFailed(index.getName(), exception);
157158
listener.onResponse(null);
158-
})
159-
);
159+
}), reindexClient, parentTaskId);
160+
}, exception -> {
161+
reindexDataStreamTask.reindexFailed(index.getName(), exception);
162+
listener.onResponse(null);
163+
}));
160164
}
161165

162166
private void updateDataStream(
163167
String dataStream,
164168
String oldIndex,
165169
String newIndex,
166170
ActionListener<Void> listener,
167-
ExecuteWithHeadersClient reindexClient
171+
ExecuteWithHeadersClient reindexClient,
172+
TaskId parentTaskId
168173
) {
169-
reindexClient.execute(
170-
ModifyDataStreamsAction.INSTANCE,
171-
new ModifyDataStreamsAction.Request(
172-
TimeValue.MAX_VALUE,
173-
TimeValue.MAX_VALUE,
174-
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
175-
),
176-
new ActionListener<>() {
177-
@Override
178-
public void onResponse(AcknowledgedResponse response) {
179-
listener.onResponse(null);
180-
}
174+
ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request(
175+
TimeValue.MAX_VALUE,
176+
TimeValue.MAX_VALUE,
177+
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
178+
);
179+
modifyDataStreamRequest.setParentTask(parentTaskId);
180+
reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, new ActionListener<>() {
181+
@Override
182+
public void onResponse(AcknowledgedResponse response) {
183+
listener.onResponse(null);
184+
}
181185

182-
@Override
183-
public void onFailure(Exception e) {
184-
listener.onFailure(e);
185-
}
186+
@Override
187+
public void onFailure(Exception e) {
188+
listener.onFailure(e);
186189
}
187-
);
190+
});
188191
}
189192

190193
private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {

0 commit comments

Comments
 (0)