diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamSettingsIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamSettingsIT.java new file mode 100644 index 0000000000000..f83807bb61535 --- /dev/null +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamSettingsIT.java @@ -0,0 +1,355 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams; + +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; +import org.elasticsearch.action.admin.indices.rollover.RolloverAction; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.datastreams.CreateDataStreamAction; +import org.elasticsearch.action.datastreams.GetDataStreamSettingsAction; +import org.elasticsearch.action.datastreams.UpdateDataStreamSettingsAction; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +public class DataStreamSettingsIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(DataStreamsPlugin.class, MockTransportService.TestPlugin.class, TestPlugin.class); + } + + public void testPutDataStreamSettings() throws Exception { + String dataStreamName = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + putComposableIndexTemplate("my-template", List.of(dataStreamName), indexSettings(1, 0).build()); + final var createDataStreamRequest = new CreateDataStreamAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStreamName); + assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet()); + final int numberOfShards = randomIntBetween(2, 7); + final String newLifecycleName = randomAlphanumericOfLength(20).toLowerCase(Locale.ROOT); + { + List getSettingsResponses = client().execute( + GetDataStreamSettingsAction.INSTANCE, + new GetDataStreamSettingsAction.Request(TimeValue.THIRTY_SECONDS).indices(dataStreamName) + ).actionGet().getDataStreamSettingsResponses(); + assertThat(getSettingsResponses.size(), equalTo(1)); + assertThat(getSettingsResponses.get(0).settings(), equalTo(Settings.EMPTY)); + Settings dataStreamSettings = Settings.builder() + .put("index.number_of_shards", numberOfShards) + .put("index.lifecycle.name", newLifecycleName) + .build(); + UpdateDataStreamSettingsAction.Response putSettingsResponse = client().execute( + UpdateDataStreamSettingsAction.INSTANCE, + new UpdateDataStreamSettingsAction.Request(dataStreamSettings, TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS).indices( + dataStreamName + ) + ).actionGet(); + List dataStreamSettingsResponses = putSettingsResponse + .getDataStreamSettingsResponses(); + assertThat(dataStreamSettingsResponses.size(), equalTo(1)); + UpdateDataStreamSettingsAction.DataStreamSettingsResponse dataStreamSettingsResponse = dataStreamSettingsResponses.get(0); + assertThat(dataStreamSettingsResponse.dataStreamName(), equalTo(dataStreamName)); + assertThat(dataStreamSettingsResponse.dataStreamSucceeded(), equalTo(true)); + assertThat(dataStreamSettingsResponse.settings().get("index.number_of_shards"), equalTo(Integer.toString(numberOfShards))); + assertThat( + dataStreamSettingsResponse.effectiveSettings().get("index.number_of_shards"), + equalTo(Integer.toString(numberOfShards)) + ); + assertThat(dataStreamSettingsResponse.indicesSettingsResult().indexSettingErrors().size(), equalTo(0)); + assertThat(dataStreamSettingsResponse.indicesSettingsResult().appliedToDataStreamOnly().size(), equalTo(1)); + assertThat( + dataStreamSettingsResponse.indicesSettingsResult().appliedToDataStreamOnly().get(0), + equalTo("index.number_of_shards") + ); + assertThat(dataStreamSettingsResponse.indicesSettingsResult().appliedToDataStreamAndBackingIndices().size(), equalTo(1)); + assertThat( + dataStreamSettingsResponse.indicesSettingsResult().appliedToDataStreamAndBackingIndices().get(0), + equalTo("index.lifecycle.name") + ); + GetIndexResponse response = admin().indices() + .getIndex(new GetIndexRequest(TimeValue.THIRTY_SECONDS).indices(dataStreamName)) + .actionGet(); + Settings settings = response.getSettings().values().iterator().next(); + assertThat(settings.get("index.number_of_shards"), equalTo("1")); + assertThat(settings.get("index.lifecycle.name"), equalTo(newLifecycleName)); + getSettingsResponses = client().execute( + GetDataStreamSettingsAction.INSTANCE, + new GetDataStreamSettingsAction.Request(TimeValue.THIRTY_SECONDS).indices(dataStreamName) + ).actionGet().getDataStreamSettingsResponses(); + assertThat(getSettingsResponses.size(), equalTo(1)); + assertThat(getSettingsResponses.get(0).settings(), equalTo(dataStreamSettings)); + assertThat( + getSettingsResponses.get(0).effectiveSettings(), + equalTo(Settings.builder().put(dataStreamSettings).put("index.number_of_replicas", "0").build()) + ); + } + { + // Try to set an invalid value for a valid setting, and make sure the data stream is not updated + int invalidNumberOfShards = 2000; + UpdateDataStreamSettingsAction.Response putSettingsResponse = client().execute( + UpdateDataStreamSettingsAction.INSTANCE, + new UpdateDataStreamSettingsAction.Request( + Settings.builder().put("index.number_of_shards", invalidNumberOfShards).build(), + TimeValue.THIRTY_SECONDS, + TimeValue.THIRTY_SECONDS + ).indices(dataStreamName) + ).actionGet(); + List dataStreamSettingsResponses = putSettingsResponse + .getDataStreamSettingsResponses(); + assertThat(dataStreamSettingsResponses.size(), equalTo(1)); + UpdateDataStreamSettingsAction.DataStreamSettingsResponse dataStreamSettingsResponse = dataStreamSettingsResponses.get(0); + assertThat(dataStreamSettingsResponse.dataStreamName(), equalTo(dataStreamName)); + assertThat(dataStreamSettingsResponse.dataStreamSucceeded(), equalTo(false)); + } + { + // Try to set an invalid setting, and make sure the data stream is not updated + UpdateDataStreamSettingsAction.Response putSettingsResponse = client().execute( + UpdateDataStreamSettingsAction.INSTANCE, + new UpdateDataStreamSettingsAction.Request( + Settings.builder() + .put("index.number_of_shards", numberOfShards) + .put("unknown.setting", randomAlphaOfLength(20)) + .build(), + TimeValue.THIRTY_SECONDS, + TimeValue.THIRTY_SECONDS + ).indices(dataStreamName) + ).actionGet(); + List dataStreamSettingsResponses = putSettingsResponse + .getDataStreamSettingsResponses(); + assertThat(dataStreamSettingsResponses.size(), equalTo(1)); + UpdateDataStreamSettingsAction.DataStreamSettingsResponse dataStreamSettingsResponse = dataStreamSettingsResponses.get(0); + assertThat(dataStreamSettingsResponse.dataStreamName(), equalTo(dataStreamName)); + assertThat(dataStreamSettingsResponse.dataStreamSucceeded(), equalTo(false)); + } + } + + public void testPutMultipleDataStreamSettings() throws Exception { + List testDataStreamNames = new ArrayList<>(); + List ignoredDataStreamNames = new ArrayList<>(); + for (int i = 0; i < randomIntBetween(2, 5); i++) { + String dataStreamName = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + putComposableIndexTemplate("my-template-" + i, List.of(dataStreamName), indexSettings(1, 0).build()); + final var createDataStreamRequest = new CreateDataStreamAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + dataStreamName + ); + assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet()); + testDataStreamNames.add(dataStreamName); + } + for (int i = 0; i < randomIntBetween(2, 5); i++) { + String dataStreamName = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + putComposableIndexTemplate("my-other-template-" + i, List.of(dataStreamName), indexSettings(1, 0).build()); + final var createDataStreamRequest = new CreateDataStreamAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + dataStreamName + ); + assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet()); + ignoredDataStreamNames.add(dataStreamName); + } + final int numberOfShards = randomIntBetween(2, 7); + final String newLifecycleName = randomAlphanumericOfLength(20).toLowerCase(Locale.ROOT); + { + { + // First, a quick check that we fetch all data streams when no data stream names are given: + UpdateDataStreamSettingsAction.Response putSettingsResponse = client().execute( + UpdateDataStreamSettingsAction.INSTANCE, + new UpdateDataStreamSettingsAction.Request( + Settings.builder() + .put("index.number_of_shards", numberOfShards) + .put("index.lifecycle.name", newLifecycleName) + .build(), + TimeValue.THIRTY_SECONDS, + TimeValue.THIRTY_SECONDS + ) + ).actionGet(); + List dataStreamSettingsResponses = putSettingsResponse + .getDataStreamSettingsResponses(); + assertThat(dataStreamSettingsResponses.size(), equalTo(testDataStreamNames.size() + ignoredDataStreamNames.size())); + } + UpdateDataStreamSettingsAction.Response putSettingsResponse = client().execute( + UpdateDataStreamSettingsAction.INSTANCE, + new UpdateDataStreamSettingsAction.Request( + Settings.builder().put("index.number_of_shards", numberOfShards).put("index.lifecycle.name", newLifecycleName).build(), + TimeValue.THIRTY_SECONDS, + TimeValue.THIRTY_SECONDS + ).indices(testDataStreamNames.toArray(new String[0])) + ).actionGet(); + List dataStreamSettingsResponses = putSettingsResponse + .getDataStreamSettingsResponses(); + assertThat(dataStreamSettingsResponses.size(), equalTo(testDataStreamNames.size())); + for (int i = 0; i < testDataStreamNames.size(); i++) { + UpdateDataStreamSettingsAction.DataStreamSettingsResponse dataStreamSettingsResponse = dataStreamSettingsResponses.get(0); + assertThat(dataStreamSettingsResponse.dataStreamSucceeded(), equalTo(true)); + assertThat(dataStreamSettingsResponse.settings().get("index.number_of_shards"), equalTo(Integer.toString(numberOfShards))); + assertThat( + dataStreamSettingsResponse.effectiveSettings().get("index.number_of_shards"), + equalTo(Integer.toString(numberOfShards)) + ); + assertThat(dataStreamSettingsResponse.indicesSettingsResult().indexSettingErrors().size(), equalTo(0)); + assertThat(dataStreamSettingsResponse.indicesSettingsResult().appliedToDataStreamOnly().size(), equalTo(1)); + assertThat( + dataStreamSettingsResponse.indicesSettingsResult().appliedToDataStreamOnly().get(0), + equalTo("index.number_of_shards") + ); + assertThat(dataStreamSettingsResponse.indicesSettingsResult().appliedToDataStreamAndBackingIndices().size(), equalTo(1)); + assertThat( + dataStreamSettingsResponse.indicesSettingsResult().appliedToDataStreamAndBackingIndices().get(0), + equalTo("index.lifecycle.name") + ); + GetIndexResponse response = admin().indices() + .getIndex(new GetIndexRequest(TimeValue.THIRTY_SECONDS).indices(dataStreamSettingsResponse.dataStreamName())) + .actionGet(); + Settings settings = response.getSettings().values().iterator().next(); + assertThat(settings.get("index.number_of_shards"), equalTo("1")); + assertThat(settings.get("index.lifecycle.name"), equalTo(newLifecycleName)); + } + List getSettingsResponses = client().execute( + GetDataStreamSettingsAction.INSTANCE, + new GetDataStreamSettingsAction.Request(TimeValue.THIRTY_SECONDS).indices(testDataStreamNames.toArray(new String[0])) + ).actionGet().getDataStreamSettingsResponses(); + assertThat(getSettingsResponses.size(), equalTo(testDataStreamNames.size())); + } + } + + public void testRolloverWithDataStreamSettings() throws Exception { + String dataStreamName = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + putComposableIndexTemplate("my-template", List.of(dataStreamName), indexSettings(1, 0).build()); + final var createDataStreamRequest = new CreateDataStreamAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStreamName); + assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet()); + final int numberOfShards = randomIntBetween(2, 7); + final String newLifecycleName = randomAlphanumericOfLength(20).toLowerCase(Locale.ROOT); + client().execute( + UpdateDataStreamSettingsAction.INSTANCE, + new UpdateDataStreamSettingsAction.Request( + Settings.builder().put("index.number_of_shards", numberOfShards).put("index.lifecycle.name", newLifecycleName).build(), + TimeValue.THIRTY_SECONDS, + TimeValue.THIRTY_SECONDS + ).indices(dataStreamName) + ).actionGet(); + + RolloverResponse rolloverResponse = client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)) + .actionGet(); + assertThat(rolloverResponse.isRolledOver(), equalTo(true)); + String newIndexName = rolloverResponse.getNewIndex(); + GetIndexResponse response = admin().indices() + .getIndex(new GetIndexRequest(TimeValue.THIRTY_SECONDS).indices(newIndexName)) + .actionGet(); + Settings settings = response.getSettings().get(newIndexName); + assertThat(settings.get("index.number_of_shards"), equalTo(Integer.toString(numberOfShards))); + assertThat(settings.get("index.lifecycle.name"), equalTo(newLifecycleName)); + } + + public void testIndexBlock() throws Exception { + /* + * This tests that if there is a block on one index, the settings changes still go through on all the other + * indices. + */ + String dataStreamName = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + putComposableIndexTemplate("my-template", List.of(dataStreamName), indexSettings(1, 0).build()); + final var createDataStreamRequest = new CreateDataStreamAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStreamName); + assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet()); + Set indexNames = new HashSet<>(); + for (int i = 0; i < randomIntBetween(1, 10); i++) { + RolloverResponse rolloverResponse = client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)) + .actionGet(); + assertThat(rolloverResponse.isRolledOver(), equalTo(true)); + indexNames.add(rolloverResponse.getOldIndex()); + } + String indexToBlock = randomFrom(indexNames); + PlainActionFuture plainActionFuture = new PlainActionFuture<>(); + indicesAdmin().addBlock(new AddIndexBlockRequest(IndexMetadata.APIBlock.METADATA, indexToBlock), plainActionFuture); + assertThat(plainActionFuture.get().isShardsAcknowledged(), equalTo(true)); + final int numberOfShards = randomIntBetween(2, 7); + final String newLifecycleName = randomAlphanumericOfLength(20).toLowerCase(Locale.ROOT); + { + UpdateDataStreamSettingsAction.Response putSettingsResponse = client().execute( + UpdateDataStreamSettingsAction.INSTANCE, + new UpdateDataStreamSettingsAction.Request( + Settings.builder().put("index.number_of_shards", numberOfShards).put("index.lifecycle.name", newLifecycleName).build(), + TimeValue.THIRTY_SECONDS, + TimeValue.THIRTY_SECONDS + ).indices(dataStreamName) + ).actionGet(); + List dataStreamSettingsResponses = putSettingsResponse + .getDataStreamSettingsResponses(); + UpdateDataStreamSettingsAction.DataStreamSettingsResponse dataStreamSettingsResponse = dataStreamSettingsResponses.get(0); + assertThat(dataStreamSettingsResponse.dataStreamSucceeded(), equalTo(true)); + indicesAdmin().prepareUpdateSettings(indexToBlock) + .setSettings(Settings.builder().put("index.blocks.metadata", false).build()) + .get(); + GetIndexResponse response = admin().indices() + .getIndex(new GetIndexRequest(TimeValue.THIRTY_SECONDS).indices(dataStreamSettingsResponse.dataStreamName())) + .actionGet(); + for (Map.Entry indexAndsettings : response.getSettings().entrySet()) { + if (indexAndsettings.getKey().equals(indexToBlock)) { + assertThat(indexAndsettings.getValue().get("index.lifecycle.name"), equalTo(null)); + } else { + assertThat(indexAndsettings.getValue().get("index.lifecycle.name"), equalTo(newLifecycleName)); + } + } + } + } + + static void putComposableIndexTemplate(String id, List patterns, @Nullable Settings settings) throws IOException { + TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id); + request.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(patterns) + .template(Template.builder().settings(settings)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build() + ); + client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet(); + } + + public static class TestPlugin extends Plugin { + /* + * index.lifecycle.name is one of the settings allowed by TransportUpdateDataStreamSettingsAction, but it is in the ilm plugin. We + * add it here so that it is available for testing. + */ + public static final Setting LIFECYCLE_SETTING = Setting.simpleString( + "index.lifecycle.name", + "", + Setting.Property.IndexScope + ); + + @Override + public List> getSettings() { + return List.of(LIFECYCLE_SETTING); + } + } + +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 7a9b23b5095b3..d876dc016d3cf 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -14,9 +14,11 @@ import org.elasticsearch.action.datastreams.DataStreamsStatsAction; import org.elasticsearch.action.datastreams.DeleteDataStreamAction; import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.datastreams.GetDataStreamSettingsAction; import org.elasticsearch.action.datastreams.MigrateToDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.datastreams.PromoteDataStreamAction; +import org.elasticsearch.action.datastreams.UpdateDataStreamSettingsAction; import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction; @@ -34,10 +36,12 @@ import org.elasticsearch.datastreams.action.TransportCreateDataStreamAction; import org.elasticsearch.datastreams.action.TransportDataStreamsStatsAction; import org.elasticsearch.datastreams.action.TransportDeleteDataStreamAction; +import org.elasticsearch.datastreams.action.TransportGetDataStreamSettingsAction; import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction; import org.elasticsearch.datastreams.action.TransportMigrateToDataStreamAction; import org.elasticsearch.datastreams.action.TransportModifyDataStreamsAction; import org.elasticsearch.datastreams.action.TransportPromoteDataStreamAction; +import org.elasticsearch.datastreams.action.TransportUpdateDataStreamSettingsAction; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction; @@ -239,6 +243,8 @@ public List getActions() { actions.add(new ActionHandler(GetDataStreamOptionsAction.INSTANCE, TransportGetDataStreamOptionsAction.class)); actions.add(new ActionHandler(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class)); actions.add(new ActionHandler(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class)); + actions.add(new ActionHandler(GetDataStreamSettingsAction.INSTANCE, TransportGetDataStreamSettingsAction.class)); + actions.add(new ActionHandler(UpdateDataStreamSettingsAction.INSTANCE, TransportUpdateDataStreamSettingsAction.class)); return actions; } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamSettingsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamSettingsAction.java new file mode 100644 index 0000000000000..9dc272d137fa8 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamSettingsAction.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; +import org.elasticsearch.action.datastreams.GetDataStreamSettingsAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction; +import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TransportGetDataStreamSettingsAction extends TransportLocalProjectMetadataAction< + GetDataStreamSettingsAction.Request, + GetDataStreamSettingsAction.Response> { + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final SettingsFilter settingsFilter; + + @Inject + public TransportGetDataStreamSettingsAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + SettingsFilter settingsFilter, + ActionFilters actionFilters, + ProjectResolver projectResolver, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + GetSettingsAction.NAME, + actionFilters, + transportService.getTaskManager(), + clusterService, + threadPool.executor(ThreadPool.Names.MANAGEMENT), + projectResolver + ); + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.settingsFilter = settingsFilter; + } + + @Override + protected ClusterBlockException checkBlock(GetDataStreamSettingsAction.Request request, ProjectState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected void localClusterStateOperation( + Task task, + GetDataStreamSettingsAction.Request request, + ProjectState project, + ActionListener listener + ) throws Exception { + List dataStreamNames = indexNameExpressionResolver.dataStreamNames( + clusterService.state(), + IndicesOptions.DEFAULT, + request.indices() + ); + Map dataStreamMap = project.metadata().dataStreams(); + List responseList = new ArrayList<>(dataStreamNames.size()); + for (String dataStreamName : dataStreamNames) { + DataStream dataStream = dataStreamMap.get(dataStreamName); + Settings settings = settingsFilter.filter(dataStream.getSettings()); + Settings effectiveSettings = settingsFilter.filter(dataStream.getEffectiveSettings(project.metadata())); + responseList.add(new GetDataStreamSettingsAction.DataStreamSettingsResponse(dataStreamName, settings, effectiveSettings)); + } + listener.onResponse(new GetDataStreamSettingsAction.Response(responseList)); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamSettingsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamSettingsAction.java new file mode 100644 index 0000000000000..c008a10f93c06 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamSettingsAction.java @@ -0,0 +1,370 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.datastreams.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest; +import org.elasticsearch.action.datastreams.UpdateDataStreamSettingsAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.CountDownActionListener; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; +import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.common.settings.Settings.EMPTY; +import static org.elasticsearch.common.settings.Settings.builder; + +public class TransportUpdateDataStreamSettingsAction extends TransportMasterNodeAction< + UpdateDataStreamSettingsAction.Request, + UpdateDataStreamSettingsAction.Response> { + private static final Logger logger = LogManager.getLogger(TransportUpdateDataStreamSettingsAction.class); + private static final Set APPLY_TO_BACKING_INDICES = Set.of("index.lifecycle.name"); + private static final Set APPLY_TO_DATA_STREAM_ONLY = Set.of("index.number_of_shards"); + private final MetadataDataStreamsService metadataDataStreamsService; + private final MetadataUpdateSettingsService updateSettingsService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final SystemIndices systemIndices; + private final ProjectResolver projectResolver; + private final SettingsFilter settingsFilter; + + @Inject + public TransportUpdateDataStreamSettingsAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver, + MetadataDataStreamsService metadataDataStreamsService, + MetadataUpdateSettingsService updateSettingsService, + IndexNameExpressionResolver indexNameExpressionResolver, + SystemIndices systemIndices, + SettingsFilter settingsFilter + ) { + super( + UpdateDataStreamSettingsAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + UpdateDataStreamSettingsAction.Request::new, + UpdateDataStreamSettingsAction.Response::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.projectResolver = projectResolver; + this.metadataDataStreamsService = metadataDataStreamsService; + this.updateSettingsService = updateSettingsService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.systemIndices = systemIndices; + this.settingsFilter = settingsFilter; + } + + @Override + protected void masterOperation( + Task task, + UpdateDataStreamSettingsAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + List dataStreamNames = indexNameExpressionResolver.dataStreamNames( + clusterService.state(), + IndicesOptions.DEFAULT, + request.indices() + ); + List dataStreamSettingsResponse = new ArrayList<>(); + CountDownActionListener countDownListener = new CountDownActionListener(dataStreamNames.size() + 1, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + listener.onResponse(new UpdateDataStreamSettingsAction.Response(dataStreamSettingsResponse)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + countDownListener.onResponse(null); + for (String dataStreamName : dataStreamNames) { + updateSingleDataStream( + dataStreamName, + request.getSettings(), + request.masterNodeTimeout(), + request.ackTimeout(), + new ActionListener<>() { + @Override + public void onResponse(UpdateDataStreamSettingsAction.DataStreamSettingsResponse dataStreamResponse) { + dataStreamSettingsResponse.add(dataStreamResponse); + countDownListener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + dataStreamSettingsResponse.add( + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse( + dataStreamName, + false, + e.getMessage(), + EMPTY, + EMPTY, + UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY + ) + ); + countDownListener.onResponse(null); + } + } + ); + } + } + + private void updateSingleDataStream( + String dataStreamName, + Settings settingsOverrides, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + ActionListener listener + ) { + logger.debug("updating settings for {}", dataStreamName); + Set settingsToReject = new HashSet<>(); + for (String settingName : settingsOverrides.keySet()) { + if (APPLY_TO_BACKING_INDICES.contains(settingName) == false && APPLY_TO_DATA_STREAM_ONLY.contains(settingName) == false) { + settingsToReject.add(settingName); + } + } + if (settingsToReject.isEmpty() == false) { + listener.onResponse( + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse( + dataStreamName, + false, + Strings.format("Cannot set the following settings on a data stream: [%s]", String.join(",", settingsToReject)), + EMPTY, + EMPTY, + UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY + ) + ); + return; + } + + if (systemIndices.isSystemDataStream(dataStreamName)) { + listener.onResponse( + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse( + dataStreamName, + false, + "Cannot update a system data stream", + EMPTY, + EMPTY, + UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY + ) + ); + return; + } + metadataDataStreamsService.updateSettings( + projectResolver.getProjectId(), + masterNodeTimeout, + ackTimeout, + dataStreamName, + settingsOverrides, + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + if (acknowledgedResponse.isAcknowledged()) { + updateSettingsOnIndices(dataStreamName, settingsOverrides, masterNodeTimeout, ackTimeout, listener); + } else { + listener.onResponse( + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse( + dataStreamName, + false, + "Updating settings not accepted for unknown reasons", + EMPTY, + EMPTY, + UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY + ) + ); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + ); + } + + private void updateSettingsOnIndices( + String dataStreamName, + Settings requestSettings, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + ActionListener listener + ) { + Map settingsToApply = new HashMap<>(); + List appliedToDataStreamOnly = new ArrayList<>(); + List appliedToDataStreamAndBackingIndices = new ArrayList<>(); + for (String settingName : requestSettings.keySet()) { + if (APPLY_TO_BACKING_INDICES.contains(settingName)) { + settingsToApply.put(settingName, requestSettings.get(settingName)); + appliedToDataStreamAndBackingIndices.add(settingName); + } else if (APPLY_TO_DATA_STREAM_ONLY.contains(settingName)) { + appliedToDataStreamOnly.add(settingName); + } + } + final List concreteIndices = clusterService.state() + .projectState(projectResolver.getProjectId()) + .metadata() + .dataStreams() + .get(dataStreamName) + .getIndices(); + final List indexSettingErrors = new ArrayList<>(); + + CountDownActionListener indexCountDownListener = new CountDownActionListener(concreteIndices.size() + 1, new ActionListener<>() { + // Called when all indices for all settings are complete + @Override + public void onResponse(Void unused) { + DataStream dataStream = clusterService.state() + .projectState(projectResolver.getProjectId()) + .metadata() + .dataStreams() + .get(dataStreamName); + listener.onResponse( + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse( + dataStreamName, + true, + null, + settingsFilter.filter(dataStream.getSettings()), + settingsFilter.filter( + dataStream.getEffectiveSettings(clusterService.state().projectState(projectResolver.getProjectId()).metadata()) + ), + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult( + appliedToDataStreamOnly, + appliedToDataStreamAndBackingIndices, + indexSettingErrors + ) + ) + ); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + indexCountDownListener.onResponse(null); // handles the case where there were zero indices + Settings applyToIndexSettings = builder().loadFromMap(settingsToApply).build(); + for (Index index : concreteIndices) { + updateSettingsOnSingleIndex(index, applyToIndexSettings, masterNodeTimeout, ackTimeout, new ActionListener<>() { + @Override + public void onResponse(UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError indexSettingError) { + if (indexSettingError != null) { + indexSettingErrors.add(indexSettingError); + } + indexCountDownListener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + indexCountDownListener.onFailure(e); + } + }); + } + } + + private void updateSettingsOnSingleIndex( + Index index, + Settings requestSettings, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + ActionListener listener + ) { + if (requestSettings.isEmpty()) { + listener.onResponse(null); + } else { + final ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); + ClusterBlockException blockException = clusterService.state() + .blocks() + .indicesBlockedException(projectMetadata.id(), ClusterBlockLevel.METADATA_WRITE, new String[] { index.getName() }); + if (blockException != null) { + listener.onResponse( + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError( + index.getName(), + blockException.getMessage() + ) + ); + return; + } + updateSettingsService.updateSettings( + new UpdateSettingsClusterStateUpdateRequest( + projectResolver.getProjectId(), + masterNodeTimeout, + ackTimeout, + requestSettings, + UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE, + UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES, + index + ), + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError error; + if (response.isAcknowledged() == false) { + error = new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError( + index.getName(), + "Updating settings not acknowledged for unknown reason" + ); + } else { + error = null; + } + listener.onResponse(error); + } + + @Override + public void onFailure(Exception e) { + listener.onResponse( + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(index.getName(), e.getMessage()) + ); + } + } + ); + } + + } + + @Override + protected ClusterBlockException checkBlock(UpdateDataStreamSettingsAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamSettingsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamSettingsAction.java new file mode 100644 index 0000000000000..a2cee4a6e435c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamSettingsAction.java @@ -0,0 +1,132 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ChunkedToXContentObject; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class GetDataStreamSettingsAction extends ActionType { + public static final String NAME = "indices:monitor/data_stream/settings/get"; + public static final GetDataStreamSettingsAction INSTANCE = new GetDataStreamSettingsAction(); + + public GetDataStreamSettingsAction() { + super(NAME); + } + + public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable { + private String[] dataStreamNames; + + public Request(TimeValue masterNodeTimeout) { + super(masterNodeTimeout); + } + + @Override + public GetDataStreamSettingsAction.Request indices(String... dataStreamNames) { + this.dataStreamNames = dataStreamNames; + return this; + } + + @Override + public boolean includeDataStreams() { + return true; + } + + @Override + public String[] indices() { + return dataStreamNames; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetDataStreamSettingsAction.Request request = (GetDataStreamSettingsAction.Request) o; + return Arrays.equals(dataStreamNames, request.dataStreamNames); + } + + @Override + public int hashCode() { + return Arrays.hashCode(dataStreamNames); + } + } + + public static class Response extends ActionResponse implements ChunkedToXContentObject { + private final List dataStreamSettingsResponses; + + public Response(List dataStreamSettingsResponses) { + this.dataStreamSettingsResponses = dataStreamSettingsResponses; + } + + public List getDataStreamSettingsResponses() { + return dataStreamSettingsResponses; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + assert false : "This ought to never be called because this action only runs locally"; + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat( + Iterators.single((builder, params1) -> builder.startObject().startArray("data_streams")), + dataStreamSettingsResponses.stream().map(dataStreamSettingsResponse -> (ToXContent) dataStreamSettingsResponse).iterator(), + Iterators.single((builder, params1) -> builder.endArray().endObject()) + ); + } + } + + public record DataStreamSettingsResponse(String dataStreamName, Settings settings, Settings effectiveSettings) implements ToXContent { + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", dataStreamName); + builder.startObject("settings"); + settings.toXContent(builder, params); + builder.endObject(); + builder.startObject("effective_settings"); + effectiveSettings.toXContent(builder, params); + builder.endObject(); + builder.endObject(); + return builder; + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamSettingsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamSettingsAction.java new file mode 100644 index 0000000000000..42ab1dd15ff38 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamSettingsAction.java @@ -0,0 +1,262 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ChunkedToXContentObject; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class UpdateDataStreamSettingsAction extends ActionType { + + public static final String NAME = "indices:admin/data_stream/settings/update"; + public static final UpdateDataStreamSettingsAction INSTANCE = new UpdateDataStreamSettingsAction(); + + public UpdateDataStreamSettingsAction() { + super(NAME); + } + + public static class Request extends AcknowledgedRequest implements IndicesRequest.Replaceable { + private final Settings settings; + private String[] dataStreamNames = Strings.EMPTY_ARRAY; + + public Request(Settings settings, TimeValue masterNodeTimeout, TimeValue ackTimeout) { + super(masterNodeTimeout, ackTimeout); + this.settings = settings; + } + + @Override + public Request indices(String... dataStreamNames) { + this.dataStreamNames = dataStreamNames; + return this; + } + + public Settings getSettings() { + return settings; + } + + @Override + public boolean includeDataStreams() { + return true; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.dataStreamNames = in.readStringArray(); + this.settings = Settings.readSettingsFromStream(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(dataStreamNames); + settings.writeTo(out); + } + + @Override + public String[] indices() { + return dataStreamNames; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Arrays.equals(dataStreamNames, request.dataStreamNames) + && settings.equals(request.settings) + && Objects.equals(masterNodeTimeout(), request.masterNodeTimeout()) + && Objects.equals(ackTimeout(), request.ackTimeout()); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(dataStreamNames), settings, masterNodeTimeout(), ackTimeout()); + } + + } + + public static class Response extends ActionResponse implements ChunkedToXContentObject { + private final List dataStreamSettingsResponses; + + public Response(List dataStreamSettingsResponses) { + this.dataStreamSettingsResponses = dataStreamSettingsResponses; + } + + public Response(StreamInput in) throws IOException { + this(in.readCollectionAsList(DataStreamSettingsResponse::new)); + } + + public List getDataStreamSettingsResponses() { + return dataStreamSettingsResponses; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(dataStreamSettingsResponses, (out1, value) -> value.writeTo(out1)); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat( + Iterators.single((builder, params1) -> builder.startObject().startArray("data_streams")), + dataStreamSettingsResponses.stream().map(dataStreamSettingsResponse -> (ToXContent) dataStreamSettingsResponse).iterator(), + Iterators.single((builder, params1) -> builder.endArray().endObject()) + ); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(dataStreamSettingsResponses, response.dataStreamSettingsResponses); + } + + @Override + public int hashCode() { + return Objects.hash(dataStreamSettingsResponses); + } + } + + public record DataStreamSettingsResponse( + String dataStreamName, + boolean dataStreamSucceeded, + String dataStreamErrorMessage, + Settings settings, + Settings effectiveSettings, + IndicesSettingsResult indicesSettingsResult + ) implements ToXContent, Writeable { + + public DataStreamSettingsResponse(StreamInput in) throws IOException { + this( + in.readString(), + in.readBoolean(), + in.readOptionalString(), + Settings.readSettingsFromStream(in), + Settings.readSettingsFromStream(in), + new IndicesSettingsResult(in) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(dataStreamName); + out.writeBoolean(dataStreamSucceeded); + out.writeOptionalString(dataStreamErrorMessage); + settings.writeTo(out); + effectiveSettings.writeTo(out); + indicesSettingsResult.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", dataStreamName); + builder.field("applied_to_data_stream", dataStreamSucceeded); + if (dataStreamErrorMessage != null) { + builder.field("error", dataStreamErrorMessage); + } + builder.startObject("settings"); + settings.toXContent(builder, params); + builder.endObject(); + builder.startObject("effective_settings"); + effectiveSettings.toXContent(builder, params); + builder.endObject(); + builder.startObject("index_settings_results"); + indicesSettingsResult.toXContent(builder, params); + builder.endObject(); + builder.endObject(); + return builder; + } + + public record IndicesSettingsResult( + List appliedToDataStreamOnly, + List appliedToDataStreamAndBackingIndices, + List indexSettingErrors + ) implements ToXContent, Writeable { + + public static final IndicesSettingsResult EMPTY = new IndicesSettingsResult(List.of(), List.of(), List.of()); + + public IndicesSettingsResult(StreamInput in) throws IOException { + this(in.readStringCollectionAsList(), in.readStringCollectionAsList(), in.readCollectionAsList(IndexSettingError::new)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("applied_to_data_stream_only", appliedToDataStreamOnly); + builder.field("applied_to_data_stream_and_backing_indices", appliedToDataStreamAndBackingIndices); + if (indexSettingErrors.isEmpty() == false) { + builder.field("errors", indexSettingErrors); + } + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringCollection(appliedToDataStreamOnly); + out.writeStringCollection(appliedToDataStreamAndBackingIndices); + out.writeCollection(indexSettingErrors, (out1, value) -> value.writeTo(out1)); + } + } + + public record IndexSettingError(String indexName, String errorMessage) implements ToXContent, Writeable { + public IndexSettingError(StreamInput in) throws IOException { + this(in.readString(), in.readString()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(indexName); + out.writeString(errorMessage); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("index", indexName); + builder.field("error", errorMessage); + builder.endObject(); + return builder; + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 62417cbdd5863..437f8ee2dd427 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -40,10 +40,13 @@ import java.io.IOException; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream; + /** * Handles data stream modification requests. */ @@ -55,6 +58,7 @@ public class MetadataDataStreamsService { private final MasterServiceTaskQueue updateLifecycleTaskQueue; private final MasterServiceTaskQueue setRolloverOnWriteTaskQueue; private final MasterServiceTaskQueue updateOptionsTaskQueue; + private final MasterServiceTaskQueue updateSettingsTaskQueue; public MetadataDataStreamsService( ClusterService clusterService, @@ -133,6 +137,46 @@ public Tuple executeTask( } }; this.updateOptionsTaskQueue = clusterService.createTaskQueue("modify-data-stream-options", Priority.NORMAL, updateOptionsExecutor); + ClusterStateTaskExecutor updateSettingsExecutor = new SimpleBatchedAckListenerTaskExecutor<>() { + + @Override + public Tuple executeTask( + UpdateSettingsTask updateSettingsTask, + ClusterState clusterState + ) throws Exception { + + ProjectMetadata projectMetadata = clusterState.metadata().getProject(updateSettingsTask.projectId); + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata); + Map dataStreamMap = projectMetadata.dataStreams(); + DataStream dataStream = dataStreamMap.get(updateSettingsTask.dataStreamName); + Settings existingSettings = dataStream.getSettings(); + + Template.Builder templateBuilder = Template.builder(); + Settings.Builder mergedSettingsBuilder = Settings.builder().put(existingSettings).put(updateSettingsTask.settingsOverrides); + Settings mergedSettings = mergedSettingsBuilder.build(); + + final ComposableIndexTemplate template = lookupTemplateForDataStream(updateSettingsTask.dataStreamName, projectMetadata); + ComposableIndexTemplate mergedTemplate = template.mergeSettings(mergedSettings); + MetadataIndexTemplateService.validateTemplate( + mergedTemplate.template().settings(), + mergedTemplate.template().mappings(), + indicesService + ); + + templateBuilder.settings(mergedSettingsBuilder); + DataStream.Builder dataStreamBuilder = dataStream.copy().setSettings(mergedSettings); + projectMetadataBuilder.removeDataStream(updateSettingsTask.dataStreamName); + projectMetadataBuilder.put(dataStreamBuilder.build()); + ClusterState updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadataBuilder).build(); + + return new Tuple<>(updatedClusterState, updateSettingsTask); + } + }; + this.updateSettingsTaskQueue = clusterService.createTaskQueue( + "update-data-stream-settings", + Priority.NORMAL, + updateSettingsExecutor + ); } public void modifyDataStream( @@ -362,6 +406,21 @@ public static ClusterState setRolloverOnWrite( ); } + public void updateSettings( + ProjectId projectId, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + String dataStreamName, + Settings settingsOverrides, + ActionListener listener + ) { + updateSettingsTaskQueue.submitTask( + "updating settings on data stream", + new UpdateSettingsTask(projectId, dataStreamName, settingsOverrides, ackTimeout, listener), + masterNodeTimeout + ); + } + private static void addBackingIndex( ProjectMetadata project, ProjectMetadata.Builder builder, @@ -606,4 +665,23 @@ public boolean targetFailureStore() { return targetFailureStore; } } + + static class UpdateSettingsTask extends AckedBatchedClusterStateUpdateTask { + final ProjectId projectId; + private final String dataStreamName; + private final Settings settingsOverrides; + + UpdateSettingsTask( + ProjectId projectId, + String dataStreamName, + Settings settingsOverrides, + TimeValue ackTimeout, + ActionListener listener + ) { + super(ackTimeout, listener); + this.projectId = projectId; + this.dataStreamName = dataStreamName; + this.settingsOverrides = settingsOverrides; + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index 492d057c80af3..a82f9d222ba22 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -1914,8 +1914,7 @@ private static void validateCompositeTemplate( }); } - private static void validateTemplate(Settings validateSettings, CompressedXContent mappings, IndicesService indicesService) - throws Exception { + static void validateTemplate(Settings validateSettings, CompressedXContent mappings, IndicesService indicesService) throws Exception { // Hard to validate settings if they're non-existent, so used empty ones if none were provided Settings settings = validateSettings; if (settings == null) { diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamSettingsActionTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamSettingsActionTests.java new file mode 100644 index 0000000000000..c6b2b699be00b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamSettingsActionTests.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class GetDataStreamSettingsActionTests extends ESTestCase { + + public void testResponseToXContentEmpty() throws IOException { + List responseList = new ArrayList<>(); + GetDataStreamSettingsAction.Response response = new GetDataStreamSettingsAction.Response(responseList); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.humanReadable(true); + response.toXContentChunked(ToXContent.EMPTY_PARAMS).forEachRemaining(xcontent -> { + try { + xcontent.toXContent(builder, EMPTY_PARAMS); + } catch (IOException e) { + fail(e); + } + }); + Map xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + assertThat(xContentMap, equalTo(Map.of("data_streams", List.of()))); + } + } + + public void testResponseToXContent() throws IOException { + Map dataStream1Settings = Map.of("setting1", "value1", "setting2", "value2"); + Map dataStream1EffectiveSettings = Map.of("setting1", "value1", "setting2", "value2", "setting3", "value3"); + Map dataStream2Settings = Map.of("setting4", "value4", "setting5", "value5"); + Map dataStream2EffectiveSettings = Map.of("setting4", "value4", "setting5", "value5", "settings6", "value6"); + GetDataStreamSettingsAction.DataStreamSettingsResponse dataStreamSettingsResponse1 = + new GetDataStreamSettingsAction.DataStreamSettingsResponse( + "dataStream1", + Settings.builder().loadFromMap(dataStream1Settings).build(), + Settings.builder().loadFromMap(dataStream1EffectiveSettings).build() + ); + GetDataStreamSettingsAction.DataStreamSettingsResponse dataStreamSettingsResponse2 = + new GetDataStreamSettingsAction.DataStreamSettingsResponse( + "dataStream2", + Settings.builder().loadFromMap(dataStream2Settings).build(), + Settings.builder().loadFromMap(dataStream2EffectiveSettings).build() + ); + List responseList = List.of( + dataStreamSettingsResponse1, + dataStreamSettingsResponse2 + ); + GetDataStreamSettingsAction.Response response = new GetDataStreamSettingsAction.Response(responseList); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.humanReadable(true); + response.toXContentChunked(ToXContent.EMPTY_PARAMS).forEachRemaining(xcontent -> { + try { + xcontent.toXContent(builder, EMPTY_PARAMS); + } catch (IOException e) { + fail(e); + } + }); + Map xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + assertThat( + xContentMap, + equalTo( + Map.of( + "data_streams", + List.of( + Map.of( + "name", + "dataStream1", + "settings", + dataStream1Settings, + "effective_settings", + dataStream1EffectiveSettings + ), + Map.of( + "name", + "dataStream2", + "settings", + dataStream2Settings, + "effective_settings", + dataStream2EffectiveSettings + ) + ) + ) + ) + ); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamSettingsActionRequestTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamSettingsActionRequestTests.java new file mode 100644 index 0000000000000..8ab80aee3fd80 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamSettingsActionRequestTests.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.cluster.metadata.ComponentTemplateTests; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.function.Supplier; + +import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomSettings; + +public class UpdateDataStreamSettingsActionRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return UpdateDataStreamSettingsAction.Request::new; + } + + @Override + protected UpdateDataStreamSettingsAction.Request createTestInstance() { + UpdateDataStreamSettingsAction.Request request = new UpdateDataStreamSettingsAction.Request( + randomSettings(), + randomTimeValue(), + randomTimeValue() + ); + request.indices(randomIndices()); + return request; + } + + @Override + protected UpdateDataStreamSettingsAction.Request mutateInstance(UpdateDataStreamSettingsAction.Request instance) throws IOException { + String[] indices = instance.indices(); + Settings settings = instance.getSettings(); + TimeValue masterNodeTimeout = instance.masterNodeTimeout(); + TimeValue ackTimeout = instance.ackTimeout(); + switch (between(0, 3)) { + case 0 -> { + indices = randomArrayValueOtherThan(indices, this::randomIndices); + } + case 1 -> { + settings = randomValueOtherThan(settings, ComponentTemplateTests::randomSettings); + } + case 2 -> { + masterNodeTimeout = randomValueOtherThan(masterNodeTimeout, ESTestCase::randomTimeValue); + } + case 3 -> { + ackTimeout = randomValueOtherThan(ackTimeout, ESTestCase::randomTimeValue); + } + default -> throw new AssertionError("Should not be here"); + } + return new UpdateDataStreamSettingsAction.Request(settings, masterNodeTimeout, ackTimeout).indices(indices); + } + + private String[] randomIndices() { + return randomList(10, () -> randomAlphaOfLength(20)).toArray(new String[0]); + } + + public static T[] randomArrayValueOtherThan(T[] input, Supplier randomSupplier) { + return randomValueOtherThanMany(v -> Arrays.equals(input, v), randomSupplier); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamSettingsActionResponseTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamSettingsActionResponseTests.java new file mode 100644 index 0000000000000..a249703beb30b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamSettingsActionResponseTests.java @@ -0,0 +1,213 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.action.datastreams.UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError; +import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomSettings; +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class UpdateDataStreamSettingsActionResponseTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return UpdateDataStreamSettingsAction.Response::new; + } + + public void testToXContent() throws IOException { + Map dataStream1Settings = Map.of("setting1", "value1", "setting2", "value2"); + Map dataStream1EffectiveSettings = Map.of("setting1", "value1", "setting2", "value2", "setting3", "value3"); + List dataStream1AppliedToDataStreamOnly = randomList(10, () -> randomAlphanumericOfLength(10)); + List dataStream1AppliedToBackingIndices = randomList(10, () -> randomAlphanumericOfLength(10)); + List dataStream1IndexErrors = randomList( + 10, + () -> new IndexSettingError(randomAlphanumericOfLength(10), randomAlphaOfLength(10)) + ); + Map dataStream2Settings = Map.of("setting4", "value4", "setting5", "value5"); + Map dataStream2EffectiveSettings = Map.of("setting4", "value4", "setting5", "value5", "settings6", "value6"); + List dataStream2AppliedToDataStreamOnly = randomList(10, () -> randomAlphanumericOfLength(10)); + List dataStream2AppliedToBackingIndices = randomList(10, () -> randomAlphanumericOfLength(10)); + List dataStream2IndexErrors = randomList( + 10, + () -> new IndexSettingError(randomAlphanumericOfLength(10), randomAlphaOfLength(10)) + ); + boolean dataStream1Succeeded = randomBoolean(); + String dataStream1Error = randomBoolean() ? null : randomAlphaOfLength(20); + boolean dataStream2Succeeded = randomBoolean(); + String dataStream2Error = randomBoolean() ? null : randomAlphaOfLength(20); + UpdateDataStreamSettingsAction.DataStreamSettingsResponse dataStreamSettingsResponse1 = + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse( + "dataStream1", + dataStream1Succeeded, + dataStream1Error, + Settings.builder().loadFromMap(dataStream1Settings).build(), + Settings.builder().loadFromMap(dataStream1EffectiveSettings).build(), + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult( + dataStream1AppliedToDataStreamOnly, + dataStream1AppliedToBackingIndices, + dataStream1IndexErrors + ) + ); + UpdateDataStreamSettingsAction.DataStreamSettingsResponse dataStreamSettingsResponse2 = + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse( + "dataStream2", + dataStream2Succeeded, + dataStream2Error, + Settings.builder().loadFromMap(dataStream2Settings).build(), + Settings.builder().loadFromMap(dataStream2EffectiveSettings).build(), + new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult( + dataStream2AppliedToDataStreamOnly, + dataStream2AppliedToBackingIndices, + dataStream2IndexErrors + ) + ); + List responseList = List.of( + dataStreamSettingsResponse1, + dataStreamSettingsResponse2 + ); + UpdateDataStreamSettingsAction.Response response = new UpdateDataStreamSettingsAction.Response(responseList); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.humanReadable(true); + response.toXContentChunked(ToXContent.EMPTY_PARAMS).forEachRemaining(xcontent -> { + try { + xcontent.toXContent(builder, EMPTY_PARAMS); + } catch (IOException e) { + fail(e); + } + }); + Map xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + assertThat( + xContentMap, + equalTo( + Map.of( + "data_streams", + List.of( + buildExpectedMap( + "dataStream1", + dataStream1Succeeded, + dataStream1Error, + dataStream1Settings, + dataStream1EffectiveSettings, + dataStream1AppliedToDataStreamOnly, + dataStream1AppliedToBackingIndices, + dataStream1IndexErrors + ), + buildExpectedMap( + "dataStream2", + dataStream2Succeeded, + dataStream2Error, + dataStream2Settings, + dataStream2EffectiveSettings, + dataStream2AppliedToDataStreamOnly, + dataStream2AppliedToBackingIndices, + dataStream2IndexErrors + ) + ) + ) + ) + ); + } + } + + private Map buildExpectedMap( + String name, + boolean succeeded, + String error, + Map settings, + Map effectiveSettings, + List appliedToDataStreamOnly, + List appliedToIndices, + List indexErrors + ) { + Map result = new HashMap<>(); + result.put("name", name); + result.put("applied_to_data_stream", succeeded); + if (error != null) { + result.put("error", error); + } + result.put("settings", settings); + result.put("effective_settings", effectiveSettings); + Map indexSettingsResults = new HashMap<>(); + indexSettingsResults.put("applied_to_data_stream_only", appliedToDataStreamOnly); + indexSettingsResults.put("applied_to_data_stream_and_backing_indices", appliedToIndices); + if (indexErrors.isEmpty() == false) { + indexSettingsResults.put( + "errors", + indexErrors.stream() + .map(indexSettingError -> Map.of("index", indexSettingError.indexName(), "error", indexSettingError.errorMessage())) + .toList() + ); + } + result.put("index_settings_results", indexSettingsResults); + return result; + } + + @Override + protected UpdateDataStreamSettingsAction.Response createTestInstance() { + return new UpdateDataStreamSettingsAction.Response(randomList(10, this::randomDataStreamSettingsResponse)); + } + + private UpdateDataStreamSettingsAction.DataStreamSettingsResponse randomDataStreamSettingsResponse() { + return new UpdateDataStreamSettingsAction.DataStreamSettingsResponse( + "dataStream1", + randomBoolean(), + randomBoolean() ? null : randomAlphaOfLength(20), + randomSettings(), + randomSettings(), + randomIndicesSettingsResult() + ); + } + + private UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult randomIndicesSettingsResult() { + return new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult( + randomList(10, () -> randomAlphanumericOfLength(20)), + randomList(10, () -> randomAlphanumericOfLength(20)), + randomList(10, this::randomIndexSettingError) + ); + } + + private IndexSettingError randomIndexSettingError() { + return new IndexSettingError(randomAlphanumericOfLength(20), randomAlphanumericOfLength(20)); + } + + @Override + protected UpdateDataStreamSettingsAction.Response mutateInstance(UpdateDataStreamSettingsAction.Response instance) throws IOException { + List responseList = instance.getDataStreamSettingsResponses(); + List mutatedResponseList = new ArrayList<>(responseList); + switch (between(0, 1)) { + case 0 -> { + if (responseList.isEmpty()) { + mutatedResponseList.add(randomDataStreamSettingsResponse()); + } else { + mutatedResponseList.remove(randomInt(responseList.size() - 1)); + } + } + case 1 -> { + mutatedResponseList.add(randomDataStreamSettingsResponse()); + } + default -> throw new AssertionError("Should not be here"); + } + return new UpdateDataStreamSettingsAction.Response(mutatedResponseList); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 7b7193695d21a..891a0badfd741 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -609,6 +609,7 @@ public class Constants { "indices:data/write/reindex", "indices:data/write/update", "indices:data/write/update/byquery", + "indices:monitor/data_stream/settings/get", "indices:monitor/data_stream/stats", "indices:monitor/field_usage_stats", "indices:monitor/fleet/global_checkpoints[s]", @@ -640,6 +641,7 @@ public class Constants { "indices:admin/data_stream/index/reindex", "indices:admin/data_stream/reindex", "indices:admin/data_stream/reindex_cancel", + "indices:admin/data_stream/settings/update", "indices:admin/index/create_from_source", "indices:admin/index/copy_lifecycle_index_metadata", "internal:admin/repository/verify",