Skip to content

Commit d996355

Browse files
authored
Merge branch 'main' into octal_bug
2 parents f24ec9e + 6c6500e commit d996355

File tree

23 files changed

+1160
-323
lines changed

23 files changed

+1160
-323
lines changed

docs/changelog/126493.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 126493
2+
summary: Bedrock Cohere Task Settings Support
3+
area: Machine Learning
4+
type: enhancement
5+
issues:
6+
- 126156

docs/reference/elasticsearch/configuration-reference/enrich-settings.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ applies_to:
77

88
# Enrich settings [enrich_settings]
99

10-
You can configure these enrich settings in the `elasticsearch.yml` file. For more information, see [Set up an enrich processor](docs-content:///manage-data/ingest/transform-enrich/set-up-an-enrich-processor.md).
10+
You can configure these enrich settings in the `elasticsearch.yml` file. For more information, see [Set up an enrich processor](docs-content://manage-data/ingest/transform-enrich/set-up-an-enrich-processor.md).
1111

1212
`enrich.cache_size` ![logo cloud](https://doc-icons.s3.us-east-2.amazonaws.com/logo_cloud.svg "Supported on Elastic Cloud Hosted")
1313
: Maximum number of searches to cache for enriching documents. Defaults to 1000. There is a single cache for all enrich processors in the cluster. This setting determines the size of that cache.

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ static TransportVersion def(int id) {
157157
public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE_8_19 = def(8_841_0_14);
158158
public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED_8_19 = def(8_841_0_15);
159159
public static final TransportVersion REMOTE_EXCEPTION_8_19 = def(8_841_0_16);
160+
public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS_8_19 = def(8_841_0_17);
160161
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0 = def(9_000_0_00);
161162
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES_90 = def(9_000_0_01);
162163
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED_90 = def(9_000_0_02);
@@ -215,6 +216,7 @@ static TransportVersion def(int id) {
215216
public static final TransportVersion ADD_PROJECT_ID_TO_DSL_ERROR_INFO = def(9_046_0_00);
216217
public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG = def(9_047_00_0);
217218
public static final TransportVersion REPO_ANALYSIS_COPY_BLOB = def(9_048_00_0);
219+
public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS = def(9_049_00_0);
218220

219221
/*
220222
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
130130
client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME),
131131
RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
132132
);
133-
checkRemoteClusterLicenseAndFetchClusterState(
133+
doCheckRemoteClusterLicenseAndFetchClusterState(
134134
client,
135135
clusterAlias,
136136
remoteClient,
@@ -168,6 +168,12 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
168168
final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null
169169
? indexAbstraction.getParentDataStream()
170170
: null;
171+
// Ensure that this leader index is not a failure store index, because they are not yet supported in CCR
172+
if (remoteDataStream != null && remoteDataStream.isFailureStoreIndex(leaderIndex)) {
173+
String message = String.format(Locale.ROOT, "cannot follow [%s], because it is a failure store index", leaderIndex);
174+
onFailure.accept(new IllegalArgumentException(message));
175+
return;
176+
}
171177
hasPrivilegesToFollowIndices(client.threadPool().getThreadContext(), remoteClient, new String[] { leaderIndex }, e -> {
172178
if (e == null) {
173179
fetchLeaderHistoryUUIDs(
@@ -231,6 +237,29 @@ public static void checkRemoteClusterLicenseAndFetchClusterState(
231237
}
232238
}
233239

240+
// overridable for testing
241+
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
242+
final Client client,
243+
final String clusterAlias,
244+
final RemoteClusterClient remoteClient,
245+
final ClusterStateRequest request,
246+
final Consumer<Exception> onFailure,
247+
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
248+
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
249+
final Function<Exception, ElasticsearchStatusException> unknownLicense
250+
) {
251+
checkRemoteClusterLicenseAndFetchClusterState(
252+
client,
253+
clusterAlias,
254+
remoteClient,
255+
request,
256+
onFailure,
257+
leaderClusterStateConsumer,
258+
nonCompliantLicense,
259+
unknownLicense
260+
);
261+
}
262+
234263
/**
235264
* Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
236265
* the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseCheckerTests.java

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,50 @@
77

88
package org.elasticsearch.xpack.ccr;
99

10+
import org.elasticsearch.ElasticsearchStatusException;
11+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
12+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.elasticsearch.client.internal.Client;
1014
import org.elasticsearch.client.internal.RemoteClusterClient;
15+
import org.elasticsearch.cluster.ClusterName;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.cluster.metadata.AliasMetadata;
18+
import org.elasticsearch.cluster.metadata.DataStream;
19+
import org.elasticsearch.cluster.metadata.DataStreamAlias;
20+
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
21+
import org.elasticsearch.cluster.metadata.IndexMetadata;
22+
import org.elasticsearch.cluster.metadata.Metadata;
1123
import org.elasticsearch.common.settings.Settings;
1224
import org.elasticsearch.common.util.concurrent.ThreadContext;
25+
import org.elasticsearch.core.Tuple;
26+
import org.elasticsearch.index.IndexNotFoundException;
27+
import org.elasticsearch.index.IndexVersion;
28+
import org.elasticsearch.indices.IndexClosedException;
29+
import org.elasticsearch.license.RemoteClusterLicenseChecker;
1330
import org.elasticsearch.test.ESTestCase;
31+
import org.elasticsearch.threadpool.ThreadPool;
1432
import org.elasticsearch.xpack.core.security.user.User;
33+
import org.mockito.ArgumentCaptor;
1534

35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.ExecutorService;
1638
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.function.BiConsumer;
40+
import java.util.function.Consumer;
41+
import java.util.function.Function;
1742

1843
import static org.hamcrest.Matchers.containsString;
44+
import static org.hamcrest.Matchers.equalTo;
1945
import static org.hamcrest.Matchers.hasToString;
2046
import static org.hamcrest.Matchers.instanceOf;
47+
import static org.mockito.ArgumentMatchers.any;
48+
import static org.mockito.ArgumentMatchers.eq;
2149
import static org.mockito.Mockito.mock;
50+
import static org.mockito.Mockito.times;
51+
import static org.mockito.Mockito.verify;
52+
import static org.mockito.Mockito.verifyNoInteractions;
53+
import static org.mockito.Mockito.when;
2254

2355
public class CcrLicenseCheckerTests extends ESTestCase {
2456

@@ -46,4 +78,164 @@ User getUser(final ThreadContext threadContext) {
4678
assertTrue(invoked.get());
4779
}
4880

81+
/**
82+
* Tests all validation logic after obtaining the remote cluster state and before executing the check for follower privileges.
83+
*/
84+
public void testRemoteIndexValidation() {
85+
// A cluster state with
86+
// - a data stream, containing a backing index and a failure index
87+
// - an alias that points to said data stream
88+
// - a standalone index
89+
// - an alias that points to said standalone index
90+
// - a closed index
91+
String indexName = "random-index";
92+
String closedIndexName = "closed-index";
93+
String dataStreamName = "logs-test-data";
94+
String aliasName = "foo-alias";
95+
String dsAliasName = "ds-alias";
96+
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
97+
.putAlias(AliasMetadata.builder(aliasName))
98+
.settings(settings(IndexVersion.current()))
99+
.numberOfShards(5)
100+
.numberOfReplicas(1)
101+
.build();
102+
IndexMetadata closedIndexMetadata = IndexMetadata.builder(closedIndexName)
103+
.settings(settings(IndexVersion.current()))
104+
.numberOfShards(5)
105+
.numberOfReplicas(1)
106+
.state(IndexMetadata.State.CLOSE)
107+
.build();
108+
IndexMetadata firstBackingIndex = DataStreamTestHelper.createFirstBackingIndex(dataStreamName).build();
109+
IndexMetadata firstFailureStore = DataStreamTestHelper.createFirstFailureStore(dataStreamName).build();
110+
DataStream dataStream = DataStreamTestHelper.newInstance(
111+
dataStreamName,
112+
List.of(firstBackingIndex.getIndex()),
113+
List.of(firstFailureStore.getIndex())
114+
);
115+
ClusterState remoteClusterState = ClusterState.builder(new ClusterName(randomIdentifier()))
116+
.metadata(
117+
Metadata.builder()
118+
.put(indexMetadata, false)
119+
.put(closedIndexMetadata, false)
120+
.put(firstBackingIndex, false)
121+
.put(firstFailureStore, false)
122+
.dataStreams(
123+
Map.of(dataStreamName, dataStream),
124+
Map.of(dsAliasName, new DataStreamAlias(dsAliasName, List.of(dataStreamName), dataStreamName, Map.of()))
125+
)
126+
)
127+
.build();
128+
129+
final boolean isCcrAllowed = randomBoolean();
130+
final CcrLicenseChecker checker = new CcrLicenseChecker(() -> isCcrAllowed, () -> true) {
131+
@Override
132+
User getUser(ThreadContext threadContext) {
133+
return null;
134+
}
135+
136+
@Override
137+
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
138+
Client client,
139+
String clusterAlias,
140+
RemoteClusterClient remoteClient,
141+
ClusterStateRequest request,
142+
Consumer<Exception> onFailure,
143+
Consumer<ClusterStateResponse> leaderClusterStateConsumer,
144+
Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
145+
Function<Exception, ElasticsearchStatusException> unknownLicense
146+
) {
147+
leaderClusterStateConsumer.accept(new ClusterStateResponse(remoteClusterState.getClusterName(), remoteClusterState, false));
148+
}
149+
150+
@Override
151+
public void hasPrivilegesToFollowIndices(
152+
ThreadContext threadContext,
153+
RemoteClusterClient remoteClient,
154+
String[] indices,
155+
Consumer<Exception> handler
156+
) {
157+
fail("Test case should fail before this code is called");
158+
}
159+
};
160+
161+
String clusterAlias = randomIdentifier();
162+
163+
ExecutorService mockExecutor = mock(ExecutorService.class);
164+
ThreadPool mockThreadPool = mock(ThreadPool.class);
165+
when(mockThreadPool.executor(eq(Ccr.CCR_THREAD_POOL_NAME))).thenReturn(mockExecutor);
166+
RemoteClusterClient mockRemoteClient = mock(RemoteClusterClient.class);
167+
168+
Client mockClient = mock(Client.class);
169+
when(mockClient.threadPool()).thenReturn(mockThreadPool);
170+
when(mockClient.getRemoteClusterClient(eq(clusterAlias), eq(mockExecutor), any())).thenReturn(mockRemoteClient);
171+
172+
// When following an index that does not exist, throw IndexNotFoundException
173+
{
174+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, "non-existent-index");
175+
assertThat(exception, instanceOf(IndexNotFoundException.class));
176+
assertThat(exception.getMessage(), equalTo("no such index [non-existent-index]"));
177+
}
178+
179+
// When following an alias, throw IllegalArgumentException
180+
{
181+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, aliasName);
182+
assertThat(exception, instanceOf(IllegalArgumentException.class));
183+
assertThat(exception.getMessage(), equalTo("cannot follow [" + aliasName + "], because it is a ALIAS"));
184+
}
185+
186+
// When following a data stream, throw IllegalArgumentException
187+
{
188+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dataStreamName);
189+
assertThat(exception, instanceOf(IllegalArgumentException.class));
190+
assertThat(exception.getMessage(), equalTo("cannot follow [" + dataStreamName + "], because it is a DATA_STREAM"));
191+
}
192+
193+
// When following a data stream alias, throw IllegalArgumentException
194+
{
195+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dsAliasName);
196+
assertThat(exception, instanceOf(IllegalArgumentException.class));
197+
assertThat(exception.getMessage(), equalTo("cannot follow [" + dsAliasName + "], because it is a ALIAS"));
198+
}
199+
200+
// When following a closed index, throw IndexClosedException
201+
{
202+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, closedIndexName);
203+
assertThat(exception, instanceOf(IndexClosedException.class));
204+
assertThat(exception.getMessage(), equalTo("closed"));
205+
}
206+
207+
// When following a failure store index, throw IllegalArgumentException
208+
{
209+
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, firstFailureStore.getIndex().getName());
210+
assertThat(exception, instanceOf(IllegalArgumentException.class));
211+
assertThat(
212+
exception.getMessage(),
213+
equalTo("cannot follow [" + firstFailureStore.getIndex().getName() + "], because it is a failure store index")
214+
);
215+
}
216+
}
217+
218+
private static Exception executeExpectingException(
219+
CcrLicenseChecker checker,
220+
Client mockClient,
221+
String clusterAlias,
222+
String leaderIndex
223+
) {
224+
@SuppressWarnings("unchecked")
225+
Consumer<Exception> onFailure = mock(Consumer.class);
226+
@SuppressWarnings("unchecked")
227+
BiConsumer<String[], Tuple<IndexMetadata, DataStream>> consumer = mock(BiConsumer.class);
228+
checker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
229+
mockClient,
230+
clusterAlias,
231+
leaderIndex,
232+
onFailure,
233+
consumer
234+
);
235+
ArgumentCaptor<Exception> captor = ArgumentCaptor.forClass(Exception.class);
236+
verify(onFailure, times(1)).accept(captor.capture());
237+
verifyNoInteractions(consumer);
238+
return captor.getValue();
239+
}
240+
49241
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public void testAutoFollower_dataStream() {
170170
Client client = mock(Client.class);
171171
when(client.getRemoteClusterClient(anyString(), any(), any())).thenReturn(new RedirectToLocalClusterRemoteClusterClient(client));
172172

173-
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar");
173+
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar", false, true);
174174

175175
AutoFollowPattern autoFollowPattern = createAutoFollowPattern("remote", "logs-*");
176176
Map<String, AutoFollowPattern> patterns = new HashMap<>();
@@ -2562,23 +2562,53 @@ private ClusterService mockClusterService() {
25622562
}
25632563

25642564
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) {
2565-
return createRemoteClusterStateWithDataStream(dataStreamName, false);
2565+
return createRemoteClusterStateWithDataStream(dataStreamName, false, false);
25662566
}
25672567

