Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ public final boolean isDone() {
return isDone(state);
}

/**
* @return return {@code true} if and only if this listener is done and has been completed successfully
*/
public final boolean isSuccess() {
return state instanceof SuccessResult;
Comment on lines +272 to +273
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit LGTM, I'll leave the rest to the security team

}

/**
* @return the result with which this listener completed successfully, or throw the exception with which it failed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -75,7 +76,7 @@
* can actually impersonate the user running the request.</li>
* <li>{@link #authorizeClusterAction(RequestInfo, AuthorizationInfo, ActionListener)} if the
* request is a cluster level operation.</li>
* <li>{@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata, ActionListener)} if
* <li>{@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata)} if
* the request is a an index action. This method may be called multiple times for a single
* request as the request may be made up of sub-requests that also need to be authorized. The async supplier
* for resolved indices will invoke the
Expand All @@ -85,7 +86,7 @@
* <br><p>
* <em>NOTE:</em> the {@link #loadAuthorizedIndices(RequestInfo, AuthorizationInfo, Map, ActionListener)}
* method may be called prior to
* {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata, ActionListener)}
* {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata)}
* in cases where wildcards need to be expanded.
* </p><br>
* Authorization engines can be called from various threads including network threads that should
Expand Down Expand Up @@ -161,14 +162,13 @@ public interface AuthorizationEngine {
* attempting to operate on
* @param metadata a map of a string name to the cluster metadata specific to that
* alias or index
* @param listener the listener to be notified of the authorization result
* @return a listener to be notified of the authorization result
*/
void authorizeIndexAction(
SubscribableListener<IndexAuthorizationResult> authorizeIndexAction(
Copy link
Contributor

@slobodanadamovic slobodanadamovic Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change to this interface will cause compilation errors of CustomAuthorizationEngine. Can you adjust the security example plugin as well?

RequestInfo requestInfo,
AuthorizationInfo authorizationInfo,
AsyncSupplier<ResolvedIndices> indicesAsyncSupplier,
ProjectMetadata metadata,
ActionListener<IndexAuthorizationResult> listener
ProjectMetadata metadata
);

/**
Expand Down Expand Up @@ -766,6 +766,6 @@ interface AsyncSupplier<V> {
* Asynchronously retrieves the value that is being supplied and notifies the listener upon
* completion.
*/
void getAsync(ActionListener<V> listener);
SubscribableListener<V> getAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
Expand Down Expand Up @@ -406,14 +407,13 @@ public void authorizeClusterAction(
}

@Override
public void authorizeIndexAction(
public SubscribableListener<IndexAuthorizationResult> authorizeIndexAction(
RequestInfo requestInfo,
AuthorizationInfo authorizationInfo,
AsyncSupplier<ResolvedIndices> indicesAsyncSupplier,
ProjectMetadata metadata,
ActionListener<IndexAuthorizationResult> listener
ProjectMetadata metadata
) {
listener.onResponse(IndexAuthorizationResult.ALLOW_NO_INDICES);
return SubscribableListener.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -482,16 +483,16 @@ private void authorizeAction(
} else if (isIndexAction(action)) {
final ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
assert projectMetadata != null;
final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(resolvedIndicesListener -> {
final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(() -> {
if (request instanceof SearchRequest searchRequest && searchRequest.pointInTimeBuilder() != null) {
var resolvedIndices = indicesAndAliasesResolver.resolvePITIndices(searchRequest);
resolvedIndicesListener.onResponse(resolvedIndices);
return;
return SubscribableListener.newSucceeded(resolvedIndices);
}
final ResolvedIndices resolvedIndices = indicesAndAliasesResolver.tryResolveWithoutWildcards(action, request);
if (resolvedIndices != null) {
resolvedIndicesListener.onResponse(resolvedIndices);
return SubscribableListener.newSucceeded(resolvedIndices);
} else {
final SubscribableListener<ResolvedIndices> resolvedIndicesListener = new SubscribableListener<>();
authzEngine.loadAuthorizedIndices(
requestInfo,
authzInfo,
Expand All @@ -510,33 +511,31 @@ private void authorizeAction(
}
)
);
return resolvedIndicesListener;
}
});
authzEngine.authorizeIndexAction(
requestInfo,
authzInfo,
resolvedIndicesAsyncSupplier,
projectMetadata,
wrapPreservingContext(
new AuthorizationResultListener<>(
result -> handleIndexActionAuthorizationResult(
result,
authzEngine.authorizeIndexAction(requestInfo, authzInfo, resolvedIndicesAsyncSupplier, projectMetadata)
.addListener(
wrapPreservingContext(
new AuthorizationResultListener<>(
result -> handleIndexActionAuthorizationResult(
result,
requestInfo,
requestId,
authzInfo,
authzEngine,
resolvedIndicesAsyncSupplier,
projectMetadata,
listener
),
listener::onFailure,
requestInfo,
requestId,
authzInfo,
authzEngine,
resolvedIndicesAsyncSupplier,
projectMetadata,
listener
authzInfo
),
listener::onFailure,
requestInfo,
requestId,
authzInfo
),
threadContext
)
);
threadContext
)
);
} else {
logger.warn("denying access for [{}] as action [{}] is not an index or cluster action", authentication, action);
auditTrail.accessDenied(requestId, authentication, action, request, authzInfo);
Expand Down Expand Up @@ -580,29 +579,30 @@ private void handleIndexActionAuthorizationResult(
TransportIndicesAliasesAction.NAME,
authzContext
);
authzEngine.authorizeIndexAction(
aliasesRequestInfo,
authzInfo,
ril -> resolvedIndicesAsyncSupplier.getAsync(ril.delegateFailureAndWrap((l, resolvedIndices) -> {
authzEngine.authorizeIndexAction(aliasesRequestInfo, authzInfo, () -> {
SubscribableListener<ResolvedIndices> ril = new SubscribableListener<>();
resolvedIndicesAsyncSupplier.getAsync().addListener(ril.delegateFailureAndWrap((l, resolvedIndices) -> {
List<String> aliasesAndIndices = new ArrayList<>(resolvedIndices.getLocal());
for (Alias alias : aliases) {
aliasesAndIndices.add(alias.name());
}
ResolvedIndices withAliases = new ResolvedIndices(aliasesAndIndices, Collections.emptyList());
l.onResponse(withAliases);
})),
projectMetadata,
wrapPreservingContext(
new AuthorizationResultListener<>(
authorizationResult -> runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener),
listener::onFailure,
aliasesRequestInfo,
requestId,
authzInfo
),
threadContext
)
);
}));
return ril;
}, projectMetadata)
.addListener(
wrapPreservingContext(
new AuthorizationResultListener<>(
authorizationResult -> runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener),
listener::onFailure,
aliasesRequestInfo,
requestId,
authzInfo
),
threadContext
)
);
}
} else if (action.equals(TransportShardBulkAction.ACTION_NAME)) {
// if this is performing multiple actions on the index, then check each of those actions.
Expand Down Expand Up @@ -631,22 +631,26 @@ private void runRequestInterceptors(
AuthorizationEngine authorizationEngine,
ActionListener<Void> listener
) {
if (requestInterceptors.isEmpty()) {
listener.onResponse(null);
} else {
final Iterator<RequestInterceptor> requestInterceptorIterator = requestInterceptors.iterator();
requestInterceptorIterator.next()
.intercept(requestInfo, authorizationEngine, authorizationInfo, new DelegatingActionListener<>(listener) {
final Iterator<RequestInterceptor> requestInterceptorIterator = requestInterceptors.iterator();
while (requestInterceptorIterator.hasNext()) {
var res = requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo);
if (res.isSuccess() == false) {
res.addListener(new DelegatingActionListener<>(listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you think if we changed the request interceptors to be fully sync and not return a promise?

public interface RequestInterceptor {

    void intercept(RequestInfo requestInfo, AuthorizationEngine authorizationEngine, AuthorizationInfo authorizationInfo)
        throws ElasticsearchSecurityException;
}

then runRequestInterceptors would look like this

private void runRequestInterceptors(
    RequestInfo requestInfo,
    AuthorizationInfo authorizationInfo,
    AuthorizationEngine authorizationEngine,
    ActionListener<Void> listener
) {
    for (RequestInterceptor requestInterceptor : requestInterceptors) {
        try {
            requestInterceptor.intercept(requestInfo, authorizationEngine, authorizationInfo);
        } catch (Exception e) {
            listener.onFailure(e);
            return;
        }
    }
    listener.onResponse(null);
}

@Override
public void onResponse(Void unused) {
if (requestInterceptorIterator.hasNext()) {
requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo, this);
requestInterceptorIterator.next()
.intercept(requestInfo, authorizationEngine, authorizationInfo)
.addListener(this);
} else {
listener.onResponse(null);
delegate.onResponse(null);
}
}
});
return;
}
}
listener.onResponse(null);
}

// pkg-private for testing
Expand Down Expand Up @@ -776,7 +780,7 @@ private void authorizeBulkItems(
final Map<String, Set<String>> actionToIndicesMap = new HashMap<>(4);
final AuditTrail auditTrail = auditTrailService.get();

resolvedIndicesAsyncSupplier.getAsync(ActionListener.wrap(overallResolvedIndices -> {
resolvedIndicesAsyncSupplier.getAsync().addListener(ActionListener.wrap(overallResolvedIndices -> {
final Set<String> localIndices = new HashSet<>(overallResolvedIndices.getLocal());
for (BulkItemRequest item : request.items()) {
final String itemAction = getAction(item);
Expand Down Expand Up @@ -871,12 +875,14 @@ private void authorizeBulkItems(
authzEngine.authorizeIndexAction(
bulkItemInfo,
authzInfo,
ril -> ril.onResponse(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())),
projectMetadata,
groupedActionListener.delegateFailureAndWrap(
(l, indexAuthorizationResult) -> l.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult))
)
);
() -> SubscribableListener.newSucceeded(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())),
projectMetadata
)
.addListener(
groupedActionListener.delegateFailureAndWrap(
(l, indexAuthorizationResult) -> l.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult))
)
);
});
}, listener::onFailure));
}
Expand Down Expand Up @@ -1068,7 +1074,7 @@ private CachingAsyncSupplier(AsyncSupplier<V> supplier) {
}

@Override
public void getAsync(ActionListener<V> listener) {
public SubscribableListener<V> getAsync() {
if (valueFuture == null) {
boolean firstInvocation = false;
synchronized (this) {
Expand All @@ -1078,10 +1084,10 @@ public void getAsync(ActionListener<V> listener) {
}
}
if (firstInvocation) {
asyncSupplier.getAsync(valueFuture);
asyncSupplier.getAsync().addListener(valueFuture);
}
}
valueFuture.addListener(listener);
return valueFuture;
}
}

Expand Down
Loading