Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public final boolean isDone() {
* @throws IllegalStateException if this listener is not complete yet and assertions are disabled.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
protected final T rawResult() throws Exception {
public final T rawResult() throws Exception {
final Object currentState = state;
if (currentState instanceof SuccessResult result) {
return (T) result.result();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -75,7 +76,7 @@
* can actually impersonate the user running the request.</li>
* <li>{@link #authorizeClusterAction(RequestInfo, AuthorizationInfo, ActionListener)} if the
* request is a cluster level operation.</li>
* <li>{@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata, ActionListener)} if
* <li>{@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata)} if
* the request is a an index action. This method may be called multiple times for a single
* request as the request may be made up of sub-requests that also need to be authorized. The async supplier
* for resolved indices will invoke the
Expand All @@ -85,7 +86,7 @@
* <br><p>
* <em>NOTE:</em> the {@link #loadAuthorizedIndices(RequestInfo, AuthorizationInfo, Map, ActionListener)}
* method may be called prior to
* {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata, ActionListener)}
* {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata)}
* in cases where wildcards need to be expanded.
* </p><br>
* Authorization engines can be called from various threads including network threads that should
Expand Down Expand Up @@ -161,14 +162,13 @@ public interface AuthorizationEngine {
* attempting to operate on
* @param metadata a map of a string name to the cluster metadata specific to that
* alias or index
* @param listener the listener to be notified of the authorization result
* @return a listener to be notified of the authorization result
*/
void authorizeIndexAction(
SubscribableListener<IndexAuthorizationResult> authorizeIndexAction(
Copy link
Contributor

@slobodanadamovic slobodanadamovic Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change to this interface will cause compilation errors of CustomAuthorizationEngine. Can you adjust the security example plugin as well?

RequestInfo requestInfo,
AuthorizationInfo authorizationInfo,
AsyncSupplier<ResolvedIndices> indicesAsyncSupplier,
ProjectMetadata metadata,
ActionListener<IndexAuthorizationResult> listener
ProjectMetadata metadata
);

/**
Expand Down Expand Up @@ -766,6 +766,6 @@ interface AsyncSupplier<V> {
* Asynchronously retrieves the value that is being supplied and notifies the listener upon
* completion.
*/
void getAsync(ActionListener<V> listener);
SubscribableListener<V> getAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.plugins.ActionPlugin;
Expand Down Expand Up @@ -406,14 +408,13 @@ public void authorizeClusterAction(
}

@Override
public void authorizeIndexAction(
public SubscribableListener<IndexAuthorizationResult> authorizeIndexAction(
RequestInfo requestInfo,
AuthorizationInfo authorizationInfo,
AsyncSupplier<ResolvedIndices> indicesAsyncSupplier,
ProjectMetadata metadata,
ActionListener<IndexAuthorizationResult> listener
ProjectMetadata metadata
) {
listener.onResponse(IndexAuthorizationResult.ALLOW_NO_INDICES);
return ListenableFuture.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -482,16 +483,16 @@ private void authorizeAction(
} else if (isIndexAction(action)) {
final ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
assert projectMetadata != null;
final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(resolvedIndicesListener -> {
final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(() -> {
if (request instanceof SearchRequest searchRequest && searchRequest.pointInTimeBuilder() != null) {
var resolvedIndices = indicesAndAliasesResolver.resolvePITIndices(searchRequest);
resolvedIndicesListener.onResponse(resolvedIndices);
return;
return ListenableFuture.newSucceeded(resolvedIndices);
}
final ResolvedIndices resolvedIndices = indicesAndAliasesResolver.tryResolveWithoutWildcards(action, request);
if (resolvedIndices != null) {
resolvedIndicesListener.onResponse(resolvedIndices);
return ListenableFuture.newSucceeded(resolvedIndices);
} else {
final SubscribableListener<ResolvedIndices> resolvedIndicesListener = new SubscribableListener<ResolvedIndices>();
authzEngine.loadAuthorizedIndices(
requestInfo,
authzInfo,
Expand All @@ -510,33 +511,31 @@ private void authorizeAction(
}
)
);
return resolvedIndicesListener;
}
});
authzEngine.authorizeIndexAction(
requestInfo,
authzInfo,
resolvedIndicesAsyncSupplier,
projectMetadata,
wrapPreservingContext(
new AuthorizationResultListener<>(
result -> handleIndexActionAuthorizationResult(
result,
authzEngine.authorizeIndexAction(requestInfo, authzInfo, resolvedIndicesAsyncSupplier, projectMetadata)
.addListener(
wrapPreservingContext(
new AuthorizationResultListener<>(
result -> handleIndexActionAuthorizationResult(
result,
requestInfo,
requestId,
authzInfo,
authzEngine,
resolvedIndicesAsyncSupplier,
projectMetadata,
listener
),
listener::onFailure,
requestInfo,
requestId,
authzInfo,
authzEngine,
resolvedIndicesAsyncSupplier,
projectMetadata,
listener
authzInfo
),
listener::onFailure,
requestInfo,
requestId,
authzInfo
),
threadContext
)
);
threadContext
)
);
} else {
logger.warn("denying access for [{}] as action [{}] is not an index or cluster action", authentication, action);
auditTrail.accessDenied(requestId, authentication, action, request, authzInfo);
Expand Down Expand Up @@ -580,29 +579,30 @@ private void handleIndexActionAuthorizationResult(
TransportIndicesAliasesAction.NAME,
authzContext
);
authzEngine.authorizeIndexAction(
aliasesRequestInfo,
authzInfo,
ril -> resolvedIndicesAsyncSupplier.getAsync(ril.delegateFailureAndWrap((l, resolvedIndices) -> {
authzEngine.authorizeIndexAction(aliasesRequestInfo, authzInfo, () -> {
SubscribableListener<ResolvedIndices> ril = new SubscribableListener<>();
resolvedIndicesAsyncSupplier.getAsync().addListener(ril.delegateFailureAndWrap((l, resolvedIndices) -> {
List<String> aliasesAndIndices = new ArrayList<>(resolvedIndices.getLocal());
for (Alias alias : aliases) {
aliasesAndIndices.add(alias.name());
}
ResolvedIndices withAliases = new ResolvedIndices(aliasesAndIndices, Collections.emptyList());
l.onResponse(withAliases);
})),
projectMetadata,
wrapPreservingContext(
new AuthorizationResultListener<>(
authorizationResult -> runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener),
listener::onFailure,
aliasesRequestInfo,
requestId,
authzInfo
),
threadContext
)
);
}));
return ril;
}, projectMetadata)
.addListener(
wrapPreservingContext(
new AuthorizationResultListener<>(
authorizationResult -> runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener),
listener::onFailure,
aliasesRequestInfo,
requestId,
authzInfo
),
threadContext
)
);
}
} else if (action.equals(TransportShardBulkAction.ACTION_NAME)) {
// if this is performing multiple actions on the index, then check each of those actions.
Expand Down Expand Up @@ -635,17 +635,33 @@ private void runRequestInterceptors(
listener.onResponse(null);
} else {
final Iterator<RequestInterceptor> requestInterceptorIterator = requestInterceptors.iterator();
requestInterceptorIterator.next()
.intercept(requestInfo, authorizationEngine, authorizationInfo, new DelegatingActionListener<>(listener) {
@Override
public void onResponse(Void unused) {
if (requestInterceptorIterator.hasNext()) {
requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo, this);
} else {
listener.onResponse(null);
while (requestInterceptorIterator.hasNext()) {
var res = requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo);
if (res.isDone() == false) {
res.addListener(new DelegatingActionListener<>(listener) {
@Override
public void onResponse(Void unused) {
if (requestInterceptorIterator.hasNext()) {
var nestedRest = requestInterceptorIterator.next()
.intercept(requestInfo, authorizationEngine, authorizationInfo);
if (nestedRest.isDone() == false) {
nestedRest.addListener(this);
}
} else {
listener.onResponse(null);
}
}
}
});
});
return;
}
try {
res.rawResult();
} catch (Exception e) {
listener.onFailure(e);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather add something like this to SubscribableListener over exposing rawResult():

    public boolean completeIfFailed(ActionListener<?> listener) {
        final Object currentState = state;
        if (currentState instanceof FailureResult result) {
            listener.onFailure(result.exception());
            return true;
        } else {
            return false;
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went for just an isSuccess now like other solutions have. There isn't much of a reason to be tricky for the failure path and this way we have a fast solution for the hot success path. Let me know what you think :) Combining this approach with the get-results on listenable futures would allow neat simplifications + speedups in some places I believe (by avoiding allocating new listeners).

}
}
listener.onResponse(null);
}
}

Expand Down Expand Up @@ -776,7 +792,7 @@ private void authorizeBulkItems(
final Map<String, Set<String>> actionToIndicesMap = new HashMap<>(4);
final AuditTrail auditTrail = auditTrailService.get();

resolvedIndicesAsyncSupplier.getAsync(ActionListener.wrap(overallResolvedIndices -> {
resolvedIndicesAsyncSupplier.getAsync().addListener(ActionListener.wrap(overallResolvedIndices -> {
final Set<String> localIndices = new HashSet<>(overallResolvedIndices.getLocal());
for (BulkItemRequest item : request.items()) {
final String itemAction = getAction(item);
Expand Down Expand Up @@ -871,12 +887,14 @@ private void authorizeBulkItems(
authzEngine.authorizeIndexAction(
bulkItemInfo,
authzInfo,
ril -> ril.onResponse(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())),
projectMetadata,
groupedActionListener.delegateFailureAndWrap(
(l, indexAuthorizationResult) -> l.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult))
)
);
() -> ListenableFuture.newSucceeded(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())),
projectMetadata
)
.addListener(
groupedActionListener.delegateFailureAndWrap(
(l, indexAuthorizationResult) -> l.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult))
)
);
});
}, listener::onFailure));
}
Expand Down Expand Up @@ -1068,7 +1086,7 @@ private CachingAsyncSupplier(AsyncSupplier<V> supplier) {
}

@Override
public void getAsync(ActionListener<V> listener) {
public SubscribableListener<V> getAsync() {
if (valueFuture == null) {
boolean firstInvocation = false;
synchronized (this) {
Expand All @@ -1078,10 +1096,10 @@ public void getAsync(ActionListener<V> listener) {
}
}
if (firstInvocation) {
asyncSupplier.getAsync(valueFuture);
asyncSupplier.getAsync().addListener(valueFuture);
}
}
valueFuture.addListener(listener);
return valueFuture;
}
}

Expand Down
Loading