Skip to content

Commit aefea5b

Browse files
committed
review feedback
1 parent 7916c28 commit aefea5b

File tree

4 files changed

+22
-16
lines changed

4 files changed

+22
-16
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2626
import org.elasticsearch.action.downsample.DownsampleAction;
2727
import org.elasticsearch.action.index.TransportIndexAction;
28+
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
2829
import org.elasticsearch.action.search.TransportSearchAction;
2930
import org.elasticsearch.action.search.TransportSearchScrollAction;
3031
import org.elasticsearch.index.reindex.ReindexAction;
@@ -196,7 +197,9 @@ public class InternalUsers {
196197
UsernamesField.REINDEX_DATA_STREAM_NAME,
197198
new RoleDescriptor(
198199
UsernamesField.REINDEX_DATA_STREAM_ROLE,
199-
new String[] {},
200+
new String[] {
201+
PutPipelineTransportAction.TYPE.name()
202+
},
200203
new RoleDescriptor.IndicesPrivileges[] {
201204
RoleDescriptor.IndicesPrivileges.builder()
202205
.indices("*")
@@ -225,6 +228,7 @@ public class InternalUsers {
225228
.build() },
226229
null,
227230
null,
231+
228232
new String[] {},
229233
MetadataUtils.DEFAULT_RESERVED_METADATA,
230234
Map.of()

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/user/InternalUsersTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.action.downsample.DownsampleAction;
3232
import org.elasticsearch.action.get.TransportGetAction;
3333
import org.elasticsearch.action.index.TransportIndexAction;
34+
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
3435
import org.elasticsearch.action.search.TransportSearchAction;
3536
import org.elasticsearch.action.search.TransportSearchScrollAction;
3637
import org.elasticsearch.cluster.metadata.DataStream;
@@ -302,7 +303,8 @@ public void testReindexDataStreamUser() {
302303

303304
final SimpleRole role = getLocalClusterRole(InternalUsers.REINDEX_DATA_STREAM_USER);
304305

305-
assertThat(role.cluster(), is(ClusterPermission.NONE));
306+
assertThat(role.cluster().privileges(), hasSize(1));
307+
checkClusterAccess(InternalUsers.REINDEX_DATA_STREAM_USER, role, PutPipelineTransportAction.TYPE.name(), true);
306308
assertThat(role.runAs(), is(RunAsPermission.NONE));
307309
assertThat(role.application(), is(ApplicationPermission.NONE));
308310
assertThat(role.remoteIndices(), is(RemoteIndicesPermission.NONE));

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/ReindexDataStreamPipeline.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.common.bytes.BytesReference;
1818
import org.elasticsearch.ingest.IngestMetadata;
1919
import org.elasticsearch.ingest.PipelineConfiguration;
20+
import org.elasticsearch.tasks.TaskId;
2021
import org.elasticsearch.xcontent.XContentBuilder;
2122
import org.elasticsearch.xcontent.XContentType;
2223

@@ -61,20 +62,19 @@ public static boolean exists(ClusterState clusterState) {
6162
* Creates a pipeline with the current version's pipeline definition
6263
* @param client Client used to execute put pipeline
6364
* @param listener Callback used after pipeline has been created
65+
* @param parentTaskId parent task id so that request can be cancelled
6466
*/
65-
public static void create(Client client, ActionListener<AcknowledgedResponse> listener) {
67+
public static void create(Client client, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
6668
final BytesReference pipeline = BytesReference.bytes(currentPipelineDefinition());
67-
client.execute(
68-
PutPipelineTransportAction.TYPE,
69-
new PutPipelineRequest(
70-
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
71-
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
72-
PIPELINE_NAME,
73-
pipeline,
74-
XContentType.JSON
75-
),
76-
listener
69+
var putPipelineRequest = new PutPipelineRequest(
70+
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
71+
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
72+
PIPELINE_NAME,
73+
pipeline,
74+
XContentType.JSON
7775
);
76+
putPipelineRequest.setParentTask(parentTaskId);
77+
client.execute(PutPipelineTransportAction.TYPE, putPipelineRequest, listener);
7878
}
7979

8080
private static XContentBuilder currentPipelineDefinition() {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ protected void doExecute(
154154
return;
155155
}
156156
final boolean wasClosed = isClosed(sourceIndex);
157-
SubscribableListener.newForked(this::prepareReindexOperation)
157+
SubscribableListener.<AcknowledgedResponse>newForked(l -> prepareReindexOperation(l, taskId))
158158
.<AcknowledgedResponse>andThen(l -> setBlockWrites(sourceIndexName, l, taskId))
159159
.<OpenIndexResponse>andThen(l -> openIndexIfClosed(sourceIndexName, wasClosed, l, taskId))
160160
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
@@ -199,11 +199,11 @@ private static boolean isClosed(IndexMetadata indexMetadata) {
199199
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE);
200200
}
201201

202-
private void prepareReindexOperation(ActionListener<AcknowledgedResponse> listener) {
202+
private void prepareReindexOperation(ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
203203
if (ReindexDataStreamPipeline.exists(clusterService.state())) {
204204
listener.onResponse(null);
205205
} else {
206-
ReindexDataStreamPipeline.create(client, listener);
206+
ReindexDataStreamPipeline.create(client, listener, parentTaskId);
207207
}
208208
}
209209

0 commit comments

Comments
 (0)