Skip to content

Commit 20d2330

Browse files
authored
Fix listener usage in remote clusters (#122629) (#122647)
We should notify the listener provided in delegateResponse/delegateFailure, not the original listener.
1 parent ff0932f commit 20d2330

File tree

2 files changed

+3
-4
lines changed

2 files changed

+3
-4
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,14 @@ void startComputeOnRemoteCluster(
8585
var resp = finalResponse.get();
8686
return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles));
8787
}))) {
88-
var openExchangeListener = computeListener.acquireAvoid();
8988
ExchangeService.openExchange(
9089
transportService,
9190
cluster.connection,
9291
childSessionId,
9392
queryPragmas.exchangeBufferSize(),
9493
esqlExecutor,
9594
EsqlCCSUtils.skipUnavailableListener(
96-
openExchangeListener,
95+
computeListener.acquireAvoid(),
9796
executionInfo,
9897
clusterAlias,
9998
EsqlExecutionInfo.Cluster.Status.SKIPPED
@@ -104,7 +103,7 @@ void startComputeOnRemoteCluster(
104103
computeListener,
105104
clusterAlias,
106105
executionInfo,
107-
openExchangeListener
106+
l
108107
);
109108

110109
var remoteSink = exchangeService.newRemoteSink(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class RemoteListenerGroup {
7474
private <T> ActionListener<T> createCancellingListener(String reason, ActionListener<T> delegate, Runnable finishGroup) {
7575
return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> {
7676
taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> {
77-
EsqlCCSUtils.skipUnavailableListener(delegate, executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL)
77+
EsqlCCSUtils.skipUnavailableListener(inner, executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL)
7878
.onFailure(e);
7979
}));
8080
}), finishGroup);

0 commit comments

Comments
 (0)