|
15 | 15 | import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; |
16 | 16 | import org.elasticsearch.action.admin.indices.open.OpenIndexAction; |
17 | 17 | import org.elasticsearch.action.support.DestructiveOperations; |
| 18 | +import org.elasticsearch.common.util.concurrent.ListenableFuture; |
18 | 19 | import org.elasticsearch.common.util.concurrent.ThreadContext; |
19 | 20 | import org.elasticsearch.transport.TaskTransportChannel; |
20 | 21 | import org.elasticsearch.transport.TcpChannel; |
@@ -102,29 +103,44 @@ requests from all the nodes are attached with a user (either a serialize |
102 | 103 | } |
103 | 104 |
|
104 | 105 | TransportVersion version = transportChannel.getVersion(); |
105 | | - authenticate(securityAction, request, listener.delegateFailureAndWrap((l, authentication) -> { |
106 | | - if (authentication != null) { |
107 | | - if (securityAction.equals(TransportService.HANDSHAKE_ACTION_NAME) |
108 | | - && SystemUser.is(authentication.getEffectiveSubject().getUser()) == false) { |
109 | | - securityContext.executeAsSystemUser(version, original -> { |
110 | | - final Authentication replaced = securityContext.getAuthentication(); |
111 | | - authzService.authorize(replaced, securityAction, request, l); |
112 | | - }); |
113 | | - } else { |
114 | | - authzService.authorize(authentication, securityAction, request, l); |
115 | | - } |
| 106 | + var authFuture = authenticate(securityAction, request); |
| 107 | + if (authFuture.isSuccess()) { |
| 108 | + handleAuthentication(request, listener, authFuture.result(), securityAction, version); |
| 109 | + } else { |
| 110 | + authFuture.addListener( |
| 111 | + listener.delegateFailureAndWrap( |
| 112 | + (l, authentication) -> handleAuthentication(request, l, authentication, securityAction, version) |
| 113 | + ) |
| 114 | + ); |
| 115 | + } |
| 116 | + } |
| 117 | + |
| 118 | + private void handleAuthentication( |
| 119 | + TransportRequest request, |
| 120 | + ActionListener<Void> listener, |
| 121 | + Authentication authentication, |
| 122 | + String securityAction, |
| 123 | + TransportVersion version |
| 124 | + ) { |
| 125 | + if (authentication != null) { |
| 126 | + if (securityAction.equals(TransportService.HANDSHAKE_ACTION_NAME) |
| 127 | + && SystemUser.is(authentication.getEffectiveSubject().getUser()) == false) { |
| 128 | + securityContext.executeAsSystemUser(version, original -> { |
| 129 | + final Authentication replaced = securityContext.getAuthentication(); |
| 130 | + authzService.authorize(replaced, securityAction, request, listener); |
| 131 | + }); |
116 | 132 | } else { |
117 | | - l.onFailure(new IllegalStateException("no authentication present but auth is allowed")); |
| 133 | + authzService.authorize(authentication, securityAction, request, listener); |
118 | 134 | } |
119 | | - })); |
| 135 | + } else { |
| 136 | + listener.onFailure(new IllegalStateException("no authentication present but auth is allowed")); |
| 137 | + } |
120 | 138 | } |
121 | 139 |
|
122 | | - protected void authenticate( |
123 | | - final String securityAction, |
124 | | - final TransportRequest request, |
125 | | - final ActionListener<Authentication> authenticationListener |
126 | | - ) { |
127 | | - authcService.authenticate(securityAction, request, true, authenticationListener); |
| 140 | + protected ListenableFuture<Authentication> authenticate(final String securityAction, final TransportRequest request) { |
| 141 | + final ListenableFuture<Authentication> listener = new ListenableFuture<>(); |
| 142 | + authcService.authenticate(securityAction, request, true, listener); |
| 143 | + return listener; |
128 | 144 | } |
129 | 145 |
|
130 | 146 | protected final ThreadContext getThreadContext() { |
|
0 commit comments