diff --git a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java index 1404d7c15e55a..bd88603407ea5 100644 --- a/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java +++ b/modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java @@ -9,7 +9,6 @@ package org.elasticsearch.ingest.otel; -import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; @@ -20,10 +19,6 @@ public class NormalizeForStreamPlugin extends Plugin implements IngestPlugin { @Override public Map getProcessors(Processor.Parameters parameters) { - if (DataStream.LOGS_STREAM_FEATURE_FLAG) { - return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory()); - } else { - return Map.of(); - } + return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory()); } } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java index 4e3a531e20d56..3ba1bc1d751b1 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java @@ -9,7 +9,6 @@ package org.elasticsearch.rest.streams; -import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -29,7 +28,6 @@ import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation; import org.elasticsearch.rest.streams.logs.TransportStreamsStatusAction; -import java.util.Collections; import java.util.List; import java.util.function.Predicate; import java.util.function.Supplier; @@ -53,10 +51,7 @@ public List getRestHandlers( Supplier nodesInCluster, Predicate clusterSupportsFeature ) { - if (DataStream.LOGS_STREAM_FEATURE_FLAG) { - return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction()); - } - return Collections.emptyList(); + return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction()); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 6c71abcf23207..e1e0b95ac659e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.time.DateFormatters; -import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; @@ -90,7 +89,6 @@ public final class DataStream implements SimpleDiffable, ToXContentO private static final TransportVersion MAPPINGS_IN_DATA_STREAMS = TransportVersion.fromName("mappings_in_data_streams"); public static final NodeFeature DATA_STREAM_FAILURE_STORE_FEATURE = new NodeFeature("data_stream.failure_store"); - public static final boolean LOGS_STREAM_FEATURE_FLAG = new FeatureFlag("logs_stream").isEnabled(); public static final TransportVersion ADDED_FAILURE_STORE_TRANSPORT_VERSION = TransportVersions.V_8_12_0; public static final TransportVersion ADDED_AUTO_SHARDING_EVENT_VERSION = TransportVersions.V_8_14_0; public static final TransportVersion ADD_DATA_STREAM_OPTIONS_VERSION = TransportVersions.V_8_16_0; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java b/server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java index 38ec1c401148c..9063df9253b28 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java @@ -9,7 +9,6 @@ package org.elasticsearch.ingest; -import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.features.FeatureSpecification; import org.elasticsearch.features.NodeFeature; @@ -21,11 +20,7 @@ public class IngestFeatures implements FeatureSpecification { @Override public Set getFeatures() { - if (DataStream.LOGS_STREAM_FEATURE_FLAG) { - return Set.of(IngestService.FIELD_ACCESS_PATTERN); - } else { - return Set.of(); - } + return Set.of(IngestService.FIELD_ACCESS_PATTERN); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 2aa7090d706a7..2c0359e367086 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -132,12 +132,7 @@ public class IngestService implements ClusterStateApplier, ReportingService taskQueue; diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index 90e50842ef553..3e91de7513a6b 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.client.internal.node.NodeClient; -import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.core.Tuple; import org.elasticsearch.rest.BaseRestHandler; @@ -79,10 +78,6 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl @Override public Set supportedCapabilities() { // pipeline_tracking info: `{created,modified}_date` system properties defined within pipeline definition. - if (DataStream.LOGS_STREAM_FEATURE_FLAG) { - return Set.of("pipeline_tracking_info", "field_access_pattern.flexible"); - } else { - return Set.of("pipeline_tracking_info"); - } + return Set.of("pipeline_tracking_info", "field_access_pattern.flexible"); } } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index b9a7607563bd0..6460aff934b64 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -10,7 +10,6 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.CompoundProcessor; @@ -197,7 +196,7 @@ public void testParseWithProvidedPipeline() throws Exception { false, ingestService, RestApiVersion.current(), - (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG + (nodeFeature) -> true ); assertThat(actualRequest.verbose(), equalTo(false)); assertThat(actualRequest.documents().size(), equalTo(numDocs)); @@ -276,7 +275,7 @@ public void testNotValidDocs() { false, ingestService, RestApiVersion.current(), - (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG + (nodeFeature) -> true ) ); assertThat(e1.getMessage(), equalTo("must specify at least one document in [docs]")); @@ -294,7 +293,7 @@ public void testNotValidDocs() { false, ingestService, RestApiVersion.current(), - (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG + (nodeFeature) -> true ) ); assertThat(e2.getMessage(), equalTo("malformed [docs] section, should include an inner object")); @@ -310,7 +309,7 @@ public void testNotValidDocs() { false, ingestService, RestApiVersion.current(), - (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG + (nodeFeature) -> true ) ); assertThat(e3.getMessage(), containsString("required property is missing")); @@ -391,7 +390,7 @@ public void testIngestPipelineWithDocumentsWithType() throws Exception { false, ingestService, RestApiVersion.V_8, - (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG + (nodeFeature) -> true ); assertThat(actualRequest.verbose(), equalTo(false)); assertThat(actualRequest.documents().size(), equalTo(numDocs)); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java index 4d09ed8cae9ea..b5d1940c09d55 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java @@ -9,13 +9,11 @@ package org.elasticsearch.ingest; -import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentType; import org.hamcrest.Matchers; -import org.junit.Assume; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -2108,8 +2106,6 @@ public void testSourceHashMapIsNotCopied() { * restore the previous pipeline's access pattern. */ public void testNestedAccessPatternPropagation() { - Assume.assumeTrue(DataStream.LOGS_STREAM_FEATURE_FLAG); - Map source = new HashMap<>(Map.of("foo", 1)); IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index 3bfcd48b5f13e..f267cef75955b 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -10,7 +10,6 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.core.Tuple; import org.elasticsearch.script.ScriptService; @@ -24,7 +23,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assume.assumeTrue; import static org.mockito.Mockito.mock; public class PipelineFactoryTests extends ESTestCase { @@ -49,19 +47,10 @@ public void testCreate() throws Exception { pipelineConfig.put(Pipeline.DEPRECATED_KEY, deprecated); pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig0), Map.of("test", processorConfig1))); IngestPipelineFieldAccessPattern expectedAccessPattern = IngestPipelineFieldAccessPattern.CLASSIC; - if (DataStream.LOGS_STREAM_FEATURE_FLAG) { - expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values()); - pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, expectedAccessPattern.getKey()); - } + expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values()); + pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, expectedAccessPattern.getKey()); Map processorRegistry = Map.of("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create( - "_id", - pipelineConfig, - processorRegistry, - scriptService, - null, - nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG - ); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -82,7 +71,7 @@ public void testCreateWithNoProcessorsField() throws Exception { pipelineConfig.put(Pipeline.META_KEY, metadata); } try { - Pipeline.create("_id", pipelineConfig, Map.of(), scriptService, null, nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG); + Pipeline.create("_id", pipelineConfig, Map.of(), scriptService, null, nodeFeature -> true); fail("should fail, missing required [processors] field"); } catch (ElasticsearchParseException e) { assertThat(e.getMessage(), equalTo("[processors] required property is missing")); @@ -97,14 +86,7 @@ public void testCreateWithEmptyProcessorsField() throws Exception { pipelineConfig.put(Pipeline.META_KEY, metadata); } pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of()); - Pipeline pipeline = Pipeline.create( - "_id", - pipelineConfig, - null, - scriptService, - null, - nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG - ); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null, scriptService, null, nodeFeature -> true); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -122,14 +104,7 @@ public void testCreateWithPipelineOnFailure() throws Exception { pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, List.of(Map.of("test", processorConfig))); Map processorRegistry = Map.of("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create( - "_id", - pipelineConfig, - processorRegistry, - scriptService, - null, - nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG - ); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -152,14 +127,7 @@ public void testCreateWithPipelineEmptyOnFailure() throws Exception { Map processorRegistry = Map.of("test", new TestProcessor.Factory()); Exception e = expectThrows( ElasticsearchParseException.class, - () -> Pipeline.create( - "_id", - pipelineConfig, - processorRegistry, - scriptService, - null, - nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG - ) + () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true) ); assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined")); } @@ -177,14 +145,7 @@ public void testCreateWithPipelineEmptyOnFailureInProcessor() throws Exception { Map processorRegistry = Map.of("test", new TestProcessor.Factory()); Exception e = expectThrows( ElasticsearchParseException.class, - () -> Pipeline.create( - "_id", - pipelineConfig, - processorRegistry, - scriptService, - null, - nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG - ) + () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true) ); assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty")); } @@ -202,14 +163,7 @@ public void testCreateWithPipelineIgnoreFailure() throws Exception { } pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig))); - Pipeline pipeline = Pipeline.create( - "_id", - pipelineConfig, - processorRegistry, - scriptService, - null, - nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG - ); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -222,15 +176,12 @@ public void testCreateWithPipelineIgnoreFailure() throws Exception { } public void testCreateUnsupportedFieldAccessPattern() throws Exception { - assumeTrue("Test is only valid if the logs stream feature flag is enabled", DataStream.LOGS_STREAM_FEATURE_FLAG); Map processorConfig = new HashMap<>(); processorConfig.put(ConfigurationUtils.TAG_KEY, "test-processor"); Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); - if (DataStream.LOGS_STREAM_FEATURE_FLAG) { - pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, "random"); - } + pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, "random"); if (metadata != null) { pipelineConfig.put(Pipeline.META_KEY, metadata); } @@ -239,14 +190,7 @@ public void testCreateUnsupportedFieldAccessPattern() throws Exception { Exception e = expectThrows( ElasticsearchParseException.class, // All node features disabled - () -> Pipeline.create( - "_id", - pipelineConfig, - processorRegistry, - scriptService, - null, - nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG - ) + () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true) ); assertThat(e.getMessage(), equalTo("pipeline [_id] doesn't support value of [random] for parameter [field_access_pattern]")); } @@ -287,14 +231,7 @@ public void testCreateUnusedProcessorOptions() throws Exception { Map processorRegistry = Map.of("test", new TestProcessor.Factory()); Exception e = expectThrows( ElasticsearchParseException.class, - () -> Pipeline.create( - "_id", - pipelineConfig, - processorRegistry, - scriptService, - null, - nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG - ) + () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true) ); assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } @@ -311,14 +248,7 @@ public void testCreateProcessorsWithOnFailureProperties() throws Exception { } pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig))); Map processorRegistry = Map.of("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create( - "_id", - pipelineConfig, - processorRegistry, - scriptService, - null, - nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG - ); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java index 960e3e0a64507..8ba3fe338caa1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java @@ -19,7 +19,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; -import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; @@ -601,14 +600,7 @@ public void testCheck_GivenMissingDestPipeline() throws Exception { ); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); var projectId = randomProjectIdOrDefault(); - Pipeline pipeline = Pipeline.create( - "missing-pipeline", - pipelineConfig, - processorRegistry, - null, - projectId, - nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG - ); + Pipeline pipeline = Pipeline.create("missing-pipeline", pipelineConfig, processorRegistry, null, projectId, nodeFeature -> true); when(ingestService.getPipeline("missing-pipeline")).thenReturn(pipeline); assertValidation(