Skip to content

Commit bf3c076

Browse files
committed
Merge branch 'main' into esql_auto_partition
2 parents c816934 + 6d86b20 commit bf3c076

File tree

7 files changed

+692
-221
lines changed

7 files changed

+692
-221
lines changed

docs/reference/elasticsearch/configuration-reference/enrich-settings.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ applies_to:
77

88
# Enrich settings [enrich_settings]
99

10-
You can configure these enrich settings in the `elasticsearch.yml` file. For more information, see [Set up an enrich processor](docs-content:///manage-data/ingest/transform-enrich/set-up-an-enrich-processor.md).
10+
You can configure these enrich settings in the `elasticsearch.yml` file. For more information, see [Set up an enrich processor](docs-content://manage-data/ingest/transform-enrich/set-up-an-enrich-processor.md).
1111

1212
`enrich.cache_size` ![logo cloud](https://doc-icons.s3.us-east-2.amazonaws.com/logo_cloud.svg "Supported on Elastic Cloud Hosted")
1313
: Maximum number of searches to cache for enriching documents. Defaults to 1000. There is a single cache for all enrich processors in the cluster. This setting determines the size of that cache.

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
130130
client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME),
131131
RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
132132
);
133-
checkRemoteClusterLicenseAndFetchClusterState(
133+
doCheckRemoteClusterLicenseAndFetchClusterState(
134134
client,
135135
clusterAlias,
136136
remoteClient,
@@ -168,6 +168,12 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
168168
final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null
169169
? indexAbstraction.getParentDataStream()
170170
: null;
171+
// Ensure that this leader index is not a failure store index, because they are not yet supported in CCR
172+
if (remoteDataStream != null && remoteDataStream.isFailureStoreIndex(leaderIndex)) {
173+
String message = String.format(Locale.ROOT, "cannot follow [%s], because it is a failure store index", leaderIndex);
174+
onFailure.accept(new IllegalArgumentException(message));
175+
return;
176+
}
171177
hasPrivilegesToFollowIndices(client.threadPool().getThreadContext(), remoteClient, new String[] { leaderIndex }, e -> {
172178
if (e == null) {
173179
fetchLeaderHistoryUUIDs(
@@ -231,6 +237,29 @@ public static void checkRemoteClusterLicenseAndFetchClusterState(
231237
}
232238
}
233239

240+
// overridable for testing
241+
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
242+
final Client client,
243+
final String clusterAlias,
244+
final RemoteClusterClient remoteClient,
245+
final ClusterStateRequest request,
246+
final Consumer<Exception> onFailure,
247+
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
248+
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
249+
final Function<Exception, ElasticsearchStatusException> unknownLicense
250+
) {
251+
checkRemoteClusterLicenseAndFetchClusterState(
252+
client,
253+
clusterAlias,
254+
remoteClient,
255+
request,
256+
onFailure,
257+
leaderClusterStateConsumer,
258+
nonCompliantLicense,
259+
unknownLicense
260+
);
261+
}
262+
234263
/**
235264
* Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
236265
* the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseCheckerTests.java

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,50 @@
77

88
package org.elasticsearch.xpack.ccr;
99

10+
import org.elasticsearch.ElasticsearchStatusException;
11+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
12+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.elasticsearch.client.internal.Client;
1014
import org.elasticsearch.client.internal.RemoteClusterClient;
15+
import org.elasticsearch.cluster.ClusterName;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.cluster.metadata.AliasMetadata;
18+
import org.elasticsearch.cluster.metadata.DataStream;
19+
import org.elasticsearch.cluster.metadata.DataStreamAlias;
20+
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
21+
import org.elasticsearch.cluster.metadata.IndexMetadata;
22+
import org.elasticsearch.cluster.metadata.Metadata;
1123
import org.elasticsearch.common.settings.Settings;
1224
import org.elasticsearch.common.util.concurrent.ThreadContext;
25+
import org.elasticsearch.core.Tuple;
26+
import org.elasticsearch.index.IndexNotFoundException;
27+
import org.elasticsearch.index.IndexVersion;
28+
import org.elasticsearch.indices.IndexClosedException;
29+
import org.elasticsearch.license.RemoteClusterLicenseChecker;
1330
import org.elasticsearch.test.ESTestCase;
31+
import org.elasticsearch.threadpool.ThreadPool;
1432
import org.elasticsearch.xpack.core.security.user.User;
33+
import org.mockito.ArgumentCaptor;
1534

35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.ExecutorService;
1638
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.function.BiConsumer;
40+
import java.util.function.Consumer;
41+
import java.util.function.Function;
1742

1843
import static org.hamcrest.Matchers.containsString;
44+
import static org.hamcrest.Matchers.equalTo;
1945
import static org.hamcrest.Matchers.hasToString;
2046
import static org.hamcrest.Matchers.instanceOf;
47+
import static org.mockito.ArgumentMatchers.any;
48+
import static org.mockito.ArgumentMatchers.eq;
2149
import static org.mockito.Mockito.mock;
50+
import static org.mockito.Mockito.times;
51+
import static org.mockito.Mockito.verify;
52+
import static org.mockito.Mockito.verifyNoInteractions;
53+
import static org.mockito.Mockito.when;
2254

2355
public class CcrLicenseCheckerTests extends ESTestCase {
2456

@@ -46,4 +78,164 @@ User getUser(final ThreadContext threadContext) {
4678
assertTrue(invoked.get());
4779
}
4880

81+
/**
82+
* Tests all validation logic after obtaining the remote cluster state and before executing the check for follower privileges.
83+
*/
84+
public void testRemoteIndexValidation() {
85+
// A cluster state with
86+
// - a data stream, containing a backing index and a failure index
87+
// - an alias that points to said data stream
88+
// - a standalone index
89+
// - an alias that points to said standalone index
90+
// - a closed index
91+
String indexName = "random-index";
92+
String closedIndexName = "closed-index";
93+
String dataStreamName = "logs-test-data";
94+
String aliasName = "foo-alias";
95+
String dsAliasName = "ds-alias";
96+
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
97+
.putAlias(AliasMetadata.builder(aliasName))
98+
.settings(settings(IndexVersion.current()))
99+
.numberOfShards(5)
100+
.numberOfReplicas(1)
101+
.build();
102+
IndexMetadata closedIndexMetadata = IndexMetadata.builder(closedIndexName)
103+
.settings(settings(IndexVersion.current()))
104+
.numberOfShards(5)
105+
.numberOfReplicas(1)
106+
.state(IndexMetadata.State.CLOSE)
107+
.build();
108+
IndexMetadata firstBackingIndex = DataStreamTestHelper.createFirstBackingIndex(dataStreamName).build();
109+
IndexMetadata firstFailureStore = DataStreamTestHelper.createFirstFailureStore(dataStreamName).build();
110+
DataStream dataStream = DataStreamTestHelper.newInstance(
111+
dataStreamName,
112+
List.of(firstBackingIndex.getIndex()),
113+
List.of(firstFailureStore.getIndex())
114+
);
115+
ClusterState remoteClusterState = ClusterState.builder(new ClusterName(randomIdentifier()))
116+
.metadata(
117+
Metadata.builder()
118+
.put(indexMetadata, false)
119+
.put(closedIndexMetadata, false)
120+
.put(firstBackingIndex, false)
121+
.put(firstFailureStore, false)
122+
.dataStreams(
123+
Map.of(dataStreamName, dataStream),
124+
Map.of(dsAliasName, new DataStreamAlias(dsAliasName, List.of(dataStreamName), dataStreamName, Map.of()))
125+
)
126+
)
127+
.build();
128+
129+
final boolean isCcrAllowed = randomBoolean();
130+
final CcrLicenseChecker checker = new CcrLicenseChecker(() -> isCcrAllowed, () -> true) {
131+
@Override
132+
User getUser(ThreadContext threadContext) {
133+
return null;
134+
}
135+
136+
@Override
137+
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
138+
Client client,
139+
String clusterAlias,
140+
RemoteClusterClient remoteClient,
141+
ClusterStateRequest request,
142+
Consumer<Exception> onFailure,
143+
Consumer<ClusterStateResponse> leaderClusterStateConsumer,
144+
Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
145+
Function<Exception, ElasticsearchStatusException> unknownLicense
146+
) {
147+
leaderClusterStateConsumer.accept(new ClusterStateResponse(remoteClusterState.getClusterName(), remoteClusterState, false));
148+
}
149+
150+
@Override
151+
public void hasPrivilegesToFollowIndices(
152+
ThreadContext threadContext,
153+
RemoteClusterClient remoteClient,
154+
String[] indices,
155+
Consumer<Exception> handler
156+
) {
157+
fail("Test case should fail before this code is called");
158+
}
159+
};
160+
161+
String clusterAlias = randomIdentifier();
162+
163+
ExecutorService mockExecutor = mock(ExecutorService.class);
164+
ThreadPool mockThreadPool = mock(ThreadPool.class);
165+
when(mockThreadPool.executor(eq(Ccr.CCR_THREAD_POOL_NAME))).thenReturn(mockExecutor);
166+
RemoteClusterClient mockRemoteClient = mock(RemoteClusterClient.class);
167+
168+
Client mockClient = mock(Client.class);
169+
when(mockClient.threadPool()).thenReturn(mockThreadPool);
170+
when(mockClient.getRemoteClusterClient(eq(clusterAlias), eq(mockExecutor), any())).thenReturn(mockRemoteClient);
171+
172+
// When following an index that does not exist, throw IndexNotFoundException
173+
{
174+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, "non-existent-index");
175+
assertThat(exception, instanceOf(IndexNotFoundException.class));
176+
assertThat(exception.getMessage(), equalTo("no such index [non-existent-index]"));
177+
}
178+
179+
// When following an alias, throw IllegalArgumentException
180+
{
181+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, aliasName);
182+
assertThat(exception, instanceOf(IllegalArgumentException.class));
183+
assertThat(exception.getMessage(), equalTo("cannot follow [" + aliasName + "], because it is a ALIAS"));
184+
}
185+
186+
// When following a data stream, throw IllegalArgumentException
187+
{
188+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dataStreamName);
189+
assertThat(exception, instanceOf(IllegalArgumentException.class));
190+
assertThat(exception.getMessage(), equalTo("cannot follow [" + dataStreamName + "], because it is a DATA_STREAM"));
191+
}
192+
193+
// When following a data stream alias, throw IllegalArgumentException
194+
{
195+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dsAliasName);
196+
assertThat(exception, instanceOf(IllegalArgumentException.class));
197+
assertThat(exception.getMessage(), equalTo("cannot follow [" + dsAliasName + "], because it is a ALIAS"));
198+
}
199+
200+
// When following a closed index, throw IndexClosedException
201+
{
202+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, closedIndexName);
203+
assertThat(exception, instanceOf(IndexClosedException.class));
204+
assertThat(exception.getMessage(), equalTo("closed"));
205+
}
206+
207+
// When following a failure store index, throw IllegalArgumentException
208+
{
209+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, firstFailureStore.getIndex().getName());
210+
assertThat(exception, instanceOf(IllegalArgumentException.class));
211+
assertThat(
212+
exception.getMessage(),
213+
equalTo("cannot follow [" + firstFailureStore.getIndex().getName() + "], because it is a failure store index")
214+
);
215+
}
216+
}
217+
218+
private static Exception executeExpectingException(
219+
CcrLicenseChecker checker,
220+
Client mockClient,
221+
String clusterAlias,
222+
String leaderIndex
223+
) {
224+
@SuppressWarnings("unchecked")
225+
Consumer<Exception> onFailure = mock(Consumer.class);
226+
@SuppressWarnings("unchecked")
227+
BiConsumer<String[], Tuple<IndexMetadata, DataStream>> consumer = mock(BiConsumer.class);
228+
checker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
229+
mockClient,
230+
clusterAlias,
231+
leaderIndex,
232+
onFailure,
233+
consumer
234+
);
235+
ArgumentCaptor<Exception> captor = ArgumentCaptor.forClass(Exception.class);
236+
verify(onFailure, times(1)).accept(captor.capture());
237+
verifyNoInteractions(consumer);
238+
return captor.getValue();
239+
}
240+
49241
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public void testAutoFollower_dataStream() {
170170
Client client = mock(Client.class);
171171
when(client.getRemoteClusterClient(anyString(), any(), any())).thenReturn(new RedirectToLocalClusterRemoteClusterClient(client));
172172

173-
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar");
173+
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar", false, true);
174174

175175
AutoFollowPattern autoFollowPattern = createAutoFollowPattern("remote", "logs-*");
176176
Map<String, AutoFollowPattern> patterns = new HashMap<>();
@@ -2562,23 +2562,53 @@ private ClusterService mockClusterService() {
25622562
}
25632563

25642564
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) {
2565-
return createRemoteClusterStateWithDataStream(dataStreamName, false);
2565+
return createRemoteClusterStateWithDataStream(dataStreamName, false, false);
25662566
}
25672567

