Skip to content

Commit 6ec8f95

Browse files
authored
Setting parent tasks on all requests for data stream reindex (#119034)
1 parent 2f25f67 commit 6ec8f95

File tree

3 files changed

+76
-53
lines changed

3 files changed

+76
-53
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.index.IndexNotFoundException;
2727
import org.elasticsearch.injection.guice.Inject;
2828
import org.elasticsearch.tasks.Task;
29+
import org.elasticsearch.tasks.TaskId;
2930
import org.elasticsearch.threadpool.ThreadPool;
3031
import org.elasticsearch.transport.TransportService;
3132
import org.elasticsearch.xcontent.XContentType;
@@ -96,6 +97,7 @@ protected void doExecute(Task task, CreateIndexFromSourceAction.Request request,
9697
if (mergeMappings.isEmpty() == false) {
9798
createIndexRequest.mapping(mergeMappings);
9899
}
100+
createIndexRequest.setParentTask(new TaskId(clusterService.localNode().getId(), task.getId()));
99101

100102
client.admin().indices().create(createIndexRequest, listener.map(response -> response));
101103
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.index.reindex.ReindexRequest;
3131
import org.elasticsearch.injection.guice.Inject;
3232
import org.elasticsearch.tasks.Task;
33+
import org.elasticsearch.tasks.TaskId;
3334
import org.elasticsearch.threadpool.ThreadPool;
3435
import org.elasticsearch.transport.TransportService;
3536
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
@@ -76,6 +77,7 @@ protected void doExecute(
7677
) {
7778
var sourceIndexName = request.getSourceIndex();
7879
var destIndexName = generateDestIndexName(sourceIndexName);
80+
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
7981
IndexMetadata sourceIndex = clusterService.state().getMetadata().index(sourceIndexName);
8082
Settings settingsBefore = sourceIndex.getSettings();
8183

@@ -89,17 +91,17 @@ protected void doExecute(
8991
);
9092
}
9193

92-
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l))
93-
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l))
94-
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l))
95-
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l))
96-
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l))
97-
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l))
94+
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
95+
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
96+
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
97+
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId))
98+
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l, taskId))
99+
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l, taskId))
98100
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
99101
.addListener(listener);
100102
}
101103

102-
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener) {
104+
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
103105
logger.debug("Setting write block on source index [{}]", sourceIndexName);
104106
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
105107
@Override
@@ -121,18 +123,24 @@ public void onFailure(Exception e) {
121123
listener.onFailure(e);
122124
}
123125
}
124-
});
126+
}, parentTaskId);
125127
}
126128

127-
private void deleteDestIfExists(String destIndexName, ActionListener<AcknowledgedResponse> listener) {
129+
private void deleteDestIfExists(String destIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
128130
logger.debug("Attempting to delete index [{}]", destIndexName);
129131
var deleteIndexRequest = new DeleteIndexRequest(destIndexName).indicesOptions(IGNORE_MISSING_OPTIONS)
130132
.masterNodeTimeout(TimeValue.MAX_VALUE);
133+
deleteIndexRequest.setParentTask(parentTaskId);
131134
var errorMessage = String.format(Locale.ROOT, "Failed to acknowledge delete of index [%s]", destIndexName);
132135
client.admin().indices().delete(deleteIndexRequest, failIfNotAcknowledged(listener, errorMessage));
133136
}
134137

135-
private void createIndex(IndexMetadata sourceIndex, String destIndexName, ActionListener<AcknowledgedResponse> listener) {
138+
private void createIndex(
139+
IndexMetadata sourceIndex,
140+
String destIndexName,
141+
ActionListener<AcknowledgedResponse> listener,
142+
TaskId parentTaskId
143+
) {
136144
logger.debug("Creating destination index [{}] for source index [{}]", destIndexName, sourceIndex.getIndex().getName());
137145

138146
// override read-only settings if they exist
@@ -147,29 +155,32 @@ private void createIndex(IndexMetadata sourceIndex, String destIndexName, Action
147155
removeReadOnlyOverride,
148156
Map.of()
149157
);
158+
request.setParentTask(parentTaskId);
150159
var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", request.getDestIndex());
151160
client.execute(CreateIndexFromSourceAction.INSTANCE, request, failIfNotAcknowledged(listener, errorMessage));
152161
}
153162

154-
private void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkByScrollResponse> listener) {
163+
private void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkByScrollResponse> listener, TaskId parentTaskId) {
155164
logger.debug("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName);
156165
var reindexRequest = new ReindexRequest();
157166
reindexRequest.setSourceIndices(sourceIndexName);
158167
reindexRequest.getSearchRequest().allowPartialSearchResults(false);
159168
reindexRequest.getSearchRequest().source().fetchSource(true);
160169
reindexRequest.setDestIndex(destIndexName);
170+
reindexRequest.setParentTask(parentTaskId);
161171
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
162172
}
163173

164174
private void addBlockIfFromSource(
165175
IndexMetadata.APIBlock block,
166176
Settings settingsBefore,
167177
String destIndexName,
168-
ActionListener<AddIndexBlockResponse> listener
178+
ActionListener<AddIndexBlockResponse> listener,
179+
TaskId parentTaskId
169180
) {
170181
if (settingsBefore.getAsBoolean(block.settingName(), false)) {
171182
var errorMessage = String.format(Locale.ROOT, "Add [%s] block to index [%s] was not acknowledged", block.name(), destIndexName);
172-
addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage));
183+
addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage), parentTaskId);
173184
} else {
174185
listener.onResponse(null);
175186
}
@@ -192,7 +203,14 @@ private static <U extends AcknowledgedResponse> ActionListener<U> failIfNotAckno
192203
});
193204
}
194205

195-
private void addBlockToIndex(IndexMetadata.APIBlock block, String index, ActionListener<AddIndexBlockResponse> listener) {
196-
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, new AddIndexBlockRequest(block, index), listener);
206+
private void addBlockToIndex(
207+
IndexMetadata.APIBlock block,
208+
String index,
209+
ActionListener<AddIndexBlockResponse> listener,
210+
TaskId parentTaskId
211+
) {
212+
AddIndexBlockRequest addIndexBlockRequest = new AddIndexBlockRequest(block, index);
213+
addIndexBlockRequest.setParentTask(parentTaskId);
214+
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest, listener);
197215
}
198216
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ protected ReindexDataStreamTask createTask(
7676
@Override
7777
protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTaskParams params, PersistentTaskState state) {
7878
String sourceDataStream = params.getSourceDataStream();
79+
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
7980
GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream });
81+
request.setParentTask(taskId);
8082
assert task instanceof ReindexDataStreamTask;
8183
final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task;
8284
ExecuteWithHeadersClient reindexClient = new ExecuteWithHeadersClient(client, params.headers());
@@ -85,16 +87,18 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
8587
if (dataStreamInfos.size() == 1) {
8688
DataStream dataStream = dataStreamInfos.getFirst().getDataStream();
8789
if (getReindexRequiredPredicate(clusterService.state().metadata()).test(dataStream.getWriteIndex())) {
90+
RolloverRequest rolloverRequest = new RolloverRequest(sourceDataStream, null);
91+
rolloverRequest.setParentTask(taskId);
8892
reindexClient.execute(
8993
RolloverAction.INSTANCE,
90-
new RolloverRequest(sourceDataStream, null),
94+
rolloverRequest,
9195
ActionListener.wrap(
92-
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream),
96+
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId),
9397
e -> completeFailedPersistentTask(reindexDataStreamTask, e)
9498
)
9599
);
96100
} else {
97-
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream);
101+
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId);
98102
}
99103
} else {
100104
completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist"));
@@ -106,7 +110,8 @@ private void reindexIndices(
106110
DataStream dataStream,
107111
ReindexDataStreamTask reindexDataStreamTask,
108112
ExecuteWithHeadersClient reindexClient,
109-
String sourceDataStream
113+
String sourceDataStream,
114+
TaskId parentTaskId
110115
) {
111116
List<Index> indices = dataStream.getIndices();
112117
List<Index> indicesToBeReindexed = indices.stream().filter(getReindexRequiredPredicate(clusterService.state().metadata())).toList();
@@ -118,7 +123,7 @@ private void reindexIndices(
118123
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
119124
final int maxConcurrentIndices = 1;
120125
for (int i = 0; i < maxConcurrentIndices; i++) {
121-
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
126+
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
122127
}
123128
// This takes care of the additional latch count referenced above:
124129
listener.onResponse(null);
@@ -129,7 +134,8 @@ private void maybeProcessNextIndex(
129134
ReindexDataStreamTask reindexDataStreamTask,
130135
ExecuteWithHeadersClient reindexClient,
131136
String sourceDataStream,
132-
CountDownActionListener listener
137+
CountDownActionListener listener,
138+
TaskId parentTaskId
133139
) {
134140
if (indicesRemaining.isEmpty()) {
135141
return;
@@ -141,51 +147,48 @@ private void maybeProcessNextIndex(
141147
return;
142148
}
143149
reindexDataStreamTask.incrementInProgressIndicesCount(index.getName());
144-
reindexClient.execute(
145-
ReindexDataStreamIndexAction.INSTANCE,
146-
new ReindexDataStreamIndexAction.Request(index.getName()),
147-
ActionListener.wrap(response1 -> {
148-
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {
149-
reindexDataStreamTask.reindexSucceeded(index.getName());
150-
listener.onResponse(null);
151-
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener);
152-
}, exception -> {
153-
reindexDataStreamTask.reindexFailed(index.getName(), exception);
154-
listener.onResponse(null);
155-
}), reindexClient);
150+
ReindexDataStreamIndexAction.Request reindexDataStreamIndexRequest = new ReindexDataStreamIndexAction.Request(index.getName());
151+
reindexDataStreamIndexRequest.setParentTask(parentTaskId);
152+
reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, ActionListener.wrap(response1 -> {
153+
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {
154+
reindexDataStreamTask.reindexSucceeded(index.getName());
155+
listener.onResponse(null);
156+
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
156157
}, exception -> {
157158
reindexDataStreamTask.reindexFailed(index.getName(), exception);
158159
listener.onResponse(null);
159-
})
160-
);
160+
}), reindexClient, parentTaskId);
161+
}, exception -> {
162+
reindexDataStreamTask.reindexFailed(index.getName(), exception);
163+
listener.onResponse(null);
164+
}));
161165
}
162166

163167
private void updateDataStream(
164168
String dataStream,
165169
String oldIndex,
166170
String newIndex,
167171
ActionListener<Void> listener,
168-
ExecuteWithHeadersClient reindexClient
172+
ExecuteWithHeadersClient reindexClient,
173+
TaskId parentTaskId
169174
) {
170-
reindexClient.execute(
171-
ModifyDataStreamsAction.INSTANCE,
172-
new ModifyDataStreamsAction.Request(
173-
TimeValue.MAX_VALUE,
174-
TimeValue.MAX_VALUE,
175-
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
176-
),
177-
new ActionListener<>() {
178-
@Override
179-
public void onResponse(AcknowledgedResponse response) {
180-
listener.onResponse(null);
181-
}
175+
ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request(
176+
TimeValue.MAX_VALUE,
177+
TimeValue.MAX_VALUE,
178+
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
179+
);
180+
modifyDataStreamRequest.setParentTask(parentTaskId);
181+
reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, new ActionListener<>() {
182+
@Override
183+
public void onResponse(AcknowledgedResponse response) {
184+
listener.onResponse(null);
185+
}
182186

183-
@Override
184-
public void onFailure(Exception e) {
185-
listener.onFailure(e);
186-
}
187+
@Override
188+
public void onFailure(Exception e) {
189+
listener.onFailure(e);
187190
}
188-
);
191+
});
189192
}
190193

191194
private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {

0 commit comments

Comments
 (0)