Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,13 @@ void executeRequest(
ActionListener<SearchResponse> listener,
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider
) {
final SearchSourceBuilder source = original.source();
final boolean isExplain = source != null && source.explain() != null && source.explain();
if (shouldOpenPIT(source)) {
executeOpenPit(task, original, listener, searchPhaseProvider, source);
return;
}

final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider = new SearchTimeProvider(
original.getOrCreateAbsoluteStartMillis(),
Expand Down Expand Up @@ -370,187 +377,180 @@ void executeRequest(
);
frozenIndexCheck(resolvedIndices);
}

ActionListener<SearchRequest> rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> {
if (ccsCheckCompatibility) {
checkCCSVersionCompatibility(rewritten);
}

if (resolvedIndices.getRemoteClusterIndices().isEmpty()) {
executeLocalSearch(
task,
timeProvider,
rewritten,
resolvedIndices,
projectState,
SearchResponse.Clusters.EMPTY,
searchPhaseProvider.apply(delegate)
);
} else {
if (delegate instanceof TelemetryListener tl) {
tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size());
if (task.isAsync()) {
tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
}
if (original.pointInTimeBuilder() != null) {
tl.setFeature(CCSUsageTelemetry.PIT_FEATURE);
}
tl.setClient(task);
// Check if any of the index patterns are wildcard patterns
var localIndices = resolvedIndices.getLocalIndices();
if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) {
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
}
if (resolvedIndices.getRemoteClusterIndices()
.values()
.stream()
.anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) {
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
}
Rewriteable.rewriteAndFetch(
original,
searchService.getRewriteContext(timeProvider::absoluteStartMillis, resolvedIndices, original.pointInTimeBuilder(), isExplain),
listener.delegateFailureAndWrap((delegate, rewritten) -> {
if (ccsCheckCompatibility) {
checkCCSVersionCompatibility(rewritten);
}
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId();
if (shouldMinimizeRoundtrips(rewritten)) {
if (delegate instanceof TelemetryListener tl) {
tl.setFeature(CCSUsageTelemetry.MRT_FEATURE);
}
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
&& rewritten.source().aggregations() != null
? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
: null;
SearchResponse.Clusters clusters = new SearchResponse.Clusters(
resolvedIndices.getLocalIndices(),
resolvedIndices.getRemoteClusterIndices(),
true,
remoteClusterService::isSkipUnavailable
);
if (resolvedIndices.getLocalIndices() == null) {
// Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local shards)
task.getProgressListener()
.notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider);
}
ccsRemoteReduce(

if (resolvedIndices.getRemoteClusterIndices().isEmpty()) {
executeLocalSearch(
task,
parentTaskId,
timeProvider,
rewritten,
resolvedIndices,
clusters,
timeProvider,
aggregationReduceContextBuilder,
remoteClusterService,
threadPool,
delegate,
(r, l) -> executeLocalSearch(
task,
timeProvider,
r,
resolvedIndices,
projectState,
clusters,
searchPhaseProvider.apply(l)
)
projectState,
SearchResponse.Clusters.EMPTY,
searchPhaseProvider.apply(delegate)
);
} else {
final SearchContextId searchContext = resolvedIndices.getSearchContextId();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(
resolvedIndices.getLocalIndices(),
resolvedIndices.getRemoteClusterIndices(),
false,
remoteClusterService::isSkipUnavailable
);
executeSearchWithRemotes(task, searchPhaseProvider, delegate, rewritten, resolvedIndices, timeProvider, projectState);
}
})
);
}

// TODO: pass parentTaskId
collectSearchShards(
rewritten.indicesOptions(),
rewritten.preference(),
rewritten.routing(),
rewritten.source() != null ? rewritten.source().query() : null,
Objects.requireNonNullElse(rewritten.allowPartialSearchResults(), searchService.defaultAllowPartialSearchResults()),
searchContext,
resolvedIndices.getRemoteClusterIndices(),
clusters,
timeProvider,
transportService,
delegate.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
searchShardsResponses
);
final Map<String, AliasFilter> remoteAliasFilters;
final List<SearchShardIterator> remoteShardIterators;
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(
searchShardsResponses,
searchContext,
rewritten.pointInTimeBuilder().getKeepAlive(),
resolvedIndices.getRemoteClusterIndices()
);
} else {
remoteAliasFilters = new HashMap<>();
for (SearchShardsResponse searchShardsResponse : searchShardsResponses.values()) {
remoteAliasFilters.putAll(searchShardsResponse.getAliasFilters());
}
remoteShardIterators = getRemoteShardsIterator(
searchShardsResponses,
resolvedIndices.getRemoteClusterIndices(),
remoteAliasFilters
);
}
executeSearch(
task,
timeProvider,
rewritten,
resolvedIndices,
remoteShardIterators,
clusterNodeLookup,
projectState,
remoteAliasFilters,
clusters,
searchPhaseProvider.apply(finalDelegate)
);
})
);
private void executeOpenPit(
SearchTask task,
SearchRequest original,
ActionListener<SearchResponse> listener,
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider,
SearchSourceBuilder source
) {
// disabling shard reordering for request
original.setPreFilterShardSize(Integer.MAX_VALUE);
openPIT(client, original, searchService.getDefaultKeepAliveInMillis(), listener.delegateFailureAndWrap((delegate, resp) -> {
// We set the keep alive to -1 to indicate that we don't need the pit id in the response.
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE));
var pitListener = new SearchResponseActionListener(delegate) {
@Override
public void onResponse(SearchResponse response) {
// we need to close the PIT first so we delay the release of the response to after the closing
response.incRef();
closePIT(client, original.source().pointInTimeBuilder(), () -> ActionListener.respondAndRelease(delegate, response));
}

@Override
public void onFailure(Exception e) {
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e));
}
};
executeRequest(task, original, pitListener, searchPhaseProvider);
}));
}

