Skip to content
Closed

poc #129968

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ tasks.register("run-ccs", RunTask) {
useCluster queryingCluster
doFirst {
queryingCluster.get().getNodes().each { node ->
node.setting('cluster.remote.my_remote_cluster.tags', 'env-dev')
if (proxyMode) {
node.setting('cluster.remote.my_remote_cluster.mode', 'proxy')
if (basicSecurityMode) {
Expand Down
25 changes: 25 additions & 0 deletions server/src/main/java/org/elasticsearch/FlatIndicesRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch;

import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.transport.RemoteClusterService;

import java.util.List;

public interface FlatIndicesRequest extends IndicesRequest {
boolean requiresRewrite();

void indexExpressions(List<IndexExpression> indexExpressions);

boolean checkRemote(List<RemoteClusterService.RemoteTag> tags);

record IndexExpression(String original, List<String> rewritten) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.fieldcaps;

import org.elasticsearch.FlatIndicesRequest;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
Expand All @@ -18,23 +19,30 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public final class FieldCapabilitiesRequest extends LegacyActionRequest implements IndicesRequest.Replaceable, ToXContentObject {
public final class FieldCapabilitiesRequest extends LegacyActionRequest
implements
FlatIndicesRequest,
IndicesRequest.Replaceable,
ToXContentObject {
public static final String NAME = "field_caps_request";
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed();

Expand All @@ -52,6 +60,8 @@ public final class FieldCapabilitiesRequest extends LegacyActionRequest implemen
private QueryBuilder indexFilter;
private Map<String, Object> runtimeFields = Collections.emptyMap();
private Long nowInMillis;
@Nullable
private List<IndexExpression> indexExpressions;

public FieldCapabilitiesRequest(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -323,4 +333,21 @@ public String getDescription() {
}
};
}

@Override
public boolean requiresRewrite() {
return indexExpressions == null;
}

@Override
public void indexExpressions(List<IndexExpression> indexExpressions) {
assert requiresRewrite();
this.indexExpressions = indexExpressions;
indices(indexExpressions.stream().flatMap(indexExpression -> indexExpression.rewritten().stream()).toArray(String[]::new));
}

@Override
public boolean checkRemote(List<RemoteClusterService.RemoteTag> tags) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.search;

import org.elasticsearch.FlatIndicesRequest;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
Expand All @@ -53,7 +55,11 @@
* @see Client#search(SearchRequest)
* @see SearchResponse
*/
public class SearchRequest extends LegacyActionRequest implements IndicesRequest.Replaceable, Rewriteable<SearchRequest> {
public class SearchRequest extends LegacyActionRequest
implements
FlatIndicesRequest,
IndicesRequest.Replaceable,
Rewriteable<SearchRequest> {

public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

Expand All @@ -69,6 +75,11 @@ public class SearchRequest extends LegacyActionRequest implements IndicesRequest
private SearchType searchType = SearchType.DEFAULT;

private String[] indices = Strings.EMPTY_ARRAY;
// This will be a more complex thing in the real implementation -- a lucene expression instead of just a list of literals
private List<RemoteClusterService.RemoteTag> routingTags = List.of();

@Nullable
private List<IndexExpression> indexExpressions;

@Nullable
private String routing;
Expand Down Expand Up @@ -400,6 +411,11 @@ public SearchRequest indices(String... indices) {
return this;
}

public SearchRequest routingTags(List<RemoteClusterService.RemoteTag> routingTags) {
this.routingTags = routingTags;
return this;
}

private static void validateIndices(String... indices) {
Objects.requireNonNull(indices, "indices must not be null");
for (String index : indices) {
Expand Down Expand Up @@ -853,4 +869,30 @@ public String toString() {
+ source
+ '}';
}

@Override
public boolean requiresRewrite() {
return indexExpressions == null;
}

@Override
public void indexExpressions(List<IndexExpression> indexExpressions) {
assert requiresRewrite();
this.indexExpressions = indexExpressions;
indices(indexExpressions.stream().flatMap(indexExpression -> indexExpression.rewritten().stream()).toArray(String[]::new));
}

@Override
public boolean checkRemote(List<RemoteClusterService.RemoteTag> tags) {
if (routingTags.isEmpty()) {
return true; // no routing requested, so no constraints
}
// if any tag in routingTags matches one in tags, return true
for (RemoteClusterService.RemoteTag tag : routingTags) {
if (tags.contains(tag)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ public void apply(Settings value, Settings current, Settings previous) {
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.REMOTE_CLUSTER_TAGS,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.REMOTE_NODE_ATTRIBUTE,
Expand Down Expand Up @@ -483,6 +484,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.ALLOW_EXPENSIVE_QUERIES,
SearchService.CCS_VERSION_CHECK_SETTING,
SearchService.CCS_COLLECT_TELEMETRY,
SearchService.FLAT_WORLD_ENABLED,
SearchService.BATCHED_QUERY_PHASE,
SearchService.PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE,
MultiBucketConsumerService.MAX_BUCKET_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

package org.elasticsearch.rest.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.search.SearchRequest;
Expand All @@ -35,6 +37,7 @@
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.term.TermSuggestionBuilder;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.usage.SearchUsageHolder;
import org.elasticsearch.xcontent.XContentParser;

Expand Down Expand Up @@ -63,6 +66,7 @@ public class RestSearchAction extends BaseRestHandler {
public static final String TYPED_KEYS_PARAM = "typed_keys";
public static final String INCLUDE_NAMED_QUERIES_SCORE_PARAM = "include_named_queries_score";
public static final Set<String> RESPONSE_PARAMS = Set.of(TYPED_KEYS_PARAM, TOTAL_HITS_AS_INT_PARAM, INCLUDE_NAMED_QUERIES_SCORE_PARAM);
private static final Logger log = LogManager.getLogger(RestSearchAction.class);

private final SearchUsageHolder searchUsageHolder;
private final Predicate<NodeFeature> clusterSupportsFeature;
Expand Down Expand Up @@ -98,6 +102,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
client.threadPool().getThreadContext().setErrorTraceTransportHeader(request);
}
SearchRequest searchRequest = new SearchRequest();

// access the BwC param, but just drop it
// this might be set by old clients
request.param("min_compatible_shard_node");
Expand Down Expand Up @@ -167,6 +172,16 @@ public static void parseSearchRequest(
searchRequest.source(new SearchSourceBuilder());
}
searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));

var routingTags = request.param("routing_tags", null);
if (routingTags != null) {
searchRequest.routingTags(
Arrays.stream(Strings.splitStringByCommaToArray(routingTags)).map(RemoteClusterService.RemoteTag::fromString).toList()
);
} else {
log.info("No routing tags");
}

if (requestContentParser != null) {
if (searchUsageHolder == null) {
searchRequest.source().parseXContent(requestContentParser, true, clusterSupportsFeature);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Setting.Property.NodeScope
);

public static final Setting<Boolean> FLAT_WORLD_ENABLED = Setting.boolSetting("search.flat_world.enabled", false, Property.NodeScope);

private static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase").isEnabled();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ protected static Set<String> getEnabledRemoteClusters(final Settings settings) {
return RemoteConnectionStrategy.getRemoteClusters(settings);
}

protected static Map<String, List<RemoteClusterService.RemoteTag>> getEnabledRemoteClustersWithTags(final Settings settings) {
return RemoteConnectionStrategy.getRemoteTags(settings);
}

/**
* Check whether the index expression represents remote index or not.
* The index name is assumed to be individual index (no commas) but can contain `-`, wildcards,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -97,6 +98,33 @@ public final class RemoteClusterService extends RemoteClusterAware
(ns, key) -> boolSetting(key, true, new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope)
);

public record RemoteTag(String key, String value) {
public static RemoteTag fromString(String tag) {
if (tag == null || tag.isEmpty()) {
throw new IllegalArgumentException("Remote tag must not be null or empty");
}
// - as a separator to simplify search path param parsing; won't be like this in the real implementation
int idx = tag.indexOf('-');
if (idx < 0) {
return new RemoteTag(tag, "");
} else {
return new RemoteTag(tag.substring(0, idx), tag.substring(idx + 1));
}
}
}

public static final Setting.AffixSetting<List<RemoteTag>> REMOTE_CLUSTER_TAGS = Setting.affixKeySetting(
"cluster.remote.",
"tags",
(ns, key) -> Setting.listSetting(
key,
Collections.emptyList(),
RemoteTag::fromString,
Setting.Property.Dynamic,
Setting.Property.NodeScope
)
);

public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
"cluster.remote.",
"transport.ping_schedule",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ static Set<String> getRemoteClusters(Settings settings) {
return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet());
}

static Map<String, List<RemoteClusterService.RemoteTag>> getRemoteTags(Settings settings) {
return RemoteClusterService.REMOTE_CLUSTER_TAGS.getAsMap(settings);
}

public static boolean isConnectionEnabled(String clusterAlias, Settings settings) {
ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
if (mode.equals(ConnectionStrategy.SNIFF)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ interface AuthorizedIndices {
* Checks if an index-like resource name is authorized, for an action by a user. The resource might or might not exist.
*/
boolean check(String name, IndexComponentSelector selector);

// Does not belong here
default boolean checkRemote(String remoteAlias) {
return false;
}
}

/**
Expand Down
Loading