Skip to content

Commit 6d86b20

Browse files
authored
Restrict failure stores from replicating via CCR (elastic#126355)
Checks to see if an index belongs to a data stream's failure store before following it. If the index is a failure index, the follow operation is rejected. Also updates the logic in the auto follower API's to exclude failure indices on data streams from being followed if their parent data stream matches the follow pattern.
1 parent 1b021c5 commit 6d86b20

File tree

4 files changed

+276
-9
lines changed

4 files changed

+276
-9
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
130130
client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME),
131131
RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
132132
);
133-
checkRemoteClusterLicenseAndFetchClusterState(
133+
doCheckRemoteClusterLicenseAndFetchClusterState(
134134
client,
135135
clusterAlias,
136136
remoteClient,
@@ -168,6 +168,12 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
168168
final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null
169169
? indexAbstraction.getParentDataStream()
170170
: null;
171+
// Ensure that this leader index is not a failure store index, because they are not yet supported in CCR
172+
if (remoteDataStream != null && remoteDataStream.isFailureStoreIndex(leaderIndex)) {
173+
String message = String.format(Locale.ROOT, "cannot follow [%s], because it is a failure store index", leaderIndex);
174+
onFailure.accept(new IllegalArgumentException(message));
175+
return;
176+
}
171177
hasPrivilegesToFollowIndices(client.threadPool().getThreadContext(), remoteClient, new String[] { leaderIndex }, e -> {
172178
if (e == null) {
173179
fetchLeaderHistoryUUIDs(
@@ -231,6 +237,29 @@ public static void checkRemoteClusterLicenseAndFetchClusterState(
231237
}
232238
}
233239

240+
// overridable for testing
241+
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
242+
final Client client,
243+
final String clusterAlias,
244+
final RemoteClusterClient remoteClient,
245+
final ClusterStateRequest request,
246+
final Consumer<Exception> onFailure,
247+
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
248+
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
249+
final Function<Exception, ElasticsearchStatusException> unknownLicense
250+
) {
251+
checkRemoteClusterLicenseAndFetchClusterState(
252+
client,
253+
clusterAlias,
254+
remoteClient,
255+
request,
256+
onFailure,
257+
leaderClusterStateConsumer,
258+
nonCompliantLicense,
259+
unknownLicense
260+
);
261+
}
262+
234263
/**
235264
* Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
236265
* the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseCheckerTests.java

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,50 @@
77

88
package org.elasticsearch.xpack.ccr;
99

10+
import org.elasticsearch.ElasticsearchStatusException;
11+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
12+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.elasticsearch.client.internal.Client;
1014
import org.elasticsearch.client.internal.RemoteClusterClient;
15+
import org.elasticsearch.cluster.ClusterName;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.cluster.metadata.AliasMetadata;
18+
import org.elasticsearch.cluster.metadata.DataStream;
19+
import org.elasticsearch.cluster.metadata.DataStreamAlias;
20+
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
21+
import org.elasticsearch.cluster.metadata.IndexMetadata;
22+
import org.elasticsearch.cluster.metadata.Metadata;
1123
import org.elasticsearch.common.settings.Settings;
1224
import org.elasticsearch.common.util.concurrent.ThreadContext;
25+
import org.elasticsearch.core.Tuple;
26+
import org.elasticsearch.index.IndexNotFoundException;
27+
import org.elasticsearch.index.IndexVersion;
28+
import org.elasticsearch.indices.IndexClosedException;
29+
import org.elasticsearch.license.RemoteClusterLicenseChecker;
1330
import org.elasticsearch.test.ESTestCase;
31+
import org.elasticsearch.threadpool.ThreadPool;
1432
import org.elasticsearch.xpack.core.security.user.User;
33+
import org.mockito.ArgumentCaptor;
1534

35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.ExecutorService;
1638
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.function.BiConsumer;
40+
import java.util.function.Consumer;
41+
import java.util.function.Function;
1742

1843
import static org.hamcrest.Matchers.containsString;
44+
import static org.hamcrest.Matchers.equalTo;
1945
import static org.hamcrest.Matchers.hasToString;
2046
import static org.hamcrest.Matchers.instanceOf;
47+
import static org.mockito.ArgumentMatchers.any;
48+
import static org.mockito.ArgumentMatchers.eq;
2149
import static org.mockito.Mockito.mock;
50+
import static org.mockito.Mockito.times;
51+
import static org.mockito.Mockito.verify;
52+
import static org.mockito.Mockito.verifyNoInteractions;
53+
import static org.mockito.Mockito.when;
2254

2355
public class CcrLicenseCheckerTests extends ESTestCase {
2456

@@ -46,4 +78,164 @@ User getUser(final ThreadContext threadContext) {
4678
assertTrue(invoked.get());
4779
}
4880

81+
/**
82+
* Tests all validation logic after obtaining the remote cluster state and before executing the check for follower privileges.
83+
*/
84+
public void testRemoteIndexValidation() {
85+
// A cluster state with
86+
// - a data stream, containing a backing index and a failure index
87+
// - an alias that points to said data stream
88+
// - a standalone index
89+
// - an alias that points to said standalone index
90+
// - a closed index
91+
String indexName = "random-index";
92+
String closedIndexName = "closed-index";
93+
String dataStreamName = "logs-test-data";
94+
String aliasName = "foo-alias";
95+
String dsAliasName = "ds-alias";
96+
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
97+
.putAlias(AliasMetadata.builder(aliasName))
98+
.settings(settings(IndexVersion.current()))
99+
.numberOfShards(5)
100+
.numberOfReplicas(1)
101+
.build();
102+
IndexMetadata closedIndexMetadata = IndexMetadata.builder(closedIndexName)
103+
.settings(settings(IndexVersion.current()))
104+
.numberOfShards(5)
105+
.numberOfReplicas(1)
106+
.state(IndexMetadata.State.CLOSE)
107+
.build();
108+
IndexMetadata firstBackingIndex = DataStreamTestHelper.createFirstBackingIndex(dataStreamName).build();
109+
IndexMetadata firstFailureStore = DataStreamTestHelper.createFirstFailureStore(dataStreamName).build();
110+
DataStream dataStream = DataStreamTestHelper.newInstance(
111+
dataStreamName,
112+
List.of(firstBackingIndex.getIndex()),
113+
List.of(firstFailureStore.getIndex())
114+
);
115+
ClusterState remoteClusterState = ClusterState.builder(new ClusterName(randomIdentifier()))
116+
.metadata(
117+
Metadata.builder()
118+
.put(indexMetadata, false)
119+
.put(closedIndexMetadata, false)
120+
.put(firstBackingIndex, false)
121+
.put(firstFailureStore, false)
122+
.dataStreams(
123+
Map.of(dataStreamName, dataStream),
124+
Map.of(dsAliasName, new DataStreamAlias(dsAliasName, List.of(dataStreamName), dataStreamName, Map.of()))
125+
)
126+
)
127+
.build();
128+
129+
final boolean isCcrAllowed = randomBoolean();
130+
final CcrLicenseChecker checker = new CcrLicenseChecker(() -> isCcrAllowed, () -> true) {
131+
@Override
132+
User getUser(ThreadContext threadContext) {
133+
return null;
134+
}
135+
136+
@Override
137+
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
138+
Client client,
139+
String clusterAlias,
140+
RemoteClusterClient remoteClient,
141+
ClusterStateRequest request,
142+
Consumer<Exception> onFailure,
143+
Consumer<ClusterStateResponse> leaderClusterStateConsumer,
144+
Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
145+
Function<Exception, ElasticsearchStatusException> unknownLicense
146+
) {
147+
leaderClusterStateConsumer.accept(new ClusterStateResponse(remoteClusterState.getClusterName(), remoteClusterState, false));
148+
}
149+
150+
@Override
151+
public void hasPrivilegesToFollowIndices(
152+
ThreadContext threadContext,
153+
RemoteClusterClient remoteClient,
154+
String[] indices,
155+
Consumer<Exception> handler
156+
) {
157+
fail("Test case should fail before this code is called");
158+
}
159+
};
160+
161+
String clusterAlias = randomIdentifier();
162+
163+
ExecutorService mockExecutor = mock(ExecutorService.class);
164+
ThreadPool mockThreadPool = mock(ThreadPool.class);
165+
when(mockThreadPool.executor(eq(Ccr.CCR_THREAD_POOL_NAME))).thenReturn(mockExecutor);
166+
RemoteClusterClient mockRemoteClient = mock(RemoteClusterClient.class);
167+
168+
Client mockClient = mock(Client.class);
169+
when(mockClient.threadPool()).thenReturn(mockThreadPool);
170+
when(mockClient.getRemoteClusterClient(eq(clusterAlias), eq(mockExecutor), any())).thenReturn(mockRemoteClient);
171+
172+
// When following an index that does not exist, throw IndexNotFoundException
173+
{
174+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, "non-existent-index");
175+
assertThat(exception, instanceOf(IndexNotFoundException.class));
176+
assertThat(exception.getMessage(), equalTo("no such index [non-existent-index]"));
177+
}
178+
179+
// When following an alias, throw IllegalArgumentException
180+
{
181+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, aliasName);
182+
assertThat(exception, instanceOf(IllegalArgumentException.class));
183+
assertThat(exception.getMessage(), equalTo("cannot follow [" + aliasName + "], because it is a ALIAS"));
184+
}
185+
186+
// When following a data stream, throw IllegalArgumentException
187+
{
188+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dataStreamName);
189+
assertThat(exception, instanceOf(IllegalArgumentException.class));
190+
assertThat(exception.getMessage(), equalTo("cannot follow [" + dataStreamName + "], because it is a DATA_STREAM"));
191+
}
192+
193+
// When following a data stream alias, throw IllegalArgumentException
194+
{
195+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dsAliasName);
196+
assertThat(exception, instanceOf(IllegalArgumentException.class));
197+
assertThat(exception.getMessage(), equalTo("cannot follow [" + dsAliasName + "], because it is a ALIAS"));
198+
}
199+
200+
// When following a closed index, throw IndexClosedException
201+
{
202+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, closedIndexName);
203+
assertThat(exception, instanceOf(IndexClosedException.class));
204+
assertThat(exception.getMessage(), equalTo("closed"));
205+
}
206+
207+
// When following a failure store index, throw IllegalArgumentException
208+
{
209+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, firstFailureStore.getIndex().getName());
210+
assertThat(exception, instanceOf(IllegalArgumentException.class));
211+
assertThat(
212+
exception.getMessage(),
213+
equalTo("cannot follow [" + firstFailureStore.getIndex().getName() + "], because it is a failure store index")
214+
);
215+
}
216+
}
217+
218+
private static Exception executeExpectingException(
219+
CcrLicenseChecker checker,
220+
Client mockClient,
221+
String clusterAlias,
222+
String leaderIndex
223+
) {
224+
@SuppressWarnings("unchecked")
225+
Consumer<Exception> onFailure = mock(Consumer.class);
226+
@SuppressWarnings("unchecked")
227+
BiConsumer<String[], Tuple<IndexMetadata, DataStream>> consumer = mock(BiConsumer.class);
228+
checker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
229+
mockClient,
230+
clusterAlias,
231+
leaderIndex,
232+
onFailure,
233+
consumer
234+
);
235+
ArgumentCaptor<Exception> captor = ArgumentCaptor.forClass(Exception.class);
236+
verify(onFailure, times(1)).accept(captor.capture());
237+
verifyNoInteractions(consumer);
238+
return captor.getValue();
239+
}
240+
49241
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public void testAutoFollower_dataStream() {
170170
Client client = mock(Client.class);
171171
when(client.getRemoteClusterClient(anyString(), any(), any())).thenReturn(new RedirectToLocalClusterRemoteClusterClient(client));
172172

173-
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar");
173+
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar", false, true);
174174

175175
AutoFollowPattern autoFollowPattern = createAutoFollowPattern("remote", "logs-*");
176176
Map<String, AutoFollowPattern> patterns = new HashMap<>();
@@ -2562,23 +2562,53 @@ private ClusterService mockClusterService() {
25622562
}
25632563

25642564
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) {
2565-
return createRemoteClusterStateWithDataStream(dataStreamName, false);
2565+
return createRemoteClusterStateWithDataStream(dataStreamName, false, false);
25662566
}
25672567

