Skip to content

Commit dfcea62

Browse files
Add pipeline to clean docs during data stream reindex (elastic#121617) (elastic#121728)
Add the pipeline "reindex-data-stream-pipeline" to the reindex request within ReindexDataStreamIndexAction. This cleans up documents as needed before inserting into the destination index. Currently, the pipeline only sets a timestamp field with a value of 0, if the document is missing a timestamp field. This is needed because existing indices which are added to a data stream may not contain a timestamp, but reindex validates that a timestamp field exists when creating data stream destination indices. This pipeline is managed by ES, but can be overriden by users if necessary. To do this, the version field of the pipeline should be set to a value higher than the MigrateRegistry version.
1 parent b3d679d commit dfcea62

File tree

8 files changed

+223
-13
lines changed

8 files changed

+223
-13
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ public class InternalUsers {
225225
.build() },
226226
null,
227227
null,
228+
228229
new String[] {},
229230
MetadataUtils.DEFAULT_RESERVED_METADATA,
230231
Map.of()
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"description": "This pipeline sanitizes documents that are being reindexed into a data stream using the reindex data stream API. It is an internal pipeline and should not be modified.",
3+
"processors": [
4+
{
5+
"set": {
6+
"field": "@timestamp",
7+
"value": 0,
8+
"override": false
9+
}
10+
}
11+
],
12+
"_meta": {
13+
"managed": true
14+
},
15+
"version": ${xpack.migrate.reindex.pipeline.version}
16+
}

x-pack/plugin/migrate/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ dependencies {
1919
testImplementation project(xpackModule('ccr'))
2020
testImplementation project(':modules:data-streams')
2121
testImplementation project(path: ':modules:reindex')
22+
testImplementation project(path: ':modules:ingest-common')
2223
}
2324

2425
addQaCheckDependencies(project)

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 134 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,17 @@
2424
import org.elasticsearch.action.bulk.BulkRequest;
2525
import org.elasticsearch.action.bulk.BulkResponse;
2626
import org.elasticsearch.action.index.IndexRequest;
27+
import org.elasticsearch.action.ingest.DeletePipelineRequest;
28+
import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
29+
import org.elasticsearch.action.ingest.PutPipelineRequest;
30+
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
2731
import org.elasticsearch.cluster.block.ClusterBlockException;
2832
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2933
import org.elasticsearch.cluster.metadata.IndexMetadata;
3034
import org.elasticsearch.cluster.metadata.MappingMetadata;
3135
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
3236
import org.elasticsearch.cluster.metadata.Template;
37+
import org.elasticsearch.common.bytes.BytesArray;
3338
import org.elasticsearch.common.compress.CompressedXContent;
3439
import org.elasticsearch.common.settings.Settings;
3540
import org.elasticsearch.common.time.DateFormatter;
@@ -38,12 +43,15 @@
3843
import org.elasticsearch.datastreams.DataStreamsPlugin;
3944
import org.elasticsearch.index.IndexSettings;
4045
import org.elasticsearch.index.mapper.DateFieldMapper;
46+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
4147
import org.elasticsearch.plugins.Plugin;
4248
import org.elasticsearch.reindex.ReindexPlugin;
4349
import org.elasticsearch.test.ESIntegTestCase;
4450
import org.elasticsearch.test.transport.MockTransportService;
4551
import org.elasticsearch.xcontent.XContentType;
4652
import org.elasticsearch.xpack.migrate.MigratePlugin;
53+
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
54+
import org.junit.After;
4755

4856
import java.io.IOException;
4957
import java.time.Instant;
@@ -56,27 +64,144 @@
5664
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
5765
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5866
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
67+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
5968
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
6069
import static org.hamcrest.Matchers.equalTo;
6170

6271
public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
72+
@After
73+
private void cleanupCluster() throws Exception {
74+
clusterAdmin().execute(
75+
DeletePipelineTransportAction.TYPE,
76+
new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME)
77+
);
78+
super.cleanUpCluster();
79+
}
6380

6481
private static final String MAPPING = """
6582
{
6683
"_doc":{
6784
"dynamic":"strict",
6885
"properties":{
69-
"foo1":{
70-
"type":"text"
71-
}
86+
"foo1": {"type":"text"},
87+
"@timestamp": {"type":"date"}
7288
}
7389
}
7490
}
7591
""";
7692

