Skip to content

Commit e724daf

Browse files
authored
Restrict failure stores from replicating via CCR (#126355) (#126557)
Checks to see if an index belongs to a data stream's failure store before following it. If the index is a failure index, the follow operation is rejected. Also updates the logic in the auto follower API's to exclude failure indices on data streams from being followed if their parent data stream matches the follow pattern.
1 parent 0c65592 commit e724daf

File tree

4 files changed

+276
-9
lines changed

4 files changed

+276
-9
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
130130
client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME),
131131
RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
132132
);
133-
checkRemoteClusterLicenseAndFetchClusterState(
133+
doCheckRemoteClusterLicenseAndFetchClusterState(
134134
client,
135135
clusterAlias,
136136
remoteClient,
@@ -165,6 +165,12 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
165165
final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null
166166
? indexAbstraction.getParentDataStream()
167167
: null;
168+
// Ensure that this leader index is not a failure store index, because they are not yet supported in CCR
169+
if (remoteDataStream != null && remoteDataStream.isFailureStoreIndex(leaderIndex)) {
170+
String message = String.format(Locale.ROOT, "cannot follow [%s], because it is a failure store index", leaderIndex);
171+
onFailure.accept(new IllegalArgumentException(message));
172+
return;
173+
}
168174
hasPrivilegesToFollowIndices(client.threadPool().getThreadContext(), remoteClient, new String[] { leaderIndex }, e -> {
169175
if (e == null) {
170176
fetchLeaderHistoryUUIDs(
@@ -228,6 +234,29 @@ public static void checkRemoteClusterLicenseAndFetchClusterState(
228234
}
229235
}
230236

237+
// overridable for testing
238+
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
239+
final Client client,
240+
final String clusterAlias,
241+
final RemoteClusterClient remoteClient,
242+
final ClusterStateRequest request,
243+
final Consumer<Exception> onFailure,
244+
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
245+
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
246+
final Function<Exception, ElasticsearchStatusException> unknownLicense
247+
) {
248+
checkRemoteClusterLicenseAndFetchClusterState(
249+
client,
250+
clusterAlias,
251+
remoteClient,
252+
request,
253+
onFailure,
254+
leaderClusterStateConsumer,
255+
nonCompliantLicense,
256+
unknownLicense
257+
);
258+
}
259+
231260
/**
232261
* Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
233262
* the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseCheckerTests.java

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,50 @@
77

88
package org.elasticsearch.xpack.ccr;
99

10+
import org.elasticsearch.ElasticsearchStatusException;
11+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
12+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.elasticsearch.client.internal.Client;
1014
import org.elasticsearch.client.internal.RemoteClusterClient;
15+
import org.elasticsearch.cluster.ClusterName;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.cluster.metadata.AliasMetadata;
18+
import org.elasticsearch.cluster.metadata.DataStream;
19+
import org.elasticsearch.cluster.metadata.DataStreamAlias;
20+
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
21+
import org.elasticsearch.cluster.metadata.IndexMetadata;
22+
import org.elasticsearch.cluster.metadata.Metadata;
1123
import org.elasticsearch.common.settings.Settings;
1224
import org.elasticsearch.common.util.concurrent.ThreadContext;
25+
import org.elasticsearch.core.Tuple;
26+
import org.elasticsearch.index.IndexNotFoundException;
27+
import org.elasticsearch.index.IndexVersion;
28+
import org.elasticsearch.indices.IndexClosedException;
29+
import org.elasticsearch.license.RemoteClusterLicenseChecker;
1330
import org.elasticsearch.test.ESTestCase;
31+
import org.elasticsearch.threadpool.ThreadPool;
1432
import org.elasticsearch.xpack.core.security.user.User;
33+
import org.mockito.ArgumentCaptor;
1534

35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.ExecutorService;
1638
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.function.BiConsumer;
40+
import java.util.function.Consumer;
41+
import java.util.function.Function;
1742

1843
import static org.hamcrest.Matchers.containsString;
44+
import static org.hamcrest.Matchers.equalTo;
1945
import static org.hamcrest.Matchers.hasToString;
2046
import static org.hamcrest.Matchers.instanceOf;
47+
import static org.mockito.ArgumentMatchers.any;
48+
import static org.mockito.ArgumentMatchers.eq;
2149
import static org.mockito.Mockito.mock;
50+
import static org.mockito.Mockito.times;
51+
import static org.mockito.Mockito.verify;
52+
import static org.mockito.Mockito.verifyNoInteractions;
53+
import static org.mockito.Mockito.when;
2254

2355
public class CcrLicenseCheckerTests extends ESTestCase {
2456

@@ -46,4 +78,164 @@ User getUser(final ThreadContext threadContext) {
4678
assertTrue(invoked.get());
4779
}
4880

81+
/**
82+
* Tests all validation logic after obtaining the remote cluster state and before executing the check for follower privileges.
83+
*/
84+
public void testRemoteIndexValidation() {
85+
// A cluster state with
86+
// - a data stream, containing a backing index and a failure index
87+
// - an alias that points to said data stream
88+
// - a standalone index
89+
// - an alias that points to said standalone index
90+
// - a closed index
91+
String indexName = "random-index";
92+
String closedIndexName = "closed-index";
93+
String dataStreamName = "logs-test-data";
94+
String aliasName = "foo-alias";
95+
String dsAliasName = "ds-alias";
96+
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
97+
.putAlias(AliasMetadata.builder(aliasName))
98+
.settings(settings(IndexVersion.current()))
99+
.numberOfShards(5)
100+
.numberOfReplicas(1)
101+
.build();
102+
IndexMetadata closedIndexMetadata = IndexMetadata.builder(closedIndexName)
103+
.settings(settings(IndexVersion.current()))
104+
.numberOfShards(5)
105+
.numberOfReplicas(1)
106+
.state(IndexMetadata.State.CLOSE)
107+
.build();
108+
IndexMetadata firstBackingIndex = DataStreamTestHelper.createFirstBackingIndex(dataStreamName).build();
109+
IndexMetadata firstFailureStore = DataStreamTestHelper.createFirstFailureStore(dataStreamName).build();
110+
DataStream dataStream = DataStreamTestHelper.newInstance(
111+
dataStreamName,
112+
List.of(firstBackingIndex.getIndex()),
113+
List.of(firstFailureStore.getIndex())
114+
);
115+
ClusterState remoteClusterState = ClusterState.builder(new ClusterName(randomIdentifier()))
116+
.metadata(
117+
Metadata.builder()
118+
.put(indexMetadata, false)
119+
.put(closedIndexMetadata, false)
120+
.put(firstBackingIndex, false)
121+
.put(firstFailureStore, false)
122+
.dataStreams(
123+
Map.of(dataStreamName, dataStream),
124+
Map.of(dsAliasName, new DataStreamAlias(dsAliasName, List.of(dataStreamName), dataStreamName, Map.of()))
125+
)
126+
)
127+
.build();
128+
129+
final boolean isCcrAllowed = randomBoolean();
130+
final CcrLicenseChecker checker = new CcrLicenseChecker(() -> isCcrAllowed, () -> true) {
131+
@Override
132+
User getUser(ThreadContext threadContext) {
133+
return null;
134+
}
135+
136+
@Override
137+
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
138+
Client client,
139+
String clusterAlias,
140+
RemoteClusterClient remoteClient,
141+
ClusterStateRequest request,
142+
Consumer<Exception> onFailure,
143+
Consumer<ClusterStateResponse> leaderClusterStateConsumer,
144+
Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
145+
Function<Exception, ElasticsearchStatusException> unknownLicense
146+
) {
147+
leaderClusterStateConsumer.accept(new ClusterStateResponse(remoteClusterState.getClusterName(), remoteClusterState, false));
148+
}
149+
150+
@Override
151+
public void hasPrivilegesToFollowIndices(
152+
ThreadContext threadContext,
153+
RemoteClusterClient remoteClient,
154+
String[] indices,
155+
Consumer<Exception> handler
156+
) {
157+
fail("Test case should fail before this code is called");
158+
}
159+
};
160+
161+
String clusterAlias = randomIdentifier();
162+
163+
ExecutorService mockExecutor = mock(ExecutorService.class);
164+
ThreadPool mockThreadPool = mock(ThreadPool.class);
165+
when(mockThreadPool.executor(eq(Ccr.CCR_THREAD_POOL_NAME))).thenReturn(mockExecutor);
166+
RemoteClusterClient mockRemoteClient = mock(RemoteClusterClient.class);
167+
168+
Client mockClient = mock(Client.class);
169+
when(mockClient.threadPool()).thenReturn(mockThreadPool);
170+
when(mockClient.getRemoteClusterClient(eq(clusterAlias), eq(mockExecutor), any())).thenReturn(mockRemoteClient);
171+
172+
// When following an index that does not exist, throw IndexNotFoundException
173+
{
174+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, "non-existent-index");
175+
assertThat(exception, instanceOf(IndexNotFoundException.class));
176+
assertThat(exception.getMessage(), equalTo("no such index [non-existent-index]"));
177+
}
178+
179+
// When following an alias, throw IllegalArgumentException
180+
{
181+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, aliasName);
182+
assertThat(exception, instanceOf(IllegalArgumentException.class));
183+
assertThat(exception.getMessage(), equalTo("cannot follow [" + aliasName + "], because it is a ALIAS"));
184+
}
185+
186+
// When following a data stream, throw IllegalArgumentException
187+
{
188+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dataStreamName);
189+
assertThat(exception, instanceOf(IllegalArgumentException.class));
190+
assertThat(exception.getMessage(), equalTo("cannot follow [" + dataStreamName + "], because it is a DATA_STREAM"));
191+
}
192+
193+
// When following a data stream alias, throw IllegalArgumentException
194+
{
195+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dsAliasName);
196+
assertThat(exception, instanceOf(IllegalArgumentException.class));
197+
assertThat(exception.getMessage(), equalTo("cannot follow [" + dsAliasName + "], because it is a ALIAS"));
198+
}
199+
200+
// When following a closed index, throw IndexClosedException
201+
{
202+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, closedIndexName);
203+
assertThat(exception, instanceOf(IndexClosedException.class));
204+
assertThat(exception.getMessage(), equalTo("closed"));
205+
}
206+
207+
// When following a failure store index, throw IllegalArgumentException
208+
{
209+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, firstFailureStore.getIndex().getName());
210+
assertThat(exception, instanceOf(IllegalArgumentException.class));
211+
assertThat(
212+
exception.getMessage(),
213+
equalTo("cannot follow [" + firstFailureStore.getIndex().getName() + "], because it is a failure store index")
214+
);
215+
}
216+
}
217+
218+
private static Exception executeExpectingException(
219+
CcrLicenseChecker checker,
220+
Client mockClient,
221+
String clusterAlias,
222+
String leaderIndex
223+
) {
224+
@SuppressWarnings("unchecked")
225+
Consumer<Exception> onFailure = mock(Consumer.class);
226+
@SuppressWarnings("unchecked")
227+
BiConsumer<String[], Tuple<IndexMetadata, DataStream>> consumer = mock(BiConsumer.class);
228+
checker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
229+
mockClient,
230+
clusterAlias,
231+
leaderIndex,
232+
onFailure,
233+
consumer
234+
);
235+
ArgumentCaptor<Exception> captor = ArgumentCaptor.forClass(Exception.class);
236+
verify(onFailure, times(1)).accept(captor.capture());
237+
verifyNoInteractions(consumer);
238+
return captor.getValue();
239+
}
240+
49241
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public void testAutoFollower_dataStream() {
169169
Client client = mock(Client.class);
170170
when(client.getRemoteClusterClient(anyString(), any(), any())).thenReturn(new RedirectToLocalClusterRemoteClusterClient(client));
171171

172-
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar");
172+
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar", false, true);
173173

174174
AutoFollowPattern autoFollowPattern = createAutoFollowPattern("remote", "logs-*");
175175
Map<String, AutoFollowPattern> patterns = new HashMap<>();
@@ -2616,23 +2616,53 @@ private ClusterService mockClusterService() {
26162616
}
26172617

26182618
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) {
2619-
return createRemoteClusterStateWithDataStream(dataStreamName, false);
2619+
return createRemoteClusterStateWithDataStream(dataStreamName, false, false);
26202620
}
26212621

