Skip to content

Commit 8f42295

Browse files
committed
Restrict failure store from replicating via CCR
1 parent a6aaccd commit 8f42295

File tree

4 files changed

+272
-9
lines changed

4 files changed

+272
-9
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
130130
client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME),
131131
RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
132132
);
133-
checkRemoteClusterLicenseAndFetchClusterState(
133+
doCheckRemoteClusterLicenseAndFetchClusterState(
134134
client,
135135
clusterAlias,
136136
remoteClient,
@@ -168,6 +168,12 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
168168
final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null
169169
? indexAbstraction.getParentDataStream()
170170
: null;
171+
// Ensure that this leader index is not a failure store index, because they are not yet supported in CCR
172+
if (remoteDataStream != null && remoteDataStream.isFailureStoreIndex(leaderIndex)) {
173+
String message = String.format(Locale.ROOT, "cannot follow [%s], because it is a failure store index", leaderIndex);
174+
onFailure.accept(new IllegalArgumentException(message));
175+
return;
176+
}
171177
hasPrivilegesToFollowIndices(client.threadPool().getThreadContext(), remoteClient, new String[] { leaderIndex }, e -> {
172178
if (e == null) {
173179
fetchLeaderHistoryUUIDs(
@@ -231,6 +237,29 @@ public static void checkRemoteClusterLicenseAndFetchClusterState(
231237
}
232238
}
233239

240+
// overridable for testing
241+
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
242+
final Client client,
243+
final String clusterAlias,
244+
final RemoteClusterClient remoteClient,
245+
final ClusterStateRequest request,
246+
final Consumer<Exception> onFailure,
247+
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
248+
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
249+
final Function<Exception, ElasticsearchStatusException> unknownLicense
250+
) {
251+
checkRemoteClusterLicenseAndFetchClusterState(
252+
client,
253+
clusterAlias,
254+
remoteClient,
255+
request,
256+
onFailure,
257+
leaderClusterStateConsumer,
258+
nonCompliantLicense,
259+
unknownLicense
260+
);
261+
}
262+
234263
/**
235264
* Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
236265
* the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseCheckerTests.java

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,52 @@
77

88
package org.elasticsearch.xpack.ccr;
99

10+
import org.elasticsearch.ElasticsearchStatusException;
11+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
12+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.elasticsearch.client.internal.Client;
1014
import org.elasticsearch.client.internal.RemoteClusterClient;
15+
import org.elasticsearch.cluster.ClusterName;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.cluster.metadata.AliasMetadata;
18+
import org.elasticsearch.cluster.metadata.DataStream;
19+
import org.elasticsearch.cluster.metadata.DataStreamAlias;
20+
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
21+
import org.elasticsearch.cluster.metadata.IndexMetadata;
22+
import org.elasticsearch.cluster.metadata.Metadata;
1123
import org.elasticsearch.common.settings.Settings;
1224
import org.elasticsearch.common.util.concurrent.ThreadContext;
25+
import org.elasticsearch.core.Tuple;
26+
import org.elasticsearch.index.IndexNotFoundException;
27+
import org.elasticsearch.index.IndexVersion;
28+
import org.elasticsearch.indices.IndexClosedException;
29+
import org.elasticsearch.license.RemoteClusterLicenseChecker;
1330
import org.elasticsearch.test.ESTestCase;
31+
import org.elasticsearch.threadpool.ThreadPool;
1432
import org.elasticsearch.xpack.core.security.user.User;
33+
import org.mockito.ArgumentCaptor;
1534

35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.ExecutorService;
1638
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.function.BiConsumer;
40+
import java.util.function.Consumer;
41+
import java.util.function.Function;
1742

1843
import static org.hamcrest.Matchers.containsString;
44+
import static org.hamcrest.Matchers.equalTo;
1945
import static org.hamcrest.Matchers.hasToString;
2046
import static org.hamcrest.Matchers.instanceOf;
47+
import static org.hamcrest.Matchers.matchesPattern;
48+
import static org.mockito.ArgumentMatchers.any;
49+
import static org.mockito.ArgumentMatchers.eq;
50+
import static org.mockito.ArgumentMatchers.matches;
2151
import static org.mockito.Mockito.mock;
52+
import static org.mockito.Mockito.times;
53+
import static org.mockito.Mockito.verify;
54+
import static org.mockito.Mockito.verifyNoInteractions;
55+
import static org.mockito.Mockito.when;
2256

2357
public class CcrLicenseCheckerTests extends ESTestCase {
2458

@@ -46,4 +80,158 @@ User getUser(final ThreadContext threadContext) {
4680
assertTrue(invoked.get());
4781
}
4882

83+
/**
84+
* Tests all validation logic after obtaining the remote cluster state and before executing the check for follower privileges.
85+
*/
86+
public void testRemoteIndexValidation() {
87+
// A cluster state with
88+
// - a data stream, containing a backing index and a failure index
89+
// - an alias that points to said data stream
90+
// - a standalone index
91+
// - an alias that points to said standalone index
92+
// - a closed index
93+
String indexName = "random-index";
94+
String closedIndexName = "closed-index";
95+
String dataStreamName = "logs-test-data";
96+
String aliasName = "foo-alias";
97+
String dsAliasName = "ds-alias";
98+
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
99+
.putAlias(AliasMetadata.builder(aliasName))
100+
.settings(settings(IndexVersion.current()))
101+
.numberOfShards(5)
102+
.numberOfReplicas(1)
103+
.build();
104+
IndexMetadata closedIndexMetadata = IndexMetadata.builder(closedIndexName)
105+
.settings(settings(IndexVersion.current()))
106+
.numberOfShards(5)
107+
.numberOfReplicas(1)
108+
.state(IndexMetadata.State.CLOSE)
109+
.build();
110+
IndexMetadata firstBackingIndex = DataStreamTestHelper.createFirstBackingIndex(dataStreamName)
111+
.build();
112+
IndexMetadata firstFailureStore = DataStreamTestHelper.createFirstFailureStore(dataStreamName).build();
113+
DataStream dataStream = DataStreamTestHelper.newInstance(
114+
dataStreamName,
115+
List.of(firstBackingIndex.getIndex()),
116+
List.of(firstFailureStore.getIndex())
117+
);
118+
ClusterState remoteClusterState = ClusterState.builder(new ClusterName(randomIdentifier()))
119+
.metadata(Metadata.builder()
120+
.put(indexMetadata, false)
121+
.put(closedIndexMetadata, false)
122+
.put(firstBackingIndex, false)
123+
.put(firstFailureStore, false)
124+
.dataStreams(
125+
Map.of(dataStreamName, dataStream),
126+
Map.of(dsAliasName, new DataStreamAlias(dsAliasName, List.of(dataStreamName), dataStreamName, Map.of()))
127+
))
128+
.build();
129+
130+
final boolean isCcrAllowed = randomBoolean();
131+
final CcrLicenseChecker checker = new CcrLicenseChecker(() -> isCcrAllowed, () -> true) {
132+
@Override
133+
User getUser(ThreadContext threadContext) {
134+
return null;
135+
}
136+
137+
@Override
138+
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
139+
Client client,
140+
String clusterAlias,
141+
RemoteClusterClient remoteClient,
142+
ClusterStateRequest request,
143+
Consumer<Exception> onFailure,
144+
Consumer<ClusterStateResponse> leaderClusterStateConsumer,
145+
Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
146+
Function<Exception, ElasticsearchStatusException> unknownLicense
147+
) {
148+
leaderClusterStateConsumer.accept(new ClusterStateResponse(remoteClusterState.getClusterName(), remoteClusterState, false));
149+
}
150+
151+
@Override
152+
public void hasPrivilegesToFollowIndices(ThreadContext threadContext, RemoteClusterClient remoteClient, String[] indices, Consumer<Exception> handler) {
153+
fail("Test case should fail before this code is called");
154+
}
155+
};
156+
157+
String clusterAlias = randomIdentifier();
158+
159+
ExecutorService mockExecutor = mock(ExecutorService.class);
160+
ThreadPool mockThreadPool = mock(ThreadPool.class);
161+
when(mockThreadPool.executor(eq(Ccr.CCR_THREAD_POOL_NAME))).thenReturn(mockExecutor);
162+
RemoteClusterClient mockRemoteClient = mock(RemoteClusterClient.class);
163+
164+
Client mockClient = mock(Client.class);
165+
when(mockClient.threadPool()).thenReturn(mockThreadPool);
166+
when(mockClient.getRemoteClusterClient(eq(clusterAlias), eq(mockExecutor), any())).thenReturn(mockRemoteClient);
167+
168+
// When following an index that does not exist, throw IndexNotFoundException
169+
{
170+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, "non-existent-index");
171+
assertThat(exception, instanceOf(IndexNotFoundException.class));
172+
assertThat(exception.getMessage(), equalTo("no such index [non-existent-index]"));
173+
}
174+
175+
// When following an alias, throw IllegalArgumentException
176+
{
177+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, aliasName);
178+
assertThat(exception, instanceOf(IllegalArgumentException.class));
179+
assertThat(exception.getMessage(), equalTo("cannot follow ["+aliasName+"], because it is a ALIAS"));
180+
}
181+
182+
// When following a data stream, throw IllegalArgumentException
183+
{
184+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dataStreamName);
185+
assertThat(exception, instanceOf(IllegalArgumentException.class));
186+
assertThat(exception.getMessage(), equalTo("cannot follow ["+dataStreamName+"], because it is a DATA_STREAM"));
187+
}
188+
189+
// When following a data stream alias, throw IllegalArgumentException
190+
{
191+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dsAliasName);
192+
assertThat(exception, instanceOf(IllegalArgumentException.class));
193+
assertThat(exception.getMessage(), equalTo("cannot follow ["+dsAliasName+"], because it is a ALIAS"));
194+
}
195+
196+
// When following a closed index, throw IndexClosedException
197+
{
198+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, closedIndexName);
199+
assertThat(exception, instanceOf(IndexClosedException.class));
200+
assertThat(exception.getMessage(), equalTo("closed"));
201+
}
202+
203+
// When following a failure store index, throw IllegalArgumentException
204+
{
205+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, firstFailureStore.getIndex().getName());
206+
assertThat(exception, instanceOf(IllegalArgumentException.class));
207+
assertThat(
208+
exception.getMessage(),
209+
equalTo("cannot follow [" + firstFailureStore.getIndex().getName() + "], because it is a failure store index")
210+
);
211+
}
212+
}
213+
214+
private static Exception executeExpectingException(
215+
CcrLicenseChecker checker,
216+
Client mockClient,
217+
String clusterAlias,
218+
String leaderIndex
219+
) {
220+
@SuppressWarnings("unchecked")
221+
Consumer<Exception> onFailure = mock(Consumer.class);
222+
@SuppressWarnings("unchecked")
223+
BiConsumer<String[], Tuple<IndexMetadata, DataStream>> consumer = mock(BiConsumer.class);
224+
checker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
225+
mockClient,
226+
clusterAlias,
227+
leaderIndex,
228+
onFailure,
229+
consumer
230+
);
231+
ArgumentCaptor<Exception> captor = ArgumentCaptor.forClass(Exception.class);
232+
verify(onFailure, times(1)).accept(captor.capture());
233+
verifyNoInteractions(consumer);
234+
return captor.getValue();
235+
}
236+
49237
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public void testAutoFollower_dataStream() {
170170
Client client = mock(Client.class);
171171
when(client.getRemoteClusterClient(anyString(), any(), any())).thenReturn(new RedirectToLocalClusterRemoteClusterClient(client));
172172

173-
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar");
173+
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar", false, true);
174174

175175
AutoFollowPattern autoFollowPattern = createAutoFollowPattern("remote", "logs-*");
176176
Map<String, AutoFollowPattern> patterns = new HashMap<>();
@@ -2562,23 +2562,53 @@ private ClusterService mockClusterService() {
25622562
}
25632563

25642564
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) {
2565-
return createRemoteClusterStateWithDataStream(dataStreamName, false);
2565+
return createRemoteClusterStateWithDataStream(dataStreamName, false, false);
25662566
}
25672567

