diff --git a/docs/changelog/132064.yaml b/docs/changelog/132064.yaml new file mode 100644 index 0000000000000..06c5955f35b1b --- /dev/null +++ b/docs/changelog/132064.yaml @@ -0,0 +1,5 @@ +pr: 132064 +summary: Only Allow Enabling Streams If No Conflicting Indices Exist +area: Data streams +type: enhancement +issues: [] diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle index 24b651da291af..4255221db4747 100644 --- a/modules/streams/build.gradle +++ b/modules/streams/build.gradle @@ -20,7 +20,7 @@ esplugin { restResources { restApi { - include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search" + include '_common', 'streams', 'bulk', 'index', 'ingest', 'indices', 'delete_by_query', 'search' } } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java index dc7330362e80b..73562c49ae280 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedRequest; @@ -22,12 +23,15 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.StreamsMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -75,19 +79,45 @@ protected void masterOperation( LogsStreamsActivationToggleAction.Request request, ClusterState state, ActionListener listener - ) throws Exception { + ) { ProjectId projectId = projectResolver.getProjectId(); - StreamsMetadata streamsState = state.metadata().getProject(projectId).custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); + ProjectMetadata projectMetadata = state.metadata().getProject(projectId); + StreamsMetadata streamsState = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); boolean currentlyEnabled = streamsState.isLogsEnabled(); boolean shouldEnable = request.shouldEnable(); - if (shouldEnable != currentlyEnabled) { - StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable); - String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable"); - taskQueue.submitTask(taskName, updateTask, updateTask.timeout()); - } else { + + if (shouldEnable == currentlyEnabled) { logger.debug("Logs streams are already in the requested state: {}", shouldEnable); listener.onResponse(AcknowledgedResponse.TRUE); + return; + } + + if (shouldEnable && logsIndexExists(projectMetadata)) { + listener.onFailure( + new ElasticsearchStatusException( + "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist.", + RestStatus.CONFLICT + ) + ); + return; } + + StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable); + String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable"); + taskQueue.submitTask(taskName, updateTask, updateTask.timeout()); + } + + private boolean logsIndexExists(ProjectMetadata projectMetadata) { + String logsStreamName = StreamType.LOGS.getStreamName(); + String logsStreamPrefix = logsStreamName + "."; + + for (String name : projectMetadata.getConcreteAllIndices()) { + if (name.equals(logsStreamName) || name.startsWith(logsStreamPrefix)) { + return true; + } + } + + return false; } @Override @@ -111,7 +141,7 @@ static class StreamsMetadataUpdateTask extends AckedClusterStateUpdateTask { } @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { return currentState.copyAndUpdateProject( projectId, builder -> builder.putCustom(StreamsMetadata.TYPE, new StreamsMetadata(enabled)) diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml index 3c1bc7d6c53c1..a1b8d9113352e 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml @@ -1,3 +1,8 @@ +--- +teardown: + - do: + streams.logs_disable: { } + --- "Basic toggle of logs state enable to disable and back": - do: @@ -24,6 +29,10 @@ streams.status: { } - is_true: logs.enabled + - do: + streams.logs_disable: { } + - is_true: acknowledged + --- "Check for repeated toggle to same state": - do: @@ -41,3 +50,35 @@ - do: streams.status: { } - is_true: logs.enabled + + - do: + streams.logs_disable: { } + - is_true: acknowledged + +--- +"Check streams can't be enabled with existing logs indices": + - do: + indices.create: + index: logs + + - do: + catch: conflict + streams.logs_enable: { } + - match: { error.reason: "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist." } + + - do: + indices.delete: + index: logs + + - do: + indices.create: + index: logs.test + + - do: + catch: conflict + streams.logs_enable: { } + - match: { error.reason: "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist." } + + - do: + indices.delete: + index: logs.test diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml index 621985985295a..d11245a7e7a5e 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml @@ -1,3 +1,8 @@ +--- +teardown: + - do: + streams.logs_disable: { } + --- "Check User Can't Write To Substream Directly": - do: @@ -71,6 +76,10 @@ - match: { items.0.index.error.type: "illegal_argument_exception" } - match: { items.0.index.error.reason: "Pipeline [reroute-to-logs-foo] can't change the target index (from [bad-index] to [logs] child stream [logs.foo]) History: [bad-index]" } + - do: + indices.delete: + index: bad-index + --- "Check Bulk Index With Script Processor To Substream Is Rejected": - do: @@ -104,6 +113,10 @@ - match: { items.0.index.error.type: "illegal_argument_exception" } - match: { items.0.index.error.reason: "Pipeline [script-to-logs-foo] can't change the target index (from [bad-index-script] to [logs] child stream [logs.foo]) History: [bad-index-script]" } + - do: + indices.delete: + index: bad-index-script + --- "Check Delete By Query Directly On Substream After Reroute Succeeds": - do: @@ -154,3 +167,7 @@ query: match_all: {} - match: { hits.total.value: 0 } + + - do: + indices.delete: + index: logs diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 90cd3c669a52c..c3d6d2f4d97df 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -213,6 +213,7 @@ exports org.elasticsearch.common.regex; exports org.elasticsearch.common.scheduler; exports org.elasticsearch.common.settings; + exports org.elasticsearch.common.streams; exports org.elasticsearch.common.text; exports org.elasticsearch.common.time; exports org.elasticsearch.common.transport;