2622-
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system) {
2622+
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system, boolean withFailures) {
2623+
long currentTimeMillis = System.currentTimeMillis();
2624+
26232625
Settings.Builder indexSettings = settings(IndexVersion.current());
26242626
indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
26252627
indexSettings.put("index.hidden", true);
26262628

2627-
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
2629+
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1, currentTimeMillis))
26282630
.settings(indexSettings)
26292631
.numberOfShards(1)
26302632
.numberOfReplicas(0)
26312633
.system(system)
26322634
.build();
2633-
DataStream dataStream = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system).build();
2634-
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
2635-
.metadata(Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L));
2635+
2636+
IndexMetadata failureIndexMetadata = null;
2637+
DataStream.DataStreamIndices failureStore = null;
2638+
if (withFailures) {
2639+
Settings.Builder failureIndexSettings = settings(IndexVersion.current());
2640+
failureIndexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
2641+
failureIndexSettings.put("index.hidden", true);
2642+
2643+
String defaultFailureStoreName = DataStream.getDefaultFailureStoreName(dataStreamName, 1, currentTimeMillis);
2644+
failureIndexMetadata = IndexMetadata.builder(defaultFailureStoreName)
2645+
.settings(failureIndexSettings)
2646+
.numberOfShards(1)
2647+
.numberOfReplicas(0)
2648+
.system(system)
2649+
.build();
2650+
2651+
failureStore = DataStream.DataStreamIndices.failureIndicesBuilder(List.of(failureIndexMetadata.getIndex())).build();
2652+
}
2653+
2654+
var dataStreamBuilder = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system);
2655+
if (withFailures) {
2656+
dataStreamBuilder.setFailureIndices(failureStore);
2657+
}
2658+
DataStream dataStream = dataStreamBuilder.build();
2659+
2660+
var mdBuilder = Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L);
2661+
if (withFailures) {
2662+
mdBuilder.put(failureIndexMetadata, true);
2663+
}
2664+
2665+
var routingTableBuilder = RoutingTable.builder();
26362666

