From d1e52857bcf8a05c87413fbdca04705d3d42d128 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 1 Mar 2025 22:11:45 +0100 Subject: [PATCH 1/4] Move some security APIs to using promises in place of callbacks We ahve some incredibly deep callstacks in security that seem to visibly raise context switch costs, make profiling more complicated and generally make the code rather hard to follow. Since the methods adjusted here return a result synchronously we can both save overhead and make things a little easier to follow by using promises as returns in place of consuming callbacks. --- .../action/support/SubscribableListener.java | 2 +- .../security/authz/AuthorizationEngine.java | 14 +- .../ProfileCancellationIntegTests.java | 9 +- .../security/authz/AuthorizationService.java | 144 ++++++++++-------- .../xpack/security/authz/RBACEngine.java | 38 +++-- .../BulkShardRequestInterceptor.java | 9 +- .../DlsFlsLicenseRequestInterceptor.java | 12 +- ...cumentLevelSecurityRequestInterceptor.java | 11 +- .../IndicesAliasesRequestInterceptor.java | 14 +- .../authz/interceptor/RequestInterceptor.java | 7 +- .../interceptor/ResizeRequestInterceptor.java | 13 +- ...earchRequestCacheDisablingInterceptor.java | 9 +- .../authz/AuthorizationServiceTests.java | 6 +- .../xpack/security/authz/RBACEngineTests.java | 5 +- ...IndicesAliasesRequestInterceptorTests.java | 6 +- .../ResizeRequestInterceptorTests.java | 6 +- ...RequestCacheDisablingInterceptorTests.java | 2 +- 17 files changed, 167 insertions(+), 140 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java index c6c240e3b6759..1b4421a92208b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java @@ -273,7 +273,7 @@ public final boolean isDone() { * @throws IllegalStateException if this listener is not complete yet and assertions are disabled. */ @SuppressWarnings({ "unchecked", "rawtypes" }) - protected final T rawResult() throws Exception { + public final T rawResult() throws Exception { final Object currentState = state; if (currentState instanceof SuccessResult result) { return (T) result.result(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java index c36a5c350658e..9f18e7915a725 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.bytes.BytesReference; @@ -75,7 +76,7 @@ * can actually impersonate the user running the request. *
  • {@link #authorizeClusterAction(RequestInfo, AuthorizationInfo, ActionListener)} if the * request is a cluster level operation.
  • - *
  • {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata, ActionListener)} if + *
  • {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata)} if * the request is a an index action. This method may be called multiple times for a single * request as the request may be made up of sub-requests that also need to be authorized. The async supplier * for resolved indices will invoke the @@ -85,7 +86,7 @@ *

    * NOTE: the {@link #loadAuthorizedIndices(RequestInfo, AuthorizationInfo, Map, ActionListener)} * method may be called prior to - * {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata, ActionListener)} + * {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata)} * in cases where wildcards need to be expanded. *


    * Authorization engines can be called from various threads including network threads that should @@ -161,14 +162,13 @@ public interface AuthorizationEngine { * attempting to operate on * @param metadata a map of a string name to the cluster metadata specific to that * alias or index - * @param listener the listener to be notified of the authorization result + * @return a listener to be notified of the authorization result */ - void authorizeIndexAction( + SubscribableListener authorizeIndexAction( RequestInfo requestInfo, AuthorizationInfo authorizationInfo, AsyncSupplier indicesAsyncSupplier, - ProjectMetadata metadata, - ActionListener listener + ProjectMetadata metadata ); /** @@ -766,6 +766,6 @@ interface AsyncSupplier { * Asynchronously retrieves the value that is being supplied and notifies the listener upon * completion. */ - void getAsync(ActionListener listener); + SubscribableListener getAsync(); } } diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java index 23807a318981c..b619d3cfc7563 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.TransportSearchAction; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.Cancellable; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; @@ -21,6 +22,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.plugins.ActionPlugin; @@ -406,14 +408,13 @@ public void authorizeClusterAction( } @Override - public void authorizeIndexAction( + public SubscribableListener authorizeIndexAction( RequestInfo requestInfo, AuthorizationInfo authorizationInfo, AsyncSupplier indicesAsyncSupplier, - ProjectMetadata metadata, - ActionListener listener + ProjectMetadata metadata ) { - listener.onResponse(IndexAuthorizationResult.ALLOW_NO_INDICES); + return ListenableFuture.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); } @Override diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index 137237457d816..adb25f2ed8b0e 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; import org.elasticsearch.action.update.TransportUpdateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -482,16 +483,16 @@ private void authorizeAction( } else if (isIndexAction(action)) { final ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); assert projectMetadata != null; - final AsyncSupplier resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(resolvedIndicesListener -> { + final AsyncSupplier resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(() -> { if (request instanceof SearchRequest searchRequest && searchRequest.pointInTimeBuilder() != null) { var resolvedIndices = indicesAndAliasesResolver.resolvePITIndices(searchRequest); - resolvedIndicesListener.onResponse(resolvedIndices); - return; + return ListenableFuture.newSucceeded(resolvedIndices); } final ResolvedIndices resolvedIndices = indicesAndAliasesResolver.tryResolveWithoutWildcards(action, request); if (resolvedIndices != null) { - resolvedIndicesListener.onResponse(resolvedIndices); + return ListenableFuture.newSucceeded(resolvedIndices); } else { + final SubscribableListener resolvedIndicesListener = new SubscribableListener(); authzEngine.loadAuthorizedIndices( requestInfo, authzInfo, @@ -510,33 +511,31 @@ private void authorizeAction( } ) ); + return resolvedIndicesListener; } }); - authzEngine.authorizeIndexAction( - requestInfo, - authzInfo, - resolvedIndicesAsyncSupplier, - projectMetadata, - wrapPreservingContext( - new AuthorizationResultListener<>( - result -> handleIndexActionAuthorizationResult( - result, + authzEngine.authorizeIndexAction(requestInfo, authzInfo, resolvedIndicesAsyncSupplier, projectMetadata) + .addListener( + wrapPreservingContext( + new AuthorizationResultListener<>( + result -> handleIndexActionAuthorizationResult( + result, + requestInfo, + requestId, + authzInfo, + authzEngine, + resolvedIndicesAsyncSupplier, + projectMetadata, + listener + ), + listener::onFailure, requestInfo, requestId, - authzInfo, - authzEngine, - resolvedIndicesAsyncSupplier, - projectMetadata, - listener + authzInfo ), - listener::onFailure, - requestInfo, - requestId, - authzInfo - ), - threadContext - ) - ); + threadContext + ) + ); } else { logger.warn("denying access for [{}] as action [{}] is not an index or cluster action", authentication, action); auditTrail.accessDenied(requestId, authentication, action, request, authzInfo); @@ -580,29 +579,30 @@ private void handleIndexActionAuthorizationResult( TransportIndicesAliasesAction.NAME, authzContext ); - authzEngine.authorizeIndexAction( - aliasesRequestInfo, - authzInfo, - ril -> resolvedIndicesAsyncSupplier.getAsync(ril.delegateFailureAndWrap((l, resolvedIndices) -> { + authzEngine.authorizeIndexAction(aliasesRequestInfo, authzInfo, () -> { + SubscribableListener ril = new SubscribableListener<>(); + resolvedIndicesAsyncSupplier.getAsync().addListener(ril.delegateFailureAndWrap((l, resolvedIndices) -> { List aliasesAndIndices = new ArrayList<>(resolvedIndices.getLocal()); for (Alias alias : aliases) { aliasesAndIndices.add(alias.name()); } ResolvedIndices withAliases = new ResolvedIndices(aliasesAndIndices, Collections.emptyList()); l.onResponse(withAliases); - })), - projectMetadata, - wrapPreservingContext( - new AuthorizationResultListener<>( - authorizationResult -> runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener), - listener::onFailure, - aliasesRequestInfo, - requestId, - authzInfo - ), - threadContext - ) - ); + })); + return ril; + }, projectMetadata) + .addListener( + wrapPreservingContext( + new AuthorizationResultListener<>( + authorizationResult -> runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener), + listener::onFailure, + aliasesRequestInfo, + requestId, + authzInfo + ), + threadContext + ) + ); } } else if (action.equals(TransportShardBulkAction.ACTION_NAME)) { // if this is performing multiple actions on the index, then check each of those actions. @@ -635,17 +635,33 @@ private void runRequestInterceptors( listener.onResponse(null); } else { final Iterator requestInterceptorIterator = requestInterceptors.iterator(); - requestInterceptorIterator.next() - .intercept(requestInfo, authorizationEngine, authorizationInfo, new DelegatingActionListener<>(listener) { - @Override - public void onResponse(Void unused) { - if (requestInterceptorIterator.hasNext()) { - requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo, this); - } else { - listener.onResponse(null); + while (requestInterceptorIterator.hasNext()) { + var res = requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo); + if (res.isDone() == false) { + res.addListener(new DelegatingActionListener<>(listener) { + @Override + public void onResponse(Void unused) { + if (requestInterceptorIterator.hasNext()) { + var nestedRest = requestInterceptorIterator.next() + .intercept(requestInfo, authorizationEngine, authorizationInfo); + if (nestedRest.isDone() == false) { + nestedRest.addListener(this); + } + } else { + listener.onResponse(null); + } } - } - }); + }); + return; + } + try { + res.rawResult(); + } catch (Exception e) { + listener.onFailure(e); + return; + } + } + listener.onResponse(null); } } @@ -776,7 +792,7 @@ private void authorizeBulkItems( final Map> actionToIndicesMap = new HashMap<>(4); final AuditTrail auditTrail = auditTrailService.get(); - resolvedIndicesAsyncSupplier.getAsync(ActionListener.wrap(overallResolvedIndices -> { + resolvedIndicesAsyncSupplier.getAsync().addListener(ActionListener.wrap(overallResolvedIndices -> { final Set localIndices = new HashSet<>(overallResolvedIndices.getLocal()); for (BulkItemRequest item : request.items()) { final String itemAction = getAction(item); @@ -871,12 +887,14 @@ private void authorizeBulkItems( authzEngine.authorizeIndexAction( bulkItemInfo, authzInfo, - ril -> ril.onResponse(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())), - projectMetadata, - groupedActionListener.delegateFailureAndWrap( - (l, indexAuthorizationResult) -> l.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult)) - ) - ); + () -> ListenableFuture.newSucceeded(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())), + projectMetadata + ) + .addListener( + groupedActionListener.delegateFailureAndWrap( + (l, indexAuthorizationResult) -> l.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult)) + ) + ); }); }, listener::onFailure)); } @@ -1068,7 +1086,7 @@ private CachingAsyncSupplier(AsyncSupplier supplier) { } @Override - public void getAsync(ActionListener listener) { + public SubscribableListener getAsync() { if (valueFuture == null) { boolean firstInvocation = false; synchronized (this) { @@ -1078,10 +1096,10 @@ public void getAsync(ActionListener listener) { } } if (firstInvocation) { - asyncSupplier.getAsync(valueFuture); + asyncSupplier.getAsync().addListener(valueFuture); } } - valueFuture.addListener(listener); + return valueFuture; } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 24fc7480eda42..0f5703aaa5228 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportMultiSearchAction; import org.elasticsearch.action.search.TransportSearchScrollAction; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.termvectors.MultiTermVectorsAction; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; @@ -40,6 +41,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CachedSupplier; +import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -314,12 +316,11 @@ private static boolean shouldAuthorizeIndexActionNameOnly(String action, Transpo } @Override - public void authorizeIndexAction( + public SubscribableListener authorizeIndexAction( RequestInfo requestInfo, AuthorizationInfo authorizationInfo, AsyncSupplier indicesAsyncSupplier, - ProjectMetadata metadata, - ActionListener listener + ProjectMetadata metadata ) { final String action = requestInfo.getAction(); final TransportRequest request = requestInfo.getRequest(); @@ -327,13 +328,14 @@ public void authorizeIndexAction( try { role = ensureRBAC(authorizationInfo).getRole(); } catch (Exception e) { - listener.onFailure(e); - return; + return ListenableFuture.newFailed(e); } if (TransportActionProxy.isProxyAction(action) || shouldAuthorizeIndexActionNameOnly(action, request)) { // we've already validated that the request is a proxy request so we can skip that but we still // need to validate that the action is allowed and then move on - listener.onResponse(role.checkIndicesAction(action) ? IndexAuthorizationResult.EMPTY : IndexAuthorizationResult.DENIED); + return ListenableFuture.newSucceeded( + role.checkIndicesAction(action) ? IndexAuthorizationResult.EMPTY : IndexAuthorizationResult.DENIED + ); } else if (request instanceof IndicesRequest == false) { if (SCROLL_RELATED_ACTIONS.contains(action)) { // scroll is special @@ -351,6 +353,7 @@ public void authorizeIndexAction( // index and if they cannot, we can fail the request early before we allow the execution of the action and in // turn the shard actions if (TransportSearchScrollAction.TYPE.name().equals(action)) { + final ListenableFuture listener = new ListenableFuture<>(); ActionRunnable.supply(listener.delegateFailureAndWrap((l, parsedScrollId) -> { if (parsedScrollId.hasLocalIndices()) { l.onResponse( @@ -360,6 +363,7 @@ public void authorizeIndexAction( l.onResponse(IndexAuthorizationResult.EMPTY); } }), ((SearchScrollRequest) request)::parseScrollId).run(); + return listener; } else { // RBACEngine simply authorizes scroll related actions without filling in any DLS/FLS permissions. // Scroll related actions have special security logic, where the security context of the initial search @@ -369,26 +373,26 @@ public void authorizeIndexAction( // The DLS/FLS permissions are used inside the {@code DirectoryReader} that {@code SecurityIndexReaderWrapper} // built while handling the initial search request. In addition, for consistency, the DLS/FLS permissions from // the originating search request are attached to the thread context upon validating the scroll. - listener.onResponse(IndexAuthorizationResult.EMPTY); + return ListenableFuture.newSucceeded(IndexAuthorizationResult.EMPTY); } } else if (isAsyncRelatedAction(action)) { if (SubmitAsyncSearchAction.NAME.equals(action)) { // authorize submit async search but don't fill in the DLS/FLS permissions // the `null` IndicesAccessControl parameter indicates that this action has *not* determined // which DLS/FLS controls should be applied to this action - listener.onResponse(IndexAuthorizationResult.EMPTY); + return ListenableFuture.newSucceeded(IndexAuthorizationResult.EMPTY); } else { // async-search actions other than submit have a custom security layer that checks if the current user is // the same as the user that submitted the original request so no additional checks are needed here. - listener.onResponse(IndexAuthorizationResult.ALLOW_NO_INDICES); + return ListenableFuture.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); } } else if (action.equals(TransportClosePointInTimeAction.TYPE.name())) { - listener.onResponse(IndexAuthorizationResult.ALLOW_NO_INDICES); + return ListenableFuture.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); } else { assert false : "only scroll and async-search related requests are known indices api that don't " + "support retrieving the indices they relate to"; - listener.onFailure( + return ListenableFuture.newFailed( new IllegalStateException( "only scroll and async-search related requests are known indices " + "api that don't support retrieving the indices they relate to" @@ -396,13 +400,16 @@ public void authorizeIndexAction( ); } } else if (isChildActionAuthorizedByParentOnLocalNode(requestInfo, authorizationInfo)) { - listener.onResponse(new IndexAuthorizationResult(requestInfo.getOriginatingAuthorizationContext().getIndicesAccessControl())); + return ListenableFuture.newSucceeded( + new IndexAuthorizationResult(requestInfo.getOriginatingAuthorizationContext().getIndicesAccessControl()) + ); } else if (PreAuthorizationUtils.shouldPreAuthorizeChildByParentAction(requestInfo, authorizationInfo)) { // We only pre-authorize child actions if DLS/FLS is not configured, // hence we can allow here access for all requested indices. - listener.onResponse(new IndexAuthorizationResult(IndicesAccessControl.allowAll())); + return ListenableFuture.newSucceeded(new IndexAuthorizationResult(IndicesAccessControl.allowAll())); } else if (allowsRemoteIndices(request) || role.checkIndicesAction(action)) { - indicesAsyncSupplier.getAsync(listener.delegateFailureAndWrap((delegateListener, resolvedIndices) -> { + final ListenableFuture listener = new ListenableFuture<>(); + indicesAsyncSupplier.getAsync().addListener(listener.delegateFailureAndWrap((delegateListener, resolvedIndices) -> { assert resolvedIndices.isEmpty() == false : "every indices request needs to have its indices set thus the resolved indices must not be empty"; // all wildcard expressions have been resolved and only the security plugin could have set '-*' here. @@ -437,8 +444,9 @@ public void authorizeIndexAction( delegateListener.onResponse(result); } })); + return listener; } else { - listener.onResponse(IndexAuthorizationResult.DENIED); + return ListenableFuture.newSucceeded(IndexAuthorizationResult.DENIED); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java index 715cbdbf06752..fb9dc02e387e7 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java @@ -9,9 +9,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchSecurityException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.XPackLicenseState; @@ -42,11 +42,10 @@ public BulkShardRequestInterceptor(ThreadPool threadPool, XPackLicenseState lice } @Override - public void intercept( + public SubscribableListener intercept( RequestInfo requestInfo, AuthorizationEngine authzEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ) { final boolean isDlsLicensed = DOCUMENT_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); final boolean isFlsLicensed = FIELD_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); @@ -82,6 +81,6 @@ public void intercept( } } } - listener.onResponse(null); + return SubscribableListener.newSucceeded(null); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java index 73cb2bea3441d..9d739f6db54ad 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java @@ -10,8 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchSecurityException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; @@ -40,11 +40,10 @@ public DlsFlsLicenseRequestInterceptor(ThreadContext threadContext, XPackLicense } @Override - public void intercept( + public SubscribableListener intercept( AuthorizationEngine.RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ) { if (requestInfo.getRequest() instanceof IndicesRequest && false == TransportActionProxy.isProxyAction(requestInfo.getAction())) { final Role role = RBACEngine.maybeGetRBACEngineRole(threadContext.getTransient(AUTHORIZATION_INFO_KEY)); @@ -96,13 +95,12 @@ public void intercept( "es.indices_with_dls_or_fls", indicesAccessControl.getIndicesWithFieldOrDocumentLevelSecurity() ); - listener.onFailure(licenseException); - return; + return SubscribableListener.newFailed(licenseException); } } } } } - listener.onResponse(null); + return SubscribableListener.newSucceeded(null); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java index 83edb0f1115ac..93e07dabbdcf3 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.TransportActionProxy; @@ -42,11 +43,10 @@ abstract class FieldAndDocumentLevelSecurityRequestInterceptor implements Reques } @Override - public void intercept( + public SubscribableListener intercept( RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ) { final boolean isDlsLicensed = DOCUMENT_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); final boolean isFlsLicensed = FIELD_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); @@ -72,11 +72,12 @@ && supports(indicesRequest) } } if (false == accessControlByIndex.isEmpty()) { + final SubscribableListener listener = new SubscribableListener<>(); disableFeatures(indicesRequest, accessControlByIndex, listener); - return; + return listener; } } - listener.onResponse(null); + return SubscribableListener.newSucceeded(null); } abstract void disableFeatures( diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java index 01cc1e9dd4cdf..013b8c952b7bd 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java @@ -9,6 +9,8 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Tuple; import org.elasticsearch.license.XPackLicenseState; @@ -53,11 +55,10 @@ public IndicesAliasesRequestInterceptor( } @Override - public void intercept( + public SubscribableListener intercept( RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ) { if (requestInfo.getRequest() instanceof IndicesAliasesRequest request) { final AuditTrail auditTrail = auditTrailService.get(); @@ -72,14 +73,13 @@ public void intercept( if (indexAccessControl != null && (indexAccessControl.getFieldPermissions().hasFieldLevelSecurity() || indexAccessControl.getDocumentPermissions().hasDocumentLevelPermissions())) { - listener.onFailure( + return SubscribableListener.newFailed( new ElasticsearchSecurityException( "Alias requests are not allowed for " + "users who have field or document level security enabled on one of the indices", RestStatus.BAD_REQUEST ) ); - return; } } } @@ -99,6 +99,7 @@ public void intercept( list.addAll(toMerge); return list; })); + final SubscribableListener listener = new ListenableFuture<>(); authorizationEngine.validateIndexPermissionsAreSubset( requestInfo, authorizationInfo, @@ -123,8 +124,9 @@ public void intercept( } }, listener::onFailure), threadContext) ); + return listener; } else { - listener.onResponse(null); + return ListenableFuture.newSucceeded(null); } } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/RequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/RequestInterceptor.java index ba36cd2b78bb0..8f968f747bb74 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/RequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/RequestInterceptor.java @@ -6,7 +6,7 @@ */ package org.elasticsearch.xpack.security.authz.interceptor; -import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine; import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationInfo; import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.RequestInfo; @@ -20,10 +20,9 @@ public interface RequestInterceptor { * This interceptor will introspect the request and potentially modify it. If the interceptor does not apply * to the request then the request will not be modified. */ - void intercept( + SubscribableListener intercept( RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java index c2bea70613b46..e3f13fe1e10cf 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.shrink.ResizeRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.rest.RestStatus; @@ -49,11 +50,10 @@ public ResizeRequestInterceptor( } @Override - public void intercept( + public SubscribableListener intercept( RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationInfo authorizationInfo ) { if (requestInfo.getRequest() instanceof ResizeRequest request) { final AuditTrail auditTrail = auditTrailService.get(); @@ -67,17 +67,17 @@ public void intercept( if (indexAccessControl != null && (indexAccessControl.getFieldPermissions().hasFieldLevelSecurity() || indexAccessControl.getDocumentPermissions().hasDocumentLevelPermissions())) { - listener.onFailure( + return SubscribableListener.newFailed( new ElasticsearchSecurityException( "Resize requests are not allowed for users when " + "field or document level security is enabled on the source index", RestStatus.BAD_REQUEST ) ); - return; } } + final SubscribableListener listener = new SubscribableListener<>(); authorizationEngine.validateIndexPermissionsAreSubset( requestInfo, authorizationInfo, @@ -101,8 +101,9 @@ public void intercept( } }, listener::onFailure), threadContext) ); + return listener; } else { - listener.onResponse(null); + return SubscribableListener.newSucceeded(null); } } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java index d8ec078507bfe..830ea90e3beed 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java @@ -6,8 +6,8 @@ */ package org.elasticsearch.xpack.security.authz.interceptor; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.threadpool.ThreadPool; @@ -33,11 +33,10 @@ public SearchRequestCacheDisablingInterceptor(ThreadPool threadPool, XPackLicens } @Override - public void intercept( + public SubscribableListener intercept( AuthorizationEngine.RequestInfo requestInfo, AuthorizationEngine authorizationEngine, - AuthorizationEngine.AuthorizationInfo authorizationInfo, - ActionListener listener + AuthorizationEngine.AuthorizationInfo authorizationInfo ) { final boolean isDlsLicensed = DOCUMENT_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); final boolean isFlsLicensed = FIELD_LEVEL_SECURITY_FEATURE.checkWithoutTracking(licenseState); @@ -50,7 +49,7 @@ && hasRemoteIndices(searchRequest) searchRequest.requestCache(false); } } - listener.onResponse(null); + return SubscribableListener.newSucceeded(null); } // package private for test diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index 163017c61a4c3..7709e6d6d6ed8 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -76,6 +76,7 @@ import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.termvectors.MultiTermVectorsAction; @@ -3452,12 +3453,11 @@ public void authorizeClusterAction( } @Override - public void authorizeIndexAction( + public SubscribableListener authorizeIndexAction( RequestInfo requestInfo, AuthorizationInfo authorizationInfo, AsyncSupplier indicesAsyncSupplier, - ProjectMetadata metadata, - ActionListener listener + ProjectMetadata metadata ) { throw new UnsupportedOperationException("not implemented"); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java index 353ce48dd7aab..7cb4bda0f232f 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexVersion; @@ -1970,7 +1971,7 @@ private void authorizeIndicesAction( final ResolvedIndices resolvedIndices = new ResolvedIndices(List.of(indices), List.of()); final TransportRequest searchRequest = new SearchRequest(indices); final RequestInfo requestInfo = createRequestInfo(searchRequest, action, parentAuthorization); - final AsyncSupplier indicesAsyncSupplier = s -> s.onResponse(resolvedIndices); + final AsyncSupplier indicesAsyncSupplier = () -> ListenableFuture.newSucceeded(resolvedIndices); Metadata.Builder metadata = Metadata.builder(); Stream.of(indices) @@ -1981,7 +1982,7 @@ private void authorizeIndicesAction( ) ); - engine.authorizeIndexAction(requestInfo, authzInfo, indicesAsyncSupplier, metadata.build().getProject(), listener); + engine.authorizeIndexAction(requestInfo, authzInfo, indicesAsyncSupplier, metadata.build().getProject()).addListener(listener); } private static RequestInfo createRequestInfo(TransportRequest request, String action, ParentActionAuthorization parentAuthorization) { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptorTests.java index 4609767aa7926..3a2781fb8d404 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptorTests.java @@ -128,7 +128,7 @@ public void checkInterceptorWithDlsFlsConfigured(boolean dlsFlsFeatureEnabled, S }).when(mockEngine) .validateIndexPermissionsAreSubset(eq(requestInfo), eq(EmptyAuthorizationInfo.INSTANCE), anyMap(), anyActionListener()); ElasticsearchSecurityException securityException = expectThrows(ElasticsearchSecurityException.class, () -> { - interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); }); assertEquals(expectedErrorMessage, securityException.getMessage()); @@ -184,7 +184,7 @@ public void testInterceptorThrowsWhenTargetHasGreaterPermissions() throws Except anyActionListener() ); ElasticsearchSecurityException securityException = expectThrows(ElasticsearchSecurityException.class, () -> { - interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); }); assertEquals( @@ -217,7 +217,7 @@ public void testInterceptorThrowsWhenTargetHasGreaterPermissions() throws Except any(Map.class), anyActionListener() ); - interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + interceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptorTests.java index 68c86a561025a..e3b402d96d416 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptorTests.java @@ -120,7 +120,7 @@ public void checkResizeWithDlsFlsConfigured(boolean dlsFlsFeatureEnabled, String }).when(mockEngine) .validateIndexPermissionsAreSubset(eq(requestInfo), eq(EmptyAuthorizationInfo.INSTANCE), anyMap(), anyActionListener()); ElasticsearchSecurityException securityException = expectThrows(ElasticsearchSecurityException.class, () -> { - resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); }); assertEquals(expectedErrorMessage, securityException.getMessage()); @@ -160,7 +160,7 @@ public void testResizeRequestInterceptorThrowsWhenTargetHasGreaterPermissions() anyActionListener() ); ElasticsearchSecurityException securityException = expectThrows(ElasticsearchSecurityException.class, () -> { - resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); }); assertEquals( @@ -184,7 +184,7 @@ public void testResizeRequestInterceptorThrowsWhenTargetHasGreaterPermissions() any(Map.class), anyActionListener() ); - resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE, plainActionFuture); + resizeRequestInterceptor.intercept(requestInfo, mockEngine, EmptyAuthorizationInfo.INSTANCE).addListener(plainActionFuture); plainActionFuture.actionGet(); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptorTests.java index b09527061f0d5..f75080f5f9792 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptorTests.java @@ -103,7 +103,7 @@ public void testRequestCacheWillBeDisabledWhenSearchRemoteIndices() { threadPool.getThreadContext().putTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY, indicesAccessControl); final PlainActionFuture future = new PlainActionFuture<>(); - interceptor.intercept(requestInfo, mock(AuthorizationEngine.class), mock(AuthorizationInfo.class), future); + interceptor.intercept(requestInfo, mock(AuthorizationEngine.class), mock(AuthorizationInfo.class)).addListener(future); future.actionGet(); if (remoteIndices.length > 0) { From 83f2ef88159198814e6b74e40309cdac252c2b2d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 3 Mar 2025 11:17:48 +0100 Subject: [PATCH 2/4] CR: comments --- .../action/support/SubscribableListener.java | 9 ++++++- .../ProfileCancellationIntegTests.java | 3 +-- .../security/authz/AuthorizationService.java | 26 +++++++------------ .../xpack/security/authz/RBACEngine.java | 25 +++++++++--------- .../IndicesAliasesRequestInterceptor.java | 5 ++-- .../xpack/security/authz/RBACEngineTests.java | 4 +-- 6 files changed, 34 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java index 1b4421a92208b..3056f7cda6429 100644 --- a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java @@ -266,6 +266,13 @@ public final boolean isDone() { return isDone(state); } + /** + * @return return {@code true} if and only if this listener is done and has been completed successfully + */ + public final boolean isSuccess() { + return state instanceof SuccessResult; + } + /** * @return the result with which this listener completed successfully, or throw the exception with which it failed. * @@ -273,7 +280,7 @@ public final boolean isDone() { * @throws IllegalStateException if this listener is not complete yet and assertions are disabled. */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public final T rawResult() throws Exception { + protected final T rawResult() throws Exception { final Object currentState = state; if (currentState instanceof SuccessResult result) { return (T) result.result(); diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java index b619d3cfc7563..0f832b83a177f 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.plugins.ActionPlugin; @@ -414,7 +413,7 @@ public SubscribableListener authorizeIndexAction( AsyncSupplier indicesAsyncSupplier, ProjectMetadata metadata ) { - return ListenableFuture.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); } @Override diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index adb25f2ed8b0e..0ce9e4cbca9c9 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -486,13 +486,13 @@ private void authorizeAction( final AsyncSupplier resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(() -> { if (request instanceof SearchRequest searchRequest && searchRequest.pointInTimeBuilder() != null) { var resolvedIndices = indicesAndAliasesResolver.resolvePITIndices(searchRequest); - return ListenableFuture.newSucceeded(resolvedIndices); + return SubscribableListener.newSucceeded(resolvedIndices); } final ResolvedIndices resolvedIndices = indicesAndAliasesResolver.tryResolveWithoutWildcards(action, request); if (resolvedIndices != null) { - return ListenableFuture.newSucceeded(resolvedIndices); + return SubscribableListener.newSucceeded(resolvedIndices); } else { - final SubscribableListener resolvedIndicesListener = new SubscribableListener(); + final SubscribableListener resolvedIndicesListener = new SubscribableListener<>(); authzEngine.loadAuthorizedIndices( requestInfo, authzInfo, @@ -637,29 +637,21 @@ private void runRequestInterceptors( final Iterator requestInterceptorIterator = requestInterceptors.iterator(); while (requestInterceptorIterator.hasNext()) { var res = requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo); - if (res.isDone() == false) { + if (res.isSuccess() == false) { res.addListener(new DelegatingActionListener<>(listener) { @Override public void onResponse(Void unused) { if (requestInterceptorIterator.hasNext()) { - var nestedRest = requestInterceptorIterator.next() - .intercept(requestInfo, authorizationEngine, authorizationInfo); - if (nestedRest.isDone() == false) { - nestedRest.addListener(this); - } + requestInterceptorIterator.next() + .intercept(requestInfo, authorizationEngine, authorizationInfo) + .addListener(this); } else { - listener.onResponse(null); + delegate.onResponse(null); } } }); return; } - try { - res.rawResult(); - } catch (Exception e) { - listener.onFailure(e); - return; - } } listener.onResponse(null); } @@ -887,7 +879,7 @@ private void authorizeBulkItems( authzEngine.authorizeIndexAction( bulkItemInfo, authzInfo, - () -> ListenableFuture.newSucceeded(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())), + () -> SubscribableListener.newSucceeded(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())), projectMetadata ) .addListener( diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 0f5703aaa5228..771cc4185bbed 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -41,7 +41,6 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CachedSupplier; -import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -328,12 +327,12 @@ public SubscribableListener authorizeIndexAction( try { role = ensureRBAC(authorizationInfo).getRole(); } catch (Exception e) { - return ListenableFuture.newFailed(e); + return SubscribableListener.newFailed(e); } if (TransportActionProxy.isProxyAction(action) || shouldAuthorizeIndexActionNameOnly(action, request)) { // we've already validated that the request is a proxy request so we can skip that but we still // need to validate that the action is allowed and then move on - return ListenableFuture.newSucceeded( + return SubscribableListener.newSucceeded( role.checkIndicesAction(action) ? IndexAuthorizationResult.EMPTY : IndexAuthorizationResult.DENIED ); } else if (request instanceof IndicesRequest == false) { @@ -353,7 +352,7 @@ public SubscribableListener authorizeIndexAction( // index and if they cannot, we can fail the request early before we allow the execution of the action and in // turn the shard actions if (TransportSearchScrollAction.TYPE.name().equals(action)) { - final ListenableFuture listener = new ListenableFuture<>(); + final SubscribableListener listener = new SubscribableListener<>(); ActionRunnable.supply(listener.delegateFailureAndWrap((l, parsedScrollId) -> { if (parsedScrollId.hasLocalIndices()) { l.onResponse( @@ -373,26 +372,26 @@ public SubscribableListener authorizeIndexAction( // The DLS/FLS permissions are used inside the {@code DirectoryReader} that {@code SecurityIndexReaderWrapper} // built while handling the initial search request. In addition, for consistency, the DLS/FLS permissions from // the originating search request are attached to the thread context upon validating the scroll. - return ListenableFuture.newSucceeded(IndexAuthorizationResult.EMPTY); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.EMPTY); } } else if (isAsyncRelatedAction(action)) { if (SubmitAsyncSearchAction.NAME.equals(action)) { // authorize submit async search but don't fill in the DLS/FLS permissions // the `null` IndicesAccessControl parameter indicates that this action has *not* determined // which DLS/FLS controls should be applied to this action - return ListenableFuture.newSucceeded(IndexAuthorizationResult.EMPTY); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.EMPTY); } else { // async-search actions other than submit have a custom security layer that checks if the current user is // the same as the user that submitted the original request so no additional checks are needed here. - return ListenableFuture.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); } } else if (action.equals(TransportClosePointInTimeAction.TYPE.name())) { - return ListenableFuture.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES); } else { assert false : "only scroll and async-search related requests are known indices api that don't " + "support retrieving the indices they relate to"; - return ListenableFuture.newFailed( + return SubscribableListener.newFailed( new IllegalStateException( "only scroll and async-search related requests are known indices " + "api that don't support retrieving the indices they relate to" @@ -400,15 +399,15 @@ public SubscribableListener authorizeIndexAction( ); } } else if (isChildActionAuthorizedByParentOnLocalNode(requestInfo, authorizationInfo)) { - return ListenableFuture.newSucceeded( + return SubscribableListener.newSucceeded( new IndexAuthorizationResult(requestInfo.getOriginatingAuthorizationContext().getIndicesAccessControl()) ); } else if (PreAuthorizationUtils.shouldPreAuthorizeChildByParentAction(requestInfo, authorizationInfo)) { // We only pre-authorize child actions if DLS/FLS is not configured, // hence we can allow here access for all requested indices. - return ListenableFuture.newSucceeded(new IndexAuthorizationResult(IndicesAccessControl.allowAll())); + return SubscribableListener.newSucceeded(new IndexAuthorizationResult(IndicesAccessControl.allowAll())); } else if (allowsRemoteIndices(request) || role.checkIndicesAction(action)) { - final ListenableFuture listener = new ListenableFuture<>(); + final SubscribableListener listener = new SubscribableListener<>(); indicesAsyncSupplier.getAsync().addListener(listener.delegateFailureAndWrap((delegateListener, resolvedIndices) -> { assert resolvedIndices.isEmpty() == false : "every indices request needs to have its indices set thus the resolved indices must not be empty"; @@ -446,7 +445,7 @@ public SubscribableListener authorizeIndexAction( })); return listener; } else { - return ListenableFuture.newSucceeded(IndexAuthorizationResult.DENIED); + return SubscribableListener.newSucceeded(IndexAuthorizationResult.DENIED); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java index 013b8c952b7bd..93f06efa32e93 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java @@ -10,7 +10,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.support.SubscribableListener; -import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Tuple; import org.elasticsearch.license.XPackLicenseState; @@ -99,7 +98,7 @@ public SubscribableListener intercept( list.addAll(toMerge); return list; })); - final SubscribableListener listener = new ListenableFuture<>(); + final SubscribableListener listener = new SubscribableListener<>(); authorizationEngine.validateIndexPermissionsAreSubset( requestInfo, authorizationInfo, @@ -126,7 +125,7 @@ public SubscribableListener intercept( ); return listener; } else { - return ListenableFuture.newSucceeded(null); + return SubscribableListener.newSucceeded(null); } } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java index 7cb4bda0f232f..bde8d6ab0c273 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.ElasticsearchClient; import org.elasticsearch.cluster.metadata.DataStream; @@ -31,7 +32,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexVersion; @@ -1971,7 +1971,7 @@ private void authorizeIndicesAction( final ResolvedIndices resolvedIndices = new ResolvedIndices(List.of(indices), List.of()); final TransportRequest searchRequest = new SearchRequest(indices); final RequestInfo requestInfo = createRequestInfo(searchRequest, action, parentAuthorization); - final AsyncSupplier indicesAsyncSupplier = () -> ListenableFuture.newSucceeded(resolvedIndices); + final AsyncSupplier indicesAsyncSupplier = () -> SubscribableListener.newSucceeded(resolvedIndices); Metadata.Builder metadata = Metadata.builder(); Stream.of(indices) From dfcc776ab036fa33388f37c3814889527e4857a7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 3 Mar 2025 11:23:03 +0100 Subject: [PATCH 3/4] remove redundant conditional --- .../security/authz/AuthorizationService.java | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index 0ce9e4cbca9c9..6cd937cb2f8c7 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -631,30 +631,26 @@ private void runRequestInterceptors( AuthorizationEngine authorizationEngine, ActionListener listener ) { - if (requestInterceptors.isEmpty()) { - listener.onResponse(null); - } else { - final Iterator requestInterceptorIterator = requestInterceptors.iterator(); - while (requestInterceptorIterator.hasNext()) { - var res = requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo); - if (res.isSuccess() == false) { - res.addListener(new DelegatingActionListener<>(listener) { - @Override - public void onResponse(Void unused) { - if (requestInterceptorIterator.hasNext()) { - requestInterceptorIterator.next() - .intercept(requestInfo, authorizationEngine, authorizationInfo) - .addListener(this); - } else { - delegate.onResponse(null); - } + final Iterator requestInterceptorIterator = requestInterceptors.iterator(); + while (requestInterceptorIterator.hasNext()) { + var res = requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo); + if (res.isSuccess() == false) { + res.addListener(new DelegatingActionListener<>(listener) { + @Override + public void onResponse(Void unused) { + if (requestInterceptorIterator.hasNext()) { + requestInterceptorIterator.next() + .intercept(requestInfo, authorizationEngine, authorizationInfo) + .addListener(this); + } else { + delegate.onResponse(null); } - }); - return; - } + } + }); + return; } - listener.onResponse(null); } + listener.onResponse(null); } // pkg-private for testing From c0e180b9e629a6e036e2d543c058f99fff923071 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 5 Mar 2025 10:24:42 +0100 Subject: [PATCH 4/4] fixup plugin --- .../example/CustomAuthorizationEngine.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/plugins/examples/security-authorization-engine/src/main/java/org/elasticsearch/example/CustomAuthorizationEngine.java b/plugins/examples/security-authorization-engine/src/main/java/org/elasticsearch/example/CustomAuthorizationEngine.java index b0db261d67e19..ea99880117f17 100644 --- a/plugins/examples/security-authorization-engine/src/main/java/org/elasticsearch/example/CustomAuthorizationEngine.java +++ b/plugins/examples/security-authorization-engine/src/main/java/org/elasticsearch/example/CustomAuthorizationEngine.java @@ -10,6 +10,7 @@ package org.elasticsearch.example; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesResponse; @@ -86,14 +87,14 @@ public void authorizeClusterAction(RequestInfo requestInfo, AuthorizationInfo au } @Override - public void authorizeIndexAction( + SubscribableListener void authorizeIndexAction( RequestInfo requestInfo, AuthorizationInfo authorizationInfo, AsyncSupplier indicesAsyncSupplier, - ProjectMetadata project, - ActionListener listener + ProjectMetadata project ) { if (isSuperuser(requestInfo.getAuthentication().getEffectiveSubject().getUser())) { + ActionListener listener = new SubscribableListener<>(); indicesAsyncSupplier.getAsync(ActionListener.wrap(resolvedIndices -> { Map indexAccessControlMap = new HashMap<>(); for (String name : resolvedIndices.getLocal()) { @@ -103,8 +104,9 @@ public void authorizeIndexAction( new IndicesAccessControl(true, Collections.unmodifiableMap(indexAccessControlMap)); listener.onResponse(new IndexAuthorizationResult(indicesAccessControl)); }, listener::onFailure)); + return listener; } else { - listener.onResponse(new IndexAuthorizationResult(IndicesAccessControl.DENIED)); + return SubscribableListener.succcess(new IndexAuthorizationResult(IndicesAccessControl.DENIED)); } }