Skip to content

Commit 1e069c3

Browse files
authored
Using OriginSettingClient for reindex data streams (#120661)
1 parent dd5e467 commit 1e069c3

File tree

9 files changed

+200
-107
lines changed

9 files changed

+200
-107
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ private static String maybeRewriteSingleAuthenticationHeaderForVersion(
195195
public static final String INFERENCE_ORIGIN = "inference";
196196
public static final String APM_ORIGIN = "apm";
197197
public static final String OTEL_ORIGIN = "otel";
198+
public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream";
198199

199200
private ClientHelper() {}
200201

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,25 @@
77

88
package org.elasticsearch.xpack.core.security.user;
99

10+
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
1011
import org.elasticsearch.action.admin.indices.analyze.TransportReloadAnalyzersAction;
12+
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
13+
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
1114
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
1215
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
1316
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
1417
import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
1518
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1619
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
1720
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
21+
import org.elasticsearch.action.bulk.TransportBulkAction;
22+
import org.elasticsearch.action.datastreams.GetDataStreamAction;
23+
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
1824
import org.elasticsearch.action.downsample.DownsampleAction;
25+
import org.elasticsearch.action.index.TransportIndexAction;
26+
import org.elasticsearch.action.search.TransportSearchAction;
27+
import org.elasticsearch.action.search.TransportSearchScrollAction;
28+
import org.elasticsearch.index.reindex.ReindexAction;
1929
import org.elasticsearch.xpack.core.XPackPlugin;
2030
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
2131
import org.elasticsearch.xpack.core.security.support.MetadataUtils;
@@ -180,6 +190,43 @@ public class InternalUsers {
180190
)
181191
);
182192

193+
public static final InternalUser REINDEX_DATA_STREAM_USER = new InternalUser(
194+
UsernamesField.REINDEX_DATA_STREAM_NAME,
195+
new RoleDescriptor(
196+
UsernamesField.REINDEX_DATA_STREAM_ROLE,
197+
new String[] {},
198+
new RoleDescriptor.IndicesPrivileges[] {
199+
RoleDescriptor.IndicesPrivileges.builder()
200+
.indices("*")
201+
.privileges(
202+
GetDataStreamAction.NAME,
203+
RolloverAction.NAME,
204+
IndicesStatsAction.NAME,
205+
TransportDeleteIndexAction.TYPE.name(),
206+
"indices:admin/data_stream/index/reindex",
207+
"indices:admin/index/create_from_source",
208+
TransportAddIndexBlockAction.TYPE.name(),
209+
TransportCreateIndexAction.TYPE.name(),
210+
TransportClusterSearchShardsAction.TYPE.name(),
211+
TransportUpdateSettingsAction.TYPE.name(),
212+
RefreshAction.NAME,
213+
ReindexAction.NAME,
214+
TransportSearchAction.NAME,
215+
TransportBulkAction.NAME,
216+
TransportIndexAction.NAME,
217+
TransportSearchScrollAction.TYPE.name(),
218+
ModifyDataStreamsAction.NAME
219+
)
220+
.allowRestrictedIndices(false)
221+
.build() },
222+
null,
223+
null,
224+
new String[] {},
225+
MetadataUtils.DEFAULT_RESERVED_METADATA,
226+
Map.of()
227+
)
228+
);
229+
183230
/**
184231
* Internal user that can rollover an index/data stream.
185232
*/
@@ -234,6 +281,7 @@ public class InternalUsers {
234281
ASYNC_SEARCH_USER,
235282
STORAGE_USER,
236283
DATA_STREAM_LIFECYCLE_USER,
284+
REINDEX_DATA_STREAM_USER,
237285
SYNONYMS_USER,
238286
LAZY_ROLLOVER_USER
239287
).collect(Collectors.toUnmodifiableMap(InternalUser::principal, Function.identity()));

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/UsernamesField.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public final class UsernamesField {
4040
public static final String REMOTE_MONITORING_INDEXING_ROLE = "remote_monitoring_agent";
4141
public static final String LAZY_ROLLOVER_NAME = "_lazy_rollover";
4242
public static final String LAZY_ROLLOVER_ROLE = "_lazy_rollover";
43+
public static final String REINDEX_DATA_STREAM_NAME = "_reindex_data_stream";
44+
public static final String REINDEX_DATA_STREAM_ROLE = "_reindex_data_stream";
4345

4446
private UsernamesField() {}
4547
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/user/InternalUsersTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.util.automaton.Operations;
1313
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
1414
import org.elasticsearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction;
15+
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
1516
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
1617
import org.elasticsearch.action.admin.cluster.storedscripts.TransportDeleteStoredScriptAction;
1718
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@@ -26,13 +27,18 @@
2627
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
2728
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
2829
import org.elasticsearch.action.bulk.TransportBulkAction;
30+
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2931
import org.elasticsearch.action.downsample.DownsampleAction;
3032
import org.elasticsearch.action.get.TransportGetAction;
33+
import org.elasticsearch.action.index.TransportIndexAction;
34+
import org.elasticsearch.action.search.TransportSearchAction;
35+
import org.elasticsearch.action.search.TransportSearchScrollAction;
3136
import org.elasticsearch.cluster.metadata.DataStream;
3237
import org.elasticsearch.cluster.metadata.IndexAbstraction;
3338
import org.elasticsearch.cluster.metadata.IndexMetadata;
3439
import org.elasticsearch.common.settings.Settings;
3540
import org.elasticsearch.index.IndexVersion;
41+
import org.elasticsearch.index.reindex.ReindexAction;
3642
import org.elasticsearch.test.ESTestCase;
3743
import org.elasticsearch.transport.TransportRequest;
3844
import org.elasticsearch.xpack.core.XPackPlugin;
@@ -56,6 +62,7 @@
5662
import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.INTERNAL_SECURITY_TOKENS_INDEX_7;
5763
import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.SECURITY_MAIN_ALIAS;
5864
import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.SECURITY_TOKENS_ALIAS;
65+
import static org.elasticsearch.xpack.core.security.user.UsernamesField.REINDEX_DATA_STREAM_NAME;
5966
import static org.hamcrest.Matchers.arrayContaining;
6067
import static org.hamcrest.Matchers.equalTo;
6168
import static org.hamcrest.Matchers.hasSize;
@@ -286,6 +293,48 @@ public void testDataStreamLifecycleUser() {
286293
checkIndexAccess(role, randomFrom(sampleSystemDataStreamActions), randomFrom(TestRestrictedIndices.SAMPLE_RESTRICTED_NAMES), false);
287294
}
288295

296+
public void testReindexDataStreamUser() {
297+
assertThat(InternalUsers.getUser(REINDEX_DATA_STREAM_NAME), is(InternalUsers.REINDEX_DATA_STREAM_USER));
298+
assertThat(
299+
InternalUsers.REINDEX_DATA_STREAM_USER.getLocalClusterRoleDescriptor().get().getMetadata(),
300+
equalTo(MetadataUtils.DEFAULT_RESERVED_METADATA)
301+
);
302+
303+
final SimpleRole role = getLocalClusterRole(InternalUsers.REINDEX_DATA_STREAM_USER);
304+
305+
assertThat(role.cluster(), is(ClusterPermission.NONE));
306+
assertThat(role.runAs(), is(RunAsPermission.NONE));
307+
assertThat(role.application(), is(ApplicationPermission.NONE));
308+
assertThat(role.remoteIndices(), is(RemoteIndicesPermission.NONE));
309+
310+
final List<String> sampleIndexActions = List.of(
311+
TransportDeleteIndexAction.TYPE.name(),
312+
"indices:admin/data_stream/index/reindex",
313+
"indices:admin/index/create_from_source",
314+
TransportAddIndexBlockAction.TYPE.name(),
315+
TransportCreateIndexAction.TYPE.name(),
316+
TransportClusterSearchShardsAction.TYPE.name(),
317+
TransportUpdateSettingsAction.TYPE.name(),
318+
RefreshAction.NAME,
319+
ReindexAction.NAME,
320+
TransportSearchAction.NAME,
321+
TransportBulkAction.NAME,
322+
TransportIndexAction.NAME,
323+
TransportSearchScrollAction.TYPE.name(),
324+
ModifyDataStreamsAction.NAME
325+
);
326+
327+
final String dataStream = randomAlphaOfLengthBetween(3, 12);
328+
checkIndexAccess(role, randomFrom(sampleIndexActions), dataStream, true);
329+
// Also check backing index access
330+
checkIndexAccess(
331+
role,
332+
randomFrom(sampleIndexActions),
333+
DataStream.BACKING_INDEX_PREFIX + dataStream + randomAlphaOfLengthBetween(4, 8),
334+
true
335+
);
336+
}
337+
289338
public void testRegularUser() {
290339
var username = randomAlphaOfLengthBetween(4, 12);
291340
expectThrows(IllegalStateException.class, () -> InternalUsers.getUser(username));

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.ActionRequest;
1111
import org.elasticsearch.action.ActionResponse;
1212
import org.elasticsearch.client.internal.Client;
13+
import org.elasticsearch.client.internal.OriginSettingClient;
1314
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1415
import org.elasticsearch.cluster.node.DiscoveryNodes;
1516
import org.elasticsearch.cluster.service.ClusterService;
@@ -58,6 +59,7 @@
5859
import java.util.function.Predicate;
5960
import java.util.function.Supplier;
6061

62+
import static org.elasticsearch.xpack.core.ClientHelper.REINDEX_DATA_STREAM_ORIGIN;
6163
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;
6264
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING;
6365
import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING;
@@ -150,7 +152,12 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
150152
) {
151153
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
152154
return List.of(
153-
new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool)
155+
new ReindexDataStreamPersistentTaskExecutor(
156+
new OriginSettingClient(client, REINDEX_DATA_STREAM_ORIGIN),
157+
clusterService,
158+
ReindexDataStreamTask.TASK_NAME,
159+
threadPool
160+
)
154161
);
155162
} else {
156163
return List.of();

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ExecuteWithHeadersClient.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,14 @@ protected void nodeOperation(
107107
request.setParentTask(taskId);
108108
assert task instanceof ReindexDataStreamTask;
109109
final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task;
110-
ExecuteWithHeadersClient reindexClient = new ExecuteWithHeadersClient(client, params.headers());
111-
reindexClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
110+
client.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
112111
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = response.getDataStreams();
113112
if (dataStreamInfos.size() == 1) {
114113
DataStream dataStream = dataStreamInfos.getFirst().getDataStream();
115114
if (getReindexRequiredPredicate(clusterService.state().metadata(), false).test(dataStream.getWriteIndex())) {
116115
RolloverRequest rolloverRequest = new RolloverRequest(sourceDataStream, null);
117116
rolloverRequest.setParentTask(taskId);
118-
reindexClient.execute(
117+
client.execute(
119118
RolloverAction.INSTANCE,
120119
rolloverRequest,
121120
ActionListener.wrap(
@@ -125,7 +124,6 @@ protected void nodeOperation(
125124
reindexDataStreamTask,
126125
params,
127126
state,
128-
reindexClient,
129127
sourceDataStream,
130128
taskId
131129
),
@@ -139,7 +137,6 @@ protected void nodeOperation(
139137
reindexDataStreamTask,
140138
params,
141139
state,
142-
reindexClient,
143140
sourceDataStream,
144141
taskId
145142
);
@@ -156,7 +153,6 @@ private void reindexIndices(
156153
ReindexDataStreamTask reindexDataStreamTask,
157154
ReindexDataStreamTaskParams params,
158155
ReindexDataStreamPersistentTaskState state,
159-
ExecuteWithHeadersClient reindexClient,
160156
String sourceDataStream,
161157
TaskId parentTaskId
162158
) {
@@ -190,7 +186,7 @@ private void reindexIndices(
190186
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
191187
logger.debug("Reindexing {} indices, with up to {} handled concurrently", indicesRemaining.size(), maxConcurrentIndices);
192188
for (int i = 0; i < maxConcurrentIndices; i++) {
193-
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
189+
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, sourceDataStream, listener, parentTaskId);
194190
}
195191
// This takes care of the additional latch count referenced above:
196192
listener.onResponse(null);
@@ -199,7 +195,6 @@ private void reindexIndices(
199195
private void maybeProcessNextIndex(
200196
List<Index> indicesRemaining,
201197
ReindexDataStreamTask reindexDataStreamTask,
202-
ExecuteWithHeadersClient reindexClient,
203198
String sourceDataStream,
204199
CountDownActionListener listener,
205200
TaskId parentTaskId
@@ -218,16 +213,16 @@ private void maybeProcessNextIndex(
218213
reindexDataStreamIndexRequest.setParentTask(parentTaskId);
219214

220215
SubscribableListener.<ReindexDataStreamIndexAction.Response>newForked(
221-
l -> reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
216+
l -> client.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
222217
)
223218
.<AcknowledgedResponse>andThen(
224-
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, reindexClient, parentTaskId)
219+
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)
225220
)
226-
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), reindexClient, parentTaskId, l))
221+
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), parentTaskId, l))
227222
.addListener(ActionListener.wrap(unused -> {
228223
reindexDataStreamTask.reindexSucceeded(index.getName());
229224
listener.onResponse(null);
230-
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
225+
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, sourceDataStream, listener, parentTaskId);
231226
}, e -> {
232227
reindexDataStreamTask.reindexFailed(index.getName(), e);
233228
listener.onResponse(null);
@@ -239,7 +234,6 @@ private void updateDataStream(
239234
String oldIndex,
240235
String newIndex,
241236
ActionListener<AcknowledgedResponse> listener,
242-
ExecuteWithHeadersClient reindexClient,
243237
TaskId parentTaskId
244238
) {
245239
ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request(
@@ -248,18 +242,13 @@ private void updateDataStream(
248242
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
249243
);
250244
modifyDataStreamRequest.setParentTask(parentTaskId);
251-
reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
245+
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
252246
}
253247

254-
private void deleteIndex(
255-
String indexName,
256-
ExecuteWithHeadersClient reindexClient,
257-
TaskId parentTaskId,
258-
ActionListener<AcknowledgedResponse> listener
259-
) {
248+
private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
260249
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
261250
deleteIndexRequest.setParentTask(parentTaskId);
262-
reindexClient.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, listener);
251+
client.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, listener);
263252
}
264253

265254
private void completeSuccessfulPersistentTask(

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
4242
import static org.elasticsearch.xpack.core.ClientHelper.OTEL_ORIGIN;
4343
import static org.elasticsearch.xpack.core.ClientHelper.PROFILING_ORIGIN;
44+
import static org.elasticsearch.xpack.core.ClientHelper.REINDEX_DATA_STREAM_ORIGIN;
4445
import static org.elasticsearch.xpack.core.ClientHelper.ROLLUP_ORIGIN;
4546
import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN;
4647
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
@@ -136,6 +137,9 @@ public static void switchUserBasedOnActionOriginAndExecute(
136137
case DATA_STREAM_LIFECYCLE_ORIGIN:
137138
securityContext.executeAsInternalUser(InternalUsers.DATA_STREAM_LIFECYCLE_USER, version, consumer);
138139
break;
140+
case REINDEX_DATA_STREAM_ORIGIN:
141+
securityContext.executeAsInternalUser(InternalUsers.REINDEX_DATA_STREAM_USER, version, consumer);
142+
break;
139143
case LAZY_ROLLOVER_ORIGIN:
140144
securityContext.executeAsInternalUser(InternalUsers.LAZY_ROLLOVER_USER, version, consumer);
141145
break;

0 commit comments

Comments
 (0)