26372667
ShardRouting shardRouting = TestShardRouting.newShardRouting(
26382668
new ShardId(indexMetadata.getIndex(), 0),
@@ -2641,7 +2671,22 @@ private static ClusterState createRemoteClusterStateWithDataStream(String dataSt
26412671
ShardRoutingState.INITIALIZING
26422672
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
26432673
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build();
2644-
return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
2674+
routingTableBuilder.add(indexRoutingTable);
2675+
2676+
if (withFailures) {
2677+
ShardRouting failureShardRouting = TestShardRouting.newShardRouting(
2678+
new ShardId(failureIndexMetadata.getIndex(), 0),
2679+
"1",
2680+
true,
2681+
ShardRoutingState.INITIALIZING
2682+
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
2683+
IndexRoutingTable failureIndexRoutingTable = IndexRoutingTable.builder(failureIndexMetadata.getIndex())
2684+
.addShard(failureShardRouting)
2685+
.build();
2686+
routingTableBuilder.add(failureIndexRoutingTable);
2687+
}
2688+
2689+
return ClusterState.builder(new ClusterName("remote")).metadata(mdBuilder).routingTable(routingTableBuilder.build()).build();
26452690
}
26462691

26472692
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ public static boolean match(
313313
final DataStream parentDataStream = indexAbstraction.getParentDataStream();
314314
return parentDataStream != null
315315
&& parentDataStream.isSystem() == false
316+
&& parentDataStream.isFailureStoreIndex(indexAbstraction.getName()) == false
316317
&& Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getParentDataStream().getName()) == false
317318
&& Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getParentDataStream().getName());
318319
}

0 commit comments

Comments
 (0)