From 980723ef83556263ff2fe00fab0ae0d6af24e94a Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Tue, 24 Jun 2025 22:49:43 +0200 Subject: [PATCH 1/8] poc --- .../org/elasticsearch/FlatIndicesRequest.java | 18 +++ .../action/search/SearchRequest.java | 12 +- .../security/authz/AuthorizationEngine.java | 5 + .../core/security/SerializationDemoTests.java | 136 ++++++++++++++++++ .../authz/IndicesAndAliasesResolver.java | 53 ++++++- .../xpack/security/authz/RBACEngine.java | 14 +- 6 files changed, 235 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/FlatIndicesRequest.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/SerializationDemoTests.java diff --git a/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java b/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java new file mode 100644 index 0000000000000..8e9785d3f82fa --- /dev/null +++ b/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch; + +import org.elasticsearch.action.IndicesRequest; + +import java.util.List; + +public interface FlatIndicesRequest extends IndicesRequest { + void indices(List indices); +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index fda2df81d3f94..60b01bda53b28 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.search; +import org.elasticsearch.FlatIndicesRequest; import org.elasticsearch.TransportVersions; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; @@ -53,7 +54,11 @@ * @see Client#search(SearchRequest) * @see SearchResponse */ -public class SearchRequest extends LegacyActionRequest implements IndicesRequest.Replaceable, Rewriteable { +public class SearchRequest extends LegacyActionRequest + implements + FlatIndicesRequest, + IndicesRequest.Replaceable, + Rewriteable { public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); @@ -853,4 +858,9 @@ public String toString() { + source + '}'; } + + @Override + public void indices(List indices) { + indices(indices.toArray(Strings.EMPTY_ARRAY)); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java index 2c831645d0e69..e2ffcf7480381 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java @@ -299,6 +299,11 @@ interface AuthorizedIndices { * Checks if an index-like resource name is authorized, for an action by a user. The resource might or might not exist. */ boolean check(String name, IndexComponentSelector selector); + + // Does not belong here + default boolean checkProject(String projectId) { + return false; + } } /** diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/SerializationDemoTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/SerializationDemoTests.java new file mode 100644 index 0000000000000..caabd1d1ee0da --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/SerializationDemoTests.java @@ -0,0 +1,136 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.security; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentType; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.TransportVersions.PARTIAL_DATA_DEMO; +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class SerializationDemoTests extends ESTestCase { + + record SearchResult(boolean success, @Nullable List results, @Nullable List failures) + implements + Writeable, + // ToXContentFragment also exists + ToXContentObject { + + private static final ConstructingObjectParser PARSER = buildParser(); + + @SuppressWarnings("unchecked") + private static ConstructingObjectParser buildParser() { + final ConstructingObjectParser parser = new ConstructingObjectParser<>( + "search_result", + true, + a -> new SearchResult((boolean) a[0], (List) a[1], (List) a[2]) + ); + parser.declareBoolean(constructorArg(), new ParseField("success")); + parser.declareStringArray(optionalConstructorArg(), new ParseField("results")); + parser.declareStringArray(optionalConstructorArg(), new ParseField("failures")); + return parser; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(success); + out.writeOptionalCollection(results, StreamOutput::writeString); + // Elasticsearch supports rolling upgrades across 1 major version and within major versions. + // For example 7.17 needs to be able to communicate with 8.4 nodes, and 8.1 nodes need to be able to talk with 8.4 nodes. + // Serverless removed the notion of transport versions being tied cleanly to ES versions since we release to serverless + // every week and have rolling upgrades + if (out.getTransportVersion().onOrAfter(PARTIAL_DATA_DEMO)) { + out.writeOptionalCollection(failures, StreamOutput::writeString); + } + } + + SearchResult(StreamInput input) throws IOException { + this( + input.readBoolean(), + input.readOptionalCollectionAsList(StreamInput::readString), + input.getTransportVersion().onOrAfter(PARTIAL_DATA_DEMO) + ? input.readOptionalCollectionAsList(StreamInput::readString) + : List.of() + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + var xcb = builder.startObject().field("success", success); + if (results != null) { + xcb = xcb.field("results", results); + } + if (failures != null) { + xcb = xcb.field("failures", failures); + } + return xcb.endObject(); + } + + public SearchResult fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + } + + public void testRoundTripTransportSerialization() throws IOException { + var result = new SearchResult(true, List.of("hit1"), List.of()); + + try (var out = new BytesStreamOutput()) { + result.writeTo(out); + var received = new SearchResult(out.bytes().streamInput()); + + System.out.println("Original: " + result); + System.out.println("Received: " + received); + } + } + + public void testToXContent() { + var result = new SearchResult(true, List.of("hit1", "hit2"), List.of("failure1")); + + try (var builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + result.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.flush(); + String json = Strings.toString(builder); + System.out.println("JSON Output: " + json); + // test from XContent + try ( + var parser = XContentType.JSON.xContent() + .createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + new ByteArrayInputStream(json.getBytes()) + ) + ) { + var parsedResult = result.fromXContent(parser); + System.out.println("Parsed Result: " + parsedResult); + } + } catch (IOException e) { + fail("Failed to convert to XContent: " + e.getMessage()); + } + } + +} diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index ff39fd587dc3a..f3573d8fb6518 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -6,6 +6,9 @@ */ package org.elasticsearch.xpack.security.authz; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.FlatIndicesRequest; import org.elasticsearch.action.AliasesRequest; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; @@ -55,6 +58,8 @@ class IndicesAndAliasesResolver { + private static final Logger logger = LogManager.getLogger(IndicesAndAliasesResolver.class); + private final IndexNameExpressionResolver nameExpressionResolver; private final IndexAbstractionResolver indexAbstractionResolver; private final RemoteClusterResolver remoteClusterResolver; @@ -103,7 +108,6 @@ class IndicesAndAliasesResolver { * resolving wildcards. *

*/ - ResolvedIndices resolve( String action, TransportRequest request, @@ -124,9 +128,52 @@ ResolvedIndices resolve( if (request instanceof IndicesRequest == false) { throw new IllegalStateException("Request [" + request + "] is not an Indices request, but should be."); } + + if (request instanceof FlatIndicesRequest flatIndicesRequest) { + rewrite(flatIndicesRequest, authorizedIndices); + } + return resolveIndicesAndAliases(action, (IndicesRequest) request, projectMetadata, authorizedIndices); } + void rewrite(FlatIndicesRequest request, AuthorizationEngine.AuthorizedIndices authorizedIndices) { + var clusters = remoteClusterResolver.clusters(); + logger.info("Clusters available for remote indices: {}", clusters); + // no remotes, nothing to rewrite... + if (clusters.isEmpty()) { + logger.info("Skipping..."); + return; + } + + var indices = request.indices(); + // empty indices actually means search everything so would need to also rewrite that + + var authorizedClusters = new HashSet(); + for (var cluster : clusters) { + if (authorizedIndices.checkProject(cluster)) { + logger.info("Remote cluster [{}] authorized", cluster); + authorizedClusters.add(cluster); + } + } + + // TODO do not rewrite twice + List rewrittenIndices = new ArrayList<>(indices.length); + ResolvedIndices resolved = remoteClusterResolver.splitLocalAndRemoteIndexNames(indices); + for (var local : resolved.getLocal()) { + String rewritten = local; + for (var cluster : authorizedClusters) { + rewritten += "," + RemoteClusterAware.buildRemoteIndexName(cluster, local); + rewrittenIndices.add(rewritten); + } + logger.info("Rewrote [{}] to [{}]", local, rewritten); + } + if (resolved.getRemote().isEmpty() == false) { + rewrittenIndices.addAll(resolved.getRemote()); + } + request.indices(rewrittenIndices); + // skipping mixed expressions, _local expressions and all that jazz + } + /** * Attempt to resolve requested indices without expanding any wildcards. * @return The {@link ResolvedIndices} or null if wildcard expansion must be performed. @@ -569,5 +616,9 @@ ResolvedIndices splitLocalAndRemoteIndexNames(String... indices) { .toList(); return new ResolvedIndices(local == null ? List.of() : local, remote); } + + Set clusters() { + return Collections.unmodifiableSet(clusters); + } } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 1b99bd6888c4f..2ef0319e4729a 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -998,6 +998,9 @@ static AuthorizedIndices resolveAuthorizedIndicesFromRole( } // we don't support granting access to a backing index with a failure selector via the parent data stream } return predicate.test(indexAbstraction, selector); + }, name -> { + // just some bogus predicate that lets us differentiate between roles + return Arrays.asList(role.names()).contains("remote_searcher"); }); } @@ -1125,15 +1128,18 @@ static final class AuthorizedIndices implements AuthorizationEngine.AuthorizedIn private final CachedSupplier> authorizedAndAvailableDataResources; private final CachedSupplier> authorizedAndAvailableFailuresResources; private final BiPredicate isAuthorizedPredicate; + private final Predicate projectPredicate; AuthorizedIndices( Supplier> authorizedAndAvailableDataResources, Supplier> authorizedAndAvailableFailuresResources, - BiPredicate isAuthorizedPredicate + BiPredicate isAuthorizedPredicate, + Predicate projectPredicate ) { this.authorizedAndAvailableDataResources = CachedSupplier.wrap(authorizedAndAvailableDataResources); this.authorizedAndAvailableFailuresResources = CachedSupplier.wrap(authorizedAndAvailableFailuresResources); this.isAuthorizedPredicate = Objects.requireNonNull(isAuthorizedPredicate); + this.projectPredicate = projectPredicate; } @Override @@ -1149,5 +1155,11 @@ public boolean check(String name, IndexComponentSelector selector) { Objects.requireNonNull(selector, "must specify a selector for authorization check"); return isAuthorizedPredicate.test(name, selector); } + + @Override + public boolean checkProject(String name) { + Objects.requireNonNull(name, "must specify a project name for authorization check"); + return projectPredicate.test(name); + } } } From 4ff5e6eb8cbacedda35dc0a916a604624b823437 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Tue, 24 Jun 2025 22:50:15 +0200 Subject: [PATCH 2/8] poc --- .../core/security/SerializationDemoTests.java | 136 ------------------ 1 file changed, 136 deletions(-) delete mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/SerializationDemoTests.java diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/SerializationDemoTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/SerializationDemoTests.java deleted file mode 100644 index caabd1d1ee0da..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/SerializationDemoTests.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.core.security; - -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xcontent.ConstructingObjectParser; -import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xcontent.ParseField; -import org.elasticsearch.xcontent.ToXContent; -import org.elasticsearch.xcontent.ToXContentObject; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xcontent.XContentType; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.List; - -import static org.elasticsearch.TransportVersions.PARTIAL_DATA_DEMO; -import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; -import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; - -public class SerializationDemoTests extends ESTestCase { - - record SearchResult(boolean success, @Nullable List results, @Nullable List failures) - implements - Writeable, - // ToXContentFragment also exists - ToXContentObject { - - private static final ConstructingObjectParser PARSER = buildParser(); - - @SuppressWarnings("unchecked") - private static ConstructingObjectParser buildParser() { - final ConstructingObjectParser parser = new ConstructingObjectParser<>( - "search_result", - true, - a -> new SearchResult((boolean) a[0], (List) a[1], (List) a[2]) - ); - parser.declareBoolean(constructorArg(), new ParseField("success")); - parser.declareStringArray(optionalConstructorArg(), new ParseField("results")); - parser.declareStringArray(optionalConstructorArg(), new ParseField("failures")); - return parser; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeBoolean(success); - out.writeOptionalCollection(results, StreamOutput::writeString); - // Elasticsearch supports rolling upgrades across 1 major version and within major versions. - // For example 7.17 needs to be able to communicate with 8.4 nodes, and 8.1 nodes need to be able to talk with 8.4 nodes. - // Serverless removed the notion of transport versions being tied cleanly to ES versions since we release to serverless - // every week and have rolling upgrades - if (out.getTransportVersion().onOrAfter(PARTIAL_DATA_DEMO)) { - out.writeOptionalCollection(failures, StreamOutput::writeString); - } - } - - SearchResult(StreamInput input) throws IOException { - this( - input.readBoolean(), - input.readOptionalCollectionAsList(StreamInput::readString), - input.getTransportVersion().onOrAfter(PARTIAL_DATA_DEMO) - ? input.readOptionalCollectionAsList(StreamInput::readString) - : List.of() - ); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - var xcb = builder.startObject().field("success", success); - if (results != null) { - xcb = xcb.field("results", results); - } - if (failures != null) { - xcb = xcb.field("failures", failures); - } - return xcb.endObject(); - } - - public SearchResult fromXContent(XContentParser parser) { - return PARSER.apply(parser, null); - } - - } - - public void testRoundTripTransportSerialization() throws IOException { - var result = new SearchResult(true, List.of("hit1"), List.of()); - - try (var out = new BytesStreamOutput()) { - result.writeTo(out); - var received = new SearchResult(out.bytes().streamInput()); - - System.out.println("Original: " + result); - System.out.println("Received: " + received); - } - } - - public void testToXContent() { - var result = new SearchResult(true, List.of("hit1", "hit2"), List.of("failure1")); - - try (var builder = XContentBuilder.builder(XContentType.JSON.xContent())) { - result.toXContent(builder, ToXContent.EMPTY_PARAMS); - builder.flush(); - String json = Strings.toString(builder); - System.out.println("JSON Output: " + json); - // test from XContent - try ( - var parser = XContentType.JSON.xContent() - .createParser( - NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, - new ByteArrayInputStream(json.getBytes()) - ) - ) { - var parsedResult = result.fromXContent(parser); - System.out.println("Parsed Result: " + parsedResult); - } - } catch (IOException e) { - fail("Failed to convert to XContent: " + e.getMessage()); - } - } - -} From 3df7a1df89671fa0df007c889e5dfbba9922b547 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Thu, 26 Jun 2025 13:45:37 +0200 Subject: [PATCH 3/8] Fix ups --- .../java/org/elasticsearch/FlatIndicesRequest.java | 6 +++++- .../elasticsearch/action/search/SearchRequest.java | 14 ++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java b/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java index 8e9785d3f82fa..2d4594c05c6f1 100644 --- a/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java +++ b/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java @@ -14,5 +14,9 @@ import java.util.List; public interface FlatIndicesRequest extends IndicesRequest { - void indices(List indices); + boolean requiresRewrite(); + + void indexExpressions(List indexExpressions); + + record IndexExpression(String original, List rewritten) {} } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 60b01bda53b28..cd5c0ad466bc7 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -75,6 +75,9 @@ public class SearchRequest extends LegacyActionRequest private String[] indices = Strings.EMPTY_ARRAY; + @Nullable + private List indexExpressions; + @Nullable private String routing; @Nullable @@ -860,7 +863,14 @@ public String toString() { } @Override - public void indices(List indices) { - indices(indices.toArray(Strings.EMPTY_ARRAY)); + public boolean requiresRewrite() { + return indexExpressions == null; + } + + @Override + public void indexExpressions(List indexExpressions) { + assert requiresRewrite(); + this.indexExpressions = indexExpressions; + indices(indexExpressions.stream().flatMap(indexExpression -> indexExpression.rewritten().stream()).toArray(String[]::new)); } } From c21a0a9d89845009cd722ee04e00be3a77956c29 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Thu, 26 Jun 2025 13:57:33 +0200 Subject: [PATCH 4/8] The missing commit --- .../authz/IndicesAndAliasesResolver.java | 34 ++++++++++++------- .../xpack/security/authz/RBACEngine.java | 5 +-- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index f3573d8fb6518..e895c8be3ce69 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -129,7 +129,7 @@ ResolvedIndices resolve( throw new IllegalStateException("Request [" + request + "] is not an Indices request, but should be."); } - if (request instanceof FlatIndicesRequest flatIndicesRequest) { + if (request instanceof FlatIndicesRequest flatIndicesRequest && flatIndicesRequest.requiresRewrite()) { rewrite(flatIndicesRequest, authorizedIndices); } @@ -137,11 +137,13 @@ ResolvedIndices resolve( } void rewrite(FlatIndicesRequest request, AuthorizationEngine.AuthorizedIndices authorizedIndices) { + assert request.requiresRewrite(); + var clusters = remoteClusterResolver.clusters(); logger.info("Clusters available for remote indices: {}", clusters); // no remotes, nothing to rewrite... if (clusters.isEmpty()) { - logger.info("Skipping..."); + logger.info("No remote clusters linked, skipping..."); return; } @@ -156,22 +158,30 @@ void rewrite(FlatIndicesRequest request, AuthorizationEngine.AuthorizedIndices a } } - // TODO do not rewrite twice - List rewrittenIndices = new ArrayList<>(indices.length); + if (authorizedClusters.isEmpty()) { + logger.info("No remote clusters authorized, skipping..."); + return; + } + ResolvedIndices resolved = remoteClusterResolver.splitLocalAndRemoteIndexNames(indices); + // skip handling searches where there's both qualified and flat expressions to simplify POC + // in the real thing, we'd also rewrite these + if (resolved.getRemote().isEmpty() == false) { + return; + } + + List indexExpressions = new ArrayList<>(indices.length); for (var local : resolved.getLocal()) { - String rewritten = local; + List rewritten = new ArrayList<>(); + rewritten.add(local); for (var cluster : authorizedClusters) { - rewritten += "," + RemoteClusterAware.buildRemoteIndexName(cluster, local); - rewrittenIndices.add(rewritten); + rewritten.add(RemoteClusterAware.buildRemoteIndexName(cluster, local)); + indexExpressions.add(new FlatIndicesRequest.IndexExpression(local, rewritten)); } logger.info("Rewrote [{}] to [{}]", local, rewritten); } - if (resolved.getRemote().isEmpty() == false) { - rewrittenIndices.addAll(resolved.getRemote()); - } - request.indices(rewrittenIndices); - // skipping mixed expressions, _local expressions and all that jazz + + request.indexExpressions(indexExpressions); } /** diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 2ef0319e4729a..9c6b8c26ae312 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -999,8 +999,9 @@ static AuthorizedIndices resolveAuthorizedIndicesFromRole( } return predicate.test(indexAbstraction, selector); }, name -> { - // just some bogus predicate that lets us differentiate between roles - return Arrays.asList(role.names()).contains("remote_searcher"); + // just some bogus predicate that lets us differentiate between roles, not at all + // how this will work in the end + return Arrays.asList(role.names()).contains("_es_test_root"); }); } From 9d63d775476edcfde976118bb3bacc0a32f8a045 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Thu, 10 Jul 2025 11:07:21 +0200 Subject: [PATCH 5/8] More --- .../xpack/core/security/authz/AuthorizationEngine.java | 2 +- .../xpack/security/authz/IndicesAndAliasesResolver.java | 2 +- .../org/elasticsearch/xpack/security/authz/RBACEngine.java | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java index e2ffcf7480381..ef7446ae22ae5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java @@ -301,7 +301,7 @@ interface AuthorizedIndices { boolean check(String name, IndexComponentSelector selector); // Does not belong here - default boolean checkProject(String projectId) { + default boolean checkRemote(String remoteAlias) { return false; } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index e895c8be3ce69..5db5b5c2b8cf4 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -152,7 +152,7 @@ void rewrite(FlatIndicesRequest request, AuthorizationEngine.AuthorizedIndices a var authorizedClusters = new HashSet(); for (var cluster : clusters) { - if (authorizedIndices.checkProject(cluster)) { + if (authorizedIndices.checkRemote(cluster)) { logger.info("Remote cluster [{}] authorized", cluster); authorizedClusters.add(cluster); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 9c6b8c26ae312..2e9a20d994e77 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -1158,9 +1158,9 @@ public boolean check(String name, IndexComponentSelector selector) { } @Override - public boolean checkProject(String name) { - Objects.requireNonNull(name, "must specify a project name for authorization check"); - return projectPredicate.test(name); + public boolean checkRemote(String remoteAlias) { + Objects.requireNonNull(remoteAlias, "must specify a project name for authorization check"); + return projectPredicate.test(remoteAlias); } } } From 8eec08ea1ee2bfb03545dc8977e8ba7254053d16 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Thu, 10 Jul 2025 12:55:15 +0200 Subject: [PATCH 6/8] Moar --- .../main/groovy/elasticsearch.run-ccs.gradle | 1 + .../org/elasticsearch/FlatIndicesRequest.java | 3 ++ .../action/search/SearchRequest.java | 22 +++++++++++++++ .../common/settings/ClusterSettings.java | 2 ++ .../rest/action/search/RestSearchAction.java | 15 ++++++++++ .../elasticsearch/search/SearchService.java | 2 ++ .../transport/RemoteClusterAware.java | 4 +++ .../transport/RemoteClusterService.java | 28 +++++++++++++++++++ .../transport/RemoteConnectionStrategy.java | 4 +++ 9 files changed, 81 insertions(+) diff --git a/build-tools-internal/src/main/groovy/elasticsearch.run-ccs.gradle b/build-tools-internal/src/main/groovy/elasticsearch.run-ccs.gradle index 587c97d3476ea..07abe9a8e7633 100644 --- a/build-tools-internal/src/main/groovy/elasticsearch.run-ccs.gradle +++ b/build-tools-internal/src/main/groovy/elasticsearch.run-ccs.gradle @@ -48,6 +48,7 @@ tasks.register("run-ccs", RunTask) { useCluster queryingCluster doFirst { queryingCluster.get().getNodes().each { node -> + node.setting('cluster.remote.my_remote_cluster.tags', 'env-dev') if (proxyMode) { node.setting('cluster.remote.my_remote_cluster.mode', 'proxy') if (basicSecurityMode) { diff --git a/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java b/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java index 2d4594c05c6f1..b4ce4dbceaffc 100644 --- a/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java +++ b/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java @@ -10,6 +10,7 @@ package org.elasticsearch; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.transport.RemoteClusterService; import java.util.List; @@ -18,5 +19,7 @@ public interface FlatIndicesRequest extends IndicesRequest { void indexExpressions(List indexExpressions); + boolean checkRemote(String remote, List tags); + record IndexExpression(String original, List rewritten) {} } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index cd5c0ad466bc7..9a21d2a05d77b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -32,6 +32,7 @@ import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; @@ -74,6 +75,8 @@ public class SearchRequest extends LegacyActionRequest private SearchType searchType = SearchType.DEFAULT; private String[] indices = Strings.EMPTY_ARRAY; + // This will be a more complex thing in the real implementation -- a lucene expression instead of just a list of literals + private List routingTags = List.of(); @Nullable private List indexExpressions; @@ -408,6 +411,11 @@ public SearchRequest indices(String... indices) { return this; } + public SearchRequest routingTags(List routingTags) { + this.routingTags = routingTags; + return this; + } + private static void validateIndices(String... indices) { Objects.requireNonNull(indices, "indices must not be null"); for (String index : indices) { @@ -873,4 +881,18 @@ public void indexExpressions(List indexExpressions) { this.indexExpressions = indexExpressions; indices(indexExpressions.stream().flatMap(indexExpression -> indexExpression.rewritten().stream()).toArray(String[]::new)); } + + @Override + public boolean checkRemote(String remote, List tags) { + if (routingTags.isEmpty()) { + return true; // no routing requested, so no constraints + } + // if any tag in routingTags matches one in tags, return true + for (RemoteClusterService.RemoteTag tag : routingTags) { + if (tags.contains(tag)) { + return true; + } + } + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 1fbc8993cc5aa..f21f1231c76d0 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -367,6 +367,7 @@ public void apply(Settings value, Settings current, Settings previous) { TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, + RemoteClusterService.REMOTE_CLUSTER_TAGS, SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, RemoteClusterService.REMOTE_NODE_ATTRIBUTE, @@ -483,6 +484,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.ALLOW_EXPENSIVE_QUERIES, SearchService.CCS_VERSION_CHECK_SETTING, SearchService.CCS_COLLECT_TELEMETRY, + SearchService.FLAT_WORLD_ENABLED, SearchService.BATCHED_QUERY_PHASE, SearchService.PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE, MultiBucketConsumerService.MAX_BUCKET_SETTING, diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 5eda47bc32354..ee67432e6050f 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -9,6 +9,8 @@ package org.elasticsearch.rest.action.search; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.search.SearchRequest; @@ -35,6 +37,7 @@ import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.term.TermSuggestionBuilder; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.usage.SearchUsageHolder; import org.elasticsearch.xcontent.XContentParser; @@ -63,6 +66,7 @@ public class RestSearchAction extends BaseRestHandler { public static final String TYPED_KEYS_PARAM = "typed_keys"; public static final String INCLUDE_NAMED_QUERIES_SCORE_PARAM = "include_named_queries_score"; public static final Set RESPONSE_PARAMS = Set.of(TYPED_KEYS_PARAM, TOTAL_HITS_AS_INT_PARAM, INCLUDE_NAMED_QUERIES_SCORE_PARAM); + private static final Logger log = LogManager.getLogger(RestSearchAction.class); private final SearchUsageHolder searchUsageHolder; private final Predicate clusterSupportsFeature; @@ -98,6 +102,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC client.threadPool().getThreadContext().setErrorTraceTransportHeader(request); } SearchRequest searchRequest = new SearchRequest(); + // access the BwC param, but just drop it // this might be set by old clients request.param("min_compatible_shard_node"); @@ -167,6 +172,16 @@ public static void parseSearchRequest( searchRequest.source(new SearchSourceBuilder()); } searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index"))); + + var routingTags = request.param("routing_tags", null); + if (routingTags != null) { + searchRequest.routingTags( + Arrays.stream(Strings.splitStringByCommaToArray(routingTags)).map(RemoteClusterService.RemoteTag::fromString).toList() + ); + } else { + log.info("No routing tags"); + } + if (requestContentParser != null) { if (searchUsageHolder == null) { searchRequest.source().parseXContent(requestContentParser, true, clusterSupportsFeature); diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index af568c7b5d2cb..ddd39676b35cd 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -297,6 +297,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Setting.Property.NodeScope ); + public static final Setting FLAT_WORLD_ENABLED = Setting.boolSetting("search.flat_world.enabled", false, Property.NodeScope); + private static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase").isEnabled(); /** diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 95e507f70d7a9..763faed028b10 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -56,6 +56,10 @@ protected static Set getEnabledRemoteClusters(final Settings settings) { return RemoteConnectionStrategy.getRemoteClusters(settings); } + protected static Map> getEnabledRemoteClustersWithTags(final Settings settings) { + return RemoteConnectionStrategy.getRemoteTags(settings); + } + /** * Check whether the index expression represents remote index or not. * The index name is assumed to be individual index (no commas) but can contain `-`, wildcards, diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index fdb597b47c137..0ac19dff6971c 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -97,6 +98,33 @@ public final class RemoteClusterService extends RemoteClusterAware (ns, key) -> boolSetting(key, true, new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope) ); + public record RemoteTag(String key, String value) { + public static RemoteTag fromString(String tag) { + if (tag == null || tag.isEmpty()) { + throw new IllegalArgumentException("Remote tag must not be null or empty"); + } + // - as a separator to simplify search path param parsing; won't be like this in the real implementation + int idx = tag.indexOf('-'); + if (idx < 0) { + return new RemoteTag(tag, ""); + } else { + return new RemoteTag(tag.substring(0, idx), tag.substring(idx + 1)); + } + } + } + + public static final Setting.AffixSetting> REMOTE_CLUSTER_TAGS = Setting.affixKeySetting( + "cluster.remote.", + "tags", + (ns, key) -> Setting.listSetting( + key, + Collections.emptyList(), + RemoteTag::fromString, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ) + ); + public static final Setting.AffixSetting REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting( "cluster.remote.", "transport.ping_schedule", diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index a715797b97977..5b1925b0a20fa 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -189,6 +189,10 @@ static Set getRemoteClusters(Settings settings) { return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet()); } + static Map> getRemoteTags(Settings settings) { + return RemoteClusterService.REMOTE_CLUSTER_TAGS.getAsMap(settings); + } + public static boolean isConnectionEnabled(String clusterAlias, Settings settings) { ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings); if (mode.equals(ConnectionStrategy.SNIFF)) { From fe6b6967e1584f79c7f23f5d45207167512fedaa Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Thu, 10 Jul 2025 12:55:40 +0200 Subject: [PATCH 7/8] Also resolver --- .../authz/IndicesAndAliasesResolver.java | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index 5db5b5c2b8cf4..7b49a2dd4a0a9 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -34,8 +34,10 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.search.SearchService; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.RemoteConnectionStrategy; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine; @@ -63,11 +65,13 @@ class IndicesAndAliasesResolver { private final IndexNameExpressionResolver nameExpressionResolver; private final IndexAbstractionResolver indexAbstractionResolver; private final RemoteClusterResolver remoteClusterResolver; + private final boolean flatWorldEnabled; IndicesAndAliasesResolver(Settings settings, ClusterService clusterService, IndexNameExpressionResolver resolver) { this.nameExpressionResolver = resolver; this.indexAbstractionResolver = new IndexAbstractionResolver(resolver); this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings()); + this.flatWorldEnabled = SearchService.FLAT_WORLD_ENABLED.get(settings); } /** @@ -129,37 +133,42 @@ ResolvedIndices resolve( throw new IllegalStateException("Request [" + request + "] is not an Indices request, but should be."); } - if (request instanceof FlatIndicesRequest flatIndicesRequest && flatIndicesRequest.requiresRewrite()) { - rewrite(flatIndicesRequest, authorizedIndices); + if (flatWorldEnabled && request instanceof FlatIndicesRequest flatIndicesRequest && flatIndicesRequest.requiresRewrite()) { + rewriteFlatIndexExpression(flatIndicesRequest, authorizedIndices); } return resolveIndicesAndAliases(action, (IndicesRequest) request, projectMetadata, authorizedIndices); } - void rewrite(FlatIndicesRequest request, AuthorizationEngine.AuthorizedIndices authorizedIndices) { - assert request.requiresRewrite(); + void rewriteFlatIndexExpression(FlatIndicesRequest request, AuthorizationEngine.AuthorizedIndices authorizedIndices) { + assert flatWorldEnabled && request.requiresRewrite(); - var clusters = remoteClusterResolver.clusters(); - logger.info("Clusters available for remote indices: {}", clusters); + Set remotes = remoteClusterResolver.clusters(); + Map> tags = remoteClusterResolver.tags(); + + logger.info("Remote available: {} with tags {}", remotes, tags); // no remotes, nothing to rewrite... - if (clusters.isEmpty()) { - logger.info("No remote clusters linked, skipping..."); + if (remotes.isEmpty()) { + logger.info("No remotes, skipping..."); return; } var indices = request.indices(); // empty indices actually means search everything so would need to also rewrite that - var authorizedClusters = new HashSet(); - for (var cluster : clusters) { - if (authorizedIndices.checkRemote(cluster)) { - logger.info("Remote cluster [{}] authorized", cluster); - authorizedClusters.add(cluster); + var targetRemotes = new HashSet(); + for (var remote : remotes) { + List tagsForRemote = tags.get(remote); + logger.info("Remote [{}] has tags [{}]", remote, tagsForRemote); + // TODO routing also needs to apply to local + if (authorizedIndices.checkRemote(remote) && request.checkRemote(remote, tagsForRemote)) { + logger.info("Remote [{}] authorized and matches routing", remote); + targetRemotes.add(remote); } } - if (authorizedClusters.isEmpty()) { - logger.info("No remote clusters authorized, skipping..."); + if (targetRemotes.isEmpty()) { + logger.info("No target remotes, skipping..."); return; } @@ -174,7 +183,7 @@ void rewrite(FlatIndicesRequest request, AuthorizationEngine.AuthorizedIndices a for (var local : resolved.getLocal()) { List rewritten = new ArrayList<>(); rewritten.add(local); - for (var cluster : authorizedClusters) { + for (var cluster : targetRemotes) { rewritten.add(RemoteClusterAware.buildRemoteIndexName(cluster, local)); indexExpressions.add(new FlatIndicesRequest.IndexExpression(local, rewritten)); } @@ -601,10 +610,13 @@ private static List indicesList(String[] list) { private static class RemoteClusterResolver extends RemoteClusterAware { private final CopyOnWriteArraySet clusters; + // TODO consolidate + private final Map> tags; private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { super(settings); clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings)); + tags = RemoteClusterService.getEnabledRemoteClustersWithTags(settings); listenForUpdates(clusterSettings); } @@ -630,5 +642,9 @@ ResolvedIndices splitLocalAndRemoteIndexNames(String... indices) { Set clusters() { return Collections.unmodifiableSet(clusters); } + + Map> tags() { + return Collections.unmodifiableMap(tags); + } } } From 8dee748c1dbe3b7ad451266220387c92aae917c7 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Thu, 10 Jul 2025 13:44:01 +0200 Subject: [PATCH 8/8] field caps --- .../org/elasticsearch/FlatIndicesRequest.java | 2 +- .../fieldcaps/FieldCapabilitiesRequest.java | 29 ++++++++++++++++++- .../action/search/SearchRequest.java | 2 +- .../authz/IndicesAndAliasesResolver.java | 2 +- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java b/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java index b4ce4dbceaffc..a3ca1528bd7e1 100644 --- a/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java +++ b/server/src/main/java/org/elasticsearch/FlatIndicesRequest.java @@ -19,7 +19,7 @@ public interface FlatIndicesRequest extends IndicesRequest { void indexExpressions(List indexExpressions); - boolean checkRemote(String remote, List tags); + boolean checkRemote(List tags); record IndexExpression(String original, List rewritten) {} } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java index 2e24858d9781f..ec04bb9bd67c5 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.fieldcaps; +import org.elasticsearch.FlatIndicesRequest; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; @@ -18,11 +19,13 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -30,11 +33,16 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -public final class FieldCapabilitiesRequest extends LegacyActionRequest implements IndicesRequest.Replaceable, ToXContentObject { +public final class FieldCapabilitiesRequest extends LegacyActionRequest + implements + FlatIndicesRequest, + IndicesRequest.Replaceable, + ToXContentObject { public static final String NAME = "field_caps_request"; public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed(); @@ -52,6 +60,8 @@ public final class FieldCapabilitiesRequest extends LegacyActionRequest implemen private QueryBuilder indexFilter; private Map runtimeFields = Collections.emptyMap(); private Long nowInMillis; + @Nullable + private List indexExpressions; public FieldCapabilitiesRequest(StreamInput in) throws IOException { super(in); @@ -323,4 +333,21 @@ public String getDescription() { } }; } + + @Override + public boolean requiresRewrite() { + return indexExpressions == null; + } + + @Override + public void indexExpressions(List indexExpressions) { + assert requiresRewrite(); + this.indexExpressions = indexExpressions; + indices(indexExpressions.stream().flatMap(indexExpression -> indexExpression.rewritten().stream()).toArray(String[]::new)); + } + + @Override + public boolean checkRemote(List tags) { + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 9a21d2a05d77b..7d890cd71140e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -883,7 +883,7 @@ public void indexExpressions(List indexExpressions) { } @Override - public boolean checkRemote(String remote, List tags) { + public boolean checkRemote(List tags) { if (routingTags.isEmpty()) { return true; // no routing requested, so no constraints } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index 7b49a2dd4a0a9..6057010c641cf 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -161,7 +161,7 @@ void rewriteFlatIndexExpression(FlatIndicesRequest request, AuthorizationEngine. List tagsForRemote = tags.get(remote); logger.info("Remote [{}] has tags [{}]", remote, tagsForRemote); // TODO routing also needs to apply to local - if (authorizedIndices.checkRemote(remote) && request.checkRemote(remote, tagsForRemote)) { + if (authorizedIndices.checkRemote(remote) && request.checkRemote(tagsForRemote)) { logger.info("Remote [{}] authorized and matches routing", remote); targetRemotes.add(remote); }