2568-
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system) {
2568+
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system, boolean withFailures) {
2569+
long currentTimeMillis = System.currentTimeMillis();
2570+
25692571
Settings.Builder indexSettings = settings(IndexVersion.current());
25702572
indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
25712573
indexSettings.put("index.hidden", true);
25722574

2573-
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
2575+
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1, currentTimeMillis))
25742576
.settings(indexSettings)
25752577
.numberOfShards(1)
25762578
.numberOfReplicas(0)
25772579
.system(system)
25782580
.build();
2579-
DataStream dataStream = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system).build();
2580-
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
2581-
.metadata(Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L));
2581+
2582+
IndexMetadata failureIndexMetadata = null;
2583+
DataStream.DataStreamIndices failureStore = null;
2584+
if (withFailures) {
2585+
Settings.Builder failureIndexSettings = settings(IndexVersion.current());
2586+
failureIndexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
2587+
failureIndexSettings.put("index.hidden", true);
2588+
2589+
String defaultFailureStoreName = DataStream.getDefaultFailureStoreName(dataStreamName, 1, currentTimeMillis);
2590+
failureIndexMetadata = IndexMetadata.builder(defaultFailureStoreName)
2591+
.settings(failureIndexSettings)
2592+
.numberOfShards(1)
2593+
.numberOfReplicas(0)
2594+
.system(system)
2595+
.build();
2596+
2597+
failureStore = DataStream.DataStreamIndices.failureIndicesBuilder(List.of(failureIndexMetadata.getIndex())).build();
2598+
}
2599+
2600+
var dataStreamBuilder = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system);
2601+
if (withFailures) {
2602+
dataStreamBuilder.setFailureIndices(failureStore);
2603+
}
2604+
DataStream dataStream = dataStreamBuilder.build();
2605+
2606+
var mdBuilder = Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L);
2607+
if (withFailures) {
2608+
mdBuilder.put(failureIndexMetadata, true);
2609+
}
2610+
2611+
var routingTableBuilder = RoutingTable.builder();
25822612

