Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public void testLocalClusterAlias() throws ExecutionException, InterruptedExcept
parentTaskId,
new SearchRequest(),
Strings.EMPTY_ARRAY,
SearchRequest.DEFAULT_INDICES_OPTIONS,
"local",
nowInMillis,
randomBoolean()
Expand All @@ -158,6 +159,7 @@ public void testLocalClusterAlias() throws ExecutionException, InterruptedExcept
parentTaskId,
new SearchRequest(),
Strings.EMPTY_ARRAY,
SearchRequest.DEFAULT_INDICES_OPTIONS,
"",
nowInMillis,
randomBoolean()
Expand Down Expand Up @@ -205,6 +207,7 @@ public void testAbsoluteStartMillis() throws ExecutionException, InterruptedExce
parentTaskId,
new SearchRequest(),
Strings.EMPTY_ARRAY,
SearchRequest.DEFAULT_INDICES_OPTIONS,
"",
0,
randomBoolean()
Expand All @@ -216,6 +219,7 @@ public void testAbsoluteStartMillis() throws ExecutionException, InterruptedExce
parentTaskId,
new SearchRequest(),
Strings.EMPTY_ARRAY,
SearchRequest.DEFAULT_INDICES_OPTIONS,
"",
0,
randomBoolean()
Expand All @@ -231,6 +235,7 @@ public void testAbsoluteStartMillis() throws ExecutionException, InterruptedExce
parentTaskId,
new SearchRequest(),
Strings.EMPTY_ARRAY,
SearchRequest.DEFAULT_INDICES_OPTIONS,
"",
0,
randomBoolean()
Expand Down Expand Up @@ -279,7 +284,15 @@ public void testFinalReduce() throws ExecutionException, InterruptedException {
{
SearchRequest searchRequest = randomBoolean()
? originalRequest
: SearchRequest.subSearchRequest(taskId, originalRequest, Strings.EMPTY_ARRAY, "remote", nowInMillis, true);
: SearchRequest.subSearchRequest(
taskId,
originalRequest,
Strings.EMPTY_ARRAY,
originalRequest.indicesOptions(),
"remote",
nowInMillis,
true
);
assertResponse(client().search(searchRequest), searchResponse -> {
assertEquals(2, searchResponse.getHits().getTotalHits().value());
InternalAggregations aggregations = searchResponse.getAggregations();
Expand All @@ -292,6 +305,7 @@ public void testFinalReduce() throws ExecutionException, InterruptedException {
taskId,
originalRequest,
Strings.EMPTY_ARRAY,
originalRequest.indicesOptions(),
"remote",
nowInMillis,
false
Expand Down
38 changes: 38 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ResolvedIndices.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,44 @@ public static ResolvedIndices resolveWithPIT(
);
}

/**
* Create a new {@link ResolvedIndices} instance from a Map of Projects to {@link ResolvedIndexExpressions}. This is intended to be
* used for Cross-Project Search (CPS).
*
* @param localIndices this value is set as-is in the resulting ResolvedIndices.
* @param localIndexMetadata this value is set as-is in the resulting ResolvedIndices.
* @param remoteExpressions the map of project names to {@link ResolvedIndexExpressions}. This map is used to create the
* {@link ResolvedIndices#getRemoteClusterIndices()} for the resulting ResolvedIndices. Each project keyed
* in the map is guaranteed to have at least one index for the index expression provided by the user.
* The resulting {@link ResolvedIndices#getRemoteClusterIndices()} will map to the original index expression
* provided by the user. For example, if the user requested "logs" and "project-1" resolved that to "logs-1",
* then the result will map "project-1" to "logs". We rely on the remote search request to expand "logs" back
* to "logs-1".
* @param indicesOptions this value is set as-is in the resulting ResolvedIndices.
*/
public static ResolvedIndices resolveWithIndexExpressions(
OriginalIndices localIndices,
Map<Index, IndexMetadata> localIndexMetadata,
Map<String, ResolvedIndexExpressions> remoteExpressions,
IndicesOptions indicesOptions
) {
Map<String, OriginalIndices> remoteIndices = remoteExpressions.entrySet().stream().collect(HashMap::new, (map, entry) -> {
var indices = entry.getValue().expressions().stream().filter(expression -> {
var resolvedExpressions = expression.localExpressions();
var successfulResolution = resolvedExpressions
.localIndexResolutionResult() == ResolvedIndexExpression.LocalIndexResolutionResult.SUCCESS;
// if the expression is a wildcard, it will be successful even if there are no indices, so filter for no indices
var hasResolvedIndices = resolvedExpressions.indices().isEmpty() == false;
return successfulResolution && hasResolvedIndices;
}).map(ResolvedIndexExpression::original).toArray(String[]::new);
if (indices.length > 0) {
map.put(entry.getKey(), new OriginalIndices(indices, indicesOptions));
}
}, Map::putAll);

return new ResolvedIndices(remoteIndices, localIndices, localIndexMetadata);
}

private static Map<Index, IndexMetadata> resolveLocalIndexMetadata(
Index[] concreteLocalIndices,
ProjectMetadata projectMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public void setProjectRouting(@Nullable String projectRouting) {
* @param parentTaskId the parent taskId of the original search request
* @param originalSearchRequest the original search request
* @param indices the indices to search against
* @param indicesOptions the indicesOptions to search with
* @param clusterAlias the alias to prefix index names with in the returned search results
* @param absoluteStartMillis the absolute start time to be used on the remote clusters to ensure that the same value is used
* @param finalReduce whether the reduction should be final or not
Expand All @@ -204,6 +205,7 @@ static SearchRequest subSearchRequest(
TaskId parentTaskId,
SearchRequest originalSearchRequest,
String[] indices,
IndicesOptions indicesOptions,
String clusterAlias,
long absoluteStartMillis,
boolean finalReduce
Expand All @@ -217,6 +219,7 @@ static SearchRequest subSearchRequest(
}
final SearchRequest request = new SearchRequest(originalSearchRequest, indices, clusterAlias, absoluteStartMillis, finalReduce);
request.setParentTask(parentTaskId);
request.indicesOptions(indicesOptions);
return request;
}

Expand Down Expand Up @@ -414,8 +417,8 @@ boolean isFinalReduce() {
/**
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
* request. When created through {@link #subSearchRequest(TaskId, SearchRequest, String[], String, long, boolean)}, this method returns
* the provided current time, otherwise it will return {@link System#currentTimeMillis()}.
* request. When created through {@link #subSearchRequest(TaskId, SearchRequest, String[], IndicesOptions, String, long, boolean)},
* this method returns the provided current time, otherwise it will return {@link System#currentTimeMillis()}.
*/
long getOrCreateAbsoluteStartMillis() {
return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis;
Expand Down
Loading