Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public void testLocalClusterAlias() throws ExecutionException, InterruptedExcept
parentTaskId,
new SearchRequest(),
Strings.EMPTY_ARRAY,
SearchRequest.DEFAULT_INDICES_OPTIONS,
"local",
nowInMillis,
randomBoolean()
Expand All @@ -158,6 +159,7 @@ public void testLocalClusterAlias() throws ExecutionException, InterruptedExcept
parentTaskId,
new SearchRequest(),
Strings.EMPTY_ARRAY,
SearchRequest.DEFAULT_INDICES_OPTIONS,
"",
nowInMillis,
randomBoolean()
Expand Down Expand Up @@ -205,6 +207,7 @@ public void testAbsoluteStartMillis() throws ExecutionException, InterruptedExce
parentTaskId,
new SearchRequest(),
Strings.EMPTY_ARRAY,
SearchRequest.DEFAULT_INDICES_OPTIONS,
"",
0,
randomBoolean()
Expand All @@ -216,6 +219,7 @@ public void testAbsoluteStartMillis() throws ExecutionException, InterruptedExce
parentTaskId,
new SearchRequest(),
Strings.EMPTY_ARRAY,
SearchRequest.DEFAULT_INDICES_OPTIONS,
"",
0,
randomBoolean()
Expand All @@ -231,6 +235,7 @@ public void testAbsoluteStartMillis() throws ExecutionException, InterruptedExce
parentTaskId,
new SearchRequest(),
Strings.EMPTY_ARRAY,
SearchRequest.DEFAULT_INDICES_OPTIONS,
"",
0,
randomBoolean()
Expand Down Expand Up @@ -279,7 +284,15 @@ public void testFinalReduce() throws ExecutionException, InterruptedException {
{
SearchRequest searchRequest = randomBoolean()
? originalRequest
: SearchRequest.subSearchRequest(taskId, originalRequest, Strings.EMPTY_ARRAY, "remote", nowInMillis, true);
: SearchRequest.subSearchRequest(
taskId,
originalRequest,
Strings.EMPTY_ARRAY,
originalRequest.indicesOptions(),
"remote",
nowInMillis,
true
);
assertResponse(client().search(searchRequest), searchResponse -> {
assertEquals(2, searchResponse.getHits().getTotalHits().value());
InternalAggregations aggregations = searchResponse.getAggregations();
Expand All @@ -292,6 +305,7 @@ public void testFinalReduce() throws ExecutionException, InterruptedException {
taskId,
originalRequest,
Strings.EMPTY_ARRAY,
originalRequest.indicesOptions(),
"remote",
nowInMillis,
false
Expand Down
40 changes: 40 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ResolvedIndices.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

/**
* Container for information about results of the resolution of index expression.
Expand Down Expand Up @@ -253,6 +254,45 @@ public static ResolvedIndices resolveWithPIT(
);
}

public static ResolvedIndices resolveWithIndexExpressions(
ResolvedIndexExpressions localExpressions,
Map<String, ResolvedIndexExpressions> remoteExpressions,
IndicesOptions indicesOptions,
ProjectMetadata projectMetadata,
IndexNameExpressionResolver indexNameExpressionResolver,
long startTimeInMillis
) {
Function<ResolvedIndexExpressions, OriginalIndices> toIndices = resolvedIndexExpressions -> {
var indices = resolvedIndexExpressions.expressions().stream().filter(expression -> {
var resolvedExpressions = expression.localExpressions();
var successfulResolution = resolvedExpressions
.localIndexResolutionResult() == ResolvedIndexExpression.LocalIndexResolutionResult.SUCCESS;
// if the expression is a wildcard, it will be successful even if there are no indices, so filter for no indices
var hasResolvedIndices = resolvedExpressions.indices().isEmpty() == false;
return successfulResolution && hasResolvedIndices;
}).map(ResolvedIndexExpression::original).toArray(String[]::new);
return indices.length > 0 ? new OriginalIndices(indices, indicesOptions) : null;
};
Map<String, OriginalIndices> remoteClusterIndices = remoteExpressions.entrySet().stream().collect(HashMap::new, (map, entry) -> {
var indices = toIndices.apply(entry.getValue());
if (indices != null) {
map.put(entry.getKey(), indices);
}
}, Map::putAll);

var localIndices = localExpressions == null ? null : toIndices.apply(localExpressions);

var concreteLocalIndices = localIndices == null
? Index.EMPTY_ARRAY
: indexNameExpressionResolver.concreteIndices(projectMetadata, localIndices, startTimeInMillis);

return new ResolvedIndices(
remoteClusterIndices,
localIndices,
resolveLocalIndexMetadata(concreteLocalIndices, projectMetadata, true)
);
}

private static Map<Index, IndexMetadata> resolveLocalIndexMetadata(
Index[] concreteLocalIndices,
ProjectMetadata projectMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public void setProjectRouting(@Nullable String projectRouting) {
* @param parentTaskId the parent taskId of the original search request
* @param originalSearchRequest the original search request
* @param indices the indices to search against
* @param indicesOptions the indicesOptions to search with
* @param clusterAlias the alias to prefix index names with in the returned search results
* @param absoluteStartMillis the absolute start time to be used on the remote clusters to ensure that the same value is used
* @param finalReduce whether the reduction should be final or not
Expand All @@ -204,6 +205,7 @@ static SearchRequest subSearchRequest(
TaskId parentTaskId,
SearchRequest originalSearchRequest,
String[] indices,
IndicesOptions indicesOptions,
String clusterAlias,
long absoluteStartMillis,
boolean finalReduce
Expand All @@ -217,6 +219,7 @@ static SearchRequest subSearchRequest(
}
final SearchRequest request = new SearchRequest(originalSearchRequest, indices, clusterAlias, absoluteStartMillis, finalReduce);
request.setParentTask(parentTaskId);
request.indicesOptions(indicesOptions);
return request;
}

Expand Down Expand Up @@ -414,8 +417,8 @@ boolean isFinalReduce() {
/**
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
* request. When created through {@link #subSearchRequest(TaskId, SearchRequest, String[], String, long, boolean)}, this method returns
* the provided current time, otherwise it will return {@link System#currentTimeMillis()}.
* request. When created through {@link #subSearchRequest(TaskId, SearchRequest, String[], IndicesOptions, String, long, boolean)},
* this method returns the provided current time, otherwise it will return {@link System#currentTimeMillis()}.
*/
long getOrCreateAbsoluteStartMillis() {
return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis;
Expand Down
Loading