77
88package  org .elasticsearch .xpack .esql .plugin ;
99
10- import  org .elasticsearch .ResourceNotFoundException ;
10+ import  org .apache .logging .log4j .LogManager ;
11+ import  org .apache .logging .log4j .Logger ;
1112import  org .elasticsearch .action .ActionListener ;
1213import  org .elasticsearch .action .ActionListenerResponseHandler ;
1314import  org .elasticsearch .action .support .ActionFilters ;
1617import  org .elasticsearch .cluster .node .DiscoveryNode ;
1718import  org .elasticsearch .cluster .service .ClusterService ;
1819import  org .elasticsearch .common .util .concurrent .EsExecutors ;
19- import  org .elasticsearch .compute .EsqlRefCountingListener ;
2020import  org .elasticsearch .compute .data .BlockFactory ;
2121import  org .elasticsearch .compute .operator .exchange .ExchangeService ;
22- import  org .elasticsearch .core .TimeValue ;
2322import  org .elasticsearch .injection .guice .Inject ;
2423import  org .elasticsearch .tasks .Task ;
2524import  org .elasticsearch .tasks .TaskId ;
3231import  org .elasticsearch .xpack .core .async .GetAsyncResultRequest ;
3332import  org .elasticsearch .xpack .core .security .SecurityContext ;
3433import  org .elasticsearch .xpack .esql .action .EsqlAsyncStopAction ;
34+ import  org .elasticsearch .xpack .esql .action .EsqlExecutionInfo ;
3535import  org .elasticsearch .xpack .esql .action .EsqlQueryResponse ;
3636import  org .elasticsearch .xpack .esql .action .EsqlQueryTask ;
3737
3838import  java .io .IOException ;
39- import  java .util .concurrent .TimeUnit ;
40- import  java .util .concurrent .atomic .AtomicReference ;
4139
4240import  static  org .elasticsearch .xpack .core .ClientHelper .ASYNC_SEARCH_ORIGIN ;
4341
@@ -55,6 +53,8 @@ public class TransportEsqlAsyncStopAction extends HandledTransportAction<AsyncSt
5553    private  final  TransportService  transportService ;
5654    private  final  AsyncSearchSecurity  security ;
5755
56+     private  static  final  Logger  logger  = LogManager .getLogger (TransportEsqlAsyncStopAction .class );
57+ 
5858    @ Inject 
5959    public  TransportEsqlAsyncStopAction (
6060        TransportService  transportService ,
@@ -106,34 +106,33 @@ private String sessionID(AsyncExecutionId asyncId) {
106106
107107    private  void  stopQueryAndReturnResult (Task  task , AsyncExecutionId  asyncId , ActionListener <EsqlQueryResponse > listener ) {
108108        String  asyncIdStr  = asyncId .getEncoded ();
109-         TransportEsqlQueryAction .EsqlQueryListener  asyncListener  = queryAction .getAsyncListener (asyncIdStr );
110-         if  (asyncListener  == null ) {
109+         EsqlQueryTask  asyncTask  = getEsqlQueryTask (asyncId );
110+         GetAsyncResultRequest  getAsyncResultRequest  = new  GetAsyncResultRequest (asyncIdStr );
111+         if  (asyncTask  == null ) {
111112            // This should mean one of the two things: either bad request ID, or the query has already finished 
112113            // In both cases, let regular async get deal with it. 
113-             var  getAsyncResultRequest  = new  GetAsyncResultRequest (asyncIdStr );
114-             // TODO: this should not be happening, but if the listener is not registered and the query is not finished, 
115-             // we give it some time to finish 
116-             getAsyncResultRequest .setWaitForCompletionTimeout (new  TimeValue (1 , TimeUnit .SECONDS ));
114+             logger .debug ("Async stop for task {}, no task present - passing to GetAsyncResultRequest" , asyncIdStr );
117115            getResultsAction .execute (task , getAsyncResultRequest , listener );
118116            return ;
119117        }
120-         try  {
121-             EsqlQueryTask  asyncTask  = AsyncTaskIndexService .getTask (taskManager , asyncId , EsqlQueryTask .class );
122-             if  (false  == security .currentUserHasAccessToTask (asyncTask )) {
123-                 throw  new  ResourceNotFoundException (asyncId  + " not found" );
118+         logger .debug ("Async stop for task {} - stopping" , asyncIdStr );
119+         final  EsqlExecutionInfo  esqlExecutionInfo  = asyncTask .executionInfo ();
120+         if  (esqlExecutionInfo  != null ) {
121+             esqlExecutionInfo .markAsPartial ();
122+         }
123+         Runnable  getResults  = () -> getResultsAction .execute (task , getAsyncResultRequest , listener );
124+         exchangeService .finishSessionEarly (sessionID (asyncId ), ActionListener .running (() -> {
125+             if  (asyncTask .addCompletionListener (() -> ActionListener .running (getResults )) == false ) {
126+                 getResults .run ();
124127            }
128+         }));
129+     }
130+ 
131+     private  EsqlQueryTask  getEsqlQueryTask (AsyncExecutionId  asyncId ) {
132+         try  {
133+             return  AsyncTaskIndexService .getTaskAndCheckAuthentication (taskManager , security , asyncId , EsqlQueryTask .class );
125134        } catch  (IOException  e ) {
126-             throw  new  ResourceNotFoundException (asyncId  + " not found" , e );
127-         }
128-         // Here we will wait for both the response to become available and for the finish operation to complete 
129-         var  responseHolder  = new  AtomicReference <EsqlQueryResponse >();
130-         try  (var  refs  = new  EsqlRefCountingListener (listener .map (unused  -> responseHolder .get ()))) {
131-             asyncListener .addListener (refs .acquire ().map (r  -> {
132-                 responseHolder .set (r );
133-                 return  null ;
134-             }));
135-             asyncListener .markAsPartial ();
136-             exchangeService .finishSessionEarly (sessionID (asyncId ), refs .acquire ());
135+             return  null ;
137136        }
138137    }
139138}
0 commit comments