77
88package org .elasticsearch .xpack .esql .plugin ;
99
10- import org .elasticsearch .ResourceNotFoundException ;
10+ import org .apache .logging .log4j .LogManager ;
11+ import org .apache .logging .log4j .Logger ;
1112import org .elasticsearch .action .ActionListener ;
1213import org .elasticsearch .action .ActionListenerResponseHandler ;
1314import org .elasticsearch .action .support .ActionFilters ;
1617import org .elasticsearch .cluster .node .DiscoveryNode ;
1718import org .elasticsearch .cluster .service .ClusterService ;
1819import org .elasticsearch .common .util .concurrent .EsExecutors ;
19- import org .elasticsearch .compute .EsqlRefCountingListener ;
2020import org .elasticsearch .compute .data .BlockFactory ;
2121import org .elasticsearch .compute .operator .exchange .ExchangeService ;
22- import org .elasticsearch .core .TimeValue ;
2322import org .elasticsearch .injection .guice .Inject ;
2423import org .elasticsearch .tasks .Task ;
2524import org .elasticsearch .tasks .TaskId ;
3231import org .elasticsearch .xpack .core .async .GetAsyncResultRequest ;
3332import org .elasticsearch .xpack .core .security .SecurityContext ;
3433import org .elasticsearch .xpack .esql .action .EsqlAsyncStopAction ;
34+ import org .elasticsearch .xpack .esql .action .EsqlExecutionInfo ;
3535import org .elasticsearch .xpack .esql .action .EsqlQueryResponse ;
3636import org .elasticsearch .xpack .esql .action .EsqlQueryTask ;
3737
3838import java .io .IOException ;
39- import java .util .concurrent .TimeUnit ;
40- import java .util .concurrent .atomic .AtomicReference ;
4139
4240import static org .elasticsearch .xpack .core .ClientHelper .ASYNC_SEARCH_ORIGIN ;
4341
@@ -55,6 +53,8 @@ public class TransportEsqlAsyncStopAction extends HandledTransportAction<AsyncSt
5553 private final TransportService transportService ;
5654 private final AsyncSearchSecurity security ;
5755
56+ private static final Logger logger = LogManager .getLogger (TransportEsqlAsyncStopAction .class );
57+
5858 @ Inject
5959 public TransportEsqlAsyncStopAction (
6060 TransportService transportService ,
@@ -106,34 +106,33 @@ private String sessionID(AsyncExecutionId asyncId) {
106106
107107 private void stopQueryAndReturnResult (Task task , AsyncExecutionId asyncId , ActionListener <EsqlQueryResponse > listener ) {
108108 String asyncIdStr = asyncId .getEncoded ();
109- TransportEsqlQueryAction .EsqlQueryListener asyncListener = queryAction .getAsyncListener (asyncIdStr );
110- if (asyncListener == null ) {
109+ EsqlQueryTask asyncTask = getEsqlQueryTask (asyncId );
110+ GetAsyncResultRequest getAsyncResultRequest = new GetAsyncResultRequest (asyncIdStr );
111+ if (asyncTask == null ) {
111112 // This should mean one of the two things: either bad request ID, or the query has already finished
112113 // In both cases, let regular async get deal with it.
113- var getAsyncResultRequest = new GetAsyncResultRequest (asyncIdStr );
114- // TODO: this should not be happening, but if the listener is not registered and the query is not finished,
115- // we give it some time to finish
116- getAsyncResultRequest .setWaitForCompletionTimeout (new TimeValue (1 , TimeUnit .SECONDS ));
114+ logger .debug ("Async stop for task {}, no task present - passing to GetAsyncResultRequest" , asyncIdStr );
117115 getResultsAction .execute (task , getAsyncResultRequest , listener );
118116 return ;
119117 }
120- try {
121- EsqlQueryTask asyncTask = AsyncTaskIndexService .getTask (taskManager , asyncId , EsqlQueryTask .class );
122- if (false == security .currentUserHasAccessToTask (asyncTask )) {
123- throw new ResourceNotFoundException (asyncId + " not found" );
118+ logger .debug ("Async stop for task {} - stopping" , asyncIdStr );
119+ final EsqlExecutionInfo esqlExecutionInfo = asyncTask .executionInfo ();
120+ if (esqlExecutionInfo != null ) {
121+ esqlExecutionInfo .markAsPartial ();
122+ }
123+ Runnable getResults = () -> getResultsAction .execute (task , getAsyncResultRequest , listener );
124+ exchangeService .finishSessionEarly (sessionID (asyncId ), ActionListener .running (() -> {
125+ if (asyncTask .addCompletionListener (() -> ActionListener .running (getResults )) == false ) {
126+ getResults .run ();
124127 }
128+ }));
129+ }
130+
131+ private EsqlQueryTask getEsqlQueryTask (AsyncExecutionId asyncId ) {
132+ try {
133+ return AsyncTaskIndexService .getTaskAndCheckAuthentication (taskManager , security , asyncId , EsqlQueryTask .class );
125134 } catch (IOException e ) {
126- throw new ResourceNotFoundException (asyncId + " not found" , e );
127- }
128- // Here we will wait for both the response to become available and for the finish operation to complete
129- var responseHolder = new AtomicReference <EsqlQueryResponse >();
130- try (var refs = new EsqlRefCountingListener (listener .map (unused -> responseHolder .get ()))) {
131- asyncListener .addListener (refs .acquire ().map (r -> {
132- responseHolder .set (r );
133- return null ;
134- }));
135- asyncListener .markAsPartial ();
136- exchangeService .finishSessionEarly (sessionID (asyncId ), refs .acquire ());
135+ return null ;
137136 }
138137 }
139138}
0 commit comments