7793
@Override
7894
protected Collection<Class<? extends Plugin>> nodePlugins() {
79-
return List.of(MigratePlugin.class, ReindexPlugin.class, MockTransportService.TestPlugin.class, DataStreamsPlugin.class);
95+
return List.of(
96+
MigratePlugin.class,
97+
ReindexPlugin.class,
98+
MockTransportService.TestPlugin.class,
99+
DataStreamsPlugin.class,
100+
IngestCommonPlugin.class
101+
);
102+
}
103+
104+
private static String DATA_STREAM_MAPPING = """
105+
{
106+
"dynamic": true,
107+
"_data_stream_timestamp": {
108+
"enabled": true
109+
},
110+
"properties": {
111+
"@timestamp": {"type":"date"}
112+
}
113+
}
114+
""";
115+
116+
public void testTimestamp0AddedIfMissing() {
117+
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
118+
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet();
119+
120+
// add doc without timestamp
121+
addDoc(sourceIndex, "{\"foo\":\"baz\"}");
122+
123+
// add timestamp to source mapping
124+
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
125+
126+
// call reindex
127+
var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
128+
.actionGet()
129+
.getDestIndex();
130+
131+
assertResponse(prepareSearch(destIndex), response -> {
132+
Map<String, Object> sourceAsMap = response.getHits().getAt(0).getSourceAsMap();
133+
assertEquals(Integer.valueOf(0), sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD));
134+
});
135+
}
136+
137+
public void testTimestampNotAddedIfExists() {
138+
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
139+
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet();
140+
141+
// add doc with timestamp
142+
String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
143+
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
144+
addDoc(sourceIndex, doc);
145+
146+
// add timestamp to source mapping
147+
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
148+
149+
// call reindex
150+
var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
151+
.actionGet()
152+
.getDestIndex();
153+
154+
assertResponse(prepareSearch(destIndex), response -> {
155+
Map<String, Object> sourceAsMap = response.getHits().getAt(0).getSourceAsMap();
156+
assertEquals(time, sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD));
157+
});
158+
}
159+
160+
public void testCustomReindexPipeline() {
161+
String customPipeline = """
162+
{
163+
"processors": [
164+
{
165+
"set": {
166+
"field": "cheese",
167+
"value": "gorgonzola"
168+
}
169+
}
170+
],
171+
"version": 1000
172+
}
173+
""";
174+
175+
PutPipelineRequest putRequest = new PutPipelineRequest(
176+
TEST_REQUEST_TIMEOUT,
177+
TEST_REQUEST_TIMEOUT,
178+
MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME,
179+
new BytesArray(customPipeline),
180+
XContentType.JSON
181+
);
182+
183+
clusterAdmin().execute(PutPipelineTransportAction.TYPE, putRequest).actionGet();
184+
185+
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
186+
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet();
187+
188+
// add doc with timestamp
189+
String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
190+
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
191+
addDoc(sourceIndex, doc);
192+
193+
// add timestamp to source mapping
194+
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
195+
196+
String destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
197+
.actionGet()
198+
.getDestIndex();
199+
200+
assertResponse(prepareSearch(destIndex), response -> {
201+
Map<String, Object> sourceAsMap = response.getHits().getAt(0).getSourceAsMap();
202+
assertEquals("gorgonzola", sourceAsMap.get("cheese"));
203+
assertEquals(time, sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD));
204+
});
80205
}
81206

82207
public void testDestIndexDeletedIfExists() throws Exception {
@@ -200,7 +325,7 @@ public void testSettingsAddedBeforeReindex() throws Exception {
200325
assertEquals(refreshInterval, settingsResponse.getSetting(destIndex, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey()));
201326
}
202327

203-
public void testMappingsAddedToDestIndex() throws Exception {
328+
public void testMappingsAddedToDestIndex() {
204329
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
205330
indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(MAPPING)).actionGet();
206331

@@ -479,12 +604,9 @@ private static String formatInstant(Instant instant) {
479604
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
480605
}
481606

482-
private static String getIndexUUID(String index) {
483-
return indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(index))
484-
.actionGet()
485-
.getSettings()
486-
.get(index)
487-
.get(IndexMetadata.SETTING_INDEX_UUID);
607+
void addDoc(String index, String doc) {
608+
BulkRequest bulkRequest = new BulkRequest();
609+
bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON));
610+
client().bulk(bulkRequest).actionGet();
488611
}
489-
490612
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams;
5656

