Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,20 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.ssl.SslConfiguration;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.core.ssl.SslProfile;
import org.elasticsearch.xpack.security.authc.RemoteClusterAuthenticationService;
import org.elasticsearch.xpack.security.transport.RemoteClusterTransportInterceptor;
import org.elasticsearch.xpack.security.transport.ServerTransportFilter;
import org.elasticsearch.xpack.security.transport.extension.RemoteClusterSecurityExtension;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class TestRemoteClusterSecurityExtension implements RemoteClusterSecurityExtension {

Expand Down Expand Up @@ -58,40 +53,17 @@ public boolean isRemoteClusterConnection(Transport.Connection connection) {
return false;
}

public boolean hasRemoteClusterAccessHeadersInContext(SecurityContext securityContext) {
return false;
}

@Override
public Map<String, ServerTransportFilter> getProfileTransportFilters(
Map<String, SslProfile> profileConfigurations,
public Optional<ServerTransportFilter> getRemoteProfileTransportFilter(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning Optional<ServerTransportFilter> is more of a preference here. We could as well return ServerTransportFilter and annotate the method as @Nullable

String profileName,
SslProfile sslProfile,
DestructiveOperations destructiveOperations
) {
Map<String, ServerTransportFilter> profileFilters = Maps.newMapWithExpectedSize(profileConfigurations.size() + 1);
Settings settings = components.settings();
final boolean transportSSLEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings);

for (Map.Entry<String, SslProfile> entry : profileConfigurations.entrySet()) {
final String profileName = entry.getKey();
final SslProfile sslProfile = entry.getValue();
final SslConfiguration profileConfiguration = sslProfile.configuration();
profileFilters.put(
profileName,
new ServerTransportFilter(
components.authenticationService(),
components.authorizationService(),
components.threadPool().getThreadContext(),
transportSSLEnabled && SSLService.isSSLClientAuthEnabled(profileConfiguration),
destructiveOperations,
components.securityContext()
)
);
}
// We need to register here the default security
// server transport filter which ensures that all
// incoming transport requests are properly
// authenticated and authorized.
return Collections.unmodifiableMap(profileFilters);
return Optional.empty();
}

public boolean hasRemoteClusterAccessHeadersInContext(SecurityContext securityContext) {
return false;
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,8 @@ Collection<Object> createComponents(
new SecurityServerTransportInterceptor(
settings,
threadPool,
authcService.get(),
authzService,
getSslService(),
securityContext.get(),
destructiveOperations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import static org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo.CROSS_CLUSTER_ACCESS_SUBJECT_INFO_HEADER_KEY;
import static org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders.CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY;

final class CrossClusterAccessServerTransportFilter extends ServerTransportFilter {
final class CrossClusterAccessServerTransportFilter extends DefaultServerTransportFilter implements ServerTransportFilter {
Copy link
Contributor

@jfreden jfreden Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implements ServerTransportFilter is implicit since CrossClusterAccessServerTransportFilter is a DefaultServerTransportFilter that in turn implements ServerTransportFilter.

Might be out of scope for this PR, but the inheritance here is a little confusing. CrossClusterAccessServerTransportFilter is only overriding the authenticate method which leads me to believe that the code in CrossClusterAccessServerTransportFilter actually belongs in CrossClusterAccessAuthenticationService and this class could be dropped for that reason.

Copy link
Contributor Author

@slobodanadamovic slobodanadamovic Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good point. Thanks for bringing this up. I think the end goal could be to have a single ServerTransportFilter implementation that accept a custom authentication service (CrossClusterAccessAuthenticationService or AuthenticationService). I like this idea, but that might be more involved at this point as it would require more refactoring. As a first step towards that, I could start by reverting the interface introduction, and later following up with generalizing the authentication part. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I didn't realize they were different interfaces, yes that makes it more difficult. One more thing that could be done in this PR is to move the logic into the CrossClusterAccessAuthenticationService and just do the authenticate call. I'll leave it up to you, might also make sense to not increase the scope.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, them being different interfaces with a different number of parameters makes it not that straightforward, but doable. In the interest of unblocking downstream work, I'd prefer to get back to this in a followup.


private static final Logger logger = LogManager.getLogger(CrossClusterAccessServerTransportFilter.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.ssl.SslConfiguration;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
Expand Down Expand Up @@ -49,7 +48,6 @@
import org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders;
import org.elasticsearch.xpack.security.authz.AuthorizationService;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
Expand Down Expand Up @@ -328,51 +326,30 @@ public boolean isRemoteClusterConnection(Transport.Connection connection) {
}

@Override
public Map<String, ServerTransportFilter> getProfileTransportFilters(
Map<String, SslProfile> profileConfigurations,
public Optional<ServerTransportFilter> getRemoteProfileTransportFilter(
String profileName,
SslProfile sslProfile,
DestructiveOperations destructiveOperations
) {
Map<String, ServerTransportFilter> profileFilters = Maps.newMapWithExpectedSize(profileConfigurations.size() + 1);

final boolean transportSSLEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings);
final boolean remoteClusterPortEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
assert REMOTE_CLUSTER_PROFILE.equals(profileName) : "should only be called for remote cluster transport profiles";
final SslConfiguration profileConfiguration = sslProfile.configuration();
assert profileConfiguration != null : "SSL Profile [" + sslProfile + "] for [" + profileName + "] has a null configuration";
final boolean remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
final boolean remoteClusterServerSSLEnabled = XPackSettings.REMOTE_CLUSTER_SERVER_SSL_ENABLED.get(settings);

for (Map.Entry<String, SslProfile> entry : profileConfigurations.entrySet()) {
final String profileName = entry.getKey();
final SslProfile sslProfile = entry.getValue();
final SslConfiguration profileConfiguration = sslProfile.configuration();
assert profileConfiguration != null : "Ssl Profile [" + sslProfile + "] for [" + profileName + "] has a null configuration";
final boolean useRemoteClusterProfile = remoteClusterPortEnabled && profileName.equals(REMOTE_CLUSTER_PROFILE);
if (useRemoteClusterProfile) {
profileFilters.put(
profileName,
new CrossClusterAccessServerTransportFilter(
crossClusterAccessAuthcService,
authzService,
threadPool.getThreadContext(),
remoteClusterServerSSLEnabled && SSLService.isSSLClientAuthEnabled(profileConfiguration),
destructiveOperations,
securityContext,
licenseState
)
);
} else {
profileFilters.put(
profileName,
new ServerTransportFilter(
authcService,
authzService,
threadPool.getThreadContext(),
transportSSLEnabled && SSLService.isSSLClientAuthEnabled(profileConfiguration),
destructiveOperations,
securityContext
)
);
}
if (remoteClusterServerEnabled) {
return Optional.of(
new CrossClusterAccessServerTransportFilter(
crossClusterAccessAuthcService,
authzService,
threadPool.getThreadContext(),
remoteClusterServerSSLEnabled && SSLService.isSSLClientAuthEnabled(profileConfiguration),
destructiveOperations,
securityContext,
licenseState
)
);
}

return Collections.unmodifiableMap(profileFilters);
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.security.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexAction;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.transport.TaskTransportChannel;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransportChannel;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty4.Netty4TcpChannel;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.user.SystemUser;
import org.elasticsearch.xpack.security.action.SecurityActionMapper;
import org.elasticsearch.xpack.security.authc.AuthenticationService;
import org.elasticsearch.xpack.security.authz.AuthorizationService;

/**
* The server transport filter that should be used in nodes as it ensures that an incoming
* request is properly authenticated and authorized
*/
public class DefaultServerTransportFilter implements ServerTransportFilter {

private static final Logger logger = LogManager.getLogger(DefaultServerTransportFilter.class);

private final AuthenticationService authcService;
private final AuthorizationService authzService;
private final ThreadContext threadContext;
private final boolean extractClientCert;
private final DestructiveOperations destructiveOperations;
private final SecurityContext securityContext;

public DefaultServerTransportFilter(
AuthenticationService authcService,
AuthorizationService authzService,
ThreadContext threadContext,
boolean extractClientCert,
DestructiveOperations destructiveOperations,
SecurityContext securityContext
) {
this.authcService = authcService;
this.authzService = authzService;
this.threadContext = threadContext;
this.extractClientCert = extractClientCert;
this.destructiveOperations = destructiveOperations;
this.securityContext = securityContext;
}

@Override
public void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener<Void> listener) {
if (TransportCloseIndexAction.NAME.equals(action)
|| OpenIndexAction.NAME.equals(action)
|| TransportDeleteIndexAction.TYPE.name().equals(action)) {
IndicesRequest indicesRequest = (IndicesRequest) request;
try {
destructiveOperations.failDestructive(indicesRequest.indices());
} catch (IllegalArgumentException e) {
listener.onFailure(e);
return;
}
}
/*
here we don't have a fallback user, as all incoming request are
expected to have a user attached (either in headers or in context)
We can make this assumption because in nodes we make sure all outgoing
requests from all the nodes are attached with a user (either a serialize
user an authentication token
*/
String securityAction = SecurityActionMapper.action(action, request);

TransportChannel unwrappedChannel = transportChannel;
if (unwrappedChannel instanceof TaskTransportChannel) {
unwrappedChannel = ((TaskTransportChannel) unwrappedChannel).getChannel();
}

if (extractClientCert && (unwrappedChannel instanceof TcpTransportChannel)) {
TcpChannel tcpChannel = ((TcpTransportChannel) unwrappedChannel).getChannel();
if (tcpChannel instanceof Netty4TcpChannel) {
if (tcpChannel.isOpen()) {
SSLEngineUtils.extractClientCertificates(logger, threadContext, tcpChannel);
}
}
}

TransportVersion version = transportChannel.getVersion();
authenticate(securityAction, request, listener.delegateFailureAndWrap((l, authentication) -> {
if (authentication != null) {
if (securityAction.equals(TransportService.HANDSHAKE_ACTION_NAME)
&& SystemUser.is(authentication.getEffectiveSubject().getUser()) == false) {
securityContext.executeAsSystemUser(version, original -> {
final Authentication replaced = securityContext.getAuthentication();
authzService.authorize(replaced, securityAction, request, l);
});
} else {
authzService.authorize(authentication, securityAction, request, l);
}
} else {
l.onFailure(new IllegalStateException("no authentication present but auth is allowed"));
}
}));
}

protected void authenticate(
final String securityAction,
final TransportRequest request,
final ActionListener<Authentication> authenticationListener
) {
authcService.authenticate(securityAction, request, true, authenticationListener);
}

protected final ThreadContext getThreadContext() {
return threadContext;
}

// Package private for testing
boolean isExtractClientCert() {
return extractClientCert;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
package org.elasticsearch.xpack.security.transport;

import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.transport.RemoteClusterPortSettings;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.ssl.SslProfile;

import java.util.Map;
import java.util.Optional;

/**
* Allows to provide remote cluster interception that's capable of intercepting remote connections
Expand All @@ -32,14 +33,22 @@ public interface RemoteClusterTransportInterceptor {
boolean isRemoteClusterConnection(Transport.Connection connection);

/**
* Allows interceptors to provide a custom {@link ServerTransportFilter} implementations per transport profile.
* The transport filter is called on the receiver side to filter incoming requests
* and execute authentication and authorization for all requests.
* Allows interceptors to provide a custom {@link ServerTransportFilter} implementation
* for intercepting requests for {@link RemoteClusterPortSettings#REMOTE_CLUSTER_PROFILE}
* transport profile.
* <p>
* The transport filter is called on the receiver side to filter incoming remote cluster requests
* and to execute authentication and authorization for all incoming requests.
* <p>
* This method is only called when setting {@link RemoteClusterPortSettings#REMOTE_CLUSTER_SERVER_ENABLED}
* is set to {@code true}.
*
* @return map of {@link ServerTransportFilter}s per transport profile name
* @return a custom {@link ServerTransportFilter}s for the given transport profile,
* or an empty optional to fall back to the default transport filter
*/
Map<String, ServerTransportFilter> getProfileTransportFilters(
Map<String, SslProfile> profileConfigurations,
Optional<ServerTransportFilter> getRemoteProfileTransportFilter(
String profileName,
SslProfile sslProfile,
DestructiveOperations destructiveOperations
);

Expand Down
Loading