Skip to content

Commit 5e3ea03

Browse files
Use SearchPlanningPhaseResolutionResult
1 parent d350c14 commit 5e3ea03

File tree

1 file changed

+26
-16
lines changed

1 file changed

+26
-16
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/resolve/ResolveIndexAction.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.search.SearchService;
5353
import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator;
5454
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
55+
import org.elasticsearch.search.crossproject.SearchPlanningPhaseResolutionResult;
5556
import org.elasticsearch.tasks.Task;
5657
import org.elasticsearch.transport.RemoteClusterAware;
5758
import org.elasticsearch.transport.RemoteClusterService;
@@ -66,9 +67,9 @@
6667
import java.util.ArrayList;
6768
import java.util.Arrays;
6869
import java.util.Collection;
69-
import java.util.Collections;
7070
import java.util.Comparator;
7171
import java.util.EnumSet;
72+
import java.util.HashMap;
7273
import java.util.List;
7374
import java.util.Locale;
7475
import java.util.Map;
@@ -636,10 +637,17 @@ protected void doExecute(Task task, Request request, final ActionListener<Respon
636637

637638
final ResolvedIndexExpressions localResolvedIndexExpressions = request.getResolvedIndexExpressions();
638639
if (remoteClusterIndices.size() > 0) {
639-
ActionListener<Collection<Map.Entry<String, Response>>> responsesListener = listener.delegateFailureAndWrap(
640-
(l, responses) -> {
641-
Map<String, Response> linkedProjectsResponses = responses.stream()
642-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
640+
ActionListener<Collection<Map.Entry<String, SearchPlanningPhaseResolutionResult>>> responsesListener = listener
641+
.delegateFailureAndWrap((l, responses) -> {
642+
Map<String, Response> linkedProjectsResponses = new HashMap<>();
643+
for (Map.Entry<String, SearchPlanningPhaseResolutionResult> entry : responses) {
644+
String projectName = entry.getKey();
645+
SearchPlanningPhaseResolutionResult result = entry.getValue();
646+
647+
if (result.response() instanceof ResolveIndexAction.Response response) {
648+
linkedProjectsResponses.put(projectName, response);
649+
}
650+
}
643651

644652
if (resolveCrossProject) {
645653
final Exception ex = CrossProjectIndexResolutionValidator.validate(
@@ -657,10 +665,9 @@ protected void doExecute(Task task, Request request, final ActionListener<Respon
657665

658666
mergeResults(linkedProjectsResponses, indices, aliases, dataStreams, request.indexModes);
659667
listener.onResponse(new Response(indices, aliases, dataStreams));
660-
}
661-
);
668+
});
662669

663-
ActionListener<Map.Entry<String, Response>> gal = new GroupedActionListener<>(
670+
ActionListener<Map.Entry<String, SearchPlanningPhaseResolutionResult>> gal = new GroupedActionListener<>(
664671
remoteClusterIndices.size(),
665672
responsesListener
666673
);
@@ -683,8 +690,8 @@ protected void doExecute(Task task, Request request, final ActionListener<Respon
683690
}
684691

685692
connectionListener.addListener(gal.delegateResponse((l, failure) -> {
686-
logger.info("failed to resolve indices on remote cluster [" + clusterAlias + "]", failure);
687-
l.onResponse(null);
693+
logger.debug("failed to resolve indices on remote cluster [{}]: {}", clusterAlias, failure);
694+
l.onResponse(Map.entry(clusterAlias, new SearchPlanningPhaseResolutionResult(null, failure)));
688695
})
689696
.delegateFailure(
690697
(ignored, connection) -> transportService.sendRequest(
@@ -693,12 +700,15 @@ protected void doExecute(Task task, Request request, final ActionListener<Respon
693700
remoteRequest,
694701
TransportRequestOptions.EMPTY,
695702
new ActionListenerResponseHandler<>(gal.delegateResponse((l, failure) -> {
696-
logger.info("Error occurred on remote cluster [" + clusterAlias + "]", failure);
697-
l.onResponse(null);
698-
}).map(resolveIndexResponse -> Map.entry(clusterAlias, resolveIndexResponse)),
699-
Response::new,
700-
EsExecutors.DIRECT_EXECUTOR_SERVICE
701-
)
703+
logger.debug("Error occurred on remote cluster [{}]: {}", clusterAlias, failure);
704+
l.onResponse(Map.entry(clusterAlias, new SearchPlanningPhaseResolutionResult(null, failure)));
705+
})
706+
.map(
707+
resolveIndexResponse -> Map.entry(
708+
clusterAlias,
709+
new SearchPlanningPhaseResolutionResult(resolveIndexResponse, null)
710+
)
711+
), Response::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
702712
)
703713
));
704714

0 commit comments

Comments
 (0)