Skip to content

Commit 9cc331c

Browse files
authored
CCS with minimize_roundtrips performs incremental merges of each SearchResponse (#103134)
* CCS with minimize_roundtrips performs incremental merges of each SearchResponse To help address the issue of slow-to-respond clusters in a cross-cluster search, async-search based CCS with minimize_roundtrips=true performs incremental merges of each SearchResponse as they come in from each cluster (including the local cluster). This means, any time the user calls GET _async_search/:id, they will now get search hits and/or aggregation results from any clusters that have finished so far, as well as any partial aggs from the local cluster (existing functionality). The `is_running` field in the async-search response should be used to determine whether at least one cluster has still not reported back its final results. The SearchResponses are collected by MutableSearchResponse. When a user requests an AsyncSearchResponse, if the final response (from onResponse) has not been received, then it will create a new SearchResponseMerger on the fly using the Supplier of SearchResponseMerger in the SearchTask. This is non-null only for CCS MRT=true.
1 parent 0c98fb2 commit 9cc331c

File tree

9 files changed

+765
-26
lines changed

9 files changed

+765
-26
lines changed

docs/changelog/103134.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 103134
2+
summary: CCS with `minimize_roundtrips` performs incremental merges of each `SearchResponse`
3+
area: Search
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,15 @@ protected void onFetchResult(int shardIndex) {}
104104
*/
105105
protected void onFetchFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}
106106

107+
/**
108+
* Indicates that a cluster has finished a search operation. Used for CCS minimize_roundtrips=true only.
109+
*
110+
* @param clusterAlias alias of cluster that has finished a search operation and returned a SearchResponse.
111+
* The cluster alias for the local cluster is RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.
112+
* @param searchResponse SearchResponse from cluster 'clusterAlias'
113+
*/
114+
protected void onClusterResponseMinimizeRoundtrips(String clusterAlias, SearchResponse searchResponse) {}
115+
107116
final void notifyListShards(
108117
List<SearchShard> shards,
109118
List<SearchShard> skippedShards,
@@ -167,6 +176,14 @@ final void notifyFetchFailure(int shardIndex, SearchShardTarget shardTarget, Exc
167176
}
168177
}
169178

