|
4 | 4 | * 2.0; you may not use this file except in compliance with the Elastic License
|
5 | 5 | * 2.0.
|
6 | 6 | */
|
7 |
| - |
8 | 7 | package org.elasticsearch.xpack.security.transport;
|
9 | 8 |
|
| 9 | +import org.apache.logging.log4j.LogManager; |
| 10 | +import org.apache.logging.log4j.Logger; |
| 11 | +import org.elasticsearch.TransportVersion; |
10 | 12 | import org.elasticsearch.action.ActionListener;
|
| 13 | +import org.elasticsearch.action.IndicesRequest; |
| 14 | +import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; |
| 15 | +import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; |
| 16 | +import org.elasticsearch.action.admin.indices.open.OpenIndexAction; |
| 17 | +import org.elasticsearch.action.support.DestructiveOperations; |
| 18 | +import org.elasticsearch.common.util.concurrent.ThreadContext; |
| 19 | +import org.elasticsearch.transport.TaskTransportChannel; |
| 20 | +import org.elasticsearch.transport.TcpChannel; |
| 21 | +import org.elasticsearch.transport.TcpTransportChannel; |
11 | 22 | import org.elasticsearch.transport.TransportChannel;
|
12 | 23 | import org.elasticsearch.transport.TransportRequest;
|
| 24 | +import org.elasticsearch.transport.TransportService; |
| 25 | +import org.elasticsearch.transport.netty4.Netty4TcpChannel; |
| 26 | +import org.elasticsearch.xpack.core.security.SecurityContext; |
| 27 | +import org.elasticsearch.xpack.core.security.authc.Authentication; |
| 28 | +import org.elasticsearch.xpack.core.security.user.SystemUser; |
| 29 | +import org.elasticsearch.xpack.security.action.SecurityActionMapper; |
| 30 | +import org.elasticsearch.xpack.security.authc.AuthenticationService; |
| 31 | +import org.elasticsearch.xpack.security.authz.AuthorizationService; |
13 | 32 |
|
14 | 33 | /**
|
15 |
| - * Transport filter which is called after a transport request is received by the transport service. |
| 34 | + * The server transport filter that should be used in nodes as it ensures that an incoming |
| 35 | + * request is properly authenticated and authorized |
16 | 36 | */
|
17 |
| -public interface ServerTransportFilter { |
| 37 | +public class ServerTransportFilter { |
| 38 | + |
| 39 | + private static final Logger logger = LogManager.getLogger(ServerTransportFilter.class); |
| 40 | + |
| 41 | + private final AuthenticationService authcService; |
| 42 | + private final AuthorizationService authzService; |
| 43 | + private final ThreadContext threadContext; |
| 44 | + private final boolean extractClientCert; |
| 45 | + private final DestructiveOperations destructiveOperations; |
| 46 | + private final SecurityContext securityContext; |
| 47 | + |
| 48 | + public ServerTransportFilter( |
| 49 | + AuthenticationService authcService, |
| 50 | + AuthorizationService authzService, |
| 51 | + ThreadContext threadContext, |
| 52 | + boolean extractClientCert, |
| 53 | + DestructiveOperations destructiveOperations, |
| 54 | + SecurityContext securityContext |
| 55 | + ) { |
| 56 | + this.authcService = authcService; |
| 57 | + this.authzService = authzService; |
| 58 | + this.threadContext = threadContext; |
| 59 | + this.extractClientCert = extractClientCert; |
| 60 | + this.destructiveOperations = destructiveOperations; |
| 61 | + this.securityContext = securityContext; |
| 62 | + } |
18 | 63 |
|
19 | 64 | /**
|
20 | 65 | * Called just after the given request was received by the transport service.
|
21 | 66 | * <p>
|
22 | 67 | * Any exception thrown by this method will stop the request from being handled
|
23 | 68 | * and the error will be sent back to the sender.
|
24 | 69 | */
|
25 |
| - void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener<Void> listener); |
| 70 | + public void inbound(String action, TransportRequest request, TransportChannel transportChannel, ActionListener<Void> listener) { |
| 71 | + if (TransportCloseIndexAction.NAME.equals(action) |
| 72 | + || OpenIndexAction.NAME.equals(action) |
| 73 | + || TransportDeleteIndexAction.TYPE.name().equals(action)) { |
| 74 | + IndicesRequest indicesRequest = (IndicesRequest) request; |
| 75 | + try { |
| 76 | + destructiveOperations.failDestructive(indicesRequest.indices()); |
| 77 | + } catch (IllegalArgumentException e) { |
| 78 | + listener.onFailure(e); |
| 79 | + return; |
| 80 | + } |
| 81 | + } |
| 82 | + /* |
| 83 | + here we don't have a fallback user, as all incoming request are |
| 84 | + expected to have a user attached (either in headers or in context) |
| 85 | + We can make this assumption because in nodes we make sure all outgoing |
| 86 | + requests from all the nodes are attached with a user (either a serialize |
| 87 | + user an authentication token |
| 88 | + */ |
| 89 | + String securityAction = SecurityActionMapper.action(action, request); |
| 90 | + |
| 91 | + TransportChannel unwrappedChannel = transportChannel; |
| 92 | + if (unwrappedChannel instanceof TaskTransportChannel) { |
| 93 | + unwrappedChannel = ((TaskTransportChannel) unwrappedChannel).getChannel(); |
| 94 | + } |
| 95 | + |
| 96 | + if (extractClientCert && (unwrappedChannel instanceof TcpTransportChannel)) { |
| 97 | + TcpChannel tcpChannel = ((TcpTransportChannel) unwrappedChannel).getChannel(); |
| 98 | + if (tcpChannel instanceof Netty4TcpChannel) { |
| 99 | + if (tcpChannel.isOpen()) { |
| 100 | + SSLEngineUtils.extractClientCertificates(logger, threadContext, tcpChannel); |
| 101 | + } |
| 102 | + } |
| 103 | + } |
| 104 | + |
| 105 | + TransportVersion version = transportChannel.getVersion(); |
| 106 | + authenticate(securityAction, request, listener.delegateFailureAndWrap((l, authentication) -> { |
| 107 | + if (authentication != null) { |
| 108 | + if (securityAction.equals(TransportService.HANDSHAKE_ACTION_NAME) |
| 109 | + && SystemUser.is(authentication.getEffectiveSubject().getUser()) == false) { |
| 110 | + securityContext.executeAsSystemUser(version, original -> { |
| 111 | + final Authentication replaced = securityContext.getAuthentication(); |
| 112 | + authzService.authorize(replaced, securityAction, request, l); |
| 113 | + }); |
| 114 | + } else { |
| 115 | + authzService.authorize(authentication, securityAction, request, l); |
| 116 | + } |
| 117 | + } else { |
| 118 | + l.onFailure(new IllegalStateException("no authentication present but auth is allowed")); |
| 119 | + } |
| 120 | + })); |
| 121 | + } |
| 122 | + |
| 123 | + protected void authenticate( |
| 124 | + final String securityAction, |
| 125 | + final TransportRequest request, |
| 126 | + final ActionListener<Authentication> authenticationListener |
| 127 | + ) { |
| 128 | + authcService.authenticate(securityAction, request, true, authenticationListener); |
| 129 | + } |
| 130 | + |
| 131 | + protected final ThreadContext getThreadContext() { |
| 132 | + return threadContext; |
| 133 | + } |
| 134 | + |
| 135 | + // Package private for testing |
| 136 | + boolean isExtractClientCert() { |
| 137 | + return extractClientCert; |
| 138 | + } |
| 139 | + |
26 | 140 | }
|
0 commit comments