Skip to content

Commit b3e2466

Browse files
committed
Add data stream reindex pipeline to sanitize docs
1 parent 92d1d31 commit b3e2466

File tree

4 files changed

+259
-13
lines changed

4 files changed

+259
-13
lines changed

x-pack/plugin/migrate/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ dependencies {
1919
testImplementation project(xpackModule('ccr'))
2020
testImplementation project(':modules:data-streams')
2121
testImplementation project(path: ':modules:reindex')
22+
testImplementation project(path: ':modules:ingest-common')
2223
}
2324

2425
addQaCheckDependencies(project)

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 133 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,16 @@
2323
import org.elasticsearch.action.bulk.BulkRequest;
2424
import org.elasticsearch.action.bulk.BulkResponse;
2525
import org.elasticsearch.action.index.IndexRequest;
26+
import org.elasticsearch.action.ingest.DeletePipelineRequest;
27+
import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
28+
import org.elasticsearch.action.ingest.PutPipelineRequest;
29+
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
2630
import org.elasticsearch.cluster.block.ClusterBlockException;
2731
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2832
import org.elasticsearch.cluster.metadata.IndexMetadata;
2933
import org.elasticsearch.cluster.metadata.MappingMetadata;
3034
import org.elasticsearch.cluster.metadata.Template;
35+
import org.elasticsearch.common.bytes.BytesArray;
3136
import org.elasticsearch.common.compress.CompressedXContent;
3237
import org.elasticsearch.common.settings.Settings;
3338
import org.elasticsearch.common.time.DateFormatter;
@@ -36,12 +41,15 @@
3641
import org.elasticsearch.datastreams.DataStreamsPlugin;
3742
import org.elasticsearch.index.IndexSettings;
3843
import org.elasticsearch.index.mapper.DateFieldMapper;
44+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
3945
import org.elasticsearch.plugins.Plugin;
4046
import org.elasticsearch.reindex.ReindexPlugin;
4147
import org.elasticsearch.test.ESIntegTestCase;
4248
import org.elasticsearch.test.transport.MockTransportService;
4349
import org.elasticsearch.xcontent.XContentType;
4450
import org.elasticsearch.xpack.migrate.MigratePlugin;
51+
import org.elasticsearch.xpack.migrate.ReindexDataStreamPipeline;
52+
import org.junit.After;
4553

4654
import java.io.IOException;
4755
import java.time.Instant;
@@ -53,27 +61,143 @@
5361
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
5462
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5563
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
64+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
5665
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
5766
import static org.hamcrest.Matchers.equalTo;
5867

5968
public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
69+
@After
70+
private void cleanupCluster() throws Exception {
71+
clusterAdmin().execute(
72+
DeletePipelineTransportAction.TYPE,
73+
new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, ReindexDataStreamPipeline.PIPELINE_NAME)
74+
);
75+
super.cleanUpCluster();
76+
}
6077

6178
private static final String MAPPING = """
6279
{
6380
"_doc":{
6481
"dynamic":"strict",
6582
"properties":{
66-
"foo1":{
67-
"type":"text"
68-
}
83+
"foo1": {"type":"text"},
84+
"@timestamp": {"type":"date"}
6985
}
7086
}
7187
}
7288
""";
7389

