-
Notifications
You must be signed in to change notification settings - Fork 25.6k
POC: Cross-Project Search #131168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC: Cross-Project Search #131168
Changes from 70 commits
980723e
4ff5e6e
3df7a1d
c21a0a9
5266376
650dd77
9d63d77
8eec08e
fe6b696
8dee748
f9d6407
be2ab99
710d789
3587e6a
26a49c0
2a45f5b
9d29d4f
0ef2258
88104ce
035ffc1
f5cad57
fe3dea0
1153dfa
3fd532e
512222e
b867e69
08d3e4e
d36c3f6
d20a8c9
32ed394
67f5cdf
e25d8b2
137195f
ee93568
18d6bcb
2ae90af
2b092fb
d834784
7de6de0
9e53cc3
2d8a118
01ad89b
4cc41c3
254977c
e427303
efbd82e
d8103f9
1d3dc40
c03c73a
1f04a53
9f341be
7c6b1fc
91bf747
266a822
bf307e2
768affc
d1bd040
a78010b
ca5b153
aaab227
1967bfa
700af92
652caf3
3a8c4dc
24b9f7c
804069c
a61ad84
ab675dc
b11d30a
cd6511f
da74cb5
5373143
6544589
7b7aa7d
2af9134
c17b5c5
7d9a438
80bd89e
0a4046c
49814b1
01276d5
d1bb772
d5af5bc
f380e82
fd1df71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.action; | ||
|
||
import org.elasticsearch.core.Nullable; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Returns authorized projects (linked and origin), if running in a cross-project environment. | ||
* In non-cross-project environments, returns a special value indicating that cross-project does not apply at all. | ||
*/ | ||
public interface AuthorizedProjectsSupplier { | ||
AuthorizedProjects get(); | ||
|
||
class Default implements AuthorizedProjectsSupplier { | ||
@Override | ||
public AuthorizedProjects get() { | ||
return AuthorizedProjects.NOT_CROSS_PROJECT; | ||
} | ||
} | ||
|
||
record AuthorizedProjects(@Nullable String origin, List<String> projects) { | ||
public static AuthorizedProjects NOT_CROSS_PROJECT = new AuthorizedProjects(null, List.of()); | ||
|
||
public boolean isOriginOnly() { | ||
return origin != null && projects.isEmpty(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.action; | ||
|
||
import org.elasticsearch.transport.RemoteClusterAware; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
public final class CrossProjectReplacedIndexExpressions extends ReplacedIndexExpressions { | ||
|
||
public CrossProjectReplacedIndexExpressions(Map<String, ReplacedIndexExpression> replacedExpressionMap) { | ||
super(replacedExpressionMap); | ||
} | ||
|
||
public static boolean isQualifiedIndexExpression(String indexExpression) { | ||
return RemoteClusterAware.isRemoteIndexName(indexExpression); | ||
} | ||
|
||
public List<String> getLocalExpressions() { | ||
return replacedExpressionMap.values() | ||
.stream() | ||
.filter(e -> hasCanonicalExpressionForOrigin(e.replacedBy())) | ||
.map(ReplacedIndexExpression::original) | ||
.toList(); | ||
} | ||
|
||
public static boolean hasCanonicalExpressionForOrigin(List<String> replacedBy) { | ||
return replacedBy.stream().anyMatch(e -> false == isQualifiedIndexExpression(e)); | ||
} | ||
|
||
public void replaceLocalExpressions(ReplacedIndexExpressions localResolved) { | ||
if (localResolved == null || localResolved.asMap() == null || localResolved.asMap().isEmpty()) { | ||
return; | ||
} | ||
|
||
for (Map.Entry<String, ReplacedIndexExpression> e : localResolved.asMap().entrySet()) { | ||
final String original = e.getKey(); | ||
final ReplacedIndexExpression local = e.getValue(); | ||
if (local == null) { | ||
continue; | ||
} | ||
|
||
final ReplacedIndexExpression current = replacedExpressionMap.get(original); | ||
if (current == null) { | ||
continue; | ||
} | ||
ywangd marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
final List<String> qualified = current.replacedBy() | ||
.stream() | ||
.filter(org.elasticsearch.action.CrossProjectReplacedIndexExpressions::isQualifiedIndexExpression) | ||
.toList(); | ||
|
||
final List<String> resolvedLocal = local.replacedBy(); | ||
|
||
final List<String> combined = new ArrayList<>(resolvedLocal.size() + qualified.size()); | ||
combined.addAll(resolvedLocal); | ||
combined.addAll(qualified); | ||
|
||
replacedExpressionMap.put( | ||
original, | ||
new ReplacedIndexExpression(original, combined, local.authorized(), local.existsAndVisible(), null) | ||
|
||
); | ||
} | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "CrossProjectReplacedIndexExpressions[" + "replacedExpressionMap=" + replacedExpressionMap + ']'; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,10 +9,22 @@ | |
|
||
package org.elasticsearch.action; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.ElasticsearchSecurityException; | ||
import org.elasticsearch.action.support.IndicesOptions; | ||
import org.elasticsearch.core.Nullable; | ||
import org.elasticsearch.index.IndexNotFoundException; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.rest.RestStatus; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.elasticsearch.action.CrossProjectReplacedIndexExpressions.hasCanonicalExpressionForOrigin; | ||
|
||
/** | ||
* Needs to be implemented by all {@link org.elasticsearch.action.ActionRequest} subclasses that relate to | ||
|
@@ -48,9 +60,15 @@ interface Replaceable extends IndicesRequest { | |
*/ | ||
IndicesRequest indices(String... indices); | ||
|
||
default void setReplacedIndexExpressions(ReplacedIndexExpressions replacedIndexExpressions) {} | ||
|
||
@Nullable | ||
default ReplacedIndexExpressions getReplacedIndexExpressions() { | ||
return null; | ||
} | ||
|
||
/** | ||
* Determines whether the request can contain indices on a remote cluster. | ||
* | ||
* NOTE in theory this method can belong to the {@link IndicesRequest} interface because whether a request | ||
* allowing remote indices has no inherent relationship to whether it is {@link Replaceable} or not. | ||
* However, we don't have an existing request that is non-replaceable but allows remote indices. | ||
|
@@ -62,6 +80,90 @@ interface Replaceable extends IndicesRequest { | |
default boolean allowsRemoteIndices() { | ||
return false; | ||
} | ||
|
||
// TODO probably makes more sense on a service class as opposed to the request itself | ||
default <T extends ResponseWithReplacedIndexExpressions> void remoteFanoutErrorHandling(Map<String, T> remoteResults) {} | ||
|
||
} | ||
|
||
interface CrossProjectSearchCapable extends Replaceable { | ||
Logger logger = LogManager.getLogger(CrossProjectSearchCapable.class); | ||
|
||
@Nullable | ||
String getProjectRouting(); | ||
|
||
@Override | ||
default boolean allowsRemoteIndices() { | ||
return true; | ||
} | ||
|
||
default boolean crossProjectMode() { | ||
return getReplacedIndexExpressions() instanceof CrossProjectReplacedIndexExpressions; | ||
} | ||
|
||
@Override | ||
default <T extends ResponseWithReplacedIndexExpressions> void remoteFanoutErrorHandling(Map<String, T> remoteResults) { | ||
logger.info("Checking if we should throw in flat world for [{}]", getReplacedIndexExpressions()); | ||
// No CPS nothing to do | ||
if (false == crossProjectMode()) { | ||
logger.info("Skipping because no cross-project expressions found..."); | ||
return; | ||
} | ||
if (indicesOptions().allowNoIndices() && indicesOptions().ignoreUnavailable()) { | ||
// nothing to do since we're in lenient mode | ||
logger.info("Skipping index existence check in lenient mode"); | ||
return; | ||
} | ||
|
||
Map<String, ReplacedIndexExpression> replacedExpressions = getReplacedIndexExpressions().asMap(); | ||
assert replacedExpressions != null; | ||
logger.info("Replaced expressions to check: [{}]", replacedExpressions); | ||
for (ReplacedIndexExpression replacedIndexExpression : replacedExpressions.values()) { | ||
// TODO need to handle qualified expressions here, too | ||
String original = replacedIndexExpression.original(); | ||
List<ElasticsearchException> exceptions = new ArrayList<>(); | ||
boolean exists = hasCanonicalExpressionForOrigin(replacedIndexExpression.replacedBy()) | ||
&& replacedIndexExpression.existsAndVisible(); | ||
ywangd marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
if (exists) { | ||
logger.info("Local cluster has canonical expression for [{}], skipping remote existence check", original); | ||
continue; | ||
} | ||
if (replacedIndexExpression.authorizationError() != null) { | ||
exceptions.add(replacedIndexExpression.authorizationError()); | ||
} | ||
|
||
for (var remoteResponse : remoteResults.values()) { | ||
logger.info("Remote response resolved: [{}]", remoteResponse); | ||
Map<String, ReplacedIndexExpression> resolved = remoteResponse.getReplacedIndexExpressions().asMap(); | ||
assert resolved != null; | ||
var r = resolved.get(original); | ||
if (r != null && r.existsAndVisible() && resolved.get(original).replacedBy().isEmpty() == false) { | ||
logger.info("Remote cluster has resolved entries for [{}], skipping further remote existence check", original); | ||
exists = true; | ||
break; | ||
} else if (r != null && r.authorizationError() != null) { | ||
assert r.authorized() == false : "we should never get an error if we are authorized"; | ||
exceptions.add(resolved.get(original).authorizationError()); | ||
} | ||
} | ||
|
||
if (false == exists && false == indicesOptions().ignoreUnavailable()) { | ||
if (false == exceptions.isEmpty()) { | ||
// we only ever get exceptions if they are security related | ||
// back and forth on whether a mix or security and non-security (missing indices) exceptions should report | ||
// as 403 or 404 | ||
ElasticsearchSecurityException e = new ElasticsearchSecurityException( | ||
"authorization errors while resolving [" + original + "]", | ||
RestStatus.FORBIDDEN | ||
); | ||
exceptions.forEach(e::addSuppressed); | ||
throw e; | ||
} else { | ||
// TODO composite exception based on missing resources | ||
throw new IndexNotFoundException(original); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -91,4 +193,5 @@ interface RemoteClusterShardRequest extends IndicesRequest { | |
*/ | ||
Collection<ShardId> shards(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.action; | ||
|
||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
import org.elasticsearch.core.Nullable; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
/** | ||
* Represents the result of replacing an original index expression with the concrete index resources it resolves to. | ||
* Also tracks whether the replacement was authorized, whether the indices exist and are visible, and any | ||
* authorization errors encountered during the process. | ||
*/ | ||
public final class ReplacedIndexExpression implements Writeable { | ||
private final String original; | ||
private final List<String> replacedBy; | ||
private final boolean authorized; | ||
private final boolean existsAndVisible; | ||
@Nullable | ||
private ElasticsearchException authorizationError; | ||
|
||
public ReplacedIndexExpression(StreamInput in) throws IOException { | ||
this.original = in.readString(); | ||
this.replacedBy = in.readCollectionAsList(StreamInput::readString); | ||
this.authorized = in.readBoolean(); | ||
this.existsAndVisible = in.readBoolean(); | ||
this.authorizationError = ElasticsearchException.readException(in); | ||
} | ||
|
||
public ReplacedIndexExpression( | ||
String original, | ||
List<String> replacedBy, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking the usage of this field, it does seem make sense to split it into remote and local.
If we represent remote and local with different objects, we can also fold Or maybe the split can be even higher up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed. I've pushed #134783 as a first pass on the data model. Fits quite cleanly IMO |
||
boolean authorized, | ||
boolean existsAndVisible, | ||
@Nullable ElasticsearchException exception | ||
) { | ||
this.original = original; | ||
this.replacedBy = replacedBy; | ||
this.authorized = authorized; | ||
this.existsAndVisible = existsAndVisible; | ||
this.authorizationError = exception; | ||
} | ||
|
||
public static String[] toIndices(Map<String, ReplacedIndexExpression> replacedExpressions) { | ||
return replacedExpressions.values() | ||
.stream() | ||
.flatMap(indexExpression -> indexExpression.replacedBy().stream()) | ||
.toArray(String[]::new); | ||
} | ||
|
||
public ReplacedIndexExpression(String original, List<String> replacedBy) { | ||
this(original, replacedBy, true, true, null); | ||
} | ||
|
||
public void setAuthorizationError(ElasticsearchException error) { | ||
assert authorized == false : "we should never set an error if we are authorized"; | ||
this.authorizationError = error; | ||
} | ||
|
||
public String original() { | ||
return original; | ||
} | ||
|
||
public List<String> replacedBy() { | ||
return replacedBy; | ||
} | ||
|
||
public boolean authorized() { | ||
return authorized; | ||
} | ||
|
||
public boolean existsAndVisible() { | ||
return existsAndVisible; | ||
} | ||
|
||
@Nullable | ||
public ElasticsearchException authorizationError() { | ||
return authorizationError; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (o == null || getClass() != o.getClass()) return false; | ||
ReplacedIndexExpression that = (ReplacedIndexExpression) o; | ||
return authorized == that.authorized | ||
&& existsAndVisible == that.existsAndVisible | ||
&& Objects.equals(original, that.original) | ||
&& Objects.equals(replacedBy, that.replacedBy) | ||
&& Objects.equals(authorizationError, that.authorizationError); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(original, replacedBy, authorized, existsAndVisible, authorizationError); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "ReplacedExpression{" | ||
+ "original='" | ||
+ original | ||
+ '\'' | ||
+ ", replacedBy=" | ||
+ replacedBy | ||
+ ", authorized=" | ||
+ authorized | ||
+ ", existsAndVisible=" | ||
+ existsAndVisible | ||
+ ", authorizationError=" | ||
+ authorizationError | ||
+ '}'; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeString(original); | ||
out.writeStringCollection(replacedBy); | ||
out.writeBoolean(authorized); | ||
out.writeBoolean(existsAndVisible); | ||
ElasticsearchException.writeException(authorizationError, out); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.