2568-
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system) {
2568+
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system, boolean withFailures) {
2569+
long currentTimeMillis = System.currentTimeMillis();
2570+
25692571
Settings.Builder indexSettings = settings(IndexVersion.current());
25702572
indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
25712573
indexSettings.put("index.hidden", true);
25722574

2573-
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
2575+
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1, currentTimeMillis))
25742576
.settings(indexSettings)
25752577
.numberOfShards(1)
25762578
.numberOfReplicas(0)
25772579
.system(system)
25782580
.build();
2579-
DataStream dataStream = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system).build();
2580-
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
2581-
.metadata(Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L));
2581+
2582+
IndexMetadata failureIndexMetadata = null;
2583+
DataStream.DataStreamIndices failureStore = null;
2584+
if (withFailures) {
2585+
Settings.Builder failureIndexSettings = settings(IndexVersion.current());
2586+
failureIndexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
2587+
failureIndexSettings.put("index.hidden", true);
2588+
2589+
String defaultFailureStoreName = DataStream.getDefaultFailureStoreName(dataStreamName, 1, currentTimeMillis);
2590+
failureIndexMetadata = IndexMetadata.builder(defaultFailureStoreName)
2591+
.settings(failureIndexSettings)
2592+
.numberOfShards(1)
2593+
.numberOfReplicas(0)
2594+
.system(system)
2595+
.build();
2596+
2597+
failureStore = DataStream.DataStreamIndices.failureIndicesBuilder(List.of(failureIndexMetadata.getIndex())).build();
2598+
}
2599+
2600+
var dataStreamBuilder = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system);
2601+
if (withFailures) {
2602+
dataStreamBuilder.setFailureIndices(failureStore);
2603+
}
2604+
DataStream dataStream = dataStreamBuilder.build();
2605+
2606+
var mdBuilder = Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L);
2607+
if (withFailures) {
2608+
mdBuilder.put(failureIndexMetadata, true);
2609+
}
2610+
2611+
var routingTableBuilder = RoutingTable.builder();
25822612

25832613
ShardRouting shardRouting = TestShardRouting.newShardRouting(
25842614
new ShardId(indexMetadata.getIndex(), 0),
@@ -2587,7 +2617,22 @@ private static ClusterState createRemoteClusterStateWithDataStream(String dataSt
25872617
ShardRoutingState.INITIALIZING
25882618
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
25892619
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build();
2590-
return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
2620+
routingTableBuilder.add(indexRoutingTable);
2621+
2622+
if (withFailures) {
2623+
ShardRouting failureShardRouting = TestShardRouting.newShardRouting(
2624+
new ShardId(failureIndexMetadata.getIndex(), 0),
2625+
"1",
2626+
true,
2627+
ShardRoutingState.INITIALIZING
2628+
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
2629+
IndexRoutingTable failureIndexRoutingTable = IndexRoutingTable.builder(failureIndexMetadata.getIndex())
2630+
.addShard(failureShardRouting)
2631+
.build();
2632+
routingTableBuilder.add(failureIndexRoutingTable);
2633+
}
2634+
2635+
return ClusterState.builder(new ClusterName("remote")).metadata(mdBuilder).routingTable(routingTableBuilder.build()).build();
25912636
}
25922637

25932638
}

0 commit comments

Comments
 (0)