Skip to content

Commit 8ef56eb

Browse files
committed
Refactor the listener code
1 parent 027b156 commit 8ef56eb

File tree

3 files changed

+27
-24
lines changed

3 files changed

+27
-24
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,12 @@ void startComputeOnRemoteCluster(
9292
childSessionId,
9393
queryPragmas.exchangeBufferSize(),
9494
esqlExecutor,
95-
ActionListener.wrap(unused -> {
95+
EsqlCCSUtils.skipUnavailableListener(
96+
openExchangeListener,
97+
executionInfo,
98+
clusterAlias,
99+
EsqlExecutionInfo.Cluster.Status.SKIPPED
100+
).delegateFailureAndWrap((l, unused) -> {
96101
var listenerGroup = new RemoteListenerGroup(
97102
transportService,
98103
rootTask,
@@ -129,18 +134,6 @@ void startComputeOnRemoteCluster(
129134
TransportRequestOptions.EMPTY,
130135
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
131136
);
132-
}, e -> {
133-
if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
134-
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(
135-
executionInfo,
136-
clusterAlias,
137-
EsqlExecutionInfo.Cluster.Status.SKIPPED,
138-
e
139-
);
140-
openExchangeListener.onResponse(null);
141-
} else {
142-
openExchangeListener.onFailure(e);
143-
}
144137
})
145138
);
146139
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,8 @@ class RemoteListenerGroup {
7474
private <T> ActionListener<T> createCancellingListener(String reason, ActionListener<T> delegate, Runnable finishGroup) {
7575
return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> {
7676
taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> {
77-
if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
78-
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(
79-
executionInfo,
80-
clusterAlias,
81-
EsqlExecutionInfo.Cluster.Status.PARTIAL,
82-
e
83-
);
84-
delegate.onResponse(null);
85-
} else {
86-
delegate.onFailure(e);
87-
}
77+
EsqlCCSUtils.skipUnavailableListener(delegate, executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL)
78+
.onFailure(e);
8879
}));
8980
}), finishGroup);
9081
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,4 +369,23 @@ public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo,
369369

370370
return ExceptionsHelper.isRemoteUnavailableException(e);
371371
}
372+
373+
/**
374+
* Wrap a listener so that it will skip errors that are ignorable
375+
*/
376+
public static <T> ActionListener<T> skipUnavailableListener(
377+
ActionListener<T> delegate,
378+
EsqlExecutionInfo executionInfo,
379+
String clusterAlias,
380+
EsqlExecutionInfo.Cluster.Status status
381+
) {
382+
return delegate.delegateResponse((l, e) -> {
383+
if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
384+
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, status, e);
385+
l.onResponse(null);
386+
} else {
387+
l.onFailure(e);
388+
}
389+
});
390+
}
372391
}

0 commit comments

Comments
 (0)