2568-
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system) {
2568+
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system, boolean withFailures) {
2569+
long currentTimeMillis = System.currentTimeMillis();
2570+
25692571
Settings.Builder indexSettings = settings(IndexVersion.current());
25702572
indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
25712573
indexSettings.put("index.hidden", true);
25722574

2573-
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
2575+
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1, currentTimeMillis))
25742576
.settings(indexSettings)
25752577
.numberOfShards(1)
25762578
.numberOfReplicas(0)
25772579
.system(system)
25782580
.build();
2579-
DataStream dataStream = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system).build();
2580-
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
2581-
.metadata(Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L));
2581+
2582+
IndexMetadata failureIndexMetadata = null;
2583+
DataStream.DataStreamIndices failureStore = null;
2584+
if (withFailures) {
2585+
Settings.Builder failureIndexSettings = settings(IndexVersion.current());
2586+
failureIndexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
2587+
failureIndexSettings.put("index.hidden", true);
2588+
2589+
String defaultFailureStoreName = DataStream.getDefaultFailureStoreName(dataStreamName, 1, currentTimeMillis);
2590+
failureIndexMetadata = IndexMetadata.builder(defaultFailureStoreName)
2591+
.settings(failureIndexSettings)
2592+
.numberOfShards(1)
2593+
.numberOfReplicas(0)
2594+
.system(system)
2595+
.build();
2596+
2597+
failureStore = DataStream.DataStreamIndices.failureIndicesBuilder(List.of(failureIndexMetadata.getIndex())).build();
2598+
}
2599+
2600+
var dataStreamBuilder = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system);
2601+
if (withFailures) {
2602+
dataStreamBuilder.setFailureIndices(failureStore);
2603+
}
2604+
DataStream dataStream = dataStreamBuilder.build();
2605+
2606+
var mdBuilder = Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L);
2607+
if (withFailures) {
2608+
mdBuilder.put(failureIndexMetadata, true);
2609+
}
2610+
2611+
var routingTableBuilder = RoutingTable.builder();
25822612