25832613
ShardRouting shardRouting = TestShardRouting.newShardRouting(
25842614
new ShardId(indexMetadata.getIndex(), 0),
@@ -2587,7 +2617,22 @@ private static ClusterState createRemoteClusterStateWithDataStream(String dataSt
25872617
ShardRoutingState.INITIALIZING
25882618
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
25892619
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build();
2590-
return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
2620+
routingTableBuilder.add(indexRoutingTable);
2621+
2622+
if (withFailures) {
2623+
ShardRouting failureShardRouting = TestShardRouting.newShardRouting(
2624+
new ShardId(failureIndexMetadata.getIndex(), 0),
2625+
"1",
2626+
true,
2627+
ShardRoutingState.INITIALIZING
2628+
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
2629+
IndexRoutingTable failureIndexRoutingTable = IndexRoutingTable.builder(failureIndexMetadata.getIndex())
2630+
.addShard(failureShardRouting)
2631+
.build();
2632+
routingTableBuilder.add(failureIndexRoutingTable);
2633+
}
2634+
2635+
return ClusterState.builder(new ClusterName("remote")).metadata(mdBuilder).routingTable(routingTableBuilder.build()).build();
25912636
}
25922637

25932638
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ public static boolean match(
302302
final DataStream parentDataStream = indexAbstraction.getParentDataStream();
303303
return parentDataStream != null
304304
&& parentDataStream.isSystem() == false
305+
&& parentDataStream.isFailureStoreIndex(indexAbstraction.getName()) == false
305306
&& Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getParentDataStream().getName()) == false
306307
&& Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getParentDataStream().getName());
307308
}

x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityFailureStoreRestIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,14 @@ protected void setupTestDataStreamOnFulfillingCluster() throws IOException {
137137
}
138138
}
139139

140-
protected Response performRequestWithRemoteSearchUser(final Request request) throws IOException {
140+
protected static Response performRequestWithRemoteSearchUser(final Request request) throws IOException {
141141
request.setOptions(
142142
RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", headerFromRandomAuthMethod(REMOTE_SEARCH_USER, PASS))
143143
);
144144
return client().performRequest(request);
145145
}
146146

147-
protected Response performRequestWithUser(final String user, final Request request) throws IOException {
147+
protected static Response performRequestWithUser(final String user, final Request request) throws IOException {
148148
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", headerFromRandomAuthMethod(user, PASS)));
149149
return client().performRequest(request);
150150
}

0 commit comments

Comments
 (0)