2568-
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system) {
2568+
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system, boolean withFailures) {
2569+
long currentTimeMillis = System.currentTimeMillis();
2570+
25692571
Settings.Builder indexSettings = settings(IndexVersion.current());
25702572
indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
25712573
indexSettings.put("index.hidden", true);
25722574

2573-
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
2575+
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1, currentTimeMillis))
25742576
.settings(indexSettings)
25752577
.numberOfShards(1)
25762578
.numberOfReplicas(0)
25772579
.system(system)
25782580
.build();
2579-
DataStream dataStream = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system).build();
2580-
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
2581-
.metadata(Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L));
2581+
2582+
IndexMetadata failureIndexMetadata = null;
2583+
DataStream.DataStreamIndices failureStore = null;
2584+
if (withFailures) {
2585+
Settings.Builder failureIndexSettings = settings(IndexVersion.current());
2586+
failureIndexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
2587+
failureIndexSettings.put("index.hidden", true);
2588+
2589+
String defaultFailureStoreName = DataStream.getDefaultFailureStoreName(dataStreamName, 1, currentTimeMillis);
2590+
failureIndexMetadata = IndexMetadata.builder(defaultFailureStoreName)
2591+
.settings(failureIndexSettings)
2592+
.numberOfShards(1)
2593+
.numberOfReplicas(0)
2594+
.system(system)
2595+
.build();
2596+
2597+
failureStore = DataStream.DataStreamIndices.failureIndicesBuilder(List.of(failureIndexMetadata.getIndex())).build();
2598+
}
2599+
2600+
var dataStreamBuilder = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system);
2601+
if (withFailures) {
2602+
dataStreamBuilder.setFailureIndices(failureStore);
2603+
}
2604+
DataStream dataStream = dataStreamBuilder.build();
2605+
2606+
var mdBuilder = Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L);
2607+
if (withFailures) {
2608+
mdBuilder.put(failureIndexMetadata, true);
2609+
}
2610+
2611+
var routingTableBuilder = RoutingTable.builder();
25822612

