|
9 | 9 |
|
10 | 10 | import org.apache.logging.log4j.LogManager; |
11 | 11 | import org.apache.logging.log4j.Logger; |
| 12 | +import org.elasticsearch.action.ActionListener; |
| 13 | +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; |
12 | 14 | import org.elasticsearch.action.support.DestructiveOperations; |
| 15 | +import org.elasticsearch.common.settings.SecureString; |
13 | 16 | import org.elasticsearch.common.settings.Settings; |
| 17 | +import org.elasticsearch.common.util.concurrent.ThreadContext; |
| 18 | +import org.elasticsearch.license.LicenseUtils; |
14 | 19 | import org.elasticsearch.license.XPackLicenseState; |
| 20 | +import org.elasticsearch.tasks.TaskCancellationService; |
15 | 21 | import org.elasticsearch.threadpool.ThreadPool; |
16 | 22 | import org.elasticsearch.transport.RemoteConnectionManager; |
| 23 | +import org.elasticsearch.transport.SendRequestTransportException; |
17 | 24 | import org.elasticsearch.transport.Transport; |
18 | 25 | import org.elasticsearch.transport.TransportInterceptor; |
| 26 | +import org.elasticsearch.transport.TransportRequest; |
| 27 | +import org.elasticsearch.transport.TransportRequestOptions; |
| 28 | +import org.elasticsearch.transport.TransportResponse; |
| 29 | +import org.elasticsearch.transport.TransportResponseHandler; |
| 30 | +import org.elasticsearch.transport.TransportService; |
19 | 31 | import org.elasticsearch.xpack.core.security.SecurityContext; |
| 32 | +import org.elasticsearch.xpack.core.security.authc.Authentication; |
| 33 | +import org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo; |
| 34 | +import org.elasticsearch.xpack.core.security.user.InternalUser; |
| 35 | +import org.elasticsearch.xpack.core.security.user.SystemUser; |
| 36 | +import org.elasticsearch.xpack.core.security.user.User; |
20 | 37 | import org.elasticsearch.xpack.core.ssl.SslProfile; |
| 38 | +import org.elasticsearch.xpack.security.Security; |
| 39 | +import org.elasticsearch.xpack.security.action.SecurityActionMapper; |
| 40 | +import org.elasticsearch.xpack.security.audit.AuditUtil; |
| 41 | +import org.elasticsearch.xpack.security.authc.ApiKeyService; |
21 | 42 | import org.elasticsearch.xpack.security.authc.AuthenticationService; |
22 | 43 | import org.elasticsearch.xpack.security.authc.CrossClusterAccessAuthenticationService; |
| 44 | +import org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders; |
23 | 45 | import org.elasticsearch.xpack.security.authz.AuthorizationService; |
24 | 46 |
|
25 | 47 | import java.util.Map; |
26 | 48 | import java.util.Optional; |
27 | 49 | import java.util.function.Function; |
28 | 50 |
|
| 51 | +import static org.elasticsearch.core.Strings.format; |
| 52 | +import static org.elasticsearch.transport.RemoteClusterPortSettings.TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY; |
| 53 | + |
29 | 54 | public class CrossClusterAccessTransportInterceptor implements RemoteClusterTransportInterceptor { |
30 | 55 |
|
31 | 56 | private static final Logger logger = LogManager.getLogger(CrossClusterAccessTransportInterceptor.class); |
32 | 57 |
|
| 58 | + private static final Map<String, String> RCS_INTERNAL_ACTIONS_REPLACEMENTS = Map.of( |
| 59 | + "internal:admin/ccr/restore/session/put", |
| 60 | + "indices:internal/admin/ccr/restore/session/put", |
| 61 | + "internal:admin/ccr/restore/session/clear", |
| 62 | + "indices:internal/admin/ccr/restore/session/clear", |
| 63 | + "internal:admin/ccr/restore/file_chunk/get", |
| 64 | + "indices:internal/admin/ccr/restore/file_chunk/get", |
| 65 | + "internal:data/read/esql/open_exchange", |
| 66 | + "cluster:internal:data/read/esql/open_exchange", |
| 67 | + "internal:data/read/esql/exchange", |
| 68 | + "cluster:internal:data/read/esql/exchange", |
| 69 | + TaskCancellationService.BAN_PARENT_ACTION_NAME, |
| 70 | + TaskCancellationService.REMOTE_CLUSTER_BAN_PARENT_ACTION_NAME, |
| 71 | + TaskCancellationService.CANCEL_CHILD_ACTION_NAME, |
| 72 | + TaskCancellationService.REMOTE_CLUSTER_CANCEL_CHILD_ACTION_NAME |
| 73 | + ); |
| 74 | + |
33 | 75 | private final Function< |
34 | 76 | Transport.Connection, |
35 | 77 | Optional<RemoteConnectionManager.RemoteClusterAliasWithCredentials>> remoteClusterCredentialsResolver; |
@@ -85,7 +127,198 @@ public CrossClusterAccessTransportInterceptor( |
85 | 127 |
|
86 | 128 | @Override |
87 | 129 | public TransportInterceptor.AsyncSender interceptSender(TransportInterceptor.AsyncSender sender) { |
88 | | - return null; |
| 130 | + return interceptForCrossClusterAccessRequests(sender); |
| 131 | + } |
| 132 | + |
| 133 | + private TransportInterceptor.AsyncSender interceptForCrossClusterAccessRequests(final TransportInterceptor.AsyncSender sender) { |
| 134 | + return new TransportInterceptor.AsyncSender() { |
| 135 | + @Override |
| 136 | + public <T extends TransportResponse> void sendRequest( |
| 137 | + Transport.Connection connection, |
| 138 | + String action, |
| 139 | + TransportRequest request, |
| 140 | + TransportRequestOptions options, |
| 141 | + TransportResponseHandler<T> handler |
| 142 | + ) { |
| 143 | + final Optional<SecurityServerTransportInterceptor.RemoteClusterCredentials> remoteClusterCredentials = |
| 144 | + getRemoteClusterCredentials(connection); |
| 145 | + if (remoteClusterCredentials.isPresent()) { |
| 146 | + sendWithCrossClusterAccessHeaders(remoteClusterCredentials.get(), connection, action, request, options, handler); |
| 147 | + } else { |
| 148 | + // Send regular request, without cross cluster access headers |
| 149 | + try { |
| 150 | + sender.sendRequest(connection, action, request, options, handler); |
| 151 | + } catch (Exception e) { |
| 152 | + handler.handleException(new SendRequestTransportException(connection.getNode(), action, e)); |
| 153 | + } |
| 154 | + } |
| 155 | + } |
| 156 | + |
| 157 | + /** |
| 158 | + * Returns cluster credentials if the connection is remote, and cluster credentials are set up for the target cluster. |
| 159 | + */ |
| 160 | + private Optional<SecurityServerTransportInterceptor.RemoteClusterCredentials> getRemoteClusterCredentials( |
| 161 | + Transport.Connection connection |
| 162 | + ) { |
| 163 | + final Optional<RemoteConnectionManager.RemoteClusterAliasWithCredentials> remoteClusterAliasWithCredentials = |
| 164 | + remoteClusterCredentialsResolver.apply(connection); |
| 165 | + if (remoteClusterAliasWithCredentials.isEmpty()) { |
| 166 | + logger.trace("Connection is not remote"); |
| 167 | + return Optional.empty(); |
| 168 | + } |
| 169 | + |
| 170 | + final String remoteClusterAlias = remoteClusterAliasWithCredentials.get().clusterAlias(); |
| 171 | + final SecureString remoteClusterCredentials = remoteClusterAliasWithCredentials.get().credentials(); |
| 172 | + if (remoteClusterCredentials == null) { |
| 173 | + logger.trace("No cluster credentials are configured for remote cluster [{}]", remoteClusterAlias); |
| 174 | + return Optional.empty(); |
| 175 | + } |
| 176 | + |
| 177 | + return Optional.of( |
| 178 | + new SecurityServerTransportInterceptor.RemoteClusterCredentials( |
| 179 | + remoteClusterAlias, |
| 180 | + ApiKeyService.withApiKeyPrefix(remoteClusterCredentials.toString()) |
| 181 | + ) |
| 182 | + ); |
| 183 | + } |
| 184 | + |
| 185 | + private <T extends TransportResponse> void sendWithCrossClusterAccessHeaders( |
| 186 | + final SecurityServerTransportInterceptor.RemoteClusterCredentials remoteClusterCredentials, |
| 187 | + final Transport.Connection connection, |
| 188 | + final String action, |
| 189 | + final TransportRequest request, |
| 190 | + final TransportRequestOptions options, |
| 191 | + final TransportResponseHandler<T> handler |
| 192 | + ) { |
| 193 | + if (false == Security.ADVANCED_REMOTE_CLUSTER_SECURITY_FEATURE.check(licenseState)) { |
| 194 | + throw LicenseUtils.newComplianceException(Security.ADVANCED_REMOTE_CLUSTER_SECURITY_FEATURE.getName()); |
| 195 | + } |
| 196 | + final String remoteClusterAlias = remoteClusterCredentials.clusterAlias(); |
| 197 | + |
| 198 | + if (connection.getTransportVersion().before(TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY)) { |
| 199 | + throw illegalArgumentExceptionWithDebugLog( |
| 200 | + "Settings for remote cluster [" |
| 201 | + + remoteClusterAlias |
| 202 | + + "] indicate cross cluster access headers should be sent but target cluster version [" |
| 203 | + + connection.getTransportVersion().toReleaseVersion() |
| 204 | + + "] does not support receiving them" |
| 205 | + ); |
| 206 | + } |
| 207 | + |
| 208 | + logger.trace( |
| 209 | + () -> format( |
| 210 | + "Sending [%s] request for [%s] action to [%s] with cross cluster access headers", |
| 211 | + request.getClass(), |
| 212 | + action, |
| 213 | + remoteClusterAlias |
| 214 | + ) |
| 215 | + ); |
| 216 | + |
| 217 | + final Authentication authentication = securityContext.getAuthentication(); |
| 218 | + assert authentication != null : "authentication must be present in security context"; |
| 219 | + |
| 220 | + final User user = authentication.getEffectiveSubject().getUser(); |
| 221 | + if (user instanceof InternalUser && false == SystemUser.is(user)) { |
| 222 | + final String message = "Internal user [" + user.principal() + "] should not be used for cross cluster requests"; |
| 223 | + assert false : message; |
| 224 | + throw illegalArgumentExceptionWithDebugLog(message); |
| 225 | + } else if (SystemUser.is(user) || action.equals(ClusterStateAction.NAME)) { |
| 226 | + if (SystemUser.is(user)) { |
| 227 | + logger.trace( |
| 228 | + "Request [{}] for action [{}] towards [{}] initiated by the system user. " |
| 229 | + + "Sending request with internal cross cluster access user headers", |
| 230 | + request.getClass(), |
| 231 | + action, |
| 232 | + remoteClusterAlias |
| 233 | + ); |
| 234 | + } else { |
| 235 | + // Use system user for cluster state requests (CCR has many calls of cluster state with end-user context) |
| 236 | + logger.trace( |
| 237 | + () -> format( |
| 238 | + "Switching to the system user for cluster state action towards [{}]. Original user is [%s]", |
| 239 | + remoteClusterAlias, |
| 240 | + user |
| 241 | + ) |
| 242 | + ); |
| 243 | + } |
| 244 | + final var crossClusterAccessHeaders = new CrossClusterAccessHeaders( |
| 245 | + remoteClusterCredentials.credentials(), |
| 246 | + SystemUser.crossClusterAccessSubjectInfo( |
| 247 | + authentication.getEffectiveSubject().getTransportVersion(), |
| 248 | + authentication.getEffectiveSubject().getRealm().getNodeName() |
| 249 | + ) |
| 250 | + ); |
| 251 | + // To be able to enforce index-level privileges under the new remote cluster security model, |
| 252 | + // we switch from old-style internal actions to their new equivalent indices actions so that |
| 253 | + // they will be checked for index privileges against the index specified in the requests |
| 254 | + final String effectiveAction = RCS_INTERNAL_ACTIONS_REPLACEMENTS.getOrDefault(action, action); |
| 255 | + if (false == effectiveAction.equals(action)) { |
| 256 | + logger.trace("switching internal action from [{}] to [{}]", action, effectiveAction); |
| 257 | + } |
| 258 | + sendWithCrossClusterAccessHeaders(crossClusterAccessHeaders, connection, effectiveAction, request, options, handler); |
| 259 | + } else { |
| 260 | + assert false == action.startsWith("internal:") : "internal action must be sent with system user"; |
| 261 | + authzService.getRoleDescriptorsIntersectionForRemoteCluster( |
| 262 | + remoteClusterAlias, |
| 263 | + connection.getTransportVersion(), |
| 264 | + authentication.getEffectiveSubject(), |
| 265 | + ActionListener.wrap(roleDescriptorsIntersection -> { |
| 266 | + logger.trace( |
| 267 | + () -> format( |
| 268 | + "Subject [%s] has role descriptors intersection [%s] for action [%s] towards remote cluster [%s]", |
| 269 | + authentication.getEffectiveSubject(), |
| 270 | + roleDescriptorsIntersection, |
| 271 | + action, |
| 272 | + remoteClusterAlias |
| 273 | + ) |
| 274 | + ); |
| 275 | + if (roleDescriptorsIntersection.isEmpty()) { |
| 276 | + throw authzService.remoteActionDenied( |
| 277 | + authentication, |
| 278 | + SecurityActionMapper.action(action, request), |
| 279 | + remoteClusterAlias |
| 280 | + ); |
| 281 | + } |
| 282 | + final var crossClusterAccessHeaders = new CrossClusterAccessHeaders( |
| 283 | + remoteClusterCredentials.credentials(), |
| 284 | + new CrossClusterAccessSubjectInfo(authentication, roleDescriptorsIntersection) |
| 285 | + ); |
| 286 | + sendWithCrossClusterAccessHeaders(crossClusterAccessHeaders, connection, action, request, options, handler); |
| 287 | + }, // it's safe to not use a context restore handler here since `getRoleDescriptorsIntersectionForRemoteCluster` |
| 288 | + // uses a context preserving listener internally, and `sendWithCrossClusterAccessHeaders` uses a context restore |
| 289 | + // handler |
| 290 | + e -> handler.handleException(new SendRequestTransportException(connection.getNode(), action, e)) |
| 291 | + ) |
| 292 | + ); |
| 293 | + } |
| 294 | + } |
| 295 | + |
| 296 | + private <T extends TransportResponse> void sendWithCrossClusterAccessHeaders( |
| 297 | + final CrossClusterAccessHeaders crossClusterAccessHeaders, |
| 298 | + final Transport.Connection connection, |
| 299 | + final String action, |
| 300 | + final TransportRequest request, |
| 301 | + final TransportRequestOptions options, |
| 302 | + final TransportResponseHandler<T> handler |
| 303 | + ) { |
| 304 | + final ThreadContext threadContext = securityContext.getThreadContext(); |
| 305 | + final var contextRestoreHandler = new TransportService.ContextRestoreResponseHandler<>( |
| 306 | + threadContext.newRestorableContext(true), |
| 307 | + handler |
| 308 | + ); |
| 309 | + try (ThreadContext.StoredContext ignored = threadContext.stashContextPreservingRequestHeaders(AuditUtil.AUDIT_REQUEST_ID)) { |
| 310 | + crossClusterAccessHeaders.writeToContext(threadContext); |
| 311 | + sender.sendRequest(connection, action, request, options, contextRestoreHandler); |
| 312 | + } catch (Exception e) { |
| 313 | + contextRestoreHandler.handleException(new SendRequestTransportException(connection.getNode(), action, e)); |
| 314 | + } |
| 315 | + } |
| 316 | + |
| 317 | + private static IllegalArgumentException illegalArgumentExceptionWithDebugLog(String message) { |
| 318 | + logger.debug(message); |
| 319 | + return new IllegalArgumentException(message); |
| 320 | + } |
| 321 | + }; |
89 | 322 | } |
90 | 323 |
|
91 | 324 | @Override |
|
0 commit comments