Skip to content
44 changes: 44 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ResolvedIndices.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action;

import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction;
import org.elasticsearch.action.search.SearchContextId;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -23,11 +24,14 @@
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Container for information about results of the resolution of index expression.
Expand Down Expand Up @@ -253,6 +257,46 @@ public static ResolvedIndices resolveWithPIT(
);
}

public static ResolvedIndices resolveFromResponse(
ResolveIndexAction.Response resolveIndexAction,
IndicesOptions indicesOptions,
ProjectMetadata projectMetadata,
IndexNameExpressionResolver indexNameExpressionResolver,
long startTimeInMillis
) {
var remoteClusterIndices = Stream.of(
resolveIndexAction.getIndices(),
resolveIndexAction.getDataStreams(),
resolveIndexAction.getAliases()
)
.flatMap(Collection::stream)
.map(ResolveIndexAction.ResolvedIndexAbstraction::getName)
.map(RemoteClusterAware::splitIndexName)
.collect(
Collectors.groupingBy(
clusterAndIndex -> clusterAndIndex[0] != null ? clusterAndIndex[0] : RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
Collectors.mapping(clusterAndIndex -> clusterAndIndex[1], Collectors.toList())
)
)
.entrySet()
.stream()
.collect(
Collectors.toMap(Map.Entry::getKey, entry -> new OriginalIndices(entry.getValue().toArray(new String[0]), indicesOptions))
);

var localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);

var concreteLocalIndices = localIndices == null
? Index.EMPTY_ARRAY
: indexNameExpressionResolver.concreteIndices(projectMetadata, localIndices, startTimeInMillis);

return new ResolvedIndices(
remoteClusterIndices,
localIndices,
resolveLocalIndexMetadata(concreteLocalIndices, projectMetadata, true)
);
}

private static Map<Index, IndexMetadata> resolveLocalIndexMetadata(
Index[] concreteLocalIndices,
ProjectMetadata projectMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.stats.CCSUsage;
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry;
import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -512,43 +513,58 @@ public void onFailure(Exception e) {
} else {
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId();
if (shouldMinimizeRoundtrips(rewritten)) {
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
&& rewritten.source().aggregations() != null
? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
: null;
SearchResponse.Clusters clusters = new SearchResponse.Clusters(
resolvedIndices.getLocalIndices(),
resolvedIndices.getRemoteClusterIndices(),
true,
(clusterAlias) -> remoteClusterService.shouldSkipOnFailure(clusterAlias, rewritten.allowPartialSearchResults())
);
if (resolvedIndices.getLocalIndices() == null) {
// Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local shards)
task.getProgressListener()
.notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider);
}
ccsRemoteReduce(
task,
parentTaskId,
rewritten,
collectResolvedIndices(
resolvesCrossProject,
original,
resolvedIndices,
clusters,
timeProvider,
aggregationReduceContextBuilder,
remoteClusterService,
threadPool,
searchResponseActionListener,
(r, l) -> executeLocalSearch(
task,
timeProvider,
r,
resolvedIndices,
projectState,
clusters,
searchPhaseProvider.apply(l)
),
transportService,
forceConnectTimeoutSecs
resolutionIdxOpts,
projectState.metadata(),
indexNameExpressionResolver,
timeProvider.absoluteStartMillis(),
searchResponseActionListener.delegateFailureAndWrap((searchListener, replacedIndices) -> {
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
&& rewritten.source().aggregations() != null
? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
: null;
SearchResponse.Clusters clusters = new SearchResponse.Clusters(
replacedIndices.getLocalIndices(),
replacedIndices.getRemoteClusterIndices(),
true,
(clusterAlias) -> remoteClusterService.shouldSkipOnFailure(
clusterAlias,
rewritten.allowPartialSearchResults()
)
);
if (replacedIndices.getLocalIndices() == null) {
// Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local
// shards)
task.getProgressListener()
.notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider);
}
ccsRemoteReduce(
task,
parentTaskId,
rewritten,
replacedIndices,
clusters,
timeProvider,
aggregationReduceContextBuilder,
remoteClusterService,
threadPool,
searchListener,
(r, l) -> executeLocalSearch(
task,
timeProvider,
r,
replacedIndices,
projectState,
clusters,
searchPhaseProvider.apply(l)
),
transportService,
forceConnectTimeoutSecs
);
})
);
} else {
final SearchContextId searchContext = resolvedIndices.getSearchContextId();
Expand Down Expand Up @@ -1047,6 +1063,44 @@ static SearchResponse.Clusters reconcileProjects(
return new SearchResponse.Clusters(reconciledMap, false);
}

void collectResolvedIndices(
boolean resolvesCrossProject,
SearchRequest original,
ResolvedIndices originalResolvedIndices,
IndicesOptions resolutionIdxOpts,
ProjectMetadata projectMetadata,
IndexNameExpressionResolver indexNameExpressionResolver,
long startTimeInMillis,
ActionListener<ResolvedIndices> listener
) {
if (resolvesCrossProject) {
final ResolveIndexAction.Request resolveIndexRequest = new ResolveIndexAction.Request(
original.indices(),
original.indicesOptions(),
null,
original.getProjectRouting()
);

client.execute(
ResolveIndexAction.INSTANCE,
resolveIndexRequest,
listener.delegateFailureAndWrap(
(l, r) -> l.onResponse(
ResolvedIndices.resolveFromResponse(
r,
resolutionIdxOpts,
projectMetadata,
indexNameExpressionResolver,
startTimeInMillis
)
)
)
);
} else {
listener.onResponse(originalResolvedIndices);
}
}

/**
* Collect remote search shards that we need to search for potential matches.
* Used for ccs_minimize_roundtrips=false
Expand Down