179+
final void notifyClusterResponseMinimizeRoundtrips(String clusterAlias, SearchResponse searchResponse) {
180+
try {
181+
onClusterResponseMinimizeRoundtrips(clusterAlias, searchResponse);
182+
} catch (Exception e) {
183+
logger.warn(() -> "[" + clusterAlias + "] Failed to execute progress listener onResponseMinimizeRoundtrips", e);
184+
}
185+
}
186+
170187
static List<SearchShard> buildSearchShards(List<? extends SearchPhaseResult> results) {
171188
return results.stream()
172189
.filter(Objects::nonNull)

server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
// TODO it may make sense to integrate the remote clusters responses as a shard response in the initial search phase and ignore hits coming
6666
// from the remote clusters in the fetch phase. This would be identical to the removed QueryAndFetch strategy except that only the remote
6767
// cluster response would have the fetch results.
68-
final class SearchResponseMerger implements Releasable {
68+
public final class SearchResponseMerger implements Releasable {
6969
final int from;
7070
final int size;
7171
final int trackTotalHitsUpTo;
@@ -98,7 +98,7 @@ final class SearchResponseMerger implements Releasable {
9898
* Merges currently happen at once when all responses are available and {@link #getMergedResponse(Clusters)} )} is called.
9999
* That may change in the future as it's possible to introduce incremental merges as responses come in if necessary.
100100
*/
101-
void add(SearchResponse searchResponse) {
101+
public void add(SearchResponse searchResponse) {
102102
assert searchResponse.getScrollId() == null : "merging scroll results is not supported";
103103
searchResponse.mustIncRef();
104104
searchResponses.add(searchResponse);
@@ -109,10 +109,13 @@ int numResponses() {
109109
}
110110

111111
/**
112-
* Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)}
113-
* so that all responses are merged into a single one.
112+
* Returns the merged response of all SearchResponses received so far. Can be called at any point,
113+
* including when only some clusters have finished, in order to get "incremental" partial results.
114+
* @param clusters The Clusters object for the search to report on the status of each cluster
115+
* involved in the cross-cluster search
116+
* @return merged response
114117
*/
115-
SearchResponse getMergedResponse(Clusters clusters) {
118+
public SearchResponse getMergedResponse(Clusters clusters) {
116119
// if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true,
117120
// we end up calling merge without anything to merge, we just return an empty search response
118121
if (searchResponses.size() == 0) {

server/src/main/java/org/elasticsearch/action/search/SearchTask.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class SearchTask extends CancellableTask {
2121
// generating description in a lazy way since source can be quite big
2222
private final Supplier<String> descriptionSupplier;
2323
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
24+
private Supplier<SearchResponseMerger> searchResponseMergerSupplier; // used for CCS minimize_roundtrips=true
2425

2526
public SearchTask(
2627
long id,
@@ -53,4 +54,19 @@ public final SearchProgressListener getProgressListener() {
5354
return progressListener;
5455
}
5556

57+
/**
58+
* @return the Supplier of {@link SearchResponseMerger} attached to this task. Will be null
59+
* for local-only search and cross-cluster searches with minimize_roundtrips=false.
60+
*/
61+
public Supplier<SearchResponseMerger> getSearchResponseMergerSupplier() {
62+
return searchResponseMergerSupplier;
63+
}
64+
65+
/**
66+
* @param supplier Attach a Supplier of {@link SearchResponseMerger} to this task.
67+
* For use with CCS minimize_roundtrips=true
68+
*/
69+
public void setSearchResponseMergerSupplier(Supplier<SearchResponseMerger> supplier) {
70+
this.searchResponseMergerSupplier = supplier;
71+
}
5672
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ void executeRequest(
362362
.notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider);
363363
}
364364
ccsRemoteReduce(
365+
task,
365366
parentTaskId,
366367
rewritten,
367368
localIndices,
@@ -496,6 +497,7 @@ public static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
496497
* Handles ccs_minimize_roundtrips=true
497498
*/
498499
static void ccsRemoteReduce(
500+
SearchTask task,
499501
TaskId parentTaskId,
500502
SearchRequest searchRequest,
501503
OriginalIndices localIndices,
@@ -532,7 +534,6 @@ static void ccsRemoteReduce(
532534
remoteClusterClient.search(ccsSearchRequest, new ActionListener<>() {
533535
@Override
534536
public void onResponse(SearchResponse searchResponse) {
535-
// TODO: in CCS fail fast ticket we may need to fail the query if the cluster is marked as FAILED
536537
// overwrite the existing cluster entry with the updated one
537538
ccsClusterInfoUpdate(searchResponse, clusters, clusterAlias, skipUnavailable);
538539
Map<String, SearchProfileShardResult> profileResults = searchResponse.getProfileResults();
@@ -580,6 +581,9 @@ public void onFailure(Exception e) {
580581
timeProvider,
581582
aggReduceContextBuilder
582583
);
584+
task.setSearchResponseMergerSupplier(
585+
() -> createSearchResponseMerger(searchRequest.source(), timeProvider, aggReduceContextBuilder)
586+
);
583587
final AtomicReference<Exception> exceptions = new AtomicReference<>();
584588
int totalClusters = remoteIndices.size() + (localIndices == null ? 0 : 1);
585589
final CountDown countDown = new CountDown(totalClusters);
@@ -602,6 +606,7 @@ public void onFailure(Exception e) {
602606
exceptions,
603607
searchResponseMerger,
604608
clusters,
609+
task.getProgressListener(),
605610
listener
606611
);
607612
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(
@@ -619,6 +624,7 @@ public void onFailure(Exception e) {
619624
exceptions,
620625
searchResponseMerger,
621626
clusters,
627+
task.getProgressListener(),
622628
listener
623629
);
624630
SearchRequest ccsLocalSearchRequest = SearchRequest.subSearchRequest(
@@ -759,6 +765,7 @@ private static ActionListener<SearchResponse> createCCSListener(
759765
AtomicReference<Exception> exceptions,
760766
SearchResponseMerger searchResponseMerger,
761767
SearchResponse.Clusters clusters,
768+
SearchProgressListener progressListener,
762769
ActionListener<SearchResponse> originalListener
763770
) {
764771
return new CCSActionListener<>(
@@ -771,9 +778,9 @@ private static ActionListener<SearchResponse> createCCSListener(
771778
) {
772779
@Override
773780
void innerOnResponse(SearchResponse searchResponse) {
774-
// TODO: in CCS fail fast ticket we may need to fail the query if the cluster gets marked as FAILED
775781
ccsClusterInfoUpdate(searchResponse, clusters, clusterAlias, skipUnavailable);
776782
searchResponseMerger.add(searchResponse);
783+
progressListener.notifyClusterResponseMinimizeRoundtrips(clusterAlias, searchResponse);
777784
}
778785

779786
@Override
@@ -1494,7 +1501,6 @@ public final void onFailure(Exception e) {
14941501
if (cluster != null) {
14951502
ccsClusterInfoUpdate(f, clusters, clusterAlias, true);
14961503
}
1497-
// skippedClusters.incrementAndGet();
14981504
} else {
14991505
if (cluster != null) {
15001506
ccsClusterInfoUpdate(f, clusters, clusterAlias, false);

0 commit comments

Comments
 (0)