7490
@Override
7591
protected Collection<Class<? extends Plugin>> nodePlugins() {
76-
return List.of(MigratePlugin.class, ReindexPlugin.class, MockTransportService.TestPlugin.class, DataStreamsPlugin.class);
92+
return List.of(
93+
MigratePlugin.class,
94+
ReindexPlugin.class,
95+
MockTransportService.TestPlugin.class,
96+
DataStreamsPlugin.class,
97+
IngestCommonPlugin.class
98+
);
99+
}
100+
101+
private static String DATA_STREAM_MAPPING = """
102+
{
103+
"dynamic": true,
104+
"_data_stream_timestamp": {
105+
"enabled": true
106+
},
107+
"properties": {
108+
"@timestamp": {"type":"date"}
109+
}
110+
}
111+
""";
112+
113+
public void testTimestamp0AddedIfMissing() {
114+
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
115+
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet();
116+
117+
// add doc without timestamp
118+
addDoc(sourceIndex, "{\"foo\":\"baz\"}");
119+
120+
// add timestamp to source mapping
121+
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
122+
123+
// call reindex
124+
var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
125+
.actionGet()
126+
.getDestIndex();
127+
128+
assertResponse(prepareSearch(destIndex), response -> {
129+
Map<String, Object> sourceAsMap = response.getHits().getAt(0).getSourceAsMap();
130+
assertEquals(Integer.valueOf(0), sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD));
131+
});
132+
}
133+
134+
public void testTimestampNotAddedIfExists() {
135+
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
136+
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet();
137+
138+
// add doc with timestamp
139+
String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
140+
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
141+
addDoc(sourceIndex, doc);
142+
143+
// add timestamp to source mapping
144+
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
145+
146+
// call reindex
147+
var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
148+
.actionGet()
149+
.getDestIndex();
150+
151+
assertResponse(prepareSearch(destIndex), response -> {
152+
Map<String, Object> sourceAsMap = response.getHits().getAt(0).getSourceAsMap();
153+
assertEquals(time, sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD));
154+
});
155+
}
156+
157+
public void testCustomReindexPipeline() {
158+
String customPipeline = """
159+
{
160+
"processors": [
161+
{
162+
"set": {
163+
"field": "cheese",
164+
"value": "gorgonzola"
165+
}
166+
}
167+
]
168+
}
169+
""";
170+
171+
PutPipelineRequest putRequest = new PutPipelineRequest(
172+
TEST_REQUEST_TIMEOUT,
173+
TEST_REQUEST_TIMEOUT,
174+
ReindexDataStreamPipeline.PIPELINE_NAME,
175+
new BytesArray(customPipeline),
176+
XContentType.JSON
177+
);
178+
179+
clusterAdmin().execute(PutPipelineTransportAction.TYPE, putRequest).actionGet();
180+
181+
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
182+
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet();
183+
184+
// add doc with timestamp
185+
String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
186+
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
187+
addDoc(sourceIndex, doc);
188+
189+
// add timestamp to source mapping
190+
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
191+
192+
String destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
193+
.actionGet()
194+
.getDestIndex();
195+
196+
assertResponse(prepareSearch(destIndex), response -> {
197+
Map<String, Object> sourceAsMap = response.getHits().getAt(0).getSourceAsMap();
198+
assertEquals("gorgonzola", sourceAsMap.get("cheese"));
199+
assertEquals(time, sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD));
200+
});
77201
}
78202

