Skip to content

Commit 2b91dd5

Browse files
Refactor remote cluster interceptor and tests (#135747)
Initially, the `RemoteClusterTransportInterceptor#getProfileTransportFilters` required remote-cluster security extensions to provide transport filters for all transport profiles. The method was too generic and not specific to only `_remote_cluster` transport profile. This meant that RCS extensions were free to decide which filter they wanted to "override" with its custom transport filter implementation. This turned out to be unnecessary, because RCS extensions only ever need to provide a custom implementation for the remote cluster profile. This refactoring removes the need to provide the "default" `ServerTransportFilter` in order for security to work. Followups to: - #134785 (comment) - #134785 (comment) - #134245 (comment) --- - Converted `ServerTransportFilter` to interface with a default implementation. - Refactored `RemoteClusterTransportInterceptor` to allow optionally providing only a custom remote cluster transport filter. It's no longer required from RCS extensions to provide the default `ServerTransportFilter` implementation. - Split transport interceptor and filter tests: - `ServerTransportFilterTests` became abstract and got split into two tests: `CrossClusterAccessServerTransportFilterTests` and `DefaultServerTransportFilterTests` - Cross-cluster access tests got extracted from `SecurityServerTransportInterceptorTests` into its own `CrossClusterAccessTransportInterceptorTests`class
1 parent 40f90fb commit 2b91dd5

12 files changed

+1347
-1020
lines changed

x-pack/plugin/security/qa/rcs-extension/src/main/java/org/elasticsearch/xpack/security/rcs/extension/TestRemoteClusterSecurityExtension.java

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,20 @@
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.support.DestructiveOperations;
1212
import org.elasticsearch.common.settings.Setting;
13-
import org.elasticsearch.common.settings.Settings;
14-
import org.elasticsearch.common.ssl.SslConfiguration;
15-
import org.elasticsearch.common.util.Maps;
1613
import org.elasticsearch.transport.Transport;
1714
import org.elasticsearch.transport.TransportInterceptor;
1815
import org.elasticsearch.transport.TransportRequest;
19-
import org.elasticsearch.xpack.core.XPackSettings;
2016
import org.elasticsearch.xpack.core.security.SecurityContext;
2117
import org.elasticsearch.xpack.core.security.authc.Authentication;
22-
import org.elasticsearch.xpack.core.ssl.SSLService;
2318
import org.elasticsearch.xpack.core.ssl.SslProfile;
2419
import org.elasticsearch.xpack.security.authc.RemoteClusterAuthenticationService;
2520
import org.elasticsearch.xpack.security.transport.RemoteClusterTransportInterceptor;
2621
import org.elasticsearch.xpack.security.transport.ServerTransportFilter;
2722
import org.elasticsearch.xpack.security.transport.extension.RemoteClusterSecurityExtension;
2823

29-
import java.util.Collections;
3024
import java.util.List;
3125
import java.util.Map;
26+
import java.util.Optional;
3227

3328
public class TestRemoteClusterSecurityExtension implements RemoteClusterSecurityExtension {
3429

@@ -58,40 +53,16 @@ public boolean isRemoteClusterConnection(Transport.Connection connection) {
5853
return false;
5954
}
6055

61-
public boolean hasRemoteClusterAccessHeadersInContext(SecurityContext securityContext) {
62-
return false;
63-
}
64-
6556
@Override
66-
public Map<String, ServerTransportFilter> getProfileTransportFilters(
67-
Map<String, SslProfile> profileConfigurations,
57+
public Optional<ServerTransportFilter> getRemoteProfileTransportFilter(
58+
SslProfile sslProfile,
6859
DestructiveOperations destructiveOperations
6960
) {
70-
Map<String, ServerTransportFilter> profileFilters = Maps.newMapWithExpectedSize(profileConfigurations.size() + 1);
71-
Settings settings = components.settings();
72-
final boolean transportSSLEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings);
73-
74-
for (Map.Entry<String, SslProfile> entry : profileConfigurations.entrySet()) {
75-
final String profileName = entry.getKey();
76-
final SslProfile sslProfile = entry.getValue();
77-
final SslConfiguration profileConfiguration = sslProfile.configuration();
78-
profileFilters.put(
79-
profileName,
80-
new ServerTransportFilter(
81-
components.authenticationService(),
82-
components.authorizationService(),
83-
components.threadPool().getThreadContext(),
84-
transportSSLEnabled && SSLService.isSSLClientAuthEnabled(profileConfiguration),
85-
destructiveOperations,
86-
components.securityContext()
87-
)
88-
);
89-
}
90-
// We need to register here the default security
91-
// server transport filter which ensures that all
92-
// incoming transport requests are properly
93-
// authenticated and authorized.
94-
return Collections.unmodifiableMap(profileFilters);
61+
return Optional.empty();
62+
}
63+
64+
public boolean hasRemoteClusterAccessHeadersInContext(SecurityContext securityContext) {
65+
return false;
9566
}
9667

9768
};

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,6 +1207,8 @@ Collection<Object> createComponents(
12071207
new SecurityServerTransportInterceptor(
12081208
settings,
12091209
threadPool,
1210+
authcService.get(),
1211+
authzService,
12101212
getSslService(),
12111213
securityContext.get(),
12121214
destructiveOperations,

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/CrossClusterAccessTransportInterceptor.java

Lines changed: 17 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.common.settings.SecureString;
1616
import org.elasticsearch.common.settings.Settings;
1717
import org.elasticsearch.common.ssl.SslConfiguration;
18-
import org.elasticsearch.common.util.Maps;
1918
import org.elasticsearch.common.util.concurrent.ThreadContext;
2019
import org.elasticsearch.license.LicenseUtils;
2120
import org.elasticsearch.license.XPackLicenseState;
@@ -49,13 +48,11 @@
4948
import org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders;
5049
import org.elasticsearch.xpack.security.authz.AuthorizationService;
5150

52-
import java.util.Collections;
5351
import java.util.Map;
5452
import java.util.Optional;
5553
import java.util.function.Function;
5654

5755
import static org.elasticsearch.core.Strings.format;
58-
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
5956
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED;
6057
import static org.elasticsearch.transport.RemoteClusterPortSettings.TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY;
6158

@@ -83,7 +80,6 @@ public class CrossClusterAccessTransportInterceptor implements RemoteClusterTran
8380
private final Function<Transport.Connection, Optional<RemoteClusterAliasWithCredentials>> remoteClusterCredentialsResolver;
8481
private final CrossClusterAccessAuthenticationService crossClusterAccessAuthcService;
8582
private final CrossClusterApiKeySignatureManager crossClusterApiKeySignatureManager;
86-
private final AuthenticationService authcService;
8783
private final AuthorizationService authzService;
8884
private final XPackLicenseState licenseState;
8985
private final SecurityContext securityContext;
@@ -128,7 +124,6 @@ public CrossClusterAccessTransportInterceptor(
128124
this.remoteClusterCredentialsResolver = remoteClusterCredentialsResolver;
129125
this.crossClusterAccessAuthcService = crossClusterAccessAuthcService;
130126
this.crossClusterApiKeySignatureManager = crossClusterApiKeySignatureManager;
131-
this.authcService = authcService;
132127
this.authzService = authzService;
133128
this.licenseState = licenseState;
134129
this.securityContext = securityContext;
@@ -328,51 +323,27 @@ public boolean isRemoteClusterConnection(Transport.Connection connection) {
328323
}
329324

330325
@Override
331-
public Map<String, ServerTransportFilter> getProfileTransportFilters(
332-
Map<String, SslProfile> profileConfigurations,
326+
public Optional<ServerTransportFilter> getRemoteProfileTransportFilter(
327+
SslProfile sslProfile,
333328
DestructiveOperations destructiveOperations
334329
) {
335-
Map<String, ServerTransportFilter> profileFilters = Maps.newMapWithExpectedSize(profileConfigurations.size() + 1);
336-
337-
final boolean transportSSLEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings);
338-
final boolean remoteClusterPortEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
330+
final SslConfiguration profileConfiguration = sslProfile.configuration();
331+
final boolean remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
339332
final boolean remoteClusterServerSSLEnabled = XPackSettings.REMOTE_CLUSTER_SERVER_SSL_ENABLED.get(settings);
340-
341-
for (Map.Entry<String, SslProfile> entry : profileConfigurations.entrySet()) {
342-
final String profileName = entry.getKey();
343-
final SslProfile sslProfile = entry.getValue();
344-
final SslConfiguration profileConfiguration = sslProfile.configuration();
345-
assert profileConfiguration != null : "Ssl Profile [" + sslProfile + "] for [" + profileName + "] has a null configuration";
346-
final boolean useRemoteClusterProfile = remoteClusterPortEnabled && profileName.equals(REMOTE_CLUSTER_PROFILE);
347-
if (useRemoteClusterProfile) {
348-
profileFilters.put(
349-
profileName,
350-
new CrossClusterAccessServerTransportFilter(
351-
crossClusterAccessAuthcService,
352-
authzService,
353-
threadPool.getThreadContext(),
354-
remoteClusterServerSSLEnabled && SSLService.isSSLClientAuthEnabled(profileConfiguration),
355-
destructiveOperations,
356-
securityContext,
357-
licenseState
358-
)
359-
);
360-
} else {
361-
profileFilters.put(
362-
profileName,
363-
new ServerTransportFilter(
364-
authcService,
365-
authzService,
366-
threadPool.getThreadContext(),
367-
transportSSLEnabled && SSLService.isSSLClientAuthEnabled(profileConfiguration),
368-
destructiveOperations,
369-
securityContext
370-
)
371-
);
372-
}
333+
if (remoteClusterServerEnabled) {
334+
return Optional.of(
335+
new CrossClusterAccessServerTransportFilter(
336+
crossClusterAccessAuthcService,
337+
authzService,
338+
threadPool.getThreadContext(),
339+
remoteClusterServerSSLEnabled && SSLService.isSSLClientAuthEnabled(profileConfiguration),
340+
destructiveOperations,
341+
securityContext,
342+
licenseState
343+
)
344+
);
373345
}
374-
375-
return Collections.unmodifiableMap(profileFilters);
346+
return Optional.empty();
376347
}
377348

378349
@Override

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/RemoteClusterTransportInterceptor.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@
88
package org.elasticsearch.xpack.security.transport;
99

1010
import org.elasticsearch.action.support.DestructiveOperations;
11+
import org.elasticsearch.transport.RemoteClusterPortSettings;
1112
import org.elasticsearch.transport.Transport;
1213
import org.elasticsearch.transport.TransportInterceptor;
1314
import org.elasticsearch.xpack.core.security.SecurityContext;
1415
import org.elasticsearch.xpack.core.ssl.SslProfile;
1516

16-
import java.util.Map;
17+
import java.util.Optional;
1718

1819
/**
1920
* Allows to provide remote cluster interception that's capable of intercepting remote connections
@@ -32,16 +33,20 @@ public interface RemoteClusterTransportInterceptor {
3233
boolean isRemoteClusterConnection(Transport.Connection connection);
3334

3435
/**
35-
* Allows interceptors to provide a custom {@link ServerTransportFilter} implementations per transport profile.
36-
* The transport filter is called on the receiver side to filter incoming requests
37-
* and execute authentication and authorization for all requests.
36+
* Allows interceptors to provide a custom {@link ServerTransportFilter} implementation
37+
* for intercepting requests for {@link RemoteClusterPortSettings#REMOTE_CLUSTER_PROFILE}
38+
* transport profile.
39+
* <p>
40+
* The transport filter is called on the receiver side to filter incoming remote cluster requests
41+
* and to execute authentication and authorization for all incoming requests.
42+
* <p>
43+
* This method is only called when setting {@link RemoteClusterPortSettings#REMOTE_CLUSTER_SERVER_ENABLED}
44+
* is set to {@code true}.
3845
*
39-
* @return map of {@link ServerTransportFilter}s per transport profile name
46+
* @return a custom {@link ServerTransportFilter}s for the given transport profile,
47+
* or an empty optional to fall back to the default transport filter
4048
*/
41-
Map<String, ServerTransportFilter> getProfileTransportFilters(
42-
Map<String, SslProfile> profileConfigurations,
43-
DestructiveOperations destructiveOperations
44-
);
49+
Optional<ServerTransportFilter> getRemoteProfileTransportFilter(SslProfile sslProfile, DestructiveOperations destructiveOperations);
4550

4651
/**
4752
* Returns {@code true} if any of the remote cluster access headers are in the security context.

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.support.DestructiveOperations;
1414
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.common.ssl.SslConfiguration;
16+
import org.elasticsearch.common.util.Maps;
1517
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1618
import org.elasticsearch.common.util.concurrent.EsExecutors;
1719
import org.elasticsearch.common.util.concurrent.RunOnce;
@@ -29,44 +31,92 @@
2931
import org.elasticsearch.transport.TransportResponseHandler;
3032
import org.elasticsearch.transport.TransportService;
3133
import org.elasticsearch.transport.TransportService.ContextRestoreResponseHandler;
34+
import org.elasticsearch.xpack.core.XPackSettings;
3235
import org.elasticsearch.xpack.core.security.SecurityContext;
3336
import org.elasticsearch.xpack.core.security.transport.ProfileConfigurations;
3437
import org.elasticsearch.xpack.core.ssl.SSLService;
3538
import org.elasticsearch.xpack.core.ssl.SslProfile;
39+
import org.elasticsearch.xpack.security.authc.AuthenticationService;
40+
import org.elasticsearch.xpack.security.authz.AuthorizationService;
3641
import org.elasticsearch.xpack.security.authz.AuthorizationUtils;
3742
import org.elasticsearch.xpack.security.authz.PreAuthorizationUtils;
3843

44+
import java.util.Collections;
3945
import java.util.Map;
4046
import java.util.concurrent.Executor;
4147

4248
import static org.elasticsearch.core.Strings.format;
49+
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
4350

4451
public class SecurityServerTransportInterceptor implements TransportInterceptor {
4552

4653
private static final Logger logger = LogManager.getLogger(SecurityServerTransportInterceptor.class);
4754

55+
private final AuthenticationService authcService;
56+
private final AuthorizationService authzService;
4857
private final RemoteClusterTransportInterceptor remoteClusterTransportInterceptor;
4958
private final Map<String, ServerTransportFilter> profileFilters;
5059
private final ThreadPool threadPool;
5160
private final SecurityContext securityContext;
61+
private final Settings settings;
5262

5363
public SecurityServerTransportInterceptor(
5464
Settings settings,
5565
ThreadPool threadPool,
66+
AuthenticationService authcService,
67+
AuthorizationService authzService,
5668
SSLService sslService,
5769
SecurityContext securityContext,
5870
DestructiveOperations destructiveOperations,
5971
RemoteClusterTransportInterceptor remoteClusterTransportInterceptor
60-
6172
) {
6273
this.remoteClusterTransportInterceptor = remoteClusterTransportInterceptor;
6374
this.securityContext = securityContext;
6475
this.threadPool = threadPool;
76+
this.settings = settings;
77+
this.authcService = authcService;
78+
this.authzService = authzService;
6579
final Map<String, SslProfile> profileConfigurations = ProfileConfigurations.get(settings, sslService, false);
66-
this.profileFilters = this.remoteClusterTransportInterceptor.getProfileTransportFilters(
67-
profileConfigurations,
68-
destructiveOperations
69-
);
80+
this.profileFilters = initializeProfileFilters(profileConfigurations, destructiveOperations);
81+
}
82+
83+
private Map<String, ServerTransportFilter> initializeProfileFilters(
84+
final Map<String, SslProfile> profileConfigurations,
85+
final DestructiveOperations destructiveOperations
86+
) {
87+
final Map<String, ServerTransportFilter> profileFilters = Maps.newMapWithExpectedSize(profileConfigurations.size() + 1);
88+
final boolean transportSSLEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings);
89+
90+
for (Map.Entry<String, SslProfile> entry : profileConfigurations.entrySet()) {
91+
final String profileName = entry.getKey();
92+
final SslProfile sslProfile = entry.getValue();
93+
if (profileName.equals(REMOTE_CLUSTER_PROFILE)) {
94+
var remoteProfileTransportFilter = this.remoteClusterTransportInterceptor.getRemoteProfileTransportFilter(
95+
sslProfile,
96+
destructiveOperations
97+
);
98+
if (remoteProfileTransportFilter.isPresent()) {
99+
profileFilters.put(profileName, remoteProfileTransportFilter.get());
100+
continue;
101+
}
102+
}
103+
104+
final SslConfiguration profileConfiguration = sslProfile.configuration();
105+
assert profileConfiguration != null : "SSL Profile [" + sslProfile + "] for [" + profileName + "] has a null configuration";
106+
profileFilters.put(
107+
profileName,
108+
new ServerTransportFilter(
109+
authcService,
110+
authzService,
111+
threadPool.getThreadContext(),
112+
transportSSLEnabled && SSLService.isSSLClientAuthEnabled(profileConfiguration),
113+
destructiveOperations,
114+
securityContext
115+
)
116+
);
117+
}
118+
119+
return Collections.unmodifiableMap(profileFilters);
70120
}
71121

72122
@Override

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,12 @@ public ServerTransportFilter(
6262
}
6363

6464
/**
65-
* Called just after the given request was received by the transport. Any exception
66-
* thrown by this method will stop the request from being handled and the error will
67-
* be sent back to the sender.
65+
* Called just after the given request was received by the transport service.
66+
* <p>
67+
* Any exception thrown by this method will stop the request from being handled
68+
* and the error will be sent back to the sender.
6869
*/
69-
void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener<Void> listener) {
70+
public void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener<Void> listener) {
7071
if (TransportCloseIndexAction.NAME.equals(action)
7172
|| OpenIndexAction.NAME.equals(action)
7273
|| TransportDeleteIndexAction.TYPE.name().equals(action)) {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.security.transport;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.test.ESTestCase;
12+
import org.elasticsearch.xpack.core.security.authc.Authentication;
13+
import org.mockito.stubbing.Answer;
14+
15+
import static org.hamcrest.Matchers.arrayWithSize;
16+
17+
public abstract class AbstractServerTransportFilterTests extends ESTestCase {
18+
19+
protected static Answer<Class<Void>> getAnswer(Authentication authentication) {
20+
return getAnswer(authentication, false);
21+
}
22+
23+
protected static Answer<Class<Void>> getAnswer(Authentication authentication, boolean crossClusterAccess) {
24+
return i -> {
25+
final Object[] args = i.getArguments();
26+
assertThat(args, arrayWithSize(crossClusterAccess ? 3 : 4));
27+
@SuppressWarnings("unchecked")
28+
ActionListener<Authentication> callback = (ActionListener<Authentication>) args[args.length - 1];
29+
callback.onResponse(authentication);
30+
return Void.TYPE;
31+
};
32+
}
33+
34+
}

0 commit comments

Comments
 (0)