Skip to content

Commit 77b9c43

Browse files
authored
Drop project-id from threadcontext for CCS (elastic#136730)
When making a request to a remote cluster, we don't want to include the current project-id ("X-Elastic-Project-Id") header in the thread context that we send to the remote cluster because the current project-id is not relevant to the thread context for the remote execution. Backport of: elastic#136664,elastic#136675
1 parent ee717f9 commit 77b9c43

File tree

5 files changed

+58
-15
lines changed

5 files changed

+58
-15
lines changed

docs/changelog/136664.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 136664
2+
summary: Drop project-id from threadcontext for CCS
3+
area: Authorization
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ public final class ThreadContext implements Writeable, TraceContext {
7979
public static final String PREFIX = "request.headers";
8080
public static final Setting<Settings> DEFAULT_HEADERS_SETTING = Setting.groupSetting(PREFIX + ".", Property.NodeScope);
8181

82+
public enum HeadersFor {
83+
LOCAL_CLUSTER,
84+
REMOTE_CLUSTER
85+
}
86+
8287
/**
8388
* Name for the {@link #stashWithOrigin origin} attribute.
8489
*/
@@ -108,14 +113,14 @@ public ThreadContext(Settings settings) {
108113
* @return a stored context that will restore the current context to its state at the point this method was called
109114
*/
110115
public StoredContext stashContext() {
111-
return stashContextPreservingRequestHeaders(Collections.emptySet());
116+
return stashContextPreservingRequestHeaders(HeadersFor.LOCAL_CLUSTER, Collections.emptySet());
112117
}
113118

114119
/**
115120
* Just like {@link #stashContext()} but also preserves request headers specified via {@code requestHeaders},
116121
* if these exist in the context before stashing.
117122
*/
118-
public StoredContext stashContextPreservingRequestHeaders(Set<String> requestHeaders) {
123+
public StoredContext stashContextPreservingRequestHeaders(HeadersFor headersFor, Set<String> requestHeaders) {
119124
final ThreadContextStruct context = threadLocal.get();
120125

121126
/*
@@ -126,7 +131,7 @@ public StoredContext stashContextPreservingRequestHeaders(Set<String> requestHea
126131
* This is needed so the DeprecationLogger in another thread can see the value of X-Opaque-ID provided by a user.
127132
* The same is applied to Task.TRACE_ID and other values specified in `Task.HEADERS_TO_COPY`.
128133
*/
129-
final Set<String> requestHeadersToCopy = getRequestHeadersToCopy(requestHeaders);
134+
final Set<String> requestHeadersToCopy = getRequestHeadersToCopy(headersFor, requestHeaders);
130135
boolean hasHeadersToCopy = false;
131136
if (context.requestHeaders.isEmpty() == false) {
132137
for (String header : requestHeadersToCopy) {
@@ -157,8 +162,8 @@ public StoredContext stashContextPreservingRequestHeaders(Set<String> requestHea
157162
return storedOriginalContext(context);
158163
}
159164

160-
public StoredContext stashContextPreservingRequestHeaders(final String... requestHeaders) {
161-
return stashContextPreservingRequestHeaders(Set.of(requestHeaders));
165+
public StoredContext stashContextPreservingRequestHeaders(HeadersFor headersFor, final String... requestHeaders) {
166+
return stashContextPreservingRequestHeaders(headersFor, Set.of(requestHeaders));
162167
}
163168

164169
/**
@@ -275,12 +280,16 @@ private StoredContext storedOriginalContext(ThreadContextStruct originalContext)
275280
return () -> threadLocal.set(originalContext);
276281
}
277282

278-
private static Set<String> getRequestHeadersToCopy(Set<String> requestHeaders) {
279-
if (requestHeaders.isEmpty()) {
283+
private static Set<String> getRequestHeadersToCopy(HeadersFor headersFor, Set<String> requestHeaders) {
284+
if (requestHeaders.isEmpty() && headersFor == HeadersFor.LOCAL_CLUSTER) {
280285
return HEADERS_TO_COPY;
281286
}
282-
final Set<String> allRequestHeadersToCopy = new HashSet<>(requestHeaders);
283-
allRequestHeadersToCopy.addAll(HEADERS_TO_COPY);
287+
final Set<String> allRequestHeadersToCopy = new HashSet<>(HEADERS_TO_COPY);
288+
if (headersFor == HeadersFor.REMOTE_CLUSTER) {
289+
// Don't send the current project id to a remote cluster/project
290+
allRequestHeadersToCopy.remove(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER);
291+
}
292+
allRequestHeadersToCopy.addAll(requestHeaders);
284293
return Set.copyOf(allRequestHeadersToCopy);
285294
}
286295

server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,14 @@ public void testStashContextPreservingRequestHeaders() {
8484
assertEquals("bar", threadContext.getHeader("foo"));
8585
assertEquals(Integer.valueOf(1), threadContext.getTransient("ctx.foo"));
8686
assertEquals("1", threadContext.getHeader("default"));
87-
try (ThreadContext.StoredContext ignored = threadContext.stashContextPreservingRequestHeaders("foo", "ctx.foo", "missing")) {
87+
try (
88+
ThreadContext.StoredContext ignored = threadContext.stashContextPreservingRequestHeaders(
89+
randomFrom(ThreadContext.HeadersFor.values()),
90+
"foo",
91+
"ctx.foo",
92+
"missing"
93+
)
94+
) {
8895
assertEquals("bar", threadContext.getHeader("foo"));
8996
// only request headers preserved, not transient
9097
assertNull(threadContext.getTransient("ctx.foo"));
@@ -99,15 +106,28 @@ public void testStashContextPreservingRequestHeaders() {
99106
assertEquals("1", threadContext.getHeader("default"));
100107
}
101108

102-
public void testStashContextPreservingHeadersWithDefaultHeadersToCopy() {
109+
public void testStashContextPreservingHeadersWithDefaultHeadersToCopyInLocalCluster() {
110+
doTestStashContextPreservingHeadersWithDefaultHeadersToCopy(ThreadContext.HeadersFor.LOCAL_CLUSTER);
111+
}
112+
113+
public void testStashContextPreservingHeadersWithDefaultHeadersToCopyForRemoteCluster() {
114+
doTestStashContextPreservingHeadersWithDefaultHeadersToCopy(ThreadContext.HeadersFor.REMOTE_CLUSTER);
115+
}
116+
117+
private static void doTestStashContextPreservingHeadersWithDefaultHeadersToCopy(final ThreadContext.HeadersFor headersFor) {
103118
for (String header : HEADERS_TO_COPY) {
104119
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
105120
threadContext.putHeader(header, "bar");
106-
try (ThreadContext.StoredContext ignored = threadContext.stashContextPreservingRequestHeaders()) {
107-
assertEquals("bar", threadContext.getHeader(header));
121+
try (ThreadContext.StoredContext ignored = threadContext.stashContextPreservingRequestHeaders(headersFor)) {
122+
if (headersFor == ThreadContext.HeadersFor.REMOTE_CLUSTER && header.equals(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER)) {
123+
assertNull(threadContext.getHeader(header));
124+
} else {
125+
assertEquals("bar", threadContext.getHeader(header));
126+
}
108127
}
109128
// Also works if we pass it explicitly
110-
try (ThreadContext.StoredContext ignored = threadContext.stashContextPreservingRequestHeaders(header)) {
129+
try (ThreadContext.StoredContext ignored = threadContext.stashContextPreservingRequestHeaders(headersFor, header)) {
130+
// If we pass it explicitly, then we expect it to be preserved even if it's not normally included in `HeadersFor`
111131
assertEquals("bar", threadContext.getHeader(header));
112132
}
113133
}

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,12 @@ private <T extends TransportResponse> void sendWithCrossClusterAccessHeaders(
426426
) {
427427
final ThreadContext threadContext = securityContext.getThreadContext();
428428
final var contextRestoreHandler = new ContextRestoreResponseHandler<>(threadContext.newRestorableContext(true), handler);
429-
try (ThreadContext.StoredContext ignored = threadContext.stashContextPreservingRequestHeaders(AuditUtil.AUDIT_REQUEST_ID)) {
429+
try (
430+
ThreadContext.StoredContext ignored = threadContext.stashContextPreservingRequestHeaders(
431+
ThreadContext.HeadersFor.REMOTE_CLUSTER,
432+
AuditUtil.AUDIT_REQUEST_ID
433+
)
434+
) {
430435
crossClusterAccessHeaders.writeToContext(threadContext);
431436
sender.sendRequest(connection, action, request, options, contextRestoreHandler);
432437
} catch (Exception e) {

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,9 @@ private void doTestSendWithCrossClusterAccessHeaders(
728728
) throws IOException {
729729
authentication.writeToContext(threadContext);
730730
final String expectedRequestId = AuditUtil.getOrGenerateRequestId(threadContext);
731+
if (randomBoolean()) {
732+
threadContext.putHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER, randomProjectIdOrDefault().id());
733+
}
731734
final String remoteClusterAlias = randomAlphaOfLengthBetween(5, 10);
732735
final String encodedApiKey = randomAlphaOfLengthBetween(10, 42);
733736
final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(encodedApiKey);
@@ -772,6 +775,7 @@ public <T extends TransportResponse> void sendRequest(
772775
}
773776
assertThat(securityContext.getAuthentication(), nullValue());
774777
assertThat(AuditUtil.extractRequestId(securityContext.getThreadContext()), equalTo(expectedRequestId));
778+
assertThat(threadContext.getHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER), nullValue());
775779
sentAction.set(action);
776780
sentCredential.set(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY));
777781
try {

0 commit comments

Comments
 (0)