77
88package org .elasticsearch .xpack .esql .plugin ;
99
10+ import org .elasticsearch .ResourceNotFoundException ;
1011import org .elasticsearch .action .ActionListener ;
1112import org .elasticsearch .action .ActionListenerResponseHandler ;
1213import org .elasticsearch .action .support .ActionFilters ;
1314import org .elasticsearch .action .support .HandledTransportAction ;
15+ import org .elasticsearch .client .internal .Client ;
1416import org .elasticsearch .cluster .node .DiscoveryNode ;
1517import org .elasticsearch .cluster .service .ClusterService ;
1618import org .elasticsearch .common .util .concurrent .EsExecutors ;
1921import org .elasticsearch .injection .guice .Inject ;
2022import org .elasticsearch .tasks .Task ;
2123import org .elasticsearch .transport .TransportService ;
24+ import org .elasticsearch .xpack .core .XPackPlugin ;
2225import org .elasticsearch .xpack .core .async .AsyncExecutionId ;
26+ import org .elasticsearch .xpack .core .async .AsyncSearchSecurity ;
2327import org .elasticsearch .xpack .core .async .AsyncStopRequest ;
2428import org .elasticsearch .xpack .core .async .GetAsyncResultRequest ;
29+ import org .elasticsearch .xpack .core .security .SecurityContext ;
2530import org .elasticsearch .xpack .esql .action .EsqlAsyncStopAction ;
2631import org .elasticsearch .xpack .esql .action .EsqlQueryResponse ;
32+ import org .elasticsearch .xpack .esql .action .EsqlQueryTask ;
2733
34+ import java .io .IOException ;
2835import java .util .concurrent .TimeUnit ;
2936
37+ import static org .elasticsearch .xpack .core .ClientHelper .ASYNC_SEARCH_ORIGIN ;
38+ import static org .elasticsearch .xpack .core .async .AsyncTaskIndexService .getTask ;
39+
3040/**
3141 * This action will stop running async request and collect the results.
3242 * If the request is already finished, it will do the same thing as the regular async get.
@@ -38,6 +48,7 @@ public class TransportEsqlAsyncStopAction extends HandledTransportAction<AsyncSt
3848 private final BlockFactory blockFactory ;
3949 private final ClusterService clusterService ;
4050 private final TransportService transportService ;
51+ private final AsyncSearchSecurity security ;
4152
4253 @ Inject
4354 public TransportEsqlAsyncStopAction (
@@ -46,6 +57,7 @@ public TransportEsqlAsyncStopAction(
4657 ActionFilters actionFilters ,
4758 TransportEsqlQueryAction queryAction ,
4859 TransportEsqlAsyncGetResultsAction getResultsAction ,
60+ Client client ,
4961 BlockFactory blockFactory
5062 ) {
5163 super (EsqlAsyncStopAction .NAME , transportService , actionFilters , AsyncStopRequest ::new , EsExecutors .DIRECT_EXECUTOR_SERVICE );
@@ -54,6 +66,12 @@ public TransportEsqlAsyncStopAction(
5466 this .blockFactory = blockFactory ;
5567 this .transportService = transportService ;
5668 this .clusterService = clusterService ;
69+ this .security = new AsyncSearchSecurity (
70+ XPackPlugin .ASYNC_RESULTS_INDEX ,
71+ new SecurityContext (clusterService .getSettings (), client .threadPool ().getThreadContext ()),
72+ client ,
73+ ASYNC_SEARCH_ORIGIN
74+ );
5775 }
5876
5977 @ Override
@@ -63,7 +81,7 @@ protected void doExecute(Task task, AsyncStopRequest request, ActionListener<Esq
6381 if (clusterService .localNode ().getId ().equals (searchId .getTaskId ().getNodeId ()) || node == null ) {
6482 // Don't use original request ID here because base64 decoding may not need some padding, but we want to match the original ID
6583 // for the map lookup
66- stopQueryAndReturnResult (task , searchId . getEncoded () , listener );
84+ stopQueryAndReturnResult (task , searchId , listener );
6785 } else {
6886 transportService .sendRequest (
6987 node ,
@@ -74,18 +92,27 @@ protected void doExecute(Task task, AsyncStopRequest request, ActionListener<Esq
7492 }
7593 }
7694
77- private void stopQueryAndReturnResult (Task task , String asyncId , ActionListener <EsqlQueryResponse > listener ) {
78- var asyncListener = queryAction .getAsyncListener (asyncId );
95+ private void stopQueryAndReturnResult (Task task , AsyncExecutionId asyncId , ActionListener <EsqlQueryResponse > listener ) {
96+ String asyncIdStr = asyncId .getEncoded ();
97+ var asyncListener = queryAction .getAsyncListener (asyncIdStr );
7998 if (asyncListener == null ) {
8099 // This should mean one of the two things: either bad request ID, or the query has already finished
81100 // In both cases, let regular async get deal with it.
82- var getAsyncResultRequest = new GetAsyncResultRequest (asyncId );
101+ var getAsyncResultRequest = new GetAsyncResultRequest (asyncIdStr );
83102 // TODO: this should not be happening, but if the listener is not registered and the query is not finished,
84103 // we give it some time to finish
85104 getAsyncResultRequest .setWaitForCompletionTimeout (new TimeValue (1 , TimeUnit .SECONDS ));
86105 getResultsAction .execute (task , getAsyncResultRequest , listener );
87106 return ;
88107 }
108+ try {
109+ EsqlQueryTask asyncTask = getTask (taskManager , asyncId , EsqlQueryTask .class );
110+ if (false == security .currentUserHasAccessToTask (asyncTask )) {
111+ throw new ResourceNotFoundException (asyncId + " not found" );
112+ }
113+ } catch (IOException e ) {
114+ throw new ResourceNotFoundException (asyncId + " not found" , e );
115+ }
89116 asyncListener .addListener (listener );
90117 // TODO: send the finish signal to the source
91118 }
0 commit comments