Skip to content

Commit 124e070

Browse files
committed
Wait for the listener to complete
1 parent 188dedb commit 124e070

File tree

2 files changed

+15
-3
lines changed

2 files changed

+15
-3
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,9 @@ public ExchangeSourceHandler removeExchangeSourceHandler(String sessionId) {
184184
public void finishSessionEarly(String sessionId, ActionListener<Void> listener) {
185185
ExchangeSourceHandler exchangeSource = removeExchangeSourceHandler(sessionId);
186186
if (exchangeSource != null) {
187-
exchangeSource.finishEarly(true, listener);
187+
exchangeSource.finishEarly(false, listener);
188+
} else {
189+
listener.onResponse(null);
188190
}
189191
}
190192

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.node.DiscoveryNode;
1717
import org.elasticsearch.cluster.service.ClusterService;
1818
import org.elasticsearch.common.util.concurrent.EsExecutors;
19+
import org.elasticsearch.compute.EsqlRefCountingListener;
1920
import org.elasticsearch.compute.data.BlockFactory;
2021
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2122
import org.elasticsearch.core.TimeValue;
@@ -36,6 +37,7 @@
3637

3738
import java.io.IOException;
3839
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.atomic.AtomicReference;
3941

4042
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
4143

@@ -123,7 +125,15 @@ private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncId, Actio
123125
} catch (IOException e) {
124126
throw new ResourceNotFoundException(asyncId + " not found", e);
125127
}
126-
asyncListener.addListener(listener);
127-
exchangeService.finishSessionEarly(sessionID(asyncId), ActionListener.noop());
128+
// Here we will wait for both the response to become available and for the finish operation to complete
129+
var responseHolder = new AtomicReference<EsqlQueryResponse>();
130+
ActionListener<Void> resultListener = listener.delegateFailureIgnoreResponseAndWrap(l -> l.onResponse(responseHolder.get()));
131+
try (var refs = new EsqlRefCountingListener(resultListener)) {
132+
asyncListener.addListener(refs.acquire().map(r -> {
133+
responseHolder.set(r);
134+
return null;
135+
}));
136+
exchangeService.finishSessionEarly(sessionID(asyncId), refs.acquire());
137+
}
128138
}
129139
}

0 commit comments

Comments
 (0)