Skip to content

Commit e4f7eea

Browse files
authored
Make ESQL EnrichPolicyResolver try to do proper connection before sending requests (#114870) (#114937)
* Make ESQL EnrichPolicyResolver try to do proper connection before sending requests * Make encureConnected be !skipUnavailable (cherry picked from commit f99927e)
1 parent 0648ab8 commit e4f7eea

File tree

2 files changed

+27
-22
lines changed

2 files changed

+27
-22
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.tasks.Task;
2626
import org.elasticsearch.threadpool.ThreadPool;
2727
import org.elasticsearch.transport.RemoteClusterAware;
28+
import org.elasticsearch.transport.RemoteClusterService;
2829
import org.elasticsearch.transport.Transport;
2930
import org.elasticsearch.transport.TransportChannel;
3031
import org.elasticsearch.transport.TransportRequest;
@@ -72,12 +73,14 @@ public class EnrichPolicyResolver {
7273
private final IndexResolver indexResolver;
7374
private final TransportService transportService;
7475
private final ThreadPool threadPool;
76+
private final RemoteClusterService remoteClusterService;
7577

7678
public EnrichPolicyResolver(ClusterService clusterService, TransportService transportService, IndexResolver indexResolver) {
7779
this.clusterService = clusterService;
7880
this.transportService = transportService;
7981
this.indexResolver = indexResolver;
8082
this.threadPool = transportService.getThreadPool();
83+
this.remoteClusterService = transportService.getRemoteClusterService();
8184
transportService.registerRequestHandler(
8285
RESOLVE_ACTION_NAME,
8386
threadPool.executor(ThreadPool.Names.SEARCH),
@@ -257,22 +260,21 @@ private void lookupPolicies(
257260
// remote clusters
258261
if (remotePolicies.isEmpty() == false) {
259262
for (String cluster : remoteClusters) {
260-
final Transport.Connection connection;
261-
try {
262-
connection = getRemoteConnection(cluster);
263-
} catch (Exception e) {
264-
refs.acquire().onFailure(e);
265-
return;
266-
}
267-
transportService.sendRequest(
268-
connection,
269-
RESOLVE_ACTION_NAME,
270-
new LookupRequest(cluster, remotePolicies),
271-
TransportRequestOptions.EMPTY,
272-
new ActionListenerResponseHandler<>(
273-
refs.acquire(resp -> lookupResponses.put(cluster, resp)),
274-
LookupResponse::new,
275-
threadPool.executor(ThreadPool.Names.SEARCH)
263+
ActionListener<LookupResponse> lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp));
264+
getRemoteConnection(
265+
cluster,
266+
lookupListener.delegateFailureAndWrap(
267+
(delegate, connection) -> transportService.sendRequest(
268+
connection,
269+
RESOLVE_ACTION_NAME,
270+
new LookupRequest(cluster, remotePolicies),
271+
TransportRequestOptions.EMPTY,
272+
new ActionListenerResponseHandler<>(
273+
delegate,
274+
LookupResponse::new,
275+
threadPool.executor(ThreadPool.Names.SEARCH)
276+
)
277+
)
276278
)
277279
);
278280
}
@@ -389,13 +391,16 @@ protected Map<String, EnrichPolicy> availablePolicies() {
389391
return metadata == null ? Map.of() : metadata.getPolicies();
390392
}
391393

392-
protected Transport.Connection getRemoteConnection(String cluster) {
393-
return transportService.getRemoteClusterService().getConnection(cluster);
394+
protected void getRemoteConnection(String cluster, ActionListener<Transport.Connection> listener) {
395+
remoteClusterService.maybeEnsureConnectedAndGetConnection(
396+
cluster,
397+
remoteClusterService.isSkipUnavailable(cluster) == false,
398+
listener
399+
);
394400
}
395401

396402
public Map<String, List<String>> groupIndicesPerCluster(String[] indices) {
397-
return transportService.getRemoteClusterService()
398-
.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
403+
return remoteClusterService.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
399404
.entrySet()
400405
.stream()
401406
.collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices())));

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,9 +446,9 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
446446
}
447447

448448
@Override
449-
protected Transport.Connection getRemoteConnection(String remoteCluster) {
449+
protected void getRemoteConnection(String remoteCluster, ActionListener<Transport.Connection> listener) {
450450
assertThat("Must only called on the local cluster", cluster, equalTo(LOCAL_CLUSTER_GROUP_KEY));
451-
return transports.get("").getConnection(transports.get(remoteCluster).getLocalNode());
451+
listener.onResponse(transports.get("").getConnection(transports.get(remoteCluster).getLocalNode()));
452452
}
453453

454454
static ClusterService mockClusterService(Map<String, EnrichPolicy> policies) {

0 commit comments

Comments
 (0)