Skip to content

Commit c8178ba

Browse files
committed
Replace adhoc pipeline registration with IndexTemplateRegistry
1 parent 0d444d9 commit c8178ba

File tree

10 files changed

+97
-135
lines changed

10 files changed

+97
-135
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ private static String maybeRewriteSingleAuthenticationHeaderForVersion(
196196
public static final String APM_ORIGIN = "apm";
197197
public static final String OTEL_ORIGIN = "otel";
198198
public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream";
199+
public static final String MIGRATE_ORIGIN = "migrate";
199200

200201
private ClientHelper() {}
201202

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2626
import org.elasticsearch.action.downsample.DownsampleAction;
2727
import org.elasticsearch.action.index.TransportIndexAction;
28-
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
2928
import org.elasticsearch.action.search.TransportSearchAction;
3029
import org.elasticsearch.action.search.TransportSearchScrollAction;
3130
import org.elasticsearch.index.reindex.ReindexAction;
@@ -197,9 +196,7 @@ public class InternalUsers {
197196
UsernamesField.REINDEX_DATA_STREAM_NAME,
198197
new RoleDescriptor(
199198
UsernamesField.REINDEX_DATA_STREAM_ROLE,
200-
new String[] {
201-
PutPipelineTransportAction.TYPE.name()
202-
},
199+
new String[] {},
203200
new RoleDescriptor.IndicesPrivileges[] {
204201
RoleDescriptor.IndicesPrivileges.builder()
205202
.indices("*")

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/user/InternalUsersTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,7 @@ public void testReindexDataStreamUser() {
303303

304304
final SimpleRole role = getLocalClusterRole(InternalUsers.REINDEX_DATA_STREAM_USER);
305305

306-
assertThat(role.cluster().privileges(), hasSize(1));
307-
checkClusterAccess(InternalUsers.REINDEX_DATA_STREAM_USER, role, PutPipelineTransportAction.TYPE.name(), true);
306+
assertThat(role.cluster(), is(ClusterPermission.NONE));
308307
assertThat(role.runAs(), is(RunAsPermission.NONE));
309308
assertThat(role.application(), is(ApplicationPermission.NONE));
310309
assertThat(role.remoteIndices(), is(RemoteIndicesPermission.NONE));
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"description": "This pipeline sanitizes documents that are being reindexed into a data stream using the reindex data stream API. It is an internal pipeline and should not be modified.",
3+
"processors": [
4+
{
5+
"set": {
6+
"field": "@timestamp",
7+
"value": 0,
8+
"override": false
9+
}
10+
}
11+
],
12+
"_meta": {
13+
"managed": true
14+
},
15+
"version": ${xpack.migrate.reindex.pipeline.version}
16+
}

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import org.elasticsearch.test.transport.MockTransportService;
5151
import org.elasticsearch.xcontent.XContentType;
5252
import org.elasticsearch.xpack.migrate.MigratePlugin;
53-
import org.elasticsearch.xpack.migrate.ReindexDataStreamPipeline;
53+
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
5454
import org.junit.After;
5555

5656
import java.io.IOException;
@@ -73,7 +73,7 @@ public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
7373
private void cleanupCluster() throws Exception {
7474
clusterAdmin().execute(
7575
DeletePipelineTransportAction.TYPE,
76-
new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, ReindexDataStreamPipeline.PIPELINE_NAME)
76+
new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME)
7777
);
7878
super.cleanUpCluster();
7979
}
@@ -167,14 +167,15 @@ public void testCustomReindexPipeline() {
167167
"value": "gorgonzola"
168168
}
169169
}
170-
]
170+
],
171+
"version": 1000
171172
}
172173
""";
173174

174175
PutPipelineRequest putRequest = new PutPipelineRequest(
175176
TEST_REQUEST_TIMEOUT,
176177
TEST_REQUEST_TIMEOUT,
177-
ReindexDataStreamPipeline.PIPELINE_NAME,
178+
MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME,
178179
new BytesArray(customPipeline),
179180
XContentType.JSON
180181
);

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams;
5656

5757
import java.util.ArrayList;
58+
import java.util.Collection;
5859
import java.util.List;
5960
import java.util.function.Predicate;
6061
import java.util.function.Supplier;
@@ -64,6 +65,18 @@
6465
import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING;
6566

6667
public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {
68+
@Override
69+
public Collection<?> createComponents(PluginServices services) {
70+
var registry = new MigrateTemplateRegistry(
71+
services.environment().settings(),
72+
services.clusterService(),
73+
services.threadPool(),
74+
services.client(),
75+
services.xContentRegistry()
76+
);
77+
registry.initialize();
78+
return List.of(registry);
79+
}
6780

6881
@Override
6982
public List<RestHandler> getRestHandlers(
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.migrate;
8+
9+
import org.elasticsearch.client.internal.Client;
10+
import org.elasticsearch.cluster.service.ClusterService;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.threadpool.ThreadPool;
13+
import org.elasticsearch.xcontent.NamedXContentRegistry;
14+
import org.elasticsearch.xpack.core.ClientHelper;
15+
import org.elasticsearch.xpack.core.template.IndexTemplateRegistry;
16+
import org.elasticsearch.xpack.core.template.IngestPipelineConfig;
17+
import org.elasticsearch.xpack.core.template.JsonIngestPipelineConfig;
18+
19+
import java.util.List;
20+
21+
public class MigrateTemplateRegistry extends IndexTemplateRegistry {
22+
23+
// This number must be incremented when we make changes to built-in pipeline.
24+
// If a specific user pipeline is needed instead, its version should be set to a value higher than the REGISTRY_VERSION.
25+
static final int REGISTRY_VERSION = 1;
26+
public static final String REINDEX_DATA_STREAM_PIPELINE_NAME = "reindex-data-stream-pipeline";
27+
private static final String TEMPLATE_VERSION_VARIABLE = "xpack.migrate.reindex.pipeline.version";
28+
29+
public MigrateTemplateRegistry(
30+
Settings nodeSettings,
31+
ClusterService clusterService,
32+
ThreadPool threadPool,
33+
Client client,
34+
NamedXContentRegistry xContentRegistry
35+
) {
36+
super(nodeSettings, clusterService, threadPool, client, xContentRegistry);
37+
}
38+
39+
@Override
40+
protected List<IngestPipelineConfig> getIngestPipelines() {
41+
return List.of(
42+
new JsonIngestPipelineConfig(
43+
REINDEX_DATA_STREAM_PIPELINE_NAME,
44+
"/" + REINDEX_DATA_STREAM_PIPELINE_NAME + ".json",
45+
REGISTRY_VERSION,
46+
TEMPLATE_VERSION_VARIABLE
47+
)
48+
);
49+
}
50+
51+
@Override
52+
protected String getOrigin() {
53+
return ClientHelper.MIGRATE_ORIGIN;
54+
}
55+
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/ReindexDataStreamPipeline.java

Lines changed: 0 additions & 113 deletions
This file was deleted.

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
import org.elasticsearch.threadpool.ThreadPool;
5454
import org.elasticsearch.transport.TransportService;
5555
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
56-
import org.elasticsearch.xpack.migrate.ReindexDataStreamPipeline;
56+
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
5757

5858
import java.util.Locale;
5959
import java.util.Map;
@@ -156,8 +156,7 @@ protected void doExecute(
156156
return;
157157
}
158158
final boolean wasClosed = isClosed(sourceIndex);
159-
SubscribableListener.<AcknowledgedResponse>newForked(l -> prepareReindexOperation(l, taskId))
160-
.<AcknowledgedResponse>andThen(l -> setBlockWrites(sourceIndexName, l, taskId))
159+
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
161160
.<OpenIndexResponse>andThen(l -> openIndexIfClosed(sourceIndexName, wasClosed, l, taskId))
162161
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
163162
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
@@ -201,14 +200,6 @@ private static boolean isClosed(IndexMetadata indexMetadata) {
201200
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE);
202201
}
203202

204-
private void prepareReindexOperation(ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
205-
if (ReindexDataStreamPipeline.exists(clusterService.state())) {
206-
listener.onResponse(null);
207-
} else {
208-
ReindexDataStreamPipeline.create(client, listener, parentTaskId);
209-
}
210-
}
211-
212203
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
213204
logger.debug("Setting write block on source index [{}]", sourceIndexName);
214205
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
@@ -281,7 +272,7 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
281272
logger.debug("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName);
282273
var reindexRequest = new ReindexRequest();
283274
reindexRequest.setSourceIndices(sourceIndexName);
284-
reindexRequest.setDestPipeline(ReindexDataStreamPipeline.PIPELINE_NAME);
275+
reindexRequest.setDestPipeline(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME);
285276
reindexRequest.getSearchRequest().allowPartialSearchResults(false);
286277
reindexRequest.getSearchRequest().source().fetchSource(true);
287278
reindexRequest.setDestIndex(destIndexName);

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN;
3838
import static org.elasticsearch.xpack.core.ClientHelper.INFERENCE_ORIGIN;
3939
import static org.elasticsearch.xpack.core.ClientHelper.LOGSTASH_MANAGEMENT_ORIGIN;
40+
import static org.elasticsearch.xpack.core.ClientHelper.MIGRATE_ORIGIN;
4041
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
4142
import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
4243
import static org.elasticsearch.xpack.core.ClientHelper.OTEL_ORIGIN;
@@ -164,6 +165,7 @@ public static void switchUserBasedOnActionOriginAndExecute(
164165
case ENT_SEARCH_ORIGIN:
165166
case CONNECTORS_ORIGIN:
166167
case INFERENCE_ORIGIN:
168+
case MIGRATE_ORIGIN:
167169
case TASKS_ORIGIN: // TODO use a more limited user for tasks
168170
securityContext.executeAsInternalUser(InternalUsers.XPACK_USER, version, consumer);
169171
break;

0 commit comments

Comments
 (0)