2828import org .elasticsearch .action .index .TransportIndexAction ;
2929import org .elasticsearch .action .search .SearchRequest ;
3030import org .elasticsearch .action .support .GroupedActionListener ;
31+ import org .elasticsearch .action .support .SubscribableListener ;
3132import org .elasticsearch .action .support .replication .TransportReplicationAction .ConcreteShardRequest ;
3233import org .elasticsearch .action .update .TransportUpdateAction ;
3334import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
@@ -482,16 +483,16 @@ private void authorizeAction(
482483 } else if (isIndexAction (action )) {
483484 final ProjectMetadata projectMetadata = projectResolver .getProjectMetadata (clusterService .state ());
484485 assert projectMetadata != null ;
485- final AsyncSupplier <ResolvedIndices > resolvedIndicesAsyncSupplier = new CachingAsyncSupplier <>(resolvedIndicesListener -> {
486+ final AsyncSupplier <ResolvedIndices > resolvedIndicesAsyncSupplier = new CachingAsyncSupplier <>(() -> {
486487 if (request instanceof SearchRequest searchRequest && searchRequest .pointInTimeBuilder () != null ) {
487488 var resolvedIndices = indicesAndAliasesResolver .resolvePITIndices (searchRequest );
488- resolvedIndicesListener .onResponse (resolvedIndices );
489- return ;
489+ return ListenableFuture .newSucceeded (resolvedIndices );
490490 }
491491 final ResolvedIndices resolvedIndices = indicesAndAliasesResolver .tryResolveWithoutWildcards (action , request );
492492 if (resolvedIndices != null ) {
493- resolvedIndicesListener . onResponse (resolvedIndices );
493+ return ListenableFuture . newSucceeded (resolvedIndices );
494494 } else {
495+ final SubscribableListener <ResolvedIndices > resolvedIndicesListener = new SubscribableListener <ResolvedIndices >();
495496 authzEngine .loadAuthorizedIndices (
496497 requestInfo ,
497498 authzInfo ,
@@ -510,33 +511,31 @@ private void authorizeAction(
510511 }
511512 )
512513 );
514+ return resolvedIndicesListener ;
513515 }
514516 });
515- authzEngine .authorizeIndexAction (
516- requestInfo ,
517- authzInfo ,
518- resolvedIndicesAsyncSupplier ,
519- projectMetadata ,
520- wrapPreservingContext (
521- new AuthorizationResultListener <>(
522- result -> handleIndexActionAuthorizationResult (
523- result ,
517+ authzEngine .authorizeIndexAction (requestInfo , authzInfo , resolvedIndicesAsyncSupplier , projectMetadata )
518+ .addListener (
519+ wrapPreservingContext (
520+ new AuthorizationResultListener <>(
521+ result -> handleIndexActionAuthorizationResult (
522+ result ,
523+ requestInfo ,
524+ requestId ,
525+ authzInfo ,
526+ authzEngine ,
527+ resolvedIndicesAsyncSupplier ,
528+ projectMetadata ,
529+ listener
530+ ),
531+ listener ::onFailure ,
524532 requestInfo ,
525533 requestId ,
526- authzInfo ,
527- authzEngine ,
528- resolvedIndicesAsyncSupplier ,
529- projectMetadata ,
530- listener
534+ authzInfo
531535 ),
532- listener ::onFailure ,
533- requestInfo ,
534- requestId ,
535- authzInfo
536- ),
537- threadContext
538- )
539- );
536+ threadContext
537+ )
538+ );
540539 } else {
541540 logger .warn ("denying access for [{}] as action [{}] is not an index or cluster action" , authentication , action );
542541 auditTrail .accessDenied (requestId , authentication , action , request , authzInfo );
@@ -580,29 +579,30 @@ private void handleIndexActionAuthorizationResult(
580579 TransportIndicesAliasesAction .NAME ,
581580 authzContext
582581 );
583- authzEngine .authorizeIndexAction (
584- aliasesRequestInfo ,
585- authzInfo ,
586- ril -> resolvedIndicesAsyncSupplier .getAsync (ril .delegateFailureAndWrap ((l , resolvedIndices ) -> {
582+ authzEngine .authorizeIndexAction (aliasesRequestInfo , authzInfo , () -> {
583+ SubscribableListener <ResolvedIndices > ril = new SubscribableListener <>();
584+ resolvedIndicesAsyncSupplier .getAsync ().addListener (ril .delegateFailureAndWrap ((l , resolvedIndices ) -> {
587585 List <String > aliasesAndIndices = new ArrayList <>(resolvedIndices .getLocal ());
588586 for (Alias alias : aliases ) {
589587 aliasesAndIndices .add (alias .name ());
590588 }
591589 ResolvedIndices withAliases = new ResolvedIndices (aliasesAndIndices , Collections .emptyList ());
592590 l .onResponse (withAliases );
593- })),
594- projectMetadata ,
595- wrapPreservingContext (
596- new AuthorizationResultListener <>(
597- authorizationResult -> runRequestInterceptors (requestInfo , authzInfo , authorizationEngine , listener ),
598- listener ::onFailure ,
599- aliasesRequestInfo ,
600- requestId ,
601- authzInfo
602- ),
603- threadContext
604- )
605- );
591+ }));
592+ return ril ;
593+ }, projectMetadata )
594+ .addListener (
595+ wrapPreservingContext (
596+ new AuthorizationResultListener <>(
597+ authorizationResult -> runRequestInterceptors (requestInfo , authzInfo , authorizationEngine , listener ),
598+ listener ::onFailure ,
599+ aliasesRequestInfo ,
600+ requestId ,
601+ authzInfo
602+ ),
603+ threadContext
604+ )
605+ );
606606 }
607607 } else if (action .equals (TransportShardBulkAction .ACTION_NAME )) {
608608 // if this is performing multiple actions on the index, then check each of those actions.
@@ -635,17 +635,33 @@ private void runRequestInterceptors(
635635 listener .onResponse (null );
636636 } else {
637637 final Iterator <RequestInterceptor > requestInterceptorIterator = requestInterceptors .iterator ();
638- requestInterceptorIterator .next ()
639- .intercept (requestInfo , authorizationEngine , authorizationInfo , new DelegatingActionListener <>(listener ) {
640- @ Override
641- public void onResponse (Void unused ) {
642- if (requestInterceptorIterator .hasNext ()) {
643- requestInterceptorIterator .next ().intercept (requestInfo , authorizationEngine , authorizationInfo , this );
644- } else {
645- listener .onResponse (null );
638+ while (requestInterceptorIterator .hasNext ()) {
639+ var res = requestInterceptorIterator .next ().intercept (requestInfo , authorizationEngine , authorizationInfo );
640+ if (res .isDone () == false ) {
641+ res .addListener (new DelegatingActionListener <>(listener ) {
642+ @ Override
643+ public void onResponse (Void unused ) {
644+ if (requestInterceptorIterator .hasNext ()) {
645+ var nestedRest = requestInterceptorIterator .next ()
646+ .intercept (requestInfo , authorizationEngine , authorizationInfo );
647+ if (nestedRest .isDone () == false ) {
648+ nestedRest .addListener (this );
649+ }
650+ } else {
651+ listener .onResponse (null );
652+ }
646653 }
647- }
648- });
654+ });
655+ return ;
656+ }
657+ try {
658+ res .rawResult ();
659+ } catch (Exception e ) {
660+ listener .onFailure (e );
661+ return ;
662+ }
663+ }
664+ listener .onResponse (null );
649665 }
650666 }
651667
@@ -776,7 +792,7 @@ private void authorizeBulkItems(
776792 final Map <String , Set <String >> actionToIndicesMap = new HashMap <>(4 );
777793 final AuditTrail auditTrail = auditTrailService .get ();
778794
779- resolvedIndicesAsyncSupplier .getAsync (ActionListener .wrap (overallResolvedIndices -> {
795+ resolvedIndicesAsyncSupplier .getAsync (). addListener ( ActionListener .wrap (overallResolvedIndices -> {
780796 final Set <String > localIndices = new HashSet <>(overallResolvedIndices .getLocal ());
781797 for (BulkItemRequest item : request .items ()) {
782798 final String itemAction = getAction (item );
@@ -871,12 +887,14 @@ private void authorizeBulkItems(
871887 authzEngine .authorizeIndexAction (
872888 bulkItemInfo ,
873889 authzInfo ,
874- ril -> ril .onResponse (new ResolvedIndices (new ArrayList <>(indices ), Collections .emptyList ())),
875- projectMetadata ,
876- groupedActionListener .delegateFailureAndWrap (
877- (l , indexAuthorizationResult ) -> l .onResponse (new Tuple <>(bulkItemAction , indexAuthorizationResult ))
878- )
879- );
890+ () -> ListenableFuture .newSucceeded (new ResolvedIndices (new ArrayList <>(indices ), Collections .emptyList ())),
891+ projectMetadata
892+ )
893+ .addListener (
894+ groupedActionListener .delegateFailureAndWrap (
895+ (l , indexAuthorizationResult ) -> l .onResponse (new Tuple <>(bulkItemAction , indexAuthorizationResult ))
896+ )
897+ );
880898 });
881899 }, listener ::onFailure ));
882900 }
@@ -1068,7 +1086,7 @@ private CachingAsyncSupplier(AsyncSupplier<V> supplier) {
10681086 }
10691087
10701088 @ Override
1071- public void getAsync ( ActionListener <V > listener ) {
1089+ public SubscribableListener <V > getAsync ( ) {
10721090 if (valueFuture == null ) {
10731091 boolean firstInvocation = false ;
10741092 synchronized (this ) {
@@ -1078,10 +1096,10 @@ public void getAsync(ActionListener<V> listener) {
10781096 }
10791097 }
10801098 if (firstInvocation ) {
1081- asyncSupplier .getAsync (valueFuture );
1099+ asyncSupplier .getAsync (). addListener ( valueFuture );
10821100 }
10831101 }
1084- valueFuture . addListener ( listener ) ;
1102+ return valueFuture ;
10851103 }
10861104 }
10871105
0 commit comments