25832613
ShardRouting shardRouting = TestShardRouting.newShardRouting(
25842614
new ShardId(indexMetadata.getIndex(), 0),
@@ -2587,7 +2617,22 @@ private static ClusterState createRemoteClusterStateWithDataStream(String dataSt
25872617
ShardRoutingState.INITIALIZING
25882618
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
25892619
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build();
2590-
return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
2620+
routingTableBuilder.add(indexRoutingTable);
2621+
2622+
if (withFailures) {
2623+
ShardRouting failureShardRouting = TestShardRouting.newShardRouting(
2624+
new ShardId(failureIndexMetadata.getIndex(), 0),
2625+
"1",
2626+
true,
2627+
ShardRoutingState.INITIALIZING
2628+
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
2629+
IndexRoutingTable failureIndexRoutingTable = IndexRoutingTable.builder(failureIndexMetadata.getIndex())
2630+
.addShard(failureShardRouting)
2631+
.build();
2632+
routingTableBuilder.add(failureIndexRoutingTable);
2633+
}
2634+
2635+
return ClusterState.builder(new ClusterName("remote")).metadata(mdBuilder).routingTable(routingTableBuilder.build()).build();
25912636
}
25922637

25932638
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ public static boolean match(
302302
final DataStream parentDataStream = indexAbstraction.getParentDataStream();
303303
return parentDataStream != null
304304
&& parentDataStream.isSystem() == false
305+
&& parentDataStream.isFailureStoreIndex(indexAbstraction.getName()) == false
305306
&& Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getParentDataStream().getName()) == false
306307
&& Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getParentDataStream().getName());
307308
}

0 commit comments

Comments
 (0)