private void executeSearchWithRemotes(
SearchTask task,
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider,
ActionListener<SearchResponse> delegate,
SearchRequest rewritten,
ResolvedIndices resolvedIndices,
SearchTimeProvider timeProvider,
ProjectState projectState
) {
if (delegate instanceof TelemetryListener tl) {
tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size());
if (task.isAsync()) {
tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
}
});
if (rewritten.pointInTimeBuilder() != null) {
tl.setFeature(CCSUsageTelemetry.PIT_FEATURE);
}
tl.setClient(task);
// Check if any of the index patterns are wildcard patterns
var localIndices = resolvedIndices.getLocalIndices();
if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) {
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
}
if (resolvedIndices.getRemoteClusterIndices()
.values()
.stream()
.anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) {
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
}
}
if (shouldMinimizeRoundtrips(rewritten)) {
if (delegate instanceof TelemetryListener tl) {
tl.setFeature(CCSUsageTelemetry.MRT_FEATURE);
}
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
&& rewritten.source().aggregations() != null
? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
: null;
SearchResponse.Clusters clusters = new SearchResponse.Clusters(
resolvedIndices.getLocalIndices(),
resolvedIndices.getRemoteClusterIndices(),
true,
remoteClusterService::isSkipUnavailable
);
if (resolvedIndices.getLocalIndices() == null) {
// Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local
// shards)
task.getProgressListener()
.notifyListShards(Collections.emptyList(), Collections.emptyList(), clusters, false, timeProvider);
}
ccsRemoteReduce(
task,
task.taskInfo(clusterService.localNode().getId(), false).taskId(),
rewritten,
resolvedIndices,
clusters,
timeProvider,
aggregationReduceContextBuilder,
remoteClusterService,
threadPool,
delegate,
(r, l) -> executeLocalSearch(task, timeProvider, r, resolvedIndices, projectState, clusters, searchPhaseProvider.apply(l))
);
} else {
final SearchContextId searchContext = resolvedIndices.getSearchContextId();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(
resolvedIndices.getLocalIndices(),
resolvedIndices.getRemoteClusterIndices(),
false,
remoteClusterService::isSkipUnavailable
);

final SearchSourceBuilder source = original.source();
final boolean isExplain = source != null && source.explain() != null && source.explain();
if (shouldOpenPIT(source)) {
// disabling shard reordering for request
original.setPreFilterShardSize(Integer.MAX_VALUE);
openPIT(client, original, searchService.getDefaultKeepAliveInMillis(), listener.delegateFailureAndWrap((delegate, resp) -> {
// We set the keep alive to -1 to indicate that we don't need the pit id in the response.
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE));
var pitListener = new SearchResponseActionListener(delegate) {
@Override
public void onResponse(SearchResponse response) {
// we need to close the PIT first so we delay the release of the response to after the closing
response.incRef();
closePIT(
client,
original.source().pointInTimeBuilder(),
() -> ActionListener.respondAndRelease(delegate, response)
// TODO: pass parentTaskId
collectSearchShards(
rewritten.indicesOptions(),
rewritten.preference(),
rewritten.routing(),
rewritten.source() != null ? rewritten.source().query() : null,
Objects.requireNonNullElse(rewritten.allowPartialSearchResults(), searchService.defaultAllowPartialSearchResults()),
searchContext,
resolvedIndices.getRemoteClusterIndices(),
clusters,
timeProvider,
transportService,
delegate.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> {
final Map<String, AliasFilter> remoteAliasFilters;
final List<SearchShardIterator> remoteShardIterators;
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(
searchShardsResponses,
searchContext,
rewritten.pointInTimeBuilder().getKeepAlive(),
resolvedIndices.getRemoteClusterIndices()
);
} else {
remoteAliasFilters = new HashMap<>();
for (SearchShardsResponse searchShardsResponse : searchShardsResponses.values()) {
remoteAliasFilters.putAll(searchShardsResponse.getAliasFilters());
}
remoteShardIterators = getRemoteShardsIterator(
searchShardsResponses,
resolvedIndices.getRemoteClusterIndices(),
remoteAliasFilters
);
}

@Override
public void onFailure(Exception e) {
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e));
}
};
executeRequest(task, original, pitListener, searchPhaseProvider);
}));
} else {
Rewriteable.rewriteAndFetch(
original,
searchService.getRewriteContext(
timeProvider::absoluteStartMillis,
resolvedIndices,
original.pointInTimeBuilder(),
isExplain
),
rewriteListener
executeSearch(
task,
timeProvider,
rewritten,
resolvedIndices,
remoteShardIterators,
getRemoteClusterNodeLookup(searchShardsResponses),
projectState,
remoteAliasFilters,
clusters,
searchPhaseProvider.apply(finalDelegate)
);
})
);
}
}
Expand Down
Loading