5757
import java.util.ArrayList;
58+
import java.util.Collection;
5859
import java.util.List;
5960
import java.util.function.Predicate;
6061
import java.util.function.Supplier;
@@ -64,6 +65,18 @@
6465
import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING;
6566

6667
public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {
68+
@Override
69+
public Collection<?> createComponents(PluginServices services) {
70+
var registry = new MigrateTemplateRegistry(
71+
services.environment().settings(),
72+
services.clusterService(),
73+
services.threadPool(),
74+
services.client(),
75+
services.xContentRegistry()
76+
);
77+
registry.initialize();
78+
return List.of(registry);
79+
}
6780

6881
@Override
6982
public List<RestHandler> getRestHandlers(
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.migrate;
8+
9+
import org.elasticsearch.client.internal.Client;
10+
import org.elasticsearch.cluster.service.ClusterService;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.threadpool.ThreadPool;
13+
import org.elasticsearch.xcontent.NamedXContentRegistry;
14+
import org.elasticsearch.xpack.core.ClientHelper;
15+
import org.elasticsearch.xpack.core.template.IndexTemplateRegistry;
16+
import org.elasticsearch.xpack.core.template.IngestPipelineConfig;
17+
import org.elasticsearch.xpack.core.template.JsonIngestPipelineConfig;
18+
19+
import java.util.List;
20+
21+
public class MigrateTemplateRegistry extends IndexTemplateRegistry {
22+
23+
// This number must be incremented when we make changes to built-in pipeline.
24+
// If a specific user pipeline is needed instead, its version should be set to a value higher than the REGISTRY_VERSION.
25+
static final int REGISTRY_VERSION = 1;
26+
public static final String REINDEX_DATA_STREAM_PIPELINE_NAME = "reindex-data-stream-pipeline";
27+
private static final String TEMPLATE_VERSION_VARIABLE = "xpack.migrate.reindex.pipeline.version";
28+
29+
public MigrateTemplateRegistry(
30+
Settings nodeSettings,
31+
ClusterService clusterService,
32+
ThreadPool threadPool,
33+
Client client,
34+
NamedXContentRegistry xContentRegistry
35+
) {
36+
super(nodeSettings, clusterService, threadPool, client, xContentRegistry);
37+
}
38+
39+
@Override
40+
protected List<IngestPipelineConfig> getIngestPipelines() {
41+
return List.of(
42+
new JsonIngestPipelineConfig(
43+
REINDEX_DATA_STREAM_PIPELINE_NAME,
44+
"/" + REINDEX_DATA_STREAM_PIPELINE_NAME + ".json",
45+
REGISTRY_VERSION,
46+
TEMPLATE_VERSION_VARIABLE
47+
)
48+
);
49+
}
50+
51+
@Override
52+
protected String getOrigin() {
53+
return ClientHelper.STACK_ORIGIN;
54+
}
55+
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.threadpool.ThreadPool;
5454
import org.elasticsearch.transport.TransportService;
5555
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
56+
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
5657

5758
import java.util.Locale;
5859
import java.util.Map;
@@ -271,6 +272,7 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
271272
logger.debug("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName);
272273
var reindexRequest = new ReindexRequest();
273274
reindexRequest.setSourceIndices(sourceIndexName);
275+
reindexRequest.setDestPipeline(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME);
274276
reindexRequest.getSearchRequest().allowPartialSearchResults(false);
275277
reindexRequest.getSearchRequest().source().fetchSource(true);
276278
reindexRequest.setDestIndex(destIndexName);

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public void testUpgradeDataStream() throws Exception {
195195
createDataStreamFromNonDataStreamIndices(dataStreamFromNonDataStreamIndices);
196196
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
197197
upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0);
198-
upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 0, 1);
198+
upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 1, 0);
199199
}
200200
}
201201

0 commit comments

Comments
 (0)