diff --git a/plugins/examples/security-authorization-engine/src/main/java/org/elasticsearch/example/CustomAuthorizationEngine.java b/plugins/examples/security-authorization-engine/src/main/java/org/elasticsearch/example/CustomAuthorizationEngine.java index b0db261d67e19..ea99880117f17 100644 --- a/plugins/examples/security-authorization-engine/src/main/java/org/elasticsearch/example/CustomAuthorizationEngine.java +++ b/plugins/examples/security-authorization-engine/src/main/java/org/elasticsearch/example/CustomAuthorizationEngine.java @@ -10,6 +10,7 @@ package org.elasticsearch.example; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesResponse; @@ -86,14 +87,14 @@ public void authorizeClusterAction(RequestInfo requestInfo, AuthorizationInfo au } @Override - public void authorizeIndexAction( + SubscribableListener void authorizeIndexAction( RequestInfo requestInfo, AuthorizationInfo authorizationInfo, AsyncSupplier indicesAsyncSupplier, - ProjectMetadata project, - ActionListener listener + ProjectMetadata project ) { if (isSuperuser(requestInfo.getAuthentication().getEffectiveSubject().getUser())) { + ActionListener listener = new SubscribableListener<>(); indicesAsyncSupplier.getAsync(ActionListener.wrap(resolvedIndices -> { Map indexAccessControlMap = new HashMap<>(); for (String name : resolvedIndices.getLocal()) { @@ -103,8 +104,9 @@ public void authorizeIndexAction( new IndicesAccessControl(true, Collections.unmodifiableMap(indexAccessControlMap)); listener.onResponse(new IndexAuthorizationResult(indicesAccessControl)); }, listener::onFailure)); + return listener; } else { - listener.onResponse(new IndexAuthorizationResult(IndicesAccessControl.DENIED)); + return SubscribableListener.succcess(new IndexAuthorizationResult(IndicesAccessControl.DENIED)); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java index c6c240e3b6759..3056f7cda6429 100644 --- a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java @@ -266,6 +266,13 @@ public final boolean isDone() { return isDone(state); } + /** + * @return return {@code true} if and only if this listener is done and has been completed successfully + */ + public final boolean isSuccess() { + return state instanceof SuccessResult; + } + /** * @return the result with which this listener completed successfully, or throw the exception with which it failed. * diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java index c36a5c350658e..9f18e7915a725 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.bytes.BytesReference; @@ -75,7 +76,7 @@ * can actually impersonate the user running the request. *
  • {@link #authorizeClusterAction(RequestInfo, AuthorizationInfo, ActionListener)} if the * request is a cluster level operation.
  • - *
  • {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata, ActionListener)} if + *
  • {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata)} if * the request is a an index action. This method may be called multiple times for a single * request as the request may be made up of sub-requests that also need to be authorized. The async supplier * for resolved indices will invoke the @@ -85,7 +86,7 @@ *

    * NOTE: the {@link #loadAuthorizedIndices(RequestInfo, AuthorizationInfo, Map, ActionListener)} * method may be called prior to - * {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata, ActionListener)} + * {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata)} * in cases where wildcards need to be expanded. *


    * Authorization engines can be called from various threads including network threads that should @@ -161,14 +162,13 @@ public interface AuthorizationEngine { * attempting to operate on * @param metadata a map of a string name to the cluster metadata specific to that * alias or index - * @param listener the listener to be notified of the authorization result + * @return a listener to be notified of the authorization result */ - void authorizeIndexAction( + SubscribableListener authorizeIndexAction( RequestInfo requestInfo, AuthorizationInfo authorizationInfo, AsyncSupplier indicesAsyncSupplier, - ProjectMetadata metadata, - ActionListener listener + ProjectMetadata metadata ); /** @@ -766,6 +766,6 @@ interface AsyncSupplier { * Asynchronously retrieves the value that is being supplied and notifies the listener upon * completion. */ - void getAsync(ActionListener listener); + SubscribableListener getAsync(); } } diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java index 23807a318981c..0f832b83a177f 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.TransportSearchAction; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.Cancellable; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; @@ -406,14 +407,13 @@ public void authorizeClusterAction( } @Override - public void authorizeIndexAction( + public SubscribableListener authorizeIndexAction( RequestInfo requestInfo, AuthorizationInfo authorizationInfo, AsyncSupplier indicesAsyncSupplier, - ProjectMetadata metadata, - ActionListener listener + ProjectMetadata metadata ) { - listener.onResponse(IndexAuthorizationResult.ALLOW_NO_INDICES); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); } @Override diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index 137237457d816..6cd937cb2f8c7 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; import org.elasticsearch.action.update.TransportUpdateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -482,16 +483,16 @@ private void authorizeAction( } else if (isIndexAction(action)) { final ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); assert projectMetadata != null; - final AsyncSupplier resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(resolvedIndicesListener -> { + final AsyncSupplier resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(() -> { if (request instanceof SearchRequest searchRequest && searchRequest.pointInTimeBuilder() != null) { var resolvedIndices = indicesAndAliasesResolver.resolvePITIndices(searchRequest); - resolvedIndicesListener.onResponse(resolvedIndices); - return; + return SubscribableListener.newSucceeded(resolvedIndices); } final ResolvedIndices resolvedIndices = indicesAndAliasesResolver.tryResolveWithoutWildcards(action, request); if (resolvedIndices != null) { - resolvedIndicesListener.onResponse(resolvedIndices); + return SubscribableListener.newSucceeded(resolvedIndices); } else { + final SubscribableListener resolvedIndicesListener = new SubscribableListener<>(); authzEngine.loadAuthorizedIndices( requestInfo, authzInfo, @@ -510,33 +511,31 @@ private void authorizeAction( } ) ); + return resolvedIndicesListener; } }); - authzEngine.authorizeIndexAction( - requestInfo, - authzInfo, - resolvedIndicesAsyncSupplier, - projectMetadata, - wrapPreservingContext( - new AuthorizationResultListener<>( - result -> handleIndexActionAuthorizationResult( - result, + authzEngine.authorizeIndexAction(requestInfo, authzInfo, resolvedIndicesAsyncSupplier, projectMetadata) + .addListener( + wrapPreservingContext( + new AuthorizationResultListener<>( + result -> handleIndexActionAuthorizationResult( + result, + requestInfo, + requestId, + authzInfo, + authzEngine, + resolvedIndicesAsyncSupplier, + projectMetadata, + listener + ), + listener::onFailure, requestInfo, requestId, - authzInfo, - authzEngine, - resolvedIndicesAsyncSupplier, - projectMetadata, - listener + authzInfo ), - listener::onFailure, - requestInfo, - requestId, - authzInfo - ), - threadContext - ) - ); + threadContext + ) + ); } else { logger.warn("denying access for [{}] as action [{}] is not an index or cluster action", authentication, action); auditTrail.accessDenied(requestId, authentication, action, request, authzInfo); @@ -580,29 +579,30 @@ private void handleIndexActionAuthorizationResult( TransportIndicesAliasesAction.NAME, authzContext ); - authzEngine.authorizeIndexAction( - aliasesRequestInfo, - authzInfo, - ril -> resolvedIndicesAsyncSupplier.getAsync(ril.delegateFailureAndWrap((l, resolvedIndices) -> { + authzEngine.authorizeIndexAction(aliasesRequestInfo, authzInfo, () -> { + SubscribableListener ril = new SubscribableListener<>(); + resolvedIndicesAsyncSupplier.getAsync().addListener(ril.delegateFailureAndWrap((l, resolvedIndices) -> { List aliasesAndIndices = new ArrayList<>(resolvedIndices.getLocal()); for (Alias alias : aliases) { aliasesAndIndices.add(alias.name()); } ResolvedIndices withAliases = new ResolvedIndices(aliasesAndIndices, Collections.emptyList()); l.onResponse(withAliases); - })), - projectMetadata, - wrapPreservingContext( - new AuthorizationResultListener<>( - authorizationResult -> runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener), - listener::onFailure, - aliasesRequestInfo, - requestId, - authzInfo - ), - threadContext - ) - ); + })); + return ril; + }, projectMetadata) + .addListener( + wrapPreservingContext( + new AuthorizationResultListener<>( + authorizationResult -> runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener), + listener::onFailure, + aliasesRequestInfo, + requestId, + authzInfo + ), + threadContext + ) + ); } } else if (action.equals(TransportShardBulkAction.ACTION_NAME)) { // if this is performing multiple actions on the index, then check each of those actions. @@ -631,22 +631,26 @@ private void runRequestInterceptors( AuthorizationEngine authorizationEngine, ActionListener listener ) { - if (requestInterceptors.isEmpty()) { - listener.onResponse(null); - } else { - final Iterator requestInterceptorIterator = requestInterceptors.iterator(); - requestInterceptorIterator.next() - .intercept(requestInfo, authorizationEngine, authorizationInfo, new DelegatingActionListener<>(listener) { + final Iterator requestInterceptorIterator = requestInterceptors.iterator(); + while (requestInterceptorIterator.hasNext()) { + var res = requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo); + if (res.isSuccess() == false) { + res.addListener(new DelegatingActionListener<>(listener) { @Override public void onResponse(Void unused) { if (requestInterceptorIterator.hasNext()) { - requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo, this); + requestInterceptorIterator.next() + .intercept(requestInfo, authorizationEngine, authorizationInfo) + .addListener(this); } else { - listener.onResponse(null); + delegate.onResponse(null); } } }); + return; + } } + listener.onResponse(null); } // pkg-private for testing @@ -776,7 +780,7 @@ private void authorizeBulkItems( final Map> actionToIndicesMap = new HashMap<>(4); final AuditTrail auditTrail = auditTrailService.get(); - resolvedIndicesAsyncSupplier.getAsync(ActionListener.wrap(overallResolvedIndices -> { + resolvedIndicesAsyncSupplier.getAsync().addListener(ActionListener.wrap(overallResolvedIndices -> { final Set localIndices = new HashSet<>(overallResolvedIndices.getLocal()); for (BulkItemRequest item : request.items()) { final String itemAction = getAction(item); @@ -871,12 +875,14 @@ private void authorizeBulkItems( authzEngine.authorizeIndexAction( bulkItemInfo, authzInfo, - ril -> ril.onResponse(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())), - projectMetadata, - groupedActionListener.delegateFailureAndWrap( - (l, indexAuthorizationResult) -> l.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult)) - ) - ); + () -> SubscribableListener.newSucceeded(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())), + projectMetadata + ) + .addListener( + groupedActionListener.delegateFailureAndWrap( + (l, indexAuthorizationResult) -> l.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult)) + ) + ); }); }, listener::onFailure)); } @@ -1068,7 +1074,7 @@ private CachingAsyncSupplier(AsyncSupplier supplier) { } @Override - public void getAsync(ActionListener listener) { + public SubscribableListener getAsync() { if (valueFuture == null) { boolean firstInvocation = false; synchronized (this) { @@ -1078,10 +1084,10 @@ public void getAsync(ActionListener listener) { } } if (firstInvocation) { - asyncSupplier.getAsync(valueFuture); + asyncSupplier.getAsync().addListener(valueFuture); } } - valueFuture.addListener(listener); + return valueFuture; } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 24fc7480eda42..771cc4185bbed 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportMultiSearchAction; import org.elasticsearch.action.search.TransportSearchScrollAction; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.termvectors.MultiTermVectorsAction; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; @@ -314,12 +315,11 @@ private static boolean shouldAuthorizeIndexActionNameOnly(String action, Transpo } @Override - public void authorizeIndexAction( + public SubscribableListener authorizeIndexAction( RequestInfo requestInfo, AuthorizationInfo authorizationInfo, AsyncSupplier indicesAsyncSupplier, - ProjectMetadata metadata, - ActionListener listener + ProjectMetadata metadata ) { final String action = requestInfo.getAction(); final TransportRequest request = requestInfo.getRequest(); @@ -327,13 +327,14 @@ public void authorizeIndexAction( try { role = ensureRBAC(authorizationInfo).getRole(); } catch (Exception e) { - listener.onFailure(e); - return; + return SubscribableListener.newFailed(e); } if (TransportActionProxy.isProxyAction(action) || shouldAuthorizeIndexActionNameOnly(action, request)) { // we've already validated that the request is a proxy request so we can skip that but we still // need to validate that the action is allowed and then move on - listener.onResponse(role.checkIndicesAction(action) ? IndexAuthorizationResult.EMPTY : IndexAuthorizationResult.DENIED); + return SubscribableListener.newSucceeded( + role.checkIndicesAction(action) ? IndexAuthorizationResult.EMPTY : IndexAuthorizationResult.DENIED + ); } else if (request instanceof IndicesRequest == false) { if (SCROLL_RELATED_ACTIONS.contains(action)) { // scroll is special @@ -351,6 +352,7 @@ public void authorizeIndexAction( // index and if they cannot, we can fail the request early before we allow the execution of the action and in // turn the shard actions if (TransportSearchScrollAction.TYPE.name().equals(action)) { + final SubscribableListener listener = new SubscribableListener<>(); ActionRunnable.supply(listener.delegateFailureAndWrap((l, parsedScrollId) -> { if (parsedScrollId.hasLocalIndices()) { l.onResponse( @@ -360,6 +362,7 @@ public void authorizeIndexAction( l.onResponse(IndexAuthorizationResult.EMPTY); } }), ((SearchScrollRequest) request)::parseScrollId).run(); + return listener; } else { // RBACEngine simply authorizes scroll related actions without filling in any DLS/FLS permissions. // Scroll related actions have special security logic, where the security context of the initial search @@ -369,26 +372,26 @@ public void authorizeIndexAction( // The DLS/FLS permissions are used inside the {@code DirectoryReader} that {@code SecurityIndexReaderWrapper} // built while handling the initial search request. In addition, for consistency, the DLS/FLS permissions from // the originating search request are attached to the thread context upon validating the scroll. - listener.onResponse(IndexAuthorizationResult.EMPTY); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.EMPTY); } } else if (isAsyncRelatedAction(action)) { if (SubmitAsyncSearchAction.NAME.equals(action)) { // authorize submit async search but don't fill in the DLS/FLS permissions // the `null` IndicesAccessControl parameter indicates that this action has *not* determined // which DLS/FLS controls should be applied to this action - listener.onResponse(IndexAuthorizationResult.EMPTY); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.EMPTY); } else { // async-search actions other than submit have a custom security layer that checks if the current user is // the same as the user that submitted the original request so no additional checks are needed here. - listener.onResponse(IndexAuthorizationResult.ALLOW_NO_INDICES); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); } } else if (action.equals(TransportClosePointInTimeAction.TYPE.name())) { - listener.onResponse(IndexAuthorizationResult.ALLOW_NO_INDICES); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); } else { assert false : "only scroll and async-search related requests are known indices api that don't " + "support retrieving the indices they relate to"; - listener.onFailure( + return SubscribableListener.newFailed( new IllegalStateException( "only scroll and async-search related requests are known indices " + "api that don't support retrieving the indices they relate to" @@ -396,13 +399,16 @@ public void authorizeIndexAction( ); } } else if (isChildActionAuthorizedByParentOnLocalNode(requestInfo, authorizationInfo)) { - listener.onResponse(new IndexAuthorizationResult(requestInfo.getOriginatingAuthorizationContext().getIndicesAccessControl())); + return SubscribableListener.newSucceeded( + new IndexAuthorizationResult(requestInfo.getOriginatingAuthorizationContext().getIndicesAccessControl()) + ); } else if (PreAuthorizationUtils.shouldPreAuthorizeChildByParentAction(requestInfo, authorizationInfo)) { // We only pre-authorize child actions if DLS/FLS is not configured, // hence we can allow here access for all requested indices. - listener.onResponse(new IndexAuthorizationResult(IndicesAccessControl.allowAll())); + return SubscribableListener.newSucceeded(new IndexAuthorizationResult(IndicesAccessControl.allowAll())); } else if (allowsRemoteIndices(request) || role.checkIndicesAction(action)) { - indicesAsyncSupplier.getAsync(listener.delegateFailureAndWrap((delegateListener, resolvedIndices) -> { + final SubscribableListener listener = new SubscribableListener<>(); + indicesAsyncSupplier.getAsync().addListener(listener.delegateFailureAndWrap((delegateListener, resolvedIndices) -> { assert resolvedIndices.isEmpty() == false : "every indices request needs to have its indices set thus the resolved indices must not be empty"; // all wildcard expressions have been resolved and only the security plugin could have set '-*' here. @@ -437,8 +443,9 @@ public void authorizeIndexAction( delegateListener.onResponse(result); } })); + return listener; } else { - listener.onResponse(IndexAuthorizationResult.DENIED); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.DENIED); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java index 715cbdbf06752..fb9dc02e387e7 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java @@ -9,9 +9,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchSecurityException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.XPackLicenseState; @@ -42,11 +42,10 @@ public BulkShardRequestInterceptor(ThreadPool threadPool, XPackLicenseState lice } @Override - public void intercept( + public SubscribableListener intercept( RequestInfo requestInfo, AuthorizationEngine authzEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ) { final boolean isDlsLicensed = DOCUMENT_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); final boolean isFlsLicensed = FIELD_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); @@ -82,6 +81,6 @@ public void intercept( } } } - listener.onResponse(null); + return SubscribableListener.newSucceeded(null); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java index 73cb2bea3441d..9d739f6db54ad 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java @@ -10,8 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchSecurityException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; @@ -40,11 +40,10 @@ public DlsFlsLicenseRequestInterceptor(ThreadContext threadContext, XPackLicense } @Override - public void intercept( + public SubscribableListener intercept( AuthorizationEngine.RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ) { if (requestInfo.getRequest() instanceof IndicesRequest && false == TransportActionProxy.isProxyAction(requestInfo.getAction())) { final Role role = RBACEngine.maybeGetRBACEngineRole(threadContext.getTransient(AUTHORIZATION_INFO_KEY)); @@ -96,13 +95,12 @@ public void intercept( "es.indices_with_dls_or_fls", indicesAccessControl.getIndicesWithFieldOrDocumentLevelSecurity() ); - listener.onFailure(licenseException); - return; + return SubscribableListener.newFailed(licenseException); } } } } } - listener.onResponse(null); + return SubscribableListener.newSucceeded(null); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java index 83edb0f1115ac..93e07dabbdcf3 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.TransportActionProxy; @@ -42,11 +43,10 @@ abstract class FieldAndDocumentLevelSecurityRequestInterceptor implements Reques } @Override - public void intercept( + public SubscribableListener intercept( RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ) { final boolean isDlsLicensed = DOCUMENT_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); final boolean isFlsLicensed = FIELD_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); @@ -72,11 +72,12 @@ && supports(indicesRequest) } } if (false == accessControlByIndex.isEmpty()) { + final SubscribableListener listener = new SubscribableListener<>(); disableFeatures(indicesRequest, accessControlByIndex, listener); - return; + return listener; } } - listener.onResponse(null); + return SubscribableListener.newSucceeded(null); } abstract void disableFeatures( diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java index 01cc1e9dd4cdf..93f06efa32e93 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Tuple; import org.elasticsearch.license.XPackLicenseState; @@ -53,11 +54,10 @@ public IndicesAliasesRequestInterceptor( } @Override - public void intercept( + public SubscribableListener intercept( RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ) { if (requestInfo.getRequest() instanceof IndicesAliasesRequest request) { final AuditTrail auditTrail = auditTrailService.get(); @@ -72,14 +72,13 @@ public void intercept( if (indexAccessControl != null && (indexAccessControl.getFieldPermissions().hasFieldLevelSecurity() || indexAccessControl.getDocumentPermissions().hasDocumentLevelPermissions())) { - listener.onFailure( + return SubscribableListener.newFailed( new ElasticsearchSecurityException( "Alias requests are not allowed for " + "users who have field or document level security enabled on one of the indices", RestStatus.BAD_REQUEST ) ); - return; } } } @@ -99,6 +98,7 @@ public void intercept( list.addAll(toMerge); return list; })); + final SubscribableListener listener = new SubscribableListener<>(); authorizationEngine.validateIndexPermissionsAreSubset( requestInfo, authorizationInfo, @@ -123,8 +123,9 @@ public void intercept( } }, listener::onFailure), threadContext) ); + return listener; } else { - listener.onResponse(null); + return SubscribableListener.newSucceeded(null); } } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/RequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/RequestInterceptor.java index ba36cd2b78bb0..8f968f747bb74 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/RequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/RequestInterceptor.java @@ -6,7 +6,7 @@ */ package org.elasticsearch.xpack.security.authz.interceptor; -import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine; import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationInfo; import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.RequestInfo; @@ -20,10 +20,9 @@ public interface RequestInterceptor { * This interceptor will introspect the request and potentially modify it. If the interceptor does not apply * to the request then the request will not be modified. */ - void intercept( + SubscribableListener intercept( RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java index c2bea70613b46..e3f13fe1e10cf 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.shrink.ResizeRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.rest.RestStatus; @@ -49,11 +50,10 @@ public ResizeRequestInterceptor( } @Override - public void intercept( + public SubscribableListener intercept( RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ) { if (requestInfo.getRequest() instanceof ResizeRequest request) { final AuditTrail auditTrail = auditTrailService.get(); @@ -67,17 +67,17 @@ public void intercept( if (indexAccessControl != null && (indexAccessControl.getFieldPermissions().hasFieldLevelSecurity() || indexAccessControl.getDocumentPermissions().hasDocumentLevelPermissions())) { - listener.onFailure( + return SubscribableListener.newFailed( new ElasticsearchSecurityException( "Resize requests are not allowed for users when " + "field or document level security is enabled on the source index", RestStatus.BAD_REQUEST ) ); - return; } } + final SubscribableListener listener = new SubscribableListener<>(); authorizationEngine.validateIndexPermissionsAreSubset( requestInfo, authorizationInfo, @@ -101,8 +101,9 @@ public void intercept( } }, listener::onFailure), threadContext) ); + return listener; } else { - listener.onResponse(null); + return SubscribableListener.newSucceeded(null); } } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java index d8ec078507bfe..830ea90e3beed 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java @@ -6,8 +6,8 @@ */ package org.elasticsearch.xpack.security.authz.interceptor; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.threadpool.ThreadPool; @@ -33,11 +33,10 @@ public SearchRequestCacheDisablingInterceptor(ThreadPool threadPool, XPackLicens } @Override - public void intercept( + public SubscribableListener intercept( AuthorizationEngine.RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationEngine.AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationEngine.AuthorizationInfo authorizationInfo ) { final boolean isDlsLicensed = DOCUMENT_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); final boolean isFlsLicensed = FIELD_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); @@ -50,7 +49,7 @@ && hasRemoteIndices(searchRequest) searchRequest.requestCache(false); } } - listener.onResponse(null); + return SubscribableListener.newSucceeded(null); } // package private for test diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index e1250a7dd2081..904c9af18c7e2 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -76,6 +76,7 @@ import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.termvectors.MultiTermVectorsAction; @@ -3456,12 +3457,11 @@ public void authorizeClusterAction( } @Override - public void authorizeIndexAction( + public SubscribableListener authorizeIndexAction( RequestInfo requestInfo, AuthorizationInfo authorizationInfo, AsyncSupplier indicesAsyncSupplier, - ProjectMetadata metadata, - ActionListener listener + ProjectMetadata metadata ) { throw new UnsupportedOperationException("not implemented"); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java index 4ce74f942a228..59fa2e417432c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.ElasticsearchClient; import org.elasticsearch.cluster.metadata.DataStream; @@ -1985,7 +1986,7 @@ private void authorizeIndicesAction( final ResolvedIndices resolvedIndices = new ResolvedIndices(List.of(indices), List.of()); final TransportRequest searchRequest = new SearchRequest(indices); final RequestInfo requestInfo = createRequestInfo(searchRequest, action, parentAuthorization); - final AsyncSupplier indicesAsyncSupplier = s -> s.onResponse(resolvedIndices); + final AsyncSupplier indicesAsyncSupplier = () -> SubscribableListener.newSucceeded(resolvedIndices); Metadata.Builder metadata = Metadata.builder(); Stream.of(indices) @@ -1996,7 +1997,7 @@ private void authorizeIndicesAction( ) ); - engine.authorizeIndexAction(requestInfo, authzInfo, indicesAsyncSupplier, metadata.build().getProject(), listener); + engine.authorizeIndexAction(requestInfo, authzInfo, indicesAsyncSupplier, metadata.build().getProject()).addListener(listener); } private static RequestInfo createRequestInfo(TransportRequest request, String action, ParentActionAuthorization parentAuthorization) { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptorTests.java index 0dacf6b37e2df..500a1aadf9f28 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptorTests.java @@ -128,7 +128,7 @@ public void checkInterceptorWithDlsFlsConfigured(boolean dlsFlsFeatureEnabled, S }).when(mockEngine) .validateIndexPermissionsAreSubset(eq(requestInfo), eq(EmptyAuthorizationInfo.INSTANCE), anyMap(), anyActionListener()); ElasticsearchSecurityException securityException = expectThrows(ElasticsearchSecurityException.class, () -> { - interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); }); assertEquals(expectedErrorMessage, securityException.getMessage()); @@ -184,7 +184,7 @@ public void testInterceptorThrowsWhenTargetHasGreaterPermissions() throws Except anyActionListener() ); ElasticsearchSecurityException securityException = expectThrows(ElasticsearchSecurityException.class, () -> { - interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); }); assertEquals( @@ -217,7 +217,7 @@ public void testInterceptorThrowsWhenTargetHasGreaterPermissions() throws Except any(Map.class), anyActionListener() ); - interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptorTests.java index 68c86a561025a..e3b402d96d416 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptorTests.java @@ -120,7 +120,7 @@ public void checkResizeWithDlsFlsConfigured(boolean dlsFlsFeatureEnabled, String }).when(mockEngine) .validateIndexPermissionsAreSubset(eq(requestInfo), eq(EmptyAuthorizationInfo.INSTANCE), anyMap(), anyActionListener()); ElasticsearchSecurityException securityException = expectThrows(ElasticsearchSecurityException.class, () -> { - resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); }); assertEquals(expectedErrorMessage, securityException.getMessage()); @@ -160,7 +160,7 @@ public void testResizeRequestInterceptorThrowsWhenTargetHasGreaterPermissions() anyActionListener() ); ElasticsearchSecurityException securityException = expectThrows(ElasticsearchSecurityException.class, () -> { - resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); }); assertEquals( @@ -184,7 +184,7 @@ public void testResizeRequestInterceptorThrowsWhenTargetHasGreaterPermissions() any(Map.class), anyActionListener() ); - resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptorTests.java index b09527061f0d5..f75080f5f9792 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptorTests.java @@ -103,7 +103,7 @@ public void testRequestCacheWillBeDisabledWhenSearchRemoteIndices() { threadPool.getThreadContext().putTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY, indicesAccessControl); final PlainActionFuture future = new PlainActionFuture<>(); - interceptor.intercept(requestInfo, mock(AuthorizationEngine.class), mock(AuthorizationInfo.class), future); + interceptor.intercept(requestInfo, mock(AuthorizationEngine.class), mock(AuthorizationInfo.class)).addListener(future); future.actionGet(); if (remoteIndices.length > 0) {