-
Notifications
You must be signed in to change notification settings - Fork 25.5k
Cross-project search index expression rewriting #135346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 112 commits
11e955a
23ed699
ad8a9cb
ba24636
dd1dbe0
a363c19
103e044
17c50c7
8a5861c
777a8bc
0474671
cbe0bd4
8789cc4
820303b
7ba1eec
bb79a5c
aad05d6
457b45b
013b1dd
a00b8cc
df37ed6
cb346e0
665bbec
5ad94e4
845fd9f
97c6efa
62bb00f
add29c3
1d0be73
c15b0d7
34c5c06
e2a0dff
893e0f4
177a8ac
f557655
ad54ed1
df80b29
617d7d4
63366dc
e2e6236
6354928
3be77da
115efe2
339ccea
eb3e45a
cfe91ef
d39b249
46388a7
f1be6b4
515eb37
6f79ff0
cf76a8f
3b003ba
352711f
c41eebc
2311e78
ae36d55
59ae6f2
0fe6504
9be4eba
dcd3169
0338477
d58edf5
6f91def
ddc9e85
6506e48
c30054b
55dc1c9
62d420f
b63c87e
4e65668
f75d465
a3bd85a
426832c
6b8fc99
783e5a9
0390eca
5172e40
51cf6f7
33b0341
607f465
c688e0a
f4cd423
0af5abc
1be3764
45bd995
8f172a0
c9d793e
040512c
57c5f87
4454e32
ba12b8f
31ea357
43009ce
001a797
1c4ffe6
64aeef4
ee80cb7
8be6bfd
4bccf95
5cb9214
3155355
c7a5477
f57896a
5b5a7a9
407cffe
73ba8a9
7224d70
7240d89
395f3a0
39a50a4
02e4693
1303b06
732ace0
0537fa5
097a111
e9f2f4e
c7748c2
69c4a80
802671e
1044829
01155f2
2bffce3
71866b1
1260c15
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,8 @@ | |
|
||
package org.elasticsearch.action.admin.indices.resolve; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.TransportVersion; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.ActionRequestValidationException; | ||
|
@@ -18,6 +20,7 @@ | |
import org.elasticsearch.action.LegacyActionRequest; | ||
import org.elasticsearch.action.OriginalIndices; | ||
import org.elasticsearch.action.RemoteClusterActionType; | ||
import org.elasticsearch.action.ResolvedIndexExpressions; | ||
import org.elasticsearch.action.support.ActionFilters; | ||
import org.elasticsearch.action.support.HandledTransportAction; | ||
import org.elasticsearch.action.support.IndicesOptions; | ||
|
@@ -43,6 +46,7 @@ | |
import org.elasticsearch.index.IndexMode; | ||
import org.elasticsearch.injection.guice.Inject; | ||
import org.elasticsearch.search.SearchService; | ||
import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator; | ||
import org.elasticsearch.tasks.Task; | ||
import org.elasticsearch.transport.RemoteClusterAware; | ||
import org.elasticsearch.transport.RemoteClusterService; | ||
|
@@ -69,6 +73,8 @@ | |
import java.util.stream.Stream; | ||
|
||
import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility; | ||
import static org.elasticsearch.search.crossproject.CrossProjectIndicesRequestHelper.crossProjectFanoutIndicesOptions; | ||
import static org.elasticsearch.search.crossproject.CrossProjectIndicesRequestHelper.shouldResolveCrossProject; | ||
|
||
public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response> { | ||
|
||
|
@@ -90,6 +96,7 @@ public static class Request extends LegacyActionRequest implements IndicesReques | |
private String[] names; | ||
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; | ||
private EnumSet<IndexMode> indexModes = EnumSet.noneOf(IndexMode.class); | ||
private ResolvedIndexExpressions resolvedIndexExpressions = null; | ||
|
||
public Request(String[] names) { | ||
this.names = names; | ||
|
@@ -168,6 +175,21 @@ public boolean allowsRemoteIndices() { | |
return true; | ||
} | ||
|
||
@Override | ||
public boolean allowsCrossProject() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public void setResolvedIndexExpressions(ResolvedIndexExpressions expressions) { | ||
this.resolvedIndexExpressions = expressions; | ||
} | ||
|
||
@Override | ||
public ResolvedIndexExpressions getResolvedIndexExpressions() { | ||
return resolvedIndexExpressions; | ||
} | ||
|
||
@Override | ||
public boolean includeDataStreams() { | ||
// request must allow data streams because the index name expression resolver for the action handler assumes it | ||
|
@@ -461,17 +483,34 @@ public static class Response extends ActionResponse implements ToXContentObject | |
private final List<ResolvedIndex> indices; | ||
private final List<ResolvedAlias> aliases; | ||
private final List<ResolvedDataStream> dataStreams; | ||
@Nullable | ||
private final ResolvedIndexExpressions resolvedIndexExpressions; | ||
|
||
public Response(List<ResolvedIndex> indices, List<ResolvedAlias> aliases, List<ResolvedDataStream> dataStreams) { | ||
this(indices, aliases, dataStreams, null); | ||
} | ||
|
||
public Response( | ||
List<ResolvedIndex> indices, | ||
List<ResolvedAlias> aliases, | ||
List<ResolvedDataStream> dataStreams, | ||
ResolvedIndexExpressions resolvedIndexExpressions | ||
) { | ||
this.indices = indices; | ||
this.aliases = aliases; | ||
this.dataStreams = dataStreams; | ||
this.resolvedIndexExpressions = resolvedIndexExpressions; | ||
} | ||
|
||
public Response(StreamInput in) throws IOException { | ||
this.indices = in.readCollectionAsList(ResolvedIndex::new); | ||
this.aliases = in.readCollectionAsList(ResolvedAlias::new); | ||
this.dataStreams = in.readCollectionAsList(ResolvedDataStream::new); | ||
if (in.getTransportVersion().supports(ResolvedIndexExpressions.RESOLVED_INDEX_EXPRESSIONS)) { | ||
this.resolvedIndexExpressions = in.readOptionalWriteable(ResolvedIndexExpressions::new); | ||
} else { | ||
this.resolvedIndexExpressions = null; | ||
} | ||
} | ||
|
||
public List<ResolvedIndex> getIndices() { | ||
|
@@ -491,6 +530,9 @@ public void writeTo(StreamOutput out) throws IOException { | |
out.writeCollection(indices); | ||
out.writeCollection(aliases); | ||
out.writeCollection(dataStreams); | ||
if (out.getTransportVersion().supports(ResolvedIndexExpressions.RESOLVED_INDEX_EXPRESSIONS)) { | ||
out.writeOptionalWriteable(resolvedIndexExpressions); | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -515,9 +557,15 @@ public boolean equals(Object o) { | |
public int hashCode() { | ||
return Objects.hash(indices, aliases, dataStreams); | ||
} | ||
|
||
@Nullable | ||
public ResolvedIndexExpressions getResolvedIndexExpressions() { | ||
return resolvedIndexExpressions; | ||
} | ||
} | ||
|
||
public static class TransportAction extends HandledTransportAction<Request, Response> { | ||
private static final Logger logger = LogManager.getLogger(TransportAction.class); | ||
|
||
private final ClusterService clusterService; | ||
private final RemoteClusterService remoteClusterService; | ||
|
@@ -547,8 +595,10 @@ protected void doExecute(Task task, Request request, final ActionListener<Respon | |
checkCCSVersionCompatibility(request); | ||
} | ||
final ProjectState projectState = projectResolver.getProjectState(clusterService.state()); | ||
final IndicesOptions originalIndicesOptions = request.indicesOptions(); | ||
final boolean resolveCrossProject = shouldResolveCrossProject(request); | ||
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices( | ||
request.indicesOptions(), | ||
resolveCrossProject ? crossProjectFanoutIndicesOptions(originalIndicesOptions) : originalIndicesOptions, | ||
request.indices() | ||
); | ||
final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); | ||
|
@@ -557,12 +607,34 @@ protected void doExecute(Task task, Request request, final ActionListener<Respon | |
List<ResolvedDataStream> dataStreams = new ArrayList<>(); | ||
resolveIndices(localIndices, projectState, indexNameExpressionResolver, indices, aliases, dataStreams, request.indexModes); | ||
|
||
final ResolvedIndexExpressions localResolvedIndexExpressions = request.getResolvedIndexExpressions(); | ||
if (remoteClusterIndices.size() > 0) { | ||
final int remoteRequests = remoteClusterIndices.size(); | ||
final CountDown completionCounter = new CountDown(remoteRequests); | ||
final SortedMap<String, Response> remoteResponses = Collections.synchronizedSortedMap(new TreeMap<>()); | ||
final Runnable terminalHandler = () -> { | ||
if (completionCounter.countDown()) { | ||
if (resolveCrossProject) { | ||
// TODO temporary fix: we need to properly handle the case where a remote does not return a result due to | ||
// a failure -- in the current version of resolve indices though, these are just silently ignored | ||
if (remoteRequests != remoteResponses.size()) { | ||
listener.onFailure( | ||
new IllegalStateException( | ||
"expected [" + remoteRequests + "] remote responses but got only [" + remoteResponses.size() + "]" | ||
) | ||
); | ||
return; | ||
} | ||
final Exception ex = CrossProjectIndexResolutionValidator.validate( | ||
originalIndicesOptions, | ||
localResolvedIndexExpressions, | ||
getResolvedExpressionsByRemote(remoteResponses) | ||
); | ||
if (ex != null) { | ||
listener.onFailure(ex); | ||
return; | ||
} | ||
} | ||
mergeResults(remoteResponses, indices, aliases, dataStreams, request.indexModes); | ||
listener.onResponse(new Response(indices, aliases, dataStreams)); | ||
} | ||
|
@@ -581,13 +653,38 @@ protected void doExecute(Task task, Request request, final ActionListener<Respon | |
remoteClusterClient.execute(ResolveIndexAction.REMOTE_TYPE, remoteRequest, ActionListener.wrap(response -> { | ||
remoteResponses.put(clusterAlias, response); | ||
terminalHandler.run(); | ||
}, failure -> terminalHandler.run())); | ||
}, failure -> { | ||
logger.info("failed to resolve indices on remote cluster [" + clusterAlias + "]", failure); | ||
n1v0lg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
terminalHandler.run(); | ||
})); | ||
} | ||
} else { | ||
listener.onResponse(new Response(indices, aliases, dataStreams)); | ||
if (resolveCrossProject) { | ||
// we still need to call response validation for local results, since qualified expressions like `_origin:index` or | ||
// `<alias-pattern-matching-origin-only>:index` get deferred validation, also | ||
|
||
final Exception ex = CrossProjectIndexResolutionValidator.validate( | ||
originalIndicesOptions, | ||
localResolvedIndexExpressions, | ||
Map.of() | ||
); | ||
if (ex != null) { | ||
listener.onFailure(ex); | ||
return; | ||
} | ||
} | ||
listener.onResponse(new Response(indices, aliases, dataStreams, localResolvedIndexExpressions)); | ||
} | ||
} | ||
|
||
private Map<String, ResolvedIndexExpressions> getResolvedExpressionsByRemote(Map<String, Response> remoteResponses) { | ||
return remoteResponses.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> { | ||
final ResolvedIndexExpressions resolvedIndexExpressions = e.getValue().getResolvedIndexExpressions(); | ||
assert resolvedIndexExpressions != null | ||
: "remote response from cluster [" + e.getKey() + "] is missing resolved index expressions"; | ||
return resolvedIndexExpressions; | ||
})); | ||
} | ||
|
||
/** | ||
* Resolves the specified names and/or wildcard expressions to index abstractions. Returns results in the supplied lists. | ||
* | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets defer that -- it's a HashSet because it needs to be modifiable for exclusion handling.
There are cleaner ways of doing this I'm sure but would prefer to follow it up in a separate PR.