From 0830cc783d3e24b244ed3e181405ee09d87efa00 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 28 Jul 2025 17:07:59 +0100 Subject: [PATCH 01/10] New logic to check for conflicting indices before enabling streams plus tests --- modules/streams/build.gradle | 2 +- .../TransportLogsStreamsToggleActivation.java | 33 +++++++++++++++---- .../test/streams/logs/10_basic.yml | 26 +++++++++++++++ 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle index fd56a627026b6..5d711abf51342 100644 --- a/modules/streams/build.gradle +++ b/modules/streams/build.gradle @@ -20,7 +20,7 @@ esplugin { restResources { restApi { - include '_common', 'streams' + include '_common', 'streams', 'indices' } } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java index dc7330362e80b..5185c2cac195f 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedRequest; @@ -22,16 +23,19 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.StreamsMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.Arrays; import java.util.Locale; /** @@ -77,17 +81,34 @@ protected void masterOperation( ActionListener listener ) throws Exception { ProjectId projectId = projectResolver.getProjectId(); - StreamsMetadata streamsState = state.metadata().getProject(projectId).custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); + ProjectMetadata projectMetadata = state.metadata().getProject(projectId); + StreamsMetadata streamsState = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); boolean currentlyEnabled = streamsState.isLogsEnabled(); boolean shouldEnable = request.shouldEnable(); - if (shouldEnable != currentlyEnabled) { - StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable); - String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable"); - taskQueue.submitTask(taskName, updateTask, updateTask.timeout()); - } else { + + if (shouldEnable == currentlyEnabled) { logger.debug("Logs streams are already in the requested state: {}", shouldEnable); listener.onResponse(AcknowledgedResponse.TRUE); + return; + } + + if (shouldEnable && logsIndexExists(projectMetadata)) { + listener.onFailure( + new ElasticsearchStatusException( + "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist.", + RestStatus.CONFLICT + ) + ); + return; } + + StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable); + String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable"); + taskQueue.submitTask(taskName, updateTask, updateTask.timeout()); + } + + private boolean logsIndexExists(ProjectMetadata projectMetadata) { + return Arrays.stream(projectMetadata.getConcreteAllIndices()).anyMatch(name -> name.equals("logs") || name.startsWith("logs.")); } @Override diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml index 3c1bc7d6c53c1..d24028ec1a0c2 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml @@ -41,3 +41,29 @@ - do: streams.status: { } - is_true: logs.enabled + +--- +"Check streams can't be enabled with existing logs indices": + - do: + indices.create: + index: logs + - do: + catch: conflict + streams.logs_enable: { } + - match: { error.reason: "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist." } + + - do: + indices.delete: + index: logs + + - do: + indices.create: + index: logs.test + - do: + catch: conflict + streams.logs_enable: { } + - match: { error.reason: "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist." } + + - do: + indices.delete: + index: logs.test From 7497dffc8eabd476a985cd639d99b208ed10fee2 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 29 Jul 2025 11:20:33 +0100 Subject: [PATCH 02/10] Update docs/changelog/132064.yaml --- docs/changelog/132064.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/132064.yaml diff --git a/docs/changelog/132064.yaml b/docs/changelog/132064.yaml new file mode 100644 index 0000000000000..83bcaca6e6498 --- /dev/null +++ b/docs/changelog/132064.yaml @@ -0,0 +1,5 @@ +pr: 132064 +summary: New logic to check for conflicting indices before enabling streams pl… +area: Data streams +type: enhancement +issues: [] From 9f8052a55bcc65612f6682cea39ed9e3f46d989d Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 30 Jul 2025 09:21:16 +0100 Subject: [PATCH 03/10] Fix tests --- modules/streams/build.gradle | 2 +- .../rest-api-spec/test/streams/logs/10_basic.yml | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle index 5d711abf51342..c42528b367d4f 100644 --- a/modules/streams/build.gradle +++ b/modules/streams/build.gradle @@ -20,7 +20,7 @@ esplugin { restResources { restApi { - include '_common', 'streams', 'indices' + include '_common', 'streams', 'indices', 'cluster.health' } } diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml index d24028ec1a0c2..1209c1217aad6 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml @@ -47,6 +47,11 @@ - do: indices.create: index: logs + + - do: + cluster.health: + wait_for_status: green + - do: catch: conflict streams.logs_enable: { } @@ -59,6 +64,11 @@ - do: indices.create: index: logs.test + + - do: + cluster.health: + wait_for_status: green + - do: catch: conflict streams.logs_enable: { } From 4500227263917cd05d1d1450dca0c4380dd62608 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 30 Jul 2025 10:10:57 +0100 Subject: [PATCH 04/10] More YAML test fixes --- modules/streams/build.gradle | 2 +- .../rest-api-spec/test/streams/logs/10_basic.yml | 13 +++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle index c42528b367d4f..5d711abf51342 100644 --- a/modules/streams/build.gradle +++ b/modules/streams/build.gradle @@ -20,7 +20,7 @@ esplugin { restResources { restApi { - include '_common', 'streams', 'indices', 'cluster.health' + include '_common', 'streams', 'indices' } } diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml index 1209c1217aad6..52bced35c32c2 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml @@ -1,3 +1,8 @@ +--- +teardown: + - do: + streams.logs_disable: { } + --- "Basic toggle of logs state enable to disable and back": - do: @@ -48,10 +53,6 @@ indices.create: index: logs - - do: - cluster.health: - wait_for_status: green - - do: catch: conflict streams.logs_enable: { } @@ -65,10 +66,6 @@ indices.create: index: logs.test - - do: - cluster.health: - wait_for_status: green - - do: catch: conflict streams.logs_enable: { } From bfc4c5c8ccab8a9bd461b98a52b9fcc9dcd1d33d Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 30 Jul 2025 14:14:21 +0100 Subject: [PATCH 05/10] Update changelog entry --- docs/changelog/132064.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/132064.yaml b/docs/changelog/132064.yaml index 83bcaca6e6498..06c5955f35b1b 100644 --- a/docs/changelog/132064.yaml +++ b/docs/changelog/132064.yaml @@ -1,5 +1,5 @@ pr: 132064 -summary: New logic to check for conflicting indices before enabling streams pl… +summary: Only Allow Enabling Streams If No Conflicting Indices Exist area: Data streams type: enhancement issues: [] From dc3db564c02e78752a87ffc10dee7c6844a4444e Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 13 Aug 2025 09:50:11 +0100 Subject: [PATCH 06/10] Normalize to single quotes in gradle file and switch to enum for stream index prefix --- modules/streams/build.gradle | 2 +- .../logs/TransportLogsStreamsToggleActivation.java | 8 +++++--- server/src/main/java/module-info.java | 1 + 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle index 24b651da291af..4255221db4747 100644 --- a/modules/streams/build.gradle +++ b/modules/streams/build.gradle @@ -20,7 +20,7 @@ esplugin { restResources { restApi { - include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search" + include '_common', 'streams', 'bulk', 'index', 'ingest', 'indices', 'delete_by_query', 'search' } } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java index 5185c2cac195f..c68eca7b8e361 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; @@ -79,7 +80,7 @@ protected void masterOperation( LogsStreamsActivationToggleAction.Request request, ClusterState state, ActionListener listener - ) throws Exception { + ) { ProjectId projectId = projectResolver.getProjectId(); ProjectMetadata projectMetadata = state.metadata().getProject(projectId); StreamsMetadata streamsState = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); @@ -108,7 +109,8 @@ protected void masterOperation( } private boolean logsIndexExists(ProjectMetadata projectMetadata) { - return Arrays.stream(projectMetadata.getConcreteAllIndices()).anyMatch(name -> name.equals("logs") || name.startsWith("logs.")); + return Arrays.stream(projectMetadata.getConcreteAllIndices()) + .anyMatch(name -> name.equals(StreamType.LOGS.getStreamName()) || name.startsWith(StreamType.LOGS.getStreamName() + ".")); } @Override @@ -132,7 +134,7 @@ static class StreamsMetadataUpdateTask extends AckedClusterStateUpdateTask { } @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { return currentState.copyAndUpdateProject( projectId, builder -> builder.putCustom(StreamsMetadata.TYPE, new StreamsMetadata(enabled)) diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 90cd3c669a52c..c3d6d2f4d97df 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -213,6 +213,7 @@ exports org.elasticsearch.common.regex; exports org.elasticsearch.common.scheduler; exports org.elasticsearch.common.settings; + exports org.elasticsearch.common.streams; exports org.elasticsearch.common.text; exports org.elasticsearch.common.time; exports org.elasticsearch.common.transport; From 4b84c85066f2a629bd6ea8e6c5e9985829dd6e1a Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 13 Aug 2025 09:53:09 +0100 Subject: [PATCH 07/10] Speed up index checking by pre-computing stream names and prefix and not using a stream --- .../logs/TransportLogsStreamsToggleActivation.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java index c68eca7b8e361..73562c49ae280 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java @@ -36,7 +36,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.Arrays; import java.util.Locale; /** @@ -109,8 +108,16 @@ protected void masterOperation( } private boolean logsIndexExists(ProjectMetadata projectMetadata) { - return Arrays.stream(projectMetadata.getConcreteAllIndices()) - .anyMatch(name -> name.equals(StreamType.LOGS.getStreamName()) || name.startsWith(StreamType.LOGS.getStreamName() + ".")); + String logsStreamName = StreamType.LOGS.getStreamName(); + String logsStreamPrefix = logsStreamName + "."; + + for (String name : projectMetadata.getConcreteAllIndices()) { + if (name.equals(logsStreamName) || name.startsWith(logsStreamPrefix)) { + return true; + } + } + + return false; } @Override From 3bb57c7d1f106af0ebed8960bfe78dd2927c2be1 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 13 Aug 2025 11:19:35 +0100 Subject: [PATCH 08/10] Fix race condition in YAML tests --- .../rest-api-spec/test/streams/logs/10_basic.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml index 52bced35c32c2..a1b8d9113352e 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml @@ -29,6 +29,10 @@ teardown: streams.status: { } - is_true: logs.enabled + - do: + streams.logs_disable: { } + - is_true: acknowledged + --- "Check for repeated toggle to same state": - do: @@ -47,6 +51,10 @@ teardown: streams.status: { } - is_true: logs.enabled + - do: + streams.logs_disable: { } + - is_true: acknowledged + --- "Check streams can't be enabled with existing logs indices": - do: From a392a28e2275d927869b5ea85503f90056367bec Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 13 Aug 2025 13:08:00 +0100 Subject: [PATCH 09/10] Fix race condition in YAML tests - Take 2 --- .../test/streams/logs/20_substream_restrictions.yml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml index 621985985295a..311bbe794ca3c 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml @@ -71,6 +71,10 @@ - match: { items.0.index.error.type: "illegal_argument_exception" } - match: { items.0.index.error.reason: "Pipeline [reroute-to-logs-foo] can't change the target index (from [bad-index] to [logs] child stream [logs.foo]) History: [bad-index]" } + - do: + indices.delete: + index: bad-index + --- "Check Bulk Index With Script Processor To Substream Is Rejected": - do: @@ -104,6 +108,10 @@ - match: { items.0.index.error.type: "illegal_argument_exception" } - match: { items.0.index.error.reason: "Pipeline [script-to-logs-foo] can't change the target index (from [bad-index-script] to [logs] child stream [logs.foo]) History: [bad-index-script]" } + - do: + indices.delete: + index: bad-index-script + --- "Check Delete By Query Directly On Substream After Reroute Succeeds": - do: @@ -154,3 +162,7 @@ query: match_all: {} - match: { hits.total.value: 0 } + + - do: + indices.delete: + index: logs From db5100c108cbe1dee84bffa13ddccb29eb11ef28 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 13 Aug 2025 14:46:37 +0100 Subject: [PATCH 10/10] Fix race condition in YAML tests - Take 3 --- .../test/streams/logs/20_substream_restrictions.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml index 311bbe794ca3c..d11245a7e7a5e 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml @@ -1,3 +1,8 @@ +--- +teardown: + - do: + streams.logs_disable: { } + --- "Check User Can't Write To Substream Directly": - do: