Skip to content

Commit afa3abc

Browse files
authored
Reindex data stream persistent task (#116780)
1 parent 4cc9f5d commit afa3abc

File tree

14 files changed

+1197
-1
lines changed

14 files changed

+1197
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams.action;
11+
12+
import org.elasticsearch.ResourceNotFoundException;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.ActionType;
15+
import org.elasticsearch.action.DocWriteRequest;
16+
import org.elasticsearch.action.DocWriteResponse;
17+
import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder;
18+
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
19+
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
20+
import org.elasticsearch.action.datastreams.ReindexDataStreamAction;
21+
import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamRequest;
22+
import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse;
23+
import org.elasticsearch.action.index.IndexRequest;
24+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
25+
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
26+
import org.elasticsearch.cluster.metadata.Template;
27+
import org.elasticsearch.datastreams.DataStreamsPlugin;
28+
import org.elasticsearch.datastreams.task.ReindexDataStreamTask;
29+
import org.elasticsearch.plugins.Plugin;
30+
import org.elasticsearch.tasks.CancellableTask;
31+
import org.elasticsearch.tasks.TaskManager;
32+
import org.elasticsearch.test.ESIntegTestCase;
33+
import org.elasticsearch.transport.TransportService;
34+
import org.elasticsearch.xcontent.XContentType;
35+
36+
import java.util.Collection;
37+
import java.util.List;
38+
import java.util.Locale;
39+
import java.util.Map;
40+
import java.util.Optional;
41+
import java.util.concurrent.CountDownLatch;
42+
import java.util.concurrent.atomic.AtomicReference;
43+
44+
import static org.hamcrest.Matchers.equalTo;
45+
import static org.hamcrest.Matchers.is;
46+
47+
public class ReindexDataStreamTransportActionIT extends ESIntegTestCase {
48+
49+
@Override
50+
protected Collection<Class<? extends Plugin>> nodePlugins() {
51+
return List.of(DataStreamsPlugin.class);
52+
}
53+
54+
public void testNonExistentDataStream() {
55+
String nonExistentDataStreamName = randomAlphaOfLength(50);
56+
ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest(nonExistentDataStreamName);
57+
assertThrows(
58+
ResourceNotFoundException.class,
59+
() -> client().execute(new ActionType<ReindexDataStreamResponse>(ReindexDataStreamAction.NAME), reindexDataStreamRequest)
60+
.actionGet()
61+
);
62+
}
63+
64+
public void testAlreadyUpToDateDataStream() throws Exception {
65+
String dataStreamName = randomAlphaOfLength(50).toLowerCase(Locale.ROOT);
66+
ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest(dataStreamName);
67+
createDataStream(dataStreamName);
68+
ReindexDataStreamResponse response = client().execute(
69+
new ActionType<ReindexDataStreamResponse>(ReindexDataStreamAction.NAME),
70+
reindexDataStreamRequest
71+
).actionGet();
72+
String persistentTaskId = response.getTaskId();
73+
assertThat(persistentTaskId, equalTo("reindex-data-stream-" + dataStreamName));
74+
AtomicReference<ReindexDataStreamTask> runningTask = new AtomicReference<>();
75+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
76+
TaskManager taskManager = transportService.getTaskManager();
77+
Map<Long, CancellableTask> tasksMap = taskManager.getCancellableTasks();
78+
Optional<Map.Entry<Long, CancellableTask>> optionalTask = taskManager.getCancellableTasks()
79+
.entrySet()
80+
.stream()
81+
.filter(entry -> entry.getValue().getType().equals("persistent"))
82+
.filter(
83+
entry -> entry.getValue() instanceof ReindexDataStreamTask
84+
&& persistentTaskId.equals((((ReindexDataStreamTask) entry.getValue()).getPersistentTaskId()))
85+
)
86+
.findAny();
87+
optionalTask.ifPresent(
88+
longCancellableTaskEntry -> runningTask.compareAndSet(null, (ReindexDataStreamTask) longCancellableTaskEntry.getValue())
89+
);
90+
}
91+
ReindexDataStreamTask task = runningTask.get();
92+
assertNotNull(task);
93+
assertThat(task.getStatus().complete(), equalTo(true));
94+
assertNull(task.getStatus().exception());
95+
assertThat(task.getStatus().pending(), equalTo(0));
96+
assertThat(task.getStatus().inProgress(), equalTo(0));
97+
assertThat(task.getStatus().errors().size(), equalTo(0));
98+
}
99+
100+
private void createDataStream(String dataStreamName) {
101+
final TransportPutComposableIndexTemplateAction.Request putComposableTemplateRequest =
102+
new TransportPutComposableIndexTemplateAction.Request("my-template");
103+
putComposableTemplateRequest.indexTemplate(
104+
ComposableIndexTemplate.builder()
105+
.indexPatterns(List.of(dataStreamName))
106+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
107+
.template(Template.builder().build())
108+
.build()
109+
);
110+
final AcknowledgedResponse putComposableTemplateResponse = safeGet(
111+
client().execute(TransportPutComposableIndexTemplateAction.TYPE, putComposableTemplateRequest)
112+
);
113+
assertThat(putComposableTemplateResponse.isAcknowledged(), is(true));
114+
115+
final CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(
116+
TEST_REQUEST_TIMEOUT,
117+
TEST_REQUEST_TIMEOUT,
118+
dataStreamName
119+
);
120+
final AcknowledgedResponse createDataStreamResponse = safeGet(
121+
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest)
122+
);
123+
assertThat(createDataStreamResponse.isAcknowledged(), is(true));
124+
indexDocs(dataStreamName);
125+
safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute());
126+
indexDocs(dataStreamName);
127+
safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute());
128+
}
129+
130+
private void indexDocs(String dataStreamName) {
131+
int docs = randomIntBetween(5, 10);
132+
CountDownLatch countDownLatch = new CountDownLatch(docs);
133+
for (int i = 0; i < docs; i++) {
134+
var indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
135+
final String doc = "{ \"@timestamp\": \"2099-05-06T16:21:15.000Z\", \"message\": \"something cool happened\" }";
136+
indexRequest.source(doc, XContentType.JSON);
137+
client().index(indexRequest, new ActionListener<>() {
138+
@Override
139+
public void onResponse(DocWriteResponse docWriteResponse) {
140+
countDownLatch.countDown();
141+
}
142+
143+
@Override
144+
public void onFailure(Exception e) {
145+
fail("Indexing request should have succeeded eventually, failed with " + e.getMessage());
146+
}
147+
});
148+
}
149+
safeAwait(countDownLatch);
150+
}
151+
152+
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,23 @@
1919
import org.elasticsearch.action.datastreams.MigrateToDataStreamAction;
2020
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2121
import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
22+
import org.elasticsearch.action.datastreams.ReindexDataStreamAction;
2223
import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction;
2324
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
2425
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
26+
import org.elasticsearch.client.internal.Client;
2527
import org.elasticsearch.client.internal.OriginSettingClient;
2628
import org.elasticsearch.cluster.metadata.DataStream;
2729
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2830
import org.elasticsearch.cluster.node.DiscoveryNodes;
31+
import org.elasticsearch.cluster.service.ClusterService;
2932
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
3033
import org.elasticsearch.common.settings.ClusterSettings;
3134
import org.elasticsearch.common.settings.IndexScopedSettings;
3235
import org.elasticsearch.common.settings.Setting;
3336
import org.elasticsearch.common.settings.Settings;
3437
import org.elasticsearch.common.settings.SettingsFilter;
38+
import org.elasticsearch.common.settings.SettingsModule;
3539
import org.elasticsearch.core.IOUtils;
3640
import org.elasticsearch.core.TimeValue;
3741
import org.elasticsearch.datastreams.action.CreateDataStreamTransportAction;
@@ -40,6 +44,7 @@
4044
import org.elasticsearch.datastreams.action.MigrateToDataStreamTransportAction;
4145
import org.elasticsearch.datastreams.action.ModifyDataStreamsTransportAction;
4246
import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction;
47+
import org.elasticsearch.datastreams.action.ReindexDataStreamTransportAction;
4348
import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction;
4449
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
4550
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
@@ -73,14 +78,27 @@
7378
import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction;
7479
import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction;
7580
import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction;
81+
import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskExecutor;
82+
import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskState;
83+
import org.elasticsearch.datastreams.task.ReindexDataStreamStatus;
84+
import org.elasticsearch.datastreams.task.ReindexDataStreamTask;
85+
import org.elasticsearch.datastreams.task.ReindexDataStreamTaskParams;
7686
import org.elasticsearch.features.NodeFeature;
7787
import org.elasticsearch.health.HealthIndicatorService;
7888
import org.elasticsearch.index.IndexSettingProvider;
89+
import org.elasticsearch.persistent.PersistentTaskParams;
90+
import org.elasticsearch.persistent.PersistentTaskState;
91+
import org.elasticsearch.persistent.PersistentTasksExecutor;
7992
import org.elasticsearch.plugins.ActionPlugin;
8093
import org.elasticsearch.plugins.HealthPlugin;
94+
import org.elasticsearch.plugins.PersistentTaskPlugin;
8195
import org.elasticsearch.plugins.Plugin;
8296
import org.elasticsearch.rest.RestController;
8397
import org.elasticsearch.rest.RestHandler;
98+
import org.elasticsearch.tasks.Task;
99+
import org.elasticsearch.threadpool.ThreadPool;
100+
import org.elasticsearch.xcontent.NamedXContentRegistry;
101+
import org.elasticsearch.xcontent.ParseField;
84102

85103
import java.io.IOException;
86104
import java.time.Clock;
@@ -93,7 +111,7 @@
93111

94112
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN;
95113

96-
public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin {
114+
public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin, PersistentTaskPlugin {
97115

98116
public static final Setting<TimeValue> TIME_SERIES_POLL_INTERVAL = Setting.timeSetting(
99117
"time_series.poll_interval",
@@ -244,6 +262,7 @@ public Collection<?> createComponents(PluginServices services) {
244262
actions.add(new ActionHandler<>(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class));
245263
actions.add(new ActionHandler<>(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
246264
}
265+
actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class));
247266
return actions;
248267
}
249268

@@ -302,4 +321,48 @@ public void close() throws IOException {
302321
public Collection<HealthIndicatorService> getHealthIndicatorServices() {
303322
return List.of(dataStreamLifecycleHealthIndicatorService.get());
304323
}
324+
325+
@Override
326+
public List<NamedXContentRegistry.Entry> getNamedXContent() {
327+
return List.of(
328+
new NamedXContentRegistry.Entry(
329+
PersistentTaskState.class,
330+
new ParseField(ReindexDataStreamPersistentTaskState.NAME),
331+
ReindexDataStreamPersistentTaskState::fromXContent
332+
),
333+
new NamedXContentRegistry.Entry(
334+
PersistentTaskParams.class,
335+
new ParseField(ReindexDataStreamTaskParams.NAME),
336+
ReindexDataStreamTaskParams::fromXContent
337+
)
338+
);
339+
}
340+
341+
@Override
342+
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
343+
return List.of(
344+
new NamedWriteableRegistry.Entry(
345+
PersistentTaskState.class,
346+
ReindexDataStreamPersistentTaskState.NAME,
347+
ReindexDataStreamPersistentTaskState::new
348+
),
349+
new NamedWriteableRegistry.Entry(
350+
PersistentTaskParams.class,
351+
ReindexDataStreamTaskParams.NAME,
352+
ReindexDataStreamTaskParams::new
353+
),
354+
new NamedWriteableRegistry.Entry(Task.Status.class, ReindexDataStreamStatus.NAME, ReindexDataStreamStatus::new)
355+
);
356+
}
357+
358+
@Override
359+
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
360+
ClusterService clusterService,
361+
ThreadPool threadPool,
362+
Client client,
363+
SettingsModule settingsModule,
364+
IndexNameExpressionResolver expressionResolver
365+
) {
366+
return List.of(new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool));
367+
}
305368
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams.action;
11+
12+
import org.elasticsearch.ResourceAlreadyExistsException;
13+
import org.elasticsearch.ResourceNotFoundException;
14+
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.datastreams.ReindexDataStreamAction;
16+
import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamRequest;
17+
import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse;
18+
import org.elasticsearch.action.support.ActionFilters;
19+
import org.elasticsearch.action.support.HandledTransportAction;
20+
import org.elasticsearch.cluster.metadata.DataStream;
21+
import org.elasticsearch.cluster.metadata.Metadata;
22+
import org.elasticsearch.cluster.service.ClusterService;
23+
import org.elasticsearch.datastreams.task.ReindexDataStreamTask;
24+
import org.elasticsearch.datastreams.task.ReindexDataStreamTaskParams;
25+
import org.elasticsearch.injection.guice.Inject;
26+
import org.elasticsearch.persistent.PersistentTasksService;
27+
import org.elasticsearch.tasks.Task;
28+
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.elasticsearch.transport.TransportService;
30+
31+
/*
32+
* This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation
33+
* of the persistent task, it responds with the persistent task id so that the user can monitor the persistent task.
34+
*/
35+
public class ReindexDataStreamTransportAction extends HandledTransportAction<ReindexDataStreamRequest, ReindexDataStreamResponse> {
36+
private final PersistentTasksService persistentTasksService;
37+
private final TransportService transportService;
38+
private final ClusterService clusterService;
39+
40+
@Inject
41+
public ReindexDataStreamTransportAction(
42+
TransportService transportService,
43+
ActionFilters actionFilters,
44+
PersistentTasksService persistentTasksService,
45+
ClusterService clusterService
46+
) {
47+
super(
48+
ReindexDataStreamAction.NAME,
49+
true,
50+
transportService,
51+
actionFilters,
52+
ReindexDataStreamRequest::new,
53+
transportService.getThreadPool().executor(ThreadPool.Names.GENERIC)
54+
);
55+
this.transportService = transportService;
56+
this.persistentTasksService = persistentTasksService;
57+
this.clusterService = clusterService;
58+
}
59+
60+
@Override
61+
protected void doExecute(Task task, ReindexDataStreamRequest request, ActionListener<ReindexDataStreamResponse> listener) {
62+
String sourceDataStreamName = request.getSourceDataStream();
63+
Metadata metadata = clusterService.state().metadata();
64+
DataStream dataStream = metadata.dataStreams().get(sourceDataStreamName);
65+
if (dataStream == null) {
66+
listener.onFailure(new ResourceNotFoundException("Data stream named [{}] does not exist", sourceDataStreamName));
67+
return;
68+
}
69+
int totalIndices = dataStream.getIndices().size();
70+
int totalIndicesToBeUpgraded = (int) dataStream.getIndices()
71+
.stream()
72+
.filter(index -> metadata.index(index).getCreationVersion().isLegacyIndexVersion())
73+
.count();
74+
ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams(
75+
sourceDataStreamName,
76+
transportService.getThreadPool().absoluteTimeInMillis(),
77+
totalIndices,
78+
totalIndicesToBeUpgraded
79+
);
80+
String persistentTaskId = getPersistentTaskId(sourceDataStreamName);
81+
persistentTasksService.sendStartRequest(
82+
persistentTaskId,
83+
ReindexDataStreamTask.TASK_NAME,
84+
params,
85+
null,
86+
ActionListener.wrap(startedTask -> listener.onResponse(new ReindexDataStreamResponse(persistentTaskId)), listener::onFailure)
87+
);
88+
}
89+
90+
private String getPersistentTaskId(String dataStreamName) throws ResourceAlreadyExistsException {
91+
return "reindex-data-stream-" + dataStreamName;
92+
}
93+
}

0 commit comments

Comments
 (0)