-
Notifications
You must be signed in to change notification settings - Fork 25.6k
POC: Cross-Project Search #131168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC: Cross-Project Search #131168
Changes from 76 commits
980723e
4ff5e6e
3df7a1d
c21a0a9
5266376
650dd77
9d63d77
8eec08e
fe6b696
8dee748
f9d6407
be2ab99
710d789
3587e6a
26a49c0
2a45f5b
9d29d4f
0ef2258
88104ce
035ffc1
f5cad57
fe3dea0
1153dfa
3fd532e
512222e
b867e69
08d3e4e
d36c3f6
d20a8c9
32ed394
67f5cdf
e25d8b2
137195f
ee93568
18d6bcb
2ae90af
2b092fb
d834784
7de6de0
9e53cc3
2d8a118
01ad89b
4cc41c3
254977c
e427303
efbd82e
d8103f9
1d3dc40
c03c73a
1f04a53
9f341be
7c6b1fc
91bf747
266a822
bf307e2
768affc
d1bd040
a78010b
ca5b153
aaab227
1967bfa
700af92
652caf3
3a8c4dc
24b9f7c
804069c
a61ad84
ab675dc
b11d30a
cd6511f
da74cb5
5373143
6544589
7b7aa7d
2af9134
c17b5c5
7d9a438
80bd89e
0a4046c
49814b1
01276d5
d1bb772
d5af5bc
f380e82
fd1df71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.action; | ||
|
||
import org.elasticsearch.core.Nullable; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Returns authorized projects (linked and origin), if running in a cross-project environment. | ||
* In non-cross-project environments, returns a special value indicating that cross-project does not apply at all. | ||
*/ | ||
public interface AuthorizedProjectsSupplier { | ||
AuthorizedProjects get(); | ||
|
||
AuthorizedProjectsSupplier DEFAULT = new Default(); | ||
|
||
default boolean runCps() { | ||
return true; // ?? | ||
} | ||
|
||
class Default implements AuthorizedProjectsSupplier { | ||
@Override | ||
public AuthorizedProjects get() { | ||
return AuthorizedProjects.NOT_CROSS_PROJECT; | ||
} | ||
} | ||
|
||
// Note: in the final implementation these won't be strings but Project record classes with additional info | ||
// relevant to e.g. project routing | ||
record AuthorizedProjects(@Nullable String originProject, List<String> linkedProjects) { | ||
public static AuthorizedProjects NOT_CROSS_PROJECT = new AuthorizedProjects(null, List.of()); | ||
|
||
public boolean isOriginOnly() { | ||
return originProject != null && linkedProjects.isEmpty(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.action; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.ElasticsearchSecurityException; | ||
import org.elasticsearch.action.support.IndicesOptions; | ||
import org.elasticsearch.index.IndexNotFoundException; | ||
import org.elasticsearch.rest.RestStatus; | ||
import org.elasticsearch.transport.RemoteClusterService; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class CrossProjectSearchErrorHandler { | ||
private static final Logger logger = LogManager.getLogger(CrossProjectSearchErrorHandler.class); | ||
|
||
private final AuthorizedProjectsSupplier supplier; | ||
private final RemoteClusterService remoteClusterService; | ||
|
||
public CrossProjectSearchErrorHandler(AuthorizedProjectsSupplier supplier, RemoteClusterService remoteClusterService) { | ||
this.supplier = supplier; | ||
this.remoteClusterService = remoteClusterService; | ||
} | ||
|
||
public boolean enabled() { | ||
return supplier.get() != AuthorizedProjectsSupplier.AuthorizedProjects.NOT_CROSS_PROJECT; | ||
} | ||
|
||
public Map<String, OriginalIndices> groupIndicesForFanoutAction(IndicesRequest.Replaceable replaceable) { | ||
return remoteClusterService.groupIndices(getIndicesOptions(replaceable.indicesOptions()), replaceable.indices()); | ||
} | ||
|
||
private IndicesOptions getIndicesOptions(IndicesOptions indicesOptions) { | ||
return enabled() ? lenientIndicesOptions(indicesOptions) : indicesOptions; | ||
} | ||
|
||
private static IndicesOptions lenientIndicesOptions(IndicesOptions indicesOptions) { | ||
return IndicesOptions.builder(indicesOptions) | ||
.concreteTargetOptions(new IndicesOptions.ConcreteTargetOptions(true)) | ||
.wildcardOptions(IndicesOptions.WildcardOptions.builder(indicesOptions.wildcardOptions()).allowEmptyExpressions(true).build()) | ||
.build(); | ||
} | ||
|
||
public <T extends ResponseWithReplacedIndexExpressions> void errorHandling( | ||
IndicesRequest.Replaceable request, | ||
Map<String, T> remoteResults | ||
) { | ||
logger.info("Checking if we should throw for [{}] under CPS", request.getReplacedIndexExpressions()); | ||
// No CPS nothing to do | ||
if (false == enabled()) { | ||
logger.info("Skipping because we are not in CPS mode..."); | ||
return; | ||
} | ||
if (request.indicesOptions().allowNoIndices() && request.indicesOptions().ignoreUnavailable()) { | ||
// nothing to do since we're in lenient mode | ||
logger.info("Skipping index existence check in lenient mode"); | ||
return; | ||
} | ||
|
||
Map<String, ReplacedIndexExpression> replacedExpressions = request.getReplacedIndexExpressions().replacedExpressionMap(); | ||
assert replacedExpressions != null; | ||
logger.info("Replaced expressions to check: [{}]", replacedExpressions); | ||
for (ReplacedIndexExpression replacedIndexExpression : replacedExpressions.values()) { | ||
// TODO need to handle qualified expressions here, too | ||
String original = replacedIndexExpression.original(); | ||
List<ElasticsearchException> exceptions = new ArrayList<>(); | ||
boolean exists = replacedIndexExpression.hasLocalIndices() | ||
&& replacedIndexExpression.resolutionResult() == ReplacedIndexExpression.ResolutionResult.SUCCESS; | ||
if (exists) { | ||
logger.info("Local cluster has canonical expression for [{}], skipping remote existence check", original); | ||
continue; | ||
} | ||
if (replacedIndexExpression.authorizationError() != null) { | ||
exceptions.add(replacedIndexExpression.authorizationError()); | ||
} | ||
|
||
for (var remoteResponse : remoteResults.values()) { | ||
logger.info("Remote response resolved: [{}]", remoteResponse); | ||
Map<String, ReplacedIndexExpression> resolved = remoteResponse.getReplacedIndexExpressions().replacedExpressionMap(); | ||
assert resolved != null; | ||
var r = resolved.get(original); | ||
if (r != null | ||
&& replacedIndexExpression.resolutionResult() == ReplacedIndexExpression.ResolutionResult.SUCCESS | ||
&& resolved.get(original).replacedBy().isEmpty() == false) { | ||
logger.info("Remote cluster has resolved entries for [{}], skipping further remote existence check", original); | ||
exists = true; | ||
break; | ||
} else if (r != null && r.authorizationError() != null) { | ||
assert r.resolutionResult() == ReplacedIndexExpression.ResolutionResult.CONCRETE_RESOURCE_UNAUTHORIZED | ||
: "we should never get an error if we are authorized"; | ||
exceptions.add(resolved.get(original).authorizationError()); | ||
} | ||
} | ||
|
||
if (false == exists && false == request.indicesOptions().ignoreUnavailable()) { | ||
if (false == exceptions.isEmpty()) { | ||
// we only ever get exceptions if they are security related | ||
// back and forth on whether a mix or security and non-security (missing indices) exceptions should report | ||
// as 403 or 404 | ||
ElasticsearchSecurityException e = new ElasticsearchSecurityException( | ||
"authorization errors while resolving [" + original + "]", | ||
RestStatus.FORBIDDEN | ||
); | ||
exceptions.forEach(e::addSuppressed); | ||
throw e; | ||
} else { | ||
// TODO composite exception based on missing resources | ||
throw new IndexNotFoundException(original); | ||
} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
package org.elasticsearch.action; | ||
|
||
import org.elasticsearch.action.support.IndicesOptions; | ||
import org.elasticsearch.core.Nullable; | ||
import org.elasticsearch.index.shard.ShardId; | ||
|
||
import java.util.Collection; | ||
|
@@ -48,9 +49,15 @@ interface Replaceable extends IndicesRequest { | |
*/ | ||
IndicesRequest indices(String... indices); | ||
|
||
default void setReplacedIndexExpressions(ReplacedIndexExpressions replacedIndexExpressions) {} | ||
|
||
@Nullable | ||
default ReplacedIndexExpressions getReplacedIndexExpressions() { | ||
return null; | ||
} | ||
|
||
/** | ||
* Determines whether the request can contain indices on a remote cluster. | ||
* | ||
* NOTE in theory this method can belong to the {@link IndicesRequest} interface because whether a request | ||
* allowing remote indices has no inherent relationship to whether it is {@link Replaceable} or not. | ||
* However, we don't have an existing request that is non-replaceable but allows remote indices. | ||
|
@@ -62,6 +69,29 @@ interface Replaceable extends IndicesRequest { | |
default boolean allowsRemoteIndices() { | ||
return false; | ||
} | ||
|
||
// Could this be useful? | ||
default IndexResolutionMode indexResolutionMode() { | ||
return IndexResolutionMode.CANONICAL; | ||
} | ||
|
||
|
||
@Nullable | ||
default String getProjectRouting() { | ||
return null; | ||
} | ||
} | ||
|
||
enum IndexResolutionMode { | ||
FLAT, | ||
CANONICAL | ||
} | ||
|
||
// Complete marker interface, can be folded into Replaceable if necessary | ||
interface CrossProjectSearchCapable extends Replaceable { | ||
@Override | ||
default boolean allowsRemoteIndices() { | ||
return true; | ||
} | ||
} | ||
Comment on lines
+79
to
85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah unless we want to move |
||
|
||
/** | ||
|
@@ -91,4 +121,5 @@ interface RemoteClusterShardRequest extends IndicesRequest { | |
*/ | ||
Collection<ShardId> shards(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.action; | ||
|
||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
import org.elasticsearch.core.Nullable; | ||
import org.elasticsearch.transport.RemoteClusterAware; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.stream.Collectors; | ||
|
||
// Represent flat -> canonical CPS rewrites | ||
// Represent local index expansion | ||
|
||
public final class ReplacedIndexExpression implements Writeable { | ||
public enum ResolutionResult { | ||
SUCCESS, | ||
CONCRETE_RESOURCE_UNAUTHORIZED, | ||
CONCRETE_RESOURCE_MISSING | ||
} | ||
|
||
private final String original; | ||
private final List<String> replacedBy; | ||
private final ResolutionResult resolutionResult; | ||
@Nullable | ||
private ElasticsearchException authorizationError; | ||
|
||
public ReplacedIndexExpression(StreamInput in) throws IOException { | ||
this.original = in.readString(); | ||
this.replacedBy = in.readCollectionAsList(StreamInput::readString); | ||
this.resolutionResult = in.readEnum(ResolutionResult.class); | ||
this.authorizationError = ElasticsearchException.readException(in); | ||
} | ||
|
||
public ReplacedIndexExpression( | ||
String original, | ||
List<String> replacedBy, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking the usage of this field, it does seem make sense to split it into remote and local.
If we represent remote and local with different objects, we can also fold Or maybe the split can be even higher up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed. I've pushed #134783 as a first pass on the data model. Fits quite cleanly IMO |
||
ResolutionResult resolutionResult, | ||
@Nullable ElasticsearchException exception | ||
) { | ||
this.original = original; | ||
this.replacedBy = replacedBy; | ||
this.resolutionResult = resolutionResult; | ||
this.authorizationError = exception; | ||
} | ||
|
||
public ReplacedIndexExpression(String original, List<String> replacedBy) { | ||
this(original, replacedBy, ResolutionResult.SUCCESS, null); | ||
} | ||
|
||
public void setAuthorizationError(ElasticsearchException error) { | ||
assert resolutionResult == ResolutionResult.CONCRETE_RESOURCE_UNAUTHORIZED : "we should never set an error if we are authorized"; | ||
this.authorizationError = error; | ||
} | ||
|
||
public boolean hasLocalIndices() { | ||
return false == getLocalIndices().isEmpty(); | ||
} | ||
|
||
public List<String> getLocalIndices() { | ||
return replacedBy.stream().filter(e -> false == RemoteClusterAware.isRemoteIndexName(e)).collect(Collectors.toList()); | ||
} | ||
|
||
public List<String> getRemoteIndices() { | ||
return replacedBy.stream().filter(RemoteClusterAware::isRemoteIndexName).collect(Collectors.toList()); | ||
} | ||
|
||
public String original() { | ||
return original; | ||
} | ||
|
||
public List<String> replacedBy() { | ||
return replacedBy; | ||
} | ||
|
||
public ResolutionResult resolutionResult() { | ||
return resolutionResult; | ||
} | ||
|
||
@Nullable | ||
public ElasticsearchException authorizationError() { | ||
return authorizationError; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeString(original); | ||
out.writeStringCollection(replacedBy); | ||
out.writeEnum(resolutionResult); | ||
ElasticsearchException.writeException(authorizationError, out); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (o == null || getClass() != o.getClass()) return false; | ||
ReplacedIndexExpression that = (ReplacedIndexExpression) o; | ||
return Objects.equals(original, that.original) | ||
&& Objects.equals(replacedBy, that.replacedBy) | ||
&& resolutionResult == that.resolutionResult | ||
&& Objects.equals(authorizationError, that.authorizationError); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(original, replacedBy, resolutionResult, authorizationError); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "ReplacedIndexExpression{" | ||
+ "original='" | ||
+ original | ||
+ '\'' | ||
+ ", replacedBy=" | ||
+ replacedBy | ||
+ ", resolutionResult=" | ||
+ resolutionResult | ||
+ ", authorizationError=" | ||
+ authorizationError | ||
+ '}'; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this class can also have different implementations and the one with actual error handling is provided by serverless.