Skip to content

Commit fc1182b

Browse files
authored
Using OriginSettingClient for reindex data streams (elastic#120661) (elastic#120713)
1 parent 7b2afee commit fc1182b

File tree

9 files changed

+200
-107
lines changed

9 files changed

+200
-107
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ private static String maybeRewriteSingleAuthenticationHeaderForVersion(
195195
public static final String INFERENCE_ORIGIN = "inference";
196196
public static final String APM_ORIGIN = "apm";
197197
public static final String OTEL_ORIGIN = "otel";
198+
public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream";
198199

199200
private ClientHelper() {}
200201

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,25 @@
77

88
package org.elasticsearch.xpack.core.security.user;
99

10+
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
1011
import org.elasticsearch.action.admin.indices.analyze.TransportReloadAnalyzersAction;
12+
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
13+
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
1114
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
1215
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
1316
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
1417
import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
1518
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1619
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
1720
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
21+
import org.elasticsearch.action.bulk.TransportBulkAction;
22+
import org.elasticsearch.action.datastreams.GetDataStreamAction;
23+
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
1824
import org.elasticsearch.action.downsample.DownsampleAction;
25+
import org.elasticsearch.action.index.TransportIndexAction;
26+
import org.elasticsearch.action.search.TransportSearchAction;
27+
import org.elasticsearch.action.search.TransportSearchScrollAction;
28+
import org.elasticsearch.index.reindex.ReindexAction;
1929
import org.elasticsearch.xpack.core.XPackPlugin;
2030
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
2131
import org.elasticsearch.xpack.core.security.support.MetadataUtils;
@@ -180,6 +190,43 @@ public class InternalUsers {
180190
)
181191
);
182192

193+
public static final InternalUser REINDEX_DATA_STREAM_USER = new InternalUser(
194+
UsernamesField.REINDEX_DATA_STREAM_NAME,
195+
new RoleDescriptor(
196+
UsernamesField.REINDEX_DATA_STREAM_ROLE,
197+
new String[] {},
198+
new RoleDescriptor.IndicesPrivileges[] {
199+
RoleDescriptor.IndicesPrivileges.builder()
200+
.indices("*")
201+
.privileges(
202+
GetDataStreamAction.NAME,
203+
RolloverAction.NAME,
204+
IndicesStatsAction.NAME,
205+
TransportDeleteIndexAction.TYPE.name(),
206+
"indices:admin/data_stream/index/reindex",
207+
"indices:admin/index/create_from_source",
208+
TransportAddIndexBlockAction.TYPE.name(),
209+
TransportCreateIndexAction.TYPE.name(),
210+
TransportClusterSearchShardsAction.TYPE.name(),
211+
TransportUpdateSettingsAction.TYPE.name(),
212+
RefreshAction.NAME,
213+
ReindexAction.NAME,
214+
TransportSearchAction.NAME,
215+
TransportBulkAction.NAME,
216+
TransportIndexAction.NAME,
217+
TransportSearchScrollAction.TYPE.name(),
218+
ModifyDataStreamsAction.NAME
219+
)
220+
.allowRestrictedIndices(false)
221+
.build() },
222+
null,
223+
null,
224+
new String[] {},
225+
MetadataUtils.DEFAULT_RESERVED_METADATA,
226+
Map.of()
227+
)
228+
);
229+
183230
/**
184231
* Internal user that can rollover an index/data stream.
185232
*/
@@ -234,6 +281,7 @@ public class InternalUsers {
234281
ASYNC_SEARCH_USER,
235282
STORAGE_USER,
236283
DATA_STREAM_LIFECYCLE_USER,
284+
REINDEX_DATA_STREAM_USER,
237285
SYNONYMS_USER,
238286
LAZY_ROLLOVER_USER
239287
).collect(Collectors.toUnmodifiableMap(InternalUser::principal, Function.identity()));

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/UsernamesField.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public final class UsernamesField {
4040
public static final String REMOTE_MONITORING_INDEXING_ROLE = "remote_monitoring_agent";
4141
public static final String LAZY_ROLLOVER_NAME = "_lazy_rollover";
4242
public static final String LAZY_ROLLOVER_ROLE = "_lazy_rollover";
43+
public static final String REINDEX_DATA_STREAM_NAME = "_reindex_data_stream";
44+
public static final String REINDEX_DATA_STREAM_ROLE = "_reindex_data_stream";
4345

4446
private UsernamesField() {}
4547
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/user/InternalUsersTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.util.automaton.Operations;
1313
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
1414
import org.elasticsearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction;
15+
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
1516
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
1617
import org.elasticsearch.action.admin.cluster.storedscripts.TransportDeleteStoredScriptAction;
1718
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@@ -26,13 +27,18 @@
2627
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
2728
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
2829
import org.elasticsearch.action.bulk.TransportBulkAction;
30+
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2931
import org.elasticsearch.action.downsample.DownsampleAction;
3032
import org.elasticsearch.action.get.TransportGetAction;
33+
import org.elasticsearch.action.index.TransportIndexAction;
34+
import org.elasticsearch.action.search.TransportSearchAction;
35+
import org.elasticsearch.action.search.TransportSearchScrollAction;
3136
import org.elasticsearch.cluster.metadata.DataStream;
3237
import org.elasticsearch.cluster.metadata.IndexAbstraction;
3338
import org.elasticsearch.cluster.metadata.IndexMetadata;
3439
import org.elasticsearch.common.settings.Settings;
3540
import org.elasticsearch.index.IndexVersion;
41+
import org.elasticsearch.index.reindex.ReindexAction;
3642
import org.elasticsearch.test.ESTestCase;
3743
import org.elasticsearch.transport.TransportRequest;
3844
import org.elasticsearch.xpack.core.XPackPlugin;
@@ -56,6 +62,7 @@
5662
import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.INTERNAL_SECURITY_TOKENS_INDEX_7;
5763
import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.SECURITY_MAIN_ALIAS;
5864
import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.SECURITY_TOKENS_ALIAS;
65+
import static org.elasticsearch.xpack.core.security.user.UsernamesField.REINDEX_DATA_STREAM_NAME;
5966
import static org.hamcrest.Matchers.arrayContaining;
6067
import static org.hamcrest.Matchers.equalTo;
6168
import static org.hamcrest.Matchers.hasSize;
@@ -286,6 +293,48 @@ public void testDataStreamLifecycleUser() {
286293
checkIndexAccess(role, randomFrom(sampleSystemDataStreamActions), randomFrom(TestRestrictedIndices.SAMPLE_RESTRICTED_NAMES), false);
287294
}
288295

296+
public void testReindexDataStreamUser() {
297+
assertThat(InternalUsers.getUser(REINDEX_DATA_STREAM_NAME), is(InternalUsers.REINDEX_DATA_STREAM_USER));
298+
assertThat(
299+
InternalUsers.REINDEX_DATA_STREAM_USER.getLocalClusterRoleDescriptor().get().getMetadata(),
300+
equalTo(MetadataUtils.DEFAULT_RESERVED_METADATA)
301+
);
302+
303+
final SimpleRole role = getLocalClusterRole(InternalUsers.REINDEX_DATA_STREAM_USER);
304+
305+
assertThat(role.cluster(), is(ClusterPermission.NONE));
306+
assertThat(role.runAs(), is(RunAsPermission.NONE));
307+
assertThat(role.application(), is(ApplicationPermission.NONE));
308+
assertThat(role.remoteIndices(), is(RemoteIndicesPermission.NONE));
309+
310+
final List<String> sampleIndexActions = List.of(
311+
TransportDeleteIndexAction.TYPE.name(),
312+
"indices:admin/data_stream/index/reindex",
313+
"indices:admin/index/create_from_source",
314+
TransportAddIndexBlockAction.TYPE.name(),
315+
TransportCreateIndexAction.TYPE.name(),
316+
TransportClusterSearchShardsAction.TYPE.name(),
317+
TransportUpdateSettingsAction.TYPE.name(),
318+
RefreshAction.NAME,
319+
ReindexAction.NAME,
320+
TransportSearchAction.NAME,
321+
TransportBulkAction.NAME,
322+
TransportIndexAction.NAME,
323+
TransportSearchScrollAction.TYPE.name(),
324+
ModifyDataStreamsAction.NAME
325+
);
326+
327+
final String dataStream = randomAlphaOfLengthBetween(3, 12);
328+
checkIndexAccess(role, randomFrom(sampleIndexActions), dataStream, true);
329+
// Also check backing index access
330+
checkIndexAccess(
331+
role,
332+
randomFrom(sampleIndexActions),
333+
DataStream.BACKING_INDEX_PREFIX + dataStream + randomAlphaOfLengthBetween(4, 8),
334+
true
335+
);
336+
}
337+
289338
public void testRegularUser() {
290339
var username = randomAlphaOfLengthBetween(4, 12);
291340
expectThrows(IllegalStateException.class, () -> InternalUsers.getUser(username));

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.ActionRequest;
1111
import org.elasticsearch.action.ActionResponse;
1212
import org.elasticsearch.client.internal.Client;
13+
import org.elasticsearch.client.internal.OriginSettingClient;
1314
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1415
import org.elasticsearch.cluster.node.DiscoveryNodes;
1516
import org.elasticsearch.cluster.service.ClusterService;
@@ -58,6 +59,7 @@
5859
import java.util.function.Predicate;
5960
import java.util.function.Supplier;
6061

62+
import static org.elasticsearch.xpack.core.ClientHelper.REINDEX_DATA_STREAM_ORIGIN;
6163
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;
6264
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING;
6365
import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING;
@@ -150,7 +152,12 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
150152
) {
151153
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
152154
return List.of(
153-
new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool)
155+
new ReindexDataStreamPersistentTaskExecutor(
156+
new OriginSettingClient(client, REINDEX_DATA_STREAM_ORIGIN),
157+
clusterService,
158+
ReindexDataStreamTask.TASK_NAME,
159+
threadPool
160+
)
154161
);
155162
} else {
156163
return List.of();

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ExecuteWithHeadersClient.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,14 @@ protected void nodeOperation(
106106
request.setParentTask(taskId);
107107
assert task instanceof ReindexDataStreamTask;
108108
final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task;
109-
ExecuteWithHeadersClient reindexClient = new ExecuteWithHeadersClient(client, params.headers());
110-
reindexClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
109+
client.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
111110
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = response.getDataStreams();
112111
if (dataStreamInfos.size() == 1) {
113112
DataStream dataStream = dataStreamInfos.get(0).getDataStream();
114113
if (getReindexRequiredPredicate(clusterService.state().metadata()).test(dataStream.getWriteIndex())) {
115114
RolloverRequest rolloverRequest = new RolloverRequest(sourceDataStream, null);
116115
rolloverRequest.setParentTask(taskId);
117-
reindexClient.execute(
116+
client.execute(
118117
RolloverAction.INSTANCE,
119118
rolloverRequest,
120119
ActionListener.wrap(
@@ -124,7 +123,6 @@ protected void nodeOperation(
124123
reindexDataStreamTask,
125124
params,
126125
state,
127-
reindexClient,
128126
sourceDataStream,
129127
taskId
130128
),
@@ -138,7 +136,6 @@ protected void nodeOperation(
138136
reindexDataStreamTask,
139137
params,
140138
state,
141-
reindexClient,
142139
sourceDataStream,
143140
taskId
144141
);
@@ -155,7 +152,6 @@ private void reindexIndices(
155152
ReindexDataStreamTask reindexDataStreamTask,
156153
ReindexDataStreamTaskParams params,
157154
ReindexDataStreamPersistentTaskState state,
158-
ExecuteWithHeadersClient reindexClient,
159155
String sourceDataStream,
160156
TaskId parentTaskId
161157
) {
@@ -187,7 +183,7 @@ private void reindexIndices(
187183
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
188184
logger.debug("Reindexing {} indices, with up to {} handled concurrently", indicesRemaining.size(), maxConcurrentIndices);
189185
for (int i = 0; i < maxConcurrentIndices; i++) {
190-
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
186+
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, sourceDataStream, listener, parentTaskId);
191187
}
192188
// This takes care of the additional latch count referenced above:
193189
listener.onResponse(null);
@@ -196,7 +192,6 @@ private void reindexIndices(
196192
private void maybeProcessNextIndex(
197193
List<Index> indicesRemaining,
198194
ReindexDataStreamTask reindexDataStreamTask,
199-
ExecuteWithHeadersClient reindexClient,
200195
String sourceDataStream,
201196
CountDownActionListener listener,
202197
TaskId parentTaskId
@@ -215,16 +210,16 @@ private void maybeProcessNextIndex(
215210
reindexDataStreamIndexRequest.setParentTask(parentTaskId);
216211

217212
SubscribableListener.<ReindexDataStreamIndexAction.Response>newForked(
218-
l -> reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
213+
l -> client.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
219214
)
220215
.<AcknowledgedResponse>andThen(
221-
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, reindexClient, parentTaskId)
216+
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)
222217
)
223-
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), reindexClient, parentTaskId, l))
218+
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), parentTaskId, l))
224219
.addListener(ActionListener.wrap(unused -> {
225220
reindexDataStreamTask.reindexSucceeded(index.getName());
226221
listener.onResponse(null);
227-
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
222+
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, sourceDataStream, listener, parentTaskId);
228223
}, e -> {
229224
reindexDataStreamTask.reindexFailed(index.getName(), e);
230225
listener.onResponse(null);
@@ -236,7 +231,6 @@ private void updateDataStream(
236231
String oldIndex,
237232
String newIndex,
238233
ActionListener<AcknowledgedResponse> listener,
239-
ExecuteWithHeadersClient reindexClient,
240234
TaskId parentTaskId
241235
) {
242236
ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request(
@@ -245,18 +239,13 @@ private void updateDataStream(
245239
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
246240
);
247241
modifyDataStreamRequest.setParentTask(parentTaskId);
248-
reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
242+
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
249243
}
250244

251-
private void deleteIndex(
252-
String indexName,
253-
ExecuteWithHeadersClient reindexClient,
254-
TaskId parentTaskId,
255-
ActionListener<AcknowledgedResponse> listener
256-
) {
245+
private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
257246
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
258247
deleteIndexRequest.setParentTask(parentTaskId);
259-
reindexClient.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, listener);
248+
client.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, listener);
260249
}
261250

262251
private void completeSuccessfulPersistentTask(

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
4242
import static org.elasticsearch.xpack.core.ClientHelper.OTEL_ORIGIN;
4343
import static org.elasticsearch.xpack.core.ClientHelper.PROFILING_ORIGIN;
44+
import static org.elasticsearch.xpack.core.ClientHelper.REINDEX_DATA_STREAM_ORIGIN;
4445
import static org.elasticsearch.xpack.core.ClientHelper.ROLLUP_ORIGIN;
4546
import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN;
4647
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
@@ -136,6 +137,9 @@ public static void switchUserBasedOnActionOriginAndExecute(
136137
case DATA_STREAM_LIFECYCLE_ORIGIN:
137138
securityContext.executeAsInternalUser(InternalUsers.DATA_STREAM_LIFECYCLE_USER, version, consumer);
138139
break;
140+
case REINDEX_DATA_STREAM_ORIGIN:
141+
securityContext.executeAsInternalUser(InternalUsers.REINDEX_DATA_STREAM_USER, version, consumer);
142+
break;
139143
case LAZY_ROLLOVER_ORIGIN:
140144
securityContext.executeAsInternalUser(InternalUsers.LAZY_ROLLOVER_USER, version, consumer);
141145
break;

0 commit comments

Comments
 (0)