Skip to content

Commit afddc5f

Browse files
committed
Wait for the listener to complete
1 parent 188dedb commit afddc5f

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,9 @@ public ExchangeSourceHandler removeExchangeSourceHandler(String sessionId) {
184184
public void finishSessionEarly(String sessionId, ActionListener<Void> listener) {
185185
ExchangeSourceHandler exchangeSource = removeExchangeSourceHandler(sessionId);
186186
if (exchangeSource != null) {
187-
exchangeSource.finishEarly(true, listener);
187+
exchangeSource.finishEarly(false, listener);
188+
} else {
189+
listener.onResponse(null);
188190
}
189191
}
190192

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
import org.elasticsearch.action.ActionListenerResponseHandler;
1313
import org.elasticsearch.action.support.ActionFilters;
1414
import org.elasticsearch.action.support.HandledTransportAction;
15+
import org.elasticsearch.action.support.RefCountingListener;
1516
import org.elasticsearch.client.internal.Client;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.service.ClusterService;
1819
import org.elasticsearch.common.util.concurrent.EsExecutors;
20+
import org.elasticsearch.compute.EsqlRefCountingListener;
1921
import org.elasticsearch.compute.data.BlockFactory;
2022
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2123
import org.elasticsearch.core.TimeValue;
@@ -36,6 +38,7 @@
3638

3739
import java.io.IOException;
3840
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicReference;
3942

4043
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
4144

@@ -123,7 +126,15 @@ private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncId, Actio
123126
} catch (IOException e) {
124127
throw new ResourceNotFoundException(asyncId + " not found", e);
125128
}
126-
asyncListener.addListener(listener);
127-
exchangeService.finishSessionEarly(sessionID(asyncId), ActionListener.noop());
129+
// Here we will wait for both the response to become available and for the finish operation to complete
130+
var responseHolder = new AtomicReference<EsqlQueryResponse>();
131+
ActionListener<Void> resultListener = listener.delegateFailureIgnoreResponseAndWrap(l -> l.onResponse(responseHolder.get()));
132+
try (var refs = new EsqlRefCountingListener(resultListener)) {
133+
asyncListener.addListener(refs.acquire().map(r -> {
134+
responseHolder.set(r);
135+
return null;
136+
}));
137+
exchangeService.finishSessionEarly(sessionID(asyncId), refs.acquire());
138+
}
128139
}
129140
}

0 commit comments

Comments
 (0)