25832613
ShardRouting shardRouting = TestShardRouting.newShardRouting(
25842614
new ShardId(indexMetadata.getIndex(), 0),
@@ -2587,7 +2617,22 @@ private static ClusterState createRemoteClusterStateWithDataStream(String dataSt
25872617
ShardRoutingState.INITIALIZING
25882618
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
25892619
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build();
2590-
return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
2620+
routingTableBuilder.add(indexRoutingTable);
2621+
2622+
if (withFailures) {
2623+
ShardRouting failureShardRouting = TestShardRouting.newShardRouting(
2624+
new ShardId(failureIndexMetadata.getIndex(), 0),
2625+
"1",
2626+
true,
2627+
ShardRoutingState.INITIALIZING
2628+
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
2629+
IndexRoutingTable failureIndexRoutingTable = IndexRoutingTable.builder(failureIndexMetadata.getIndex())
2630+
.addShard(failureShardRouting)
2631+
.build();
2632+
routingTableBuilder.add(failureIndexRoutingTable);
2633+
}
2634+
2635+
return ClusterState.builder(new ClusterName("remote")).metadata(mdBuilder).routingTable(routingTableBuilder.build()).build();
25912636
}
25922637

25932638
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ public static boolean match(
302302
final DataStream parentDataStream = indexAbstraction.getParentDataStream();
303303
return parentDataStream != null
304304
&& parentDataStream.isSystem() == false
305+
&& parentDataStream.isFailureStoreIndex(indexAbstraction.getName()) == false
305306
&& Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getParentDataStream().getName()) == false
306307
&& Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getParentDataStream().getName());
307308
}

0 commit comments

Comments
 (0)