Skip to content

Commit ca99157

Browse files
authored
Using the credentials of the user who calls reindex data stream (#117938) (#118149)
1 parent d08c26b commit ca99157

File tree

5 files changed

+163
-13
lines changed

5 files changed

+163
-13
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.tasks.Task;
2121
import org.elasticsearch.threadpool.ThreadPool;
2222
import org.elasticsearch.transport.TransportService;
23+
import org.elasticsearch.xpack.core.ClientHelper;
2324
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamRequest;
2425
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse;
2526
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask;
@@ -72,7 +73,8 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList
7273
sourceDataStreamName,
7374
transportService.getThreadPool().absoluteTimeInMillis(),
7475
totalIndices,
75-
totalIndicesToBeUpgraded
76+
totalIndicesToBeUpgraded,
77+
ClientHelper.getPersistableSafeSecurityHeaders(transportService.getThreadPool().getThreadContext(), clusterService.state())
7678
);
7779
String persistentTaskId = getPersistentTaskId(sourceDataStreamName);
7880
persistentTasksService.sendStartRequest(
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.migrate.task;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.ActionRequest;
12+
import org.elasticsearch.action.ActionResponse;
13+
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.client.internal.Client;
15+
import org.elasticsearch.client.internal.support.AbstractClient;
16+
import org.elasticsearch.xpack.core.ClientHelper;
17+
18+
import java.util.Map;
19+
20+
public class ExecuteWithHeadersClient extends AbstractClient {
21+
22+
private final Client client;
23+
private final Map<String, String> headers;
24+
25+
public ExecuteWithHeadersClient(Client client, Map<String, String> headers) {
26+
super(client.settings(), client.threadPool());
27+
this.client = client;
28+
this.headers = headers;
29+
}
30+
31+
@Override
32+
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
33+
ActionType<Response> action,
34+
Request request,
35+
ActionListener<Response> listener
36+
) {
37+
ClientHelper.executeWithHeadersAsync(headers, null, client, action, request, listener);
38+
}
39+
40+
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
6666
GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream });
6767
assert task instanceof ReindexDataStreamTask;
6868
final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task;
69-
client.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
69+
ExecuteWithHeadersClient reindexClient = new ExecuteWithHeadersClient(client, params.headers());
70+
reindexClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
7071
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = response.getDataStreams();
7172
if (dataStreamInfos.size() == 1) {
7273
List<Index> indices = dataStreamInfos.get(0).getDataStream().getIndices();

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParams.java

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,41 +9,65 @@
99

1010
import org.elasticsearch.TransportVersion;
1111
import org.elasticsearch.TransportVersions;
12+
import org.elasticsearch.cluster.metadata.Metadata;
1213
import org.elasticsearch.common.io.stream.StreamInput;
1314
import org.elasticsearch.common.io.stream.StreamOutput;
1415
import org.elasticsearch.persistent.PersistentTaskParams;
1516
import org.elasticsearch.xcontent.ConstructingObjectParser;
17+
import org.elasticsearch.xcontent.ObjectParser;
1618
import org.elasticsearch.xcontent.ParseField;
1719
import org.elasticsearch.xcontent.XContentBuilder;
1820
import org.elasticsearch.xcontent.XContentParser;
1921

2022
import java.io.IOException;
23+
import java.util.Map;
2124

2225
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
2326

24-
public record ReindexDataStreamTaskParams(String sourceDataStream, long startTime, int totalIndices, int totalIndicesToBeUpgraded)
25-
implements
26-
PersistentTaskParams {
27+
public record ReindexDataStreamTaskParams(
28+
String sourceDataStream,
29+
long startTime,
30+
int totalIndices,
31+
int totalIndicesToBeUpgraded,
32+
Map<String, String> headers
33+
) implements PersistentTaskParams {
34+
35+
private static final String API_CONTEXT = Metadata.XContentContext.API.toString();
2736

2837
public static final String NAME = ReindexDataStreamTask.TASK_NAME;
2938
private static final String SOURCE_DATA_STREAM_FIELD = "source_data_stream";
3039
private static final String START_TIME_FIELD = "start_time";
3140
private static final String TOTAL_INDICES_FIELD = "total_indices";
3241
private static final String TOTAL_INDICES_TO_BE_UPGRADED_FIELD = "total_indices_to_be_upgraded";
42+
private static final String HEADERS_FIELD = "headers";
43+
@SuppressWarnings("unchecked")
3344
private static final ConstructingObjectParser<ReindexDataStreamTaskParams, Void> PARSER = new ConstructingObjectParser<>(
3445
NAME,
3546
true,
36-
args -> new ReindexDataStreamTaskParams((String) args[0], (long) args[1], (int) args[2], (int) args[3])
47+
args -> new ReindexDataStreamTaskParams(
48+
(String) args[0],
49+
(long) args[1],
50+
(int) args[2],
51+
(int) args[3],
52+
args[4] == null ? Map.of() : (Map<String, String>) args[4]
53+
)
3754
);
3855
static {
3956
PARSER.declareString(constructorArg(), new ParseField(SOURCE_DATA_STREAM_FIELD));
4057
PARSER.declareLong(constructorArg(), new ParseField(START_TIME_FIELD));
4158
PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_FIELD));
4259
PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_TO_BE_UPGRADED_FIELD));
60+
PARSER.declareField(
61+
ConstructingObjectParser.optionalConstructorArg(),
62+
XContentParser::mapStrings,
63+
new ParseField(HEADERS_FIELD),
64+
ObjectParser.ValueType.OBJECT
65+
);
4366
}
4467