79203
public void testDestIndexDeletedIfExists() throws Exception {
@@ -190,7 +314,7 @@ public void testSettingsAddedBeforeReindex() throws Exception {
190314
assertEquals(refreshInterval, settingsResponse.getSetting(destIndex, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey()));
191315
}
192316

193-
public void testMappingsAddedToDestIndex() throws Exception {
317+
public void testMappingsAddedToDestIndex() {
194318
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
195319
indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(MAPPING)).actionGet();
196320

@@ -469,12 +593,9 @@ private static String formatInstant(Instant instant) {
469593
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
470594
}
471595

472-
private static String getIndexUUID(String index) {
473-
return indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(index))
474-
.actionGet()
475-
.getSettings()
476-
.get(index)
477-
.get(IndexMetadata.SETTING_INDEX_UUID);
596+
void addDoc(String index, String doc) {
597+
BulkRequest bulkRequest = new BulkRequest();
598+
bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON));
599+
client().bulk(bulkRequest).actionGet();
478600
}
479-
480601
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.migrate;
8+
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.ingest.PutPipelineRequest;
12+
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
13+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14+
import org.elasticsearch.action.support.master.MasterNodeRequest;
15+
import org.elasticsearch.client.internal.Client;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.common.bytes.BytesReference;
18+
import org.elasticsearch.ingest.IngestMetadata;
19+
import org.elasticsearch.ingest.PipelineConfiguration;
20+
import org.elasticsearch.xcontent.XContentBuilder;
21+
import org.elasticsearch.xcontent.XContentType;
22+
23+
import java.io.IOException;
24+
import java.io.UncheckedIOException;
25+
26+
/**
27+
* Manages the definitions and lifecycle of the ingest pipeline used by the reindex data stream operation.
28+
*/
29+
public class ReindexDataStreamPipeline {
30+
31+
/**
32+
* The last version of the distribution that updated the pipeline definition.
33+
*/
34+
static final int LAST_UPDATED_VERSION = Version.V_8_18_0.id;
35+
36+
public static final String PIPELINE_NAME = "reindex-data-stream";
37+
38+
/**
39+
* Checks if the current version of the pipeline definition is installed in the cluster
40+
* @param clusterState The cluster state to check
41+
* @return true if a pipeline exists that is compatible with this version, false otherwise
42+
*/
43+
public static boolean exists(ClusterState clusterState) {
44+
final IngestMetadata ingestMetadata = clusterState.getMetadata().custom(IngestMetadata.TYPE);
45+
// we ensure that we both have the pipeline and its version represents the current (or later) version
46+
if (ingestMetadata != null) {
47+
final PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(PIPELINE_NAME);
48+
if (pipeline != null) {
49+
Object version = pipeline.getConfig().get("version");
50+
// do not replace if pipeline was created by user and has no version
51+
if (version == null) {
52+
return true;
53+
}
54+
return version instanceof Number number && number.intValue() >= LAST_UPDATED_VERSION;
55+
}
56+
}
57+
return false;
58+
}
59+
60+
/**
61+
* Creates a pipeline with the current version's pipeline definition
62+
* @param client Client used to execute put pipeline
63+
* @param listener Callback used after pipeline has been created
64+
*/
65+
public static void create(Client client, ActionListener<AcknowledgedResponse> listener) {
66+
final BytesReference pipeline = BytesReference.bytes(currentPipelineDefinition());
67+
client.execute(
68+
PutPipelineTransportAction.TYPE,
69+
new PutPipelineRequest(
70+
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
71+
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
72+
PIPELINE_NAME,
73+
pipeline,
74+
XContentType.JSON
75+
),
76+
listener
77+
);
78+
}
79+
80+
private static XContentBuilder currentPipelineDefinition() {
81+
try {
82+
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
83+
builder.startObject();
84+
{
85+
builder.field(
86+
"description",
87+
"This pipeline sanitizes documents that are being reindexed into a data stream using the reindex data stream API. "
88+
+ "It is an internal pipeline and should not be modified."
89+
);
90+
builder.field("version", LAST_UPDATED_VERSION);
91+
builder.startArray("processors");
92+
{
93+
builder.startObject();
94+
{
95+
builder.startObject("set");
96+
{
97+
builder.field("field", "@timestamp");
98+
builder.field("value", 0);
99+
builder.field("override", false);
100+
}
101+
builder.endObject();
102+
}
103+
builder.endObject();
104+
}
105+
builder.endArray();
106+
}
107+
builder.endObject();
108+
return builder;
109+
} catch (final IOException e) {
110+
throw new UncheckedIOException("Failed to create pipeline for reindex data stream document sanitization", e);
111+
}
112+
}
113+
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.threadpool.ThreadPool;
5252
import org.elasticsearch.transport.TransportService;
5353
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
54+
import org.elasticsearch.xpack.migrate.ReindexDataStreamPipeline;
5455

5556
import java.util.Locale;
5657
import java.util.Map;
@@ -153,7 +154,8 @@ protected void doExecute(
153154
return;
154155
}
155156
final boolean wasClosed = isClosed(sourceIndex);
156-
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
157+
SubscribableListener.newForked(this::prepareReindexOperation)
158+
.<AcknowledgedResponse>andThen(l -> setBlockWrites(sourceIndexName, l, taskId))
157159
.<OpenIndexResponse>andThen(l -> openIndexIfClosed(sourceIndexName, wasClosed, l, taskId))
158160
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
159161
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
@@ -197,6 +199,14 @@ private static boolean isClosed(IndexMetadata indexMetadata) {
197199
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE);
198200
}
199201

202+
private void prepareReindexOperation(ActionListener<AcknowledgedResponse> listener) {
203+
if (ReindexDataStreamPipeline.exists(clusterService.state())) {
204+
listener.onResponse(null);
205+
} else {
206+
ReindexDataStreamPipeline.create(client, listener);
207+
}
208+
}
209+
200210
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
201211
logger.debug("Setting write block on source index [{}]", sourceIndexName);
202212
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
@@ -269,6 +279,7 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
269279
logger.debug("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName);
270280
var reindexRequest = new ReindexRequest();
271281
reindexRequest.setSourceIndices(sourceIndexName);
282+
reindexRequest.setDestPipeline(ReindexDataStreamPipeline.PIPELINE_NAME);
272283
reindexRequest.getSearchRequest().allowPartialSearchResults(false);
273284
reindexRequest.getSearchRequest().source().fetchSource(true);
274285
reindexRequest.setDestIndex(destIndexName);

0 commit comments

Comments
 (0)