Skip to content

Commit b1c75d1

Browse files
Move some security APIs to using promises in place of callbacks (#123812)
We have some incredibly deep callstacks in security that seem to visibly raise context switch costs, make profiling more complicated and generally make the code rather hard to follow. Since the methods adjusted here return a result synchronously we can both save overhead and make things a little easier to follow by using promises as returns in place of consuming callbacks.
1 parent cfa98f3 commit b1c75d1

File tree

18 files changed

+162
-141
lines changed

18 files changed

+162
-141
lines changed

plugins/examples/security-authorization-engine/src/main/java/org/elasticsearch/example/CustomAuthorizationEngine.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.example;
1111

1212
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.SubscribableListener;
1314
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1415
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1516
import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesResponse;
@@ -86,14 +87,14 @@ public void authorizeClusterAction(RequestInfo requestInfo, AuthorizationInfo au
8687
}
8788

8889
@Override
89-
public void authorizeIndexAction(
90+
SubscribableListener<IndexAuthorizationResult> void authorizeIndexAction(
9091
RequestInfo requestInfo,
9192
AuthorizationInfo authorizationInfo,
9293
AsyncSupplier<ResolvedIndices> indicesAsyncSupplier,
93-
ProjectMetadata project,
94-
ActionListener<IndexAuthorizationResult> listener
94+
ProjectMetadata project
9595
) {
9696
if (isSuperuser(requestInfo.getAuthentication().getEffectiveSubject().getUser())) {
97+
ActionListener<IndexAuthorizationResult> listener = new SubscribableListener<>();
9798
indicesAsyncSupplier.getAsync(ActionListener.wrap(resolvedIndices -> {
9899
Map<String, IndexAccessControl> indexAccessControlMap = new HashMap<>();
99100
for (String name : resolvedIndices.getLocal()) {
@@ -103,8 +104,9 @@ public void authorizeIndexAction(
103104
new IndicesAccessControl(true, Collections.unmodifiableMap(indexAccessControlMap));
104105
listener.onResponse(new IndexAuthorizationResult(indicesAccessControl));
105106
}, listener::onFailure));
107+
return listener;
106108
} else {
107-
listener.onResponse(new IndexAuthorizationResult(IndicesAccessControl.DENIED));
109+
return SubscribableListener.succcess(new IndexAuthorizationResult(IndicesAccessControl.DENIED));
108110
}
109111
}
110112

server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,13 @@ public final boolean isDone() {
266266
return isDone(state);
267267
}
268268

269+
/**
270+
* @return return {@code true} if and only if this listener is done and has been completed successfully
271+
*/
272+
public final boolean isSuccess() {
273+
return state instanceof SuccessResult;
274+
}
275+
269276
/**
270277
* @return the result with which this listener completed successfully, or throw the exception with which it failed.
271278
*

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.ActionRequestValidationException;
1313
import org.elasticsearch.action.IndicesRequest;
14+
import org.elasticsearch.action.support.SubscribableListener;
1415
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1516
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1617
import org.elasticsearch.common.bytes.BytesReference;
@@ -75,7 +76,7 @@
7576
* can actually impersonate the user running the request.</li>
7677
* <li>{@link #authorizeClusterAction(RequestInfo, AuthorizationInfo, ActionListener)} if the
7778
* request is a cluster level operation.</li>
78-
* <li>{@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata, ActionListener)} if
79+
* <li>{@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata)} if
7980
* the request is a an index action. This method may be called multiple times for a single
8081
* request as the request may be made up of sub-requests that also need to be authorized. The async supplier
8182
* for resolved indices will invoke the
@@ -85,7 +86,7 @@
8586
* <br><p>
8687
* <em>NOTE:</em> the {@link #loadAuthorizedIndices(RequestInfo, AuthorizationInfo, Map, ActionListener)}
8788
* method may be called prior to
88-
* {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata, ActionListener)}
89+
* {@link #authorizeIndexAction(RequestInfo, AuthorizationInfo, AsyncSupplier, ProjectMetadata)}
8990
* in cases where wildcards need to be expanded.
9091
* </p><br>
9192
* Authorization engines can be called from various threads including network threads that should
@@ -161,14 +162,13 @@ public interface AuthorizationEngine {
161162
* attempting to operate on
162163
* @param metadata a map of a string name to the cluster metadata specific to that
163164
* alias or index
164-
* @param listener the listener to be notified of the authorization result
165+
* @return a listener to be notified of the authorization result
165166
*/
166-
void authorizeIndexAction(
167+
SubscribableListener<IndexAuthorizationResult> authorizeIndexAction(
167168
RequestInfo requestInfo,
168169
AuthorizationInfo authorizationInfo,
169170
AsyncSupplier<ResolvedIndices> indicesAsyncSupplier,
170-
ProjectMetadata metadata,
171-
ActionListener<IndexAuthorizationResult> listener
171+
ProjectMetadata metadata
172172
);
173173

174174
/**
@@ -766,6 +766,6 @@ interface AsyncSupplier<V> {
766766
* Asynchronously retrieves the value that is being supplied and notifies the listener upon
767767
* completion.
768768
*/
769-
void getAsync(ActionListener<V> listener);
769+
SubscribableListener<V> getAsync();
770770
}
771771
}

x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.search.TransportSearchAction;
14+
import org.elasticsearch.action.support.SubscribableListener;
1415
import org.elasticsearch.client.Cancellable;
1516
import org.elasticsearch.client.Request;
1617
import org.elasticsearch.client.RequestOptions;
@@ -406,14 +407,13 @@ public void authorizeClusterAction(
406407
}
407408

408409
@Override
409-
public void authorizeIndexAction(
410+
public SubscribableListener<IndexAuthorizationResult> authorizeIndexAction(
410411
RequestInfo requestInfo,
411412
AuthorizationInfo authorizationInfo,
412413
AsyncSupplier<ResolvedIndices> indicesAsyncSupplier,
413-
ProjectMetadata metadata,
414-
ActionListener<IndexAuthorizationResult> listener
414+
ProjectMetadata metadata
415415
) {
416-
listener.onResponse(IndexAuthorizationResult.ALLOW_NO_INDICES);
416+
return SubscribableListener.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES);
417417
}
418418

419419
@Override

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java

Lines changed: 67 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.action.index.TransportIndexAction;
2929
import org.elasticsearch.action.search.SearchRequest;
3030
import org.elasticsearch.action.support.GroupedActionListener;
31+
import org.elasticsearch.action.support.SubscribableListener;
3132
import org.elasticsearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest;
3233
import org.elasticsearch.action.update.TransportUpdateAction;
3334
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -482,16 +483,16 @@ private void authorizeAction(
482483
} else if (isIndexAction(action)) {
483484
final ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
484485
assert projectMetadata != null;
485-
final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(resolvedIndicesListener -> {
486+
final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(() -> {
486487
if (request instanceof SearchRequest searchRequest && searchRequest.pointInTimeBuilder() != null) {
487488
var resolvedIndices = indicesAndAliasesResolver.resolvePITIndices(searchRequest);
488-
resolvedIndicesListener.onResponse(resolvedIndices);
489-
return;
489+
return SubscribableListener.newSucceeded(resolvedIndices);
490490
}
491491
final ResolvedIndices resolvedIndices = indicesAndAliasesResolver.tryResolveWithoutWildcards(action, request);
492492
if (resolvedIndices != null) {
493-
resolvedIndicesListener.onResponse(resolvedIndices);
493+
return SubscribableListener.newSucceeded(resolvedIndices);
494494
} else {
495+
final SubscribableListener<ResolvedIndices> resolvedIndicesListener = new SubscribableListener<>();
495496
authzEngine.loadAuthorizedIndices(
496497
requestInfo,
497498
authzInfo,
@@ -510,33 +511,31 @@ private void authorizeAction(
510511
}
511512
)
512513
);
514+
return resolvedIndicesListener;
513515
}
514516
});
515-
authzEngine.authorizeIndexAction(
516-
requestInfo,
517-
authzInfo,
518-
resolvedIndicesAsyncSupplier,
519-
projectMetadata,
520-
wrapPreservingContext(
521-
new AuthorizationResultListener<>(
522-
result -> handleIndexActionAuthorizationResult(
523-
result,
517+
authzEngine.authorizeIndexAction(requestInfo, authzInfo, resolvedIndicesAsyncSupplier, projectMetadata)
518+
.addListener(
519+
wrapPreservingContext(
520+
new AuthorizationResultListener<>(
521+
result -> handleIndexActionAuthorizationResult(
522+
result,
523+
requestInfo,
524+
requestId,
525+
authzInfo,
526+
authzEngine,
527+
resolvedIndicesAsyncSupplier,
528+
projectMetadata,
529+
listener
530+
),
531+
listener::onFailure,
524532
requestInfo,
525533
requestId,
526-
authzInfo,
527-
authzEngine,
528-
resolvedIndicesAsyncSupplier,
529-
projectMetadata,
530-
listener
534+
authzInfo
531535
),
532-
listener::onFailure,
533-
requestInfo,
534-
requestId,
535-
authzInfo
536-
),
537-
threadContext
538-
)
539-
);
536+
threadContext
537+
)
538+
);
540539
} else {
541540
logger.warn("denying access for [{}] as action [{}] is not an index or cluster action", authentication, action);
542541
auditTrail.accessDenied(requestId, authentication, action, request, authzInfo);
@@ -580,29 +579,30 @@ private void handleIndexActionAuthorizationResult(
580579
TransportIndicesAliasesAction.NAME,
581580
authzContext
582581
);
583-
authzEngine.authorizeIndexAction(
584-
aliasesRequestInfo,
585-
authzInfo,
586-
ril -> resolvedIndicesAsyncSupplier.getAsync(ril.delegateFailureAndWrap((l, resolvedIndices) -> {
582+
authzEngine.authorizeIndexAction(aliasesRequestInfo, authzInfo, () -> {
583+
SubscribableListener<ResolvedIndices> ril = new SubscribableListener<>();
584+
resolvedIndicesAsyncSupplier.getAsync().addListener(ril.delegateFailureAndWrap((l, resolvedIndices) -> {
587585
List<String> aliasesAndIndices = new ArrayList<>(resolvedIndices.getLocal());
588586
for (Alias alias : aliases) {
589587
aliasesAndIndices.add(alias.name());
590588
}
591589
ResolvedIndices withAliases = new ResolvedIndices(aliasesAndIndices, Collections.emptyList());
592590
l.onResponse(withAliases);
593-
})),
594-
projectMetadata,
595-
wrapPreservingContext(
596-
new AuthorizationResultListener<>(
597-
authorizationResult -> runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener),
598-
listener::onFailure,
599-
aliasesRequestInfo,
600-
requestId,
601-
authzInfo
602-
),
603-
threadContext
604-
)
605-
);
591+
}));
592+
return ril;
593+
}, projectMetadata)
594+
.addListener(
595+
wrapPreservingContext(
596+
new AuthorizationResultListener<>(
597+
authorizationResult -> runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener),
598+
listener::onFailure,
599+
aliasesRequestInfo,
600+
requestId,
601+
authzInfo
602+
),
603+
threadContext
604+
)
605+
);
606606
}
607607
} else if (action.equals(TransportShardBulkAction.ACTION_NAME)) {
608608
// if this is performing multiple actions on the index, then check each of those actions.
@@ -631,22 +631,26 @@ private void runRequestInterceptors(
631631
AuthorizationEngine authorizationEngine,
632632
ActionListener<Void> listener
633633
) {
634-
if (requestInterceptors.isEmpty()) {
635-
listener.onResponse(null);
636-
} else {
637-
final Iterator<RequestInterceptor> requestInterceptorIterator = requestInterceptors.iterator();
638-
requestInterceptorIterator.next()
639-
.intercept(requestInfo, authorizationEngine, authorizationInfo, new DelegatingActionListener<>(listener) {
634+
final Iterator<RequestInterceptor> requestInterceptorIterator = requestInterceptors.iterator();
635+
while (requestInterceptorIterator.hasNext()) {
636+
var res = requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo);
637+
if (res.isSuccess() == false) {
638+
res.addListener(new DelegatingActionListener<>(listener) {
640639
@Override
641640
public void onResponse(Void unused) {
642641
if (requestInterceptorIterator.hasNext()) {
643-
requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo, this);
642+
requestInterceptorIterator.next()
643+
.intercept(requestInfo, authorizationEngine, authorizationInfo)
644+
.addListener(this);
644645
} else {
645-
listener.onResponse(null);
646+
delegate.onResponse(null);
646647
}
647648
}
648649
});
650+
return;
651+
}
649652
}
653+
listener.onResponse(null);
650654
}
651655

652656
// pkg-private for testing
@@ -776,7 +780,7 @@ private void authorizeBulkItems(
776780
final Map<String, Set<String>> actionToIndicesMap = new HashMap<>(4);
777781
final AuditTrail auditTrail = auditTrailService.get();
778782

779-
resolvedIndicesAsyncSupplier.getAsync(ActionListener.wrap(overallResolvedIndices -> {
783+
resolvedIndicesAsyncSupplier.getAsync().addListener(ActionListener.wrap(overallResolvedIndices -> {
780784
final Set<String> localIndices = new HashSet<>(overallResolvedIndices.getLocal());
781785
for (BulkItemRequest item : request.items()) {
782786
final String itemAction = getAction(item);
@@ -871,12 +875,14 @@ private void authorizeBulkItems(
871875
authzEngine.authorizeIndexAction(
872876
bulkItemInfo,
873877
authzInfo,
874-
ril -> ril.onResponse(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())),
875-
projectMetadata,
876-
groupedActionListener.delegateFailureAndWrap(
877-
(l, indexAuthorizationResult) -> l.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult))
878-
)
879-
);
878+
() -> SubscribableListener.newSucceeded(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())),
879+
projectMetadata
880+
)
881+
.addListener(
882+
groupedActionListener.delegateFailureAndWrap(
883+
(l, indexAuthorizationResult) -> l.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult))
884+
)
885+
);
880886
});
881887
}, listener::onFailure));
882888
}
@@ -1068,7 +1074,7 @@ private CachingAsyncSupplier(AsyncSupplier<V> supplier) {
10681074
}
10691075

10701076
@Override
1071-
public void getAsync(ActionListener<V> listener) {
1077+
public SubscribableListener<V> getAsync() {
10721078
if (valueFuture == null) {
10731079
boolean firstInvocation = false;
10741080
synchronized (this) {
@@ -1078,10 +1084,10 @@ public void getAsync(ActionListener<V> listener) {
10781084
}
10791085
}
10801086
if (firstInvocation) {
1081-
asyncSupplier.getAsync(valueFuture);
1087+
asyncSupplier.getAsync().addListener(valueFuture);
10821088
}
10831089
}
1084-
valueFuture.addListener(listener);
1090+
return valueFuture;
10851091
}
10861092
}
10871093

0 commit comments

Comments
 (0)