68+
@SuppressWarnings("unchecked")
4569
public ReindexDataStreamTaskParams(StreamInput in) throws IOException {
46-
this(in.readString(), in.readLong(), in.readInt(), in.readInt());
70+
this(in.readString(), in.readLong(), in.readInt(), in.readInt(), (Map<String, String>) in.readGenericValue());
4771
}
4872

4973
@Override
@@ -62,16 +86,22 @@ public void writeTo(StreamOutput out) throws IOException {
6286
out.writeLong(startTime);
6387
out.writeInt(totalIndices);
6488
out.writeInt(totalIndicesToBeUpgraded);
89+
out.writeGenericValue(headers);
6590
}
6691

6792
@Override
6893
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
69-
return builder.startObject()
94+
builder.startObject()
7095
.field(SOURCE_DATA_STREAM_FIELD, sourceDataStream)
7196
.field(START_TIME_FIELD, startTime)
7297
.field(TOTAL_INDICES_FIELD, totalIndices)
73-
.field(TOTAL_INDICES_TO_BE_UPGRADED_FIELD, totalIndicesToBeUpgraded)
74-
.endObject();
98+
.field(TOTAL_INDICES_TO_BE_UPGRADED_FIELD, totalIndicesToBeUpgraded);
99+
if (API_CONTEXT.equals(params.param(Metadata.CONTEXT_MODE_PARAM, API_CONTEXT)) == false) {
100+
// This makes sure that we don't return the headers to an api request, like _cluster/state
101+
builder.stringStringMap(HEADERS_FIELD, headers);
102+
}
103+
builder.endObject();
104+
return builder;
75105
}
76106

