Skip to content

Commit ed33fdd

Browse files
authored
Adjust interception of requests for specific shard IDs (#101656)
Some index requests target shard IDs specifically, which may not match the indices that the request targets as given by `IndicesRequest#indices()`, which requires a different interception strategy in order to make sure those requests are handled correctly in all cases and that any malformed messages are caught early to aid in troubleshooting. This PR adds and interface allowing requests to report the shard IDs they target as well as the index names, and adjusts the interception of those requests as appropriate to handle those shard IDs in the cases where they are relevant.
1 parent 75e67f0 commit ed33fdd

File tree

11 files changed

+403
-5
lines changed

11 files changed

+403
-5
lines changed

docs/changelog/101656.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 101656
2+
summary: Adjust interception of requests for specific shard IDs
3+
area: Authorization
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/IndicesRequest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
package org.elasticsearch.action;
1010

1111
import org.elasticsearch.action.support.IndicesOptions;
12+
import org.elasticsearch.index.shard.ShardId;
13+
14+
import java.util.Collection;
1215

1316
/**
1417
* Needs to be implemented by all {@link org.elasticsearch.action.ActionRequest} subclasses that relate to
@@ -72,4 +75,19 @@ default boolean allowsRemoteIndices() {
7275
return true;
7376
}
7477
}
78+
79+
/**
80+
* This subtype of request is for requests which may travel to remote clusters. These requests may need to provide additional
81+
* information to the system on top of the indices the action relates to in order to be handled correctly in all cases.
82+
*/
83+
interface RemoteClusterShardRequest extends IndicesRequest {
84+
/**
85+
* Returns the shards this action is targeting directly, which may not obviously align with the indices returned by
86+
* {@code indices()}. This is mostly used by requests which fan out to a number of shards for the those fan-out requests.
87+
*
88+
* A default is intentionally not provided for this method. It is critical that this method be implemented correctly for all
89+
* remote cluster requests,
90+
*/
91+
Collection<ShardId> shards();
92+
}
7593
}

server/src/main/java/org/elasticsearch/action/get/GetRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class GetRequest extends SingleShardRequest<GetRequest> implements Realti
6464

6565
public GetRequest() {}
6666

