Skip to content

Commit ec12300

Browse files
committed
resolve indices when MRT is true
1 parent 42607fc commit ec12300

File tree

2 files changed

+134
-36
lines changed

2 files changed

+134
-36
lines changed

server/src/main/java/org/elasticsearch/action/ResolvedIndices.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action;
1111

12+
import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction;
1213
import org.elasticsearch.action.search.SearchContextId;
1314
import org.elasticsearch.action.support.IndicesOptions;
1415
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -23,11 +24,14 @@
2324
import org.elasticsearch.transport.RemoteClusterAware;
2425
import org.elasticsearch.transport.RemoteClusterService;
2526

27+
import java.util.Collection;
2628
import java.util.Collections;
2729
import java.util.HashMap;
2830
import java.util.HashSet;
2931
import java.util.Map;
3032
import java.util.Set;
33+
import java.util.stream.Collectors;
34+
import java.util.stream.Stream;
3135

3236
/**
3337
* Container for information about results of the resolution of index expression.
@@ -253,6 +257,46 @@ public static ResolvedIndices resolveWithPIT(
253257
);
254258
}
255259

260+
public static ResolvedIndices resolveFromResponse(
261+
ResolveIndexAction.Response resolveIndexAction,
262+
IndicesOptions indicesOptions,
263+
ProjectMetadata projectMetadata,
264+
IndexNameExpressionResolver indexNameExpressionResolver,
265+
long startTimeInMillis
266+
) {
267+
var remoteClusterIndices = Stream.of(
268+
resolveIndexAction.getIndices(),
269+
resolveIndexAction.getDataStreams(),
270+
resolveIndexAction.getAliases()
271+
)
272+
.flatMap(Collection::stream)
273+
.map(ResolveIndexAction.ResolvedIndexAbstraction::getName)
274+
.map(RemoteClusterAware::splitIndexName)
275+
.collect(
276+
Collectors.groupingBy(
277+
clusterAndIndex -> clusterAndIndex[0] != null ? clusterAndIndex[0] : RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
278+
Collectors.mapping(clusterAndIndex -> clusterAndIndex[1], Collectors.toList())
279+
)
280+
)
281+
.entrySet()
282+
.stream()
283+
.collect(
284+
Collectors.toMap(Map.Entry::getKey, entry -> new OriginalIndices(entry.getValue().toArray(new String[0]), indicesOptions))
285+
);
286+
287+
var localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
288+
289+
var concreteLocalIndices = localIndices == null
290+
? Index.EMPTY_ARRAY
291+
: indexNameExpressionResolver.concreteIndices(projectMetadata, localIndices, startTimeInMillis);
292+
293+
return new ResolvedIndices(
294+
remoteClusterIndices,
295+
localIndices,
296+
resolveLocalIndexMetadata(concreteLocalIndices, projectMetadata, true)
297+
);
298+
}
299+
256300
private static Map<Index, IndexMetadata> resolveLocalIndexMetadata(
257301
Index[] concreteLocalIndices,
258302
ProjectMetadata projectMetadata,

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 90 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
3131
import org.elasticsearch.action.admin.cluster.stats.CCSUsage;
3232
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry;
33+
import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction;
3334
import org.elasticsearch.action.support.ActionFilters;
3435
import org.elasticsearch.action.support.HandledTransportAction;
3536
import org.elasticsearch.action.support.IndicesOptions;
@@ -512,43 +513,58 @@ public void onFailure(Exception e) {
512513
} else {
513514
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId();
514515
if (shouldMinimizeRoundtrips(rewritten)) {
515-
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
516-
&& rewritten.source().aggregations() != null
517-
? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
518-
: null;
519-
SearchResponse.Clusters clusters = new SearchResponse.Clusters(
520-
resolvedIndices.getLocalIndices(),
521-
resolvedIndices.getRemoteClusterIndices(),
522-
true,
523-
(clusterAlias) -> remoteClusterService.shouldSkipOnFailure(clusterAlias, rewritten.allowPartialSearchResults())
524-
);
525-
if (resolvedIndices.getLocalIndices() == null) {
526-
// Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local shards)
527-
task.getProgressListener()
528-
.notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider);
529-
}
530-
ccsRemoteReduce(
531-
task,
532-
parentTaskId,
533-
rewritten,
516+
collectResolvedIndices(
517+
resolvesCrossProject,
518+
original,
534519
resolvedIndices,
535-
clusters,
536-
timeProvider,
537-
aggregationReduceContextBuilder,
538-
remoteClusterService,
539-
threadPool,
540-
searchResponseActionListener,
541-
(r, l) -> executeLocalSearch(
542-
task,
543-
timeProvider,
544-
r,
545-
resolvedIndices,
546-
projectState,
547-
clusters,
548-
searchPhaseProvider.apply(l)
549-
),
550-
transportService,
551-
forceConnectTimeoutSecs
520+
resolutionIdxOpts,
521+
projectState.metadata(),
522+
indexNameExpressionResolver,
523+
timeProvider.absoluteStartMillis(),
524+
searchResponseActionListener.delegateFailureAndWrap((searchListener, replacedIndices) -> {
525+
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
526+
&& rewritten.source().aggregations() != null
527+
? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
528+
: null;
529+
SearchResponse.Clusters clusters = new SearchResponse.Clusters(
530+
replacedIndices.getLocalIndices(),
531+
replacedIndices.getRemoteClusterIndices(),
532+
true,
533+
(clusterAlias) -> remoteClusterService.shouldSkipOnFailure(
534+
clusterAlias,
535+
rewritten.allowPartialSearchResults()
536+
)
537+
);
538+
if (replacedIndices.getLocalIndices() == null) {
539+
// Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local
540+
// shards)
541+
task.getProgressListener()
542+
.notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider);
543+
}
544+
ccsRemoteReduce(
545+
task,
546+
parentTaskId,
547+
rewritten,
548+
replacedIndices,
549+
clusters,
550+
timeProvider,
551+
aggregationReduceContextBuilder,
552+
remoteClusterService,
553+
threadPool,
554+
searchListener,
555+
(r, l) -> executeLocalSearch(
556+
task,
557+
timeProvider,
558+
r,
559+
replacedIndices,
560+
projectState,
561+
clusters,
562+
searchPhaseProvider.apply(l)
563+
),
564+
transportService,
565+
forceConnectTimeoutSecs
566+
);
567+
})
552568
);
553569
} else {
554570
final SearchContextId searchContext = resolvedIndices.getSearchContextId();
@@ -1047,6 +1063,44 @@ static SearchResponse.Clusters reconcileProjects(
10471063
return new SearchResponse.Clusters(reconciledMap, false);
10481064
}
10491065

1066+
void collectResolvedIndices(
1067+
boolean resolvesCrossProject,
1068+
SearchRequest original,
1069+
ResolvedIndices originalResolvedIndices,
1070+
IndicesOptions resolutionIdxOpts,
1071+
ProjectMetadata projectMetadata,
1072+
IndexNameExpressionResolver indexNameExpressionResolver,
1073+
long startTimeInMillis,
1074+
ActionListener<ResolvedIndices> listener
1075+
) {
1076+
if (resolvesCrossProject) {
1077+
final ResolveIndexAction.Request resolveIndexRequest = new ResolveIndexAction.Request(
1078+
original.indices(),
1079+
original.indicesOptions(),
1080+
null,
1081+
original.getProjectRouting()
1082+
);
1083+
1084+
client.execute(
1085+
ResolveIndexAction.INSTANCE,
1086+
resolveIndexRequest,
1087+
listener.delegateFailureAndWrap(
1088+
(l, r) -> l.onResponse(
1089+
ResolvedIndices.resolveFromResponse(
1090+
r,
1091+
resolutionIdxOpts,
1092+
projectMetadata,
1093+
indexNameExpressionResolver,
1094+
startTimeInMillis
1095+
)
1096+
)
1097+
)
1098+
);
1099+
} else {
1100+
listener.onResponse(originalResolvedIndices);
1101+
}
1102+
}
1103+
10501104
/**
10511105
* Collect remote search shards that we need to search for potential matches.
10521106
* Used for ccs_minimize_roundtrips=false

0 commit comments

Comments
 (0)