77107
public String getSourceDataStream() {
@@ -81,4 +111,8 @@ public String getSourceDataStream() {
81111
public static ReindexDataStreamTaskParams fromXContent(XContentParser parser) {
82112
return PARSER.apply(parser, null);
83113
}
114+
115+
public Map<String, String> getHeaders() {
116+
return headers;
117+
}
84118
}

x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParamsTests.java

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@
77

88
package org.elasticsearch.xpack.migrate.task;
99

10+
import org.elasticsearch.cluster.metadata.Metadata;
1011
import org.elasticsearch.common.bytes.BytesReference;
1112
import org.elasticsearch.common.io.stream.Writeable;
1213
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
14+
import org.elasticsearch.xcontent.ToXContent;
1315
import org.elasticsearch.xcontent.XContentBuilder;
1416
import org.elasticsearch.xcontent.XContentParser;
17+
import org.elasticsearch.xcontent.XContentType;
1518
import org.elasticsearch.xcontent.json.JsonXContent;
1619

1720
import java.io.IOException;
@@ -29,7 +32,26 @@ protected Writeable.Reader<ReindexDataStreamTaskParams> instanceReader() {
2932

3033
@Override
3134
protected ReindexDataStreamTaskParams createTestInstance() {
32-
return new ReindexDataStreamTaskParams(randomAlphaOfLength(50), randomLong(), randomNonNegativeInt(), randomNonNegativeInt());
35+
return createTestInstance(randomBoolean());
36+
}
37+
38+
@Override
39+
protected ReindexDataStreamTaskParams createXContextTestInstance(XContentType xContentType) {
40+
/*
41+
* Since we filter out headers from xcontent in some cases, we can't use them in the standard xcontent round trip testing.
42+
* Headers are covered in testToXContentContextMode
43+
*/
44+
return createTestInstance(false);
45+
}
46+
47+
private ReindexDataStreamTaskParams createTestInstance(boolean withHeaders) {
48+
return new ReindexDataStreamTaskParams(
49+
randomAlphaOfLength(50),
50+
randomLong(),
51+
randomNonNegativeInt(),
52+
randomNonNegativeInt(),
53+
getTestHeaders(withHeaders)
54+
);
3355
}
3456

3557
@Override
@@ -38,21 +60,35 @@ protected ReindexDataStreamTaskParams mutateInstance(ReindexDataStreamTaskParams
3860
long startTime = instance.startTime();
3961
int totalIndices = instance.totalIndices();
4062
int totalIndicesToBeUpgraded = instance.totalIndicesToBeUpgraded();
41-
switch (randomIntBetween(0, 3)) {
63+
Map<String, String> headers = instance.headers();
64+
switch (randomIntBetween(0, 4)) {
4265
case 0 -> sourceDataStream = randomAlphaOfLength(50);
4366
case 1 -> startTime = randomLong();
4467
case 2 -> totalIndices = totalIndices + 1;
4568
case 3 -> totalIndices = totalIndicesToBeUpgraded + 1;
69+
case 4 -> headers = headers.isEmpty() ? getTestHeaders(true) : getTestHeaders();
4670
default -> throw new UnsupportedOperationException();
4771
}
48-
return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded);
72+
return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded, headers);
4973
}
5074

5175
@Override
5276
protected ReindexDataStreamTaskParams doParseInstance(XContentParser parser) {
5377
return ReindexDataStreamTaskParams.fromXContent(parser);
5478
}
5579

80+
private Map<String, String> getTestHeaders() {
81+
return getTestHeaders(randomBoolean());
82+
}
83+
84+
private Map<String, String> getTestHeaders(boolean nonEmpty) {
85+
if (nonEmpty) {
86+
return Map.of(randomAlphaOfLength(20), randomAlphaOfLength(30));
87+
} else {
88+
return Map.of();
89+
}
90+
}
91+
5692
public void testToXContent() throws IOException {
5793
ReindexDataStreamTaskParams params = createTestInstance();
5894
try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) {
@@ -65,4 +101,41 @@ public void testToXContent() throws IOException {
65101
}
66102
}
67103
}
104+
105+
public void testToXContentContextMode() throws IOException {
106+
ReindexDataStreamTaskParams params = createTestInstance(true);
107+
108+
// We do not expect to get headers if the "content_mode" is "api"
109+
try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) {
110+
builder.humanReadable(true);
111+
ToXContent.Params xContentParams = new ToXContent.MapParams(
112+
Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.XContentContext.API.toString())
113+
);
114+
params.toXContent(builder, xContentParams);
115+
try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
116+
Map<String, Object> parserMap = parser.map();
117+
assertThat(parserMap.get("source_data_stream"), equalTo(params.sourceDataStream()));
118+
assertThat(((Number) parserMap.get("start_time")).longValue(), equalTo(params.startTime()));
119+
assertThat(parserMap.containsKey("headers"), equalTo(false));
120+
}
121+
}
122+
123+
// We do expect to get headers if the "content_mode" is anything but "api"
124+
try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) {
125+
builder.humanReadable(true);
126+
ToXContent.Params xContentParams = new ToXContent.MapParams(
127+
Map.of(
128+
Metadata.CONTEXT_MODE_PARAM,
129+
randomFrom(Metadata.XContentContext.GATEWAY.toString(), Metadata.XContentContext.SNAPSHOT.toString())
130+
)
131+
);
132+
params.toXContent(builder, xContentParams);
133+
try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
134+
Map<String, Object> parserMap = parser.map();
135+
assertThat(parserMap.get("source_data_stream"), equalTo(params.sourceDataStream()));
136+
assertThat(((Number) parserMap.get("start_time")).longValue(), equalTo(params.startTime()));
137+
assertThat(parserMap.get("headers"), equalTo(params.getHeaders()));
138+
}
139+
}
140+
}
68141
}

0 commit comments

Comments
 (0)