67-
GetRequest(StreamInput in) throws IOException {
67+
public GetRequest(StreamInput in) throws IOException {
6868
super(in);
6969
if (in.getTransportVersion().before(TransportVersions.V_8_0_0)) {
7070
in.readString();

server/src/main/java/org/elasticsearch/action/get/GetResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class GetResponse extends ActionResponse implements Iterable<DocumentFiel
3434

3535
GetResult getResult;
3636

37-
GetResponse(StreamInput in) throws IOException {
37+
public GetResponse(StreamInput in) throws IOException {
3838
super(in);
3939
getResult = new GetResult(in);
4040
}

server/src/main/java/org/elasticsearch/action/support/single/shard/SingleShardRequest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919
import org.elasticsearch.index.shard.ShardId;
2020

2121
import java.io.IOException;
22+
import java.util.Collections;
23+
import java.util.List;
2224

23-
public abstract class SingleShardRequest<Request extends SingleShardRequest<Request>> extends ActionRequest implements IndicesRequest {
25+
public abstract class SingleShardRequest<Request extends SingleShardRequest<Request>> extends ActionRequest
26+
implements
27+
IndicesRequest.RemoteClusterShardRequest {
2428

2529
public static final IndicesOptions INDICES_OPTIONS = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
2630

@@ -85,6 +89,11 @@ public String[] indices() {
8589
return new String[] { index };
8690
}
8791

92+
@Override
93+
public List<ShardId> shards() {
94+
return Collections.singletonList(this.internalShardId);
95+
}
96+
8897
@Override
8998
public IndicesOptions indicesOptions() {
9099
return INDICES_OPTIONS;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ public DownsampleConfig config() {
9898
return config;
9999
}
100100

101+
public ShardId shardId() {
102+
return shardId;
103+
}
104+
101105
public long getTotalShardDocCount() {
102106
return totalShardDocCount;
103107
}

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
import java.util.Arrays;
5151
import java.util.Collection;
52+
import java.util.Collections;
5253
import java.util.Map;
5354
import java.util.Objects;
5455
import java.util.concurrent.Executor;
@@ -260,7 +261,7 @@ private DelegatingAction() {
260261
super(NAME);
261262
}
262263

263-
public static class Request extends ActionRequest implements IndicesRequest {
264+
public static class Request extends ActionRequest implements IndicesRequest.RemoteClusterShardRequest {
264265

265266
private final DownsampleShardTask task;
266267
private final BytesRef lastDownsampleTsid;
@@ -291,6 +292,11 @@ public IndicesOptions indicesOptions() {
291292
public void writeTo(StreamOutput out) {
292293
throw new IllegalStateException("request should stay local");
293294
}
295+
296+
@Override
297+
public Collection<ShardId> shards() {
298+
return Collections.singletonList(task.shardId());
299+
}
294300
}
295301

296302
public static class TA extends TransportAction<Request, ActionResponse.Empty> {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
apply plugin: 'elasticsearch.standalone-test'
2+
3+
dependencies {
4+
5+
testImplementation(testArtifact(project(xpackModule('core'))))
6+
testImplementation project(path: ':modules:ingest-common')
7+
testImplementation project(path: ':modules:data-streams')
8+
testImplementation project(path: ':modules:lang-mustache')
9+
testImplementation project(path: ':modules:rank-eval')
10+
testImplementation project(path: ':modules:reindex')
11+
testImplementation project(path: xpackModule('analytics'))
12+
testImplementation project(path: xpackModule('async-search'))
13+
testImplementation project(path: xpackModule('autoscaling'))
14+
testImplementation project(path: xpackModule('ccr'))
15+
testImplementation project(path: xpackModule('downsample'))
16+
testImplementation project(path: xpackModule('eql'))
17+
testImplementation project(path: xpackModule('esql'))
18+
testImplementation project(path: xpackModule('frozen-indices'))
19+
testImplementation project(path: xpackModule('graph'))
20+
testImplementation project(path: xpackModule('ilm'))
21+
testImplementation project(path: xpackModule('inference'))
22+
testImplementation project(path: xpackModule('profiling'))
23+
testImplementation project(path: xpackModule('rollup'))
24+
testImplementation project(path: xpackModule('slm'))
25+
testImplementation project(path: xpackModule('sql'))
26+
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.security;
9+
10+
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
11+
import org.elasticsearch.action.search.TransportSearchShardsAction;
12+
import org.elasticsearch.action.support.TransportAction;
13+
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
14+
import org.elasticsearch.common.inject.Binding;
15+
import org.elasticsearch.common.inject.TypeLiteral;
16+
import org.elasticsearch.datastreams.DataStreamsPlugin;
17+
import org.elasticsearch.index.rankeval.RankEvalPlugin;
18+
import org.elasticsearch.ingest.IngestTestPlugin;
19+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
20+
import org.elasticsearch.node.Node;
21+
import org.elasticsearch.plugins.Plugin;
22+
import org.elasticsearch.reindex.ReindexPlugin;
23+
import org.elasticsearch.script.mustache.MustachePlugin;
24+
import org.elasticsearch.test.ESSingleNodeTestCase;
25+
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
26+
import org.elasticsearch.xpack.autoscaling.Autoscaling;
27+
import org.elasticsearch.xpack.ccr.Ccr;
28+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
29+
import org.elasticsearch.xpack.core.security.action.apikey.CrossClusterApiKeyRoleDescriptorBuilder;
30+
import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege;
31+
import org.elasticsearch.xpack.downsample.Downsample;
32+
import org.elasticsearch.xpack.downsample.DownsampleShardPersistentTaskExecutor;
33+
import org.elasticsearch.xpack.eql.plugin.EqlPlugin;
34+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
35+
import org.elasticsearch.xpack.frozen.FrozenIndices;
36+
import org.elasticsearch.xpack.graph.Graph;
37+
import org.elasticsearch.xpack.ilm.IndexLifecycle;
38+
import org.elasticsearch.xpack.inference.InferencePlugin;
39+
import org.elasticsearch.xpack.profiling.ProfilingPlugin;
40+
import org.elasticsearch.xpack.rollup.Rollup;
41+
import org.elasticsearch.xpack.search.AsyncSearch;
42+
import org.elasticsearch.xpack.slm.SnapshotLifecycle;
43+
import org.elasticsearch.xpack.sql.plugin.SqlPlugin;
44+
45+
import java.util.ArrayList;
46+
import java.util.Collection;
47+
import java.util.HashSet;
48+
import java.util.List;
49+
import java.util.Locale;
50+
import java.util.Set;
51+
import java.util.function.Predicate;
52+
53+
public class CrossClusterShardTests extends ESSingleNodeTestCase {
54+
55+
Set<String> MANUALLY_CHECKED_SHARD_ACTIONS = Set.of(
56+
// The request types for these actions are all subtypes of SingleShardRequest, and have been evaluated to make sure their
57+
// `shards()` methods return the correct thing.
58+
TransportSearchShardsAction.NAME,
59+
60+
// These types have had the interface implemented manually.
61+
DownsampleShardPersistentTaskExecutor.DelegatingAction.NAME,
62+
63+
// These actions do not have any references to shard IDs in their requests.
64+
ClusterSearchShardsAction.NAME
65+
);
66+
67+
Set<Class<?>> CHECKED_ABSTRACT_CLASSES = Set.of(
68+
// This abstract class implements the interface so we can assume all of its subtypes do so properly as well.
69+
TransportSingleShardAction.class
70+
);
71+
72+
@Override
73+
protected Collection<Class<? extends Plugin>> getPlugins() {
74+
final ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins());
75+
plugins.addAll(
76+
List.of(
77+
LocalStateCompositeXPackPlugin.class,
78+
AnalyticsPlugin.class,
79+
AsyncSearch.class,
80+
Autoscaling.class,
81+
Ccr.class,
82+
DataStreamsPlugin.class,
83+
Downsample.class,
84+
EqlPlugin.class,
85+
EsqlPlugin.class,
86+
FrozenIndices.class,
87+
Graph.class,
88+
IndexLifecycle.class,
89+
InferencePlugin.class,
90+
IngestCommonPlugin.class,
91+
IngestTestPlugin.class,
92+
MustachePlugin.class,
93+
ProfilingPlugin.class,
94+
RankEvalPlugin.class,
95+
ReindexPlugin.class,
96+
Rollup.class,
97+
SnapshotLifecycle.class,
98+
SqlPlugin.class
99+
)
100+
);
101+
return plugins;
102+
}
103+
104+
@SuppressWarnings("rawtypes")
105+
public void testCheckForNewShardLevelTransportActions() throws Exception {
106+
Node node = node();
107+
List<Binding<TransportAction>> transportActionBindings = node.injector().findBindingsByType(TypeLiteral.get(TransportAction.class));
108+
Set<String> crossClusterPrivilegeNames = new HashSet<>();
109+
crossClusterPrivilegeNames.addAll(List.of(CrossClusterApiKeyRoleDescriptorBuilder.CCS_INDICES_PRIVILEGE_NAMES));
110+
crossClusterPrivilegeNames.addAll(List.of(CrossClusterApiKeyRoleDescriptorBuilder.CCR_INDICES_PRIVILEGE_NAMES));
111+
112+
List<String> shardActions = transportActionBindings.stream()
113+
.map(binding -> binding.getProvider().get())
114+
.filter(action -> IndexPrivilege.get(crossClusterPrivilegeNames).predicate().test(action.actionName))
115+
.filter(this::actionIsLikelyShardAction)
116+
.map(action -> action.actionName)
117+
.toList();
118+
119+
List<String> actionsNotOnAllowlist = shardActions.stream().filter(Predicate.not(MANUALLY_CHECKED_SHARD_ACTIONS::contains)).toList();
120+
if (actionsNotOnAllowlist.isEmpty() == false) {
121+
fail("""
122+
If this test fails, you likely just added a transport action, probably with `shard` in the name. Transport actions which
123+
operate on shards directly and can be used across clusters must meet some additional requirements in order to be
124+
handled correctly by all Elasticsearch infrastructure, so please make sure you have read the javadoc on the
125+
IndicesRequest.RemoteClusterShardRequest interface and implemented it if appropriate and not already appropriately
126+
implemented by a supertype, then add the name (as in "indices:data/read/get") of your new transport action to
127+
MANUALLY_CHECKED_SHARD_ACTIONS above. Found actions not in allowlist:
128+
""" + actionsNotOnAllowlist);
129+
}
130+
131+
// Also make sure the allowlist stays up to date and doesn't have any unnecessary entries.
132+
List<String> actionsOnAllowlistNotFound = MANUALLY_CHECKED_SHARD_ACTIONS.stream()
133+
.filter(Predicate.not(shardActions::contains))
134+
.toList();
135+
if (actionsOnAllowlistNotFound.isEmpty() == false) {
136+
fail(
137+
"Some actions were on the allowlist but not found in the list of cross-cluster capable transport actions, please remove "
138+
+ "these from MANUALLY_CHECKED_SHARD_ACTIONS if they have been removed from Elasticsearch: "
139+
+ actionsOnAllowlistNotFound
140+
);
141+
}
142+
}
143+
144+
/**
145+
* Getting to the actual request classes themselves is made difficult by the design of Elasticsearch's transport
146+
* protocol infrastructure combined with JVM type erasure. Therefore, we resort to a crude heuristic here.
147+
* @param transportAction The transportport action to be checked.
148+
* @return True if the action is suspected of being an action which may operate on shards directly.
149+
*/
150+
private boolean actionIsLikelyShardAction(TransportAction<?, ?> transportAction) {
151+
Class<?> clazz = transportAction.getClass();
152+
Set<Class<?>> classHeirarchy = new HashSet<>();
153+
while (clazz != TransportAction.class) {
154+
classHeirarchy.add(clazz);
155+
clazz = clazz.getSuperclass();
156+
}
157+
boolean hasCheckedSuperclass = classHeirarchy.stream().anyMatch(clz -> CHECKED_ABSTRACT_CLASSES.contains(clz));
158+
boolean shardInClassName = classHeirarchy.stream().anyMatch(clz -> clz.getName().toLowerCase(Locale.ROOT).contains("shard"));
159+
return hasCheckedSuperclass == false
160+
&& (shardInClassName
161+
|| transportAction.actionName.toLowerCase(Locale.ROOT).contains("shard")
162+
|| transportAction.actionName.toLowerCase(Locale.ROOT).contains("[s]"));
163+
}
164+
165+
}

0 commit comments

Comments
 (0)