Skip to content

Commit 973ec0a

Browse files
masseykegmjehovich
authored andcommitted
Removing the logs stream feature flag (elastic#134649)
1 parent e83b779 commit 973ec0a

File tree

10 files changed

+24
-134
lines changed

10 files changed

+24
-134
lines changed

modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.ingest.otel;
1111

12-
import org.elasticsearch.cluster.metadata.DataStream;
1312
import org.elasticsearch.ingest.Processor;
1413
import org.elasticsearch.plugins.IngestPlugin;
1514
import org.elasticsearch.plugins.Plugin;
@@ -20,10 +19,6 @@ public class NormalizeForStreamPlugin extends Plugin implements IngestPlugin {
2019

2120
@Override
2221
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
23-
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
24-
return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory());
25-
} else {
26-
return Map.of();
27-
}
22+
return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory());
2823
}
2924
}

modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.rest.streams;
1111

12-
import org.elasticsearch.cluster.metadata.DataStream;
1312
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1413
import org.elasticsearch.cluster.node.DiscoveryNodes;
1514
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -29,7 +28,6 @@
2928
import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation;
3029
import org.elasticsearch.rest.streams.logs.TransportStreamsStatusAction;
3130

32-
import java.util.Collections;
3331
import java.util.List;
3432
import java.util.function.Predicate;
3533
import java.util.function.Supplier;
@@ -53,10 +51,7 @@ public List<RestHandler> getRestHandlers(
5351
Supplier<DiscoveryNodes> nodesInCluster,
5452
Predicate<NodeFeature> clusterSupportsFeature
5553
) {
56-
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
57-
return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction());
58-
}
59-
return Collections.emptyList();
54+
return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction());
6055
}
6156

6257
@Override

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.elasticsearch.common.settings.Settings;
3636
import org.elasticsearch.common.time.DateFormatter;
3737
import org.elasticsearch.common.time.DateFormatters;
38-
import org.elasticsearch.common.util.FeatureFlag;
3938
import org.elasticsearch.common.xcontent.XContentHelper;
4039
import org.elasticsearch.core.FixForMultiProject;
4140
import org.elasticsearch.core.Nullable;
@@ -90,7 +89,6 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
9089
private static final TransportVersion MAPPINGS_IN_DATA_STREAMS = TransportVersion.fromName("mappings_in_data_streams");
9190

9291
public static final NodeFeature DATA_STREAM_FAILURE_STORE_FEATURE = new NodeFeature("data_stream.failure_store");
93-
public static final boolean LOGS_STREAM_FEATURE_FLAG = new FeatureFlag("logs_stream").isEnabled();
9492
public static final TransportVersion ADDED_FAILURE_STORE_TRANSPORT_VERSION = TransportVersions.V_8_12_0;
9593
public static final TransportVersion ADDED_AUTO_SHARDING_EVENT_VERSION = TransportVersions.V_8_14_0;
9694
public static final TransportVersion ADD_DATA_STREAM_OPTIONS_VERSION = TransportVersions.V_8_16_0;

server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.ingest;
1111

12-
import org.elasticsearch.cluster.metadata.DataStream;
1312
import org.elasticsearch.features.FeatureSpecification;
1413
import org.elasticsearch.features.NodeFeature;
1514

@@ -21,11 +20,7 @@ public class IngestFeatures implements FeatureSpecification {
2120

2221
@Override
2322
public Set<NodeFeature> getFeatures() {
24-
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
25-
return Set.of(IngestService.FIELD_ACCESS_PATTERN);
26-
} else {
27-
return Set.of();
28-
}
23+
return Set.of(IngestService.FIELD_ACCESS_PATTERN);
2924
}
3025

3126
@Override

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
132132
* @return true if the node feature can be supported in the local library code, false if it is not supported
133133
*/
134134
public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
135-
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
136-
// logs_stream feature flag guard
137-
return IngestService.FIELD_ACCESS_PATTERN.equals(nodeFeature);
138-
}
139-
// Default to unsupported if not contained here
140-
return false;
135+
return IngestService.FIELD_ACCESS_PATTERN.equals(nodeFeature);
141136
}
142137

143138
private final MasterServiceTaskQueue<PipelineClusterStateUpdateTask> taskQueue;

server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.action.ingest.PutPipelineRequest;
1414
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
1515
import org.elasticsearch.client.internal.node.NodeClient;
16-
import org.elasticsearch.cluster.metadata.DataStream;
1716
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1817
import org.elasticsearch.core.Tuple;
1918
import org.elasticsearch.rest.BaseRestHandler;
@@ -79,10 +78,6 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl
7978
@Override
8079
public Set<String> supportedCapabilities() {
8180
// pipeline_tracking info: `{created,modified}_date` system properties defined within pipeline definition.
82-
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
83-
return Set.of("pipeline_tracking_info", "field_access_pattern.flexible");
84-
} else {
85-
return Set.of("pipeline_tracking_info");
86-
}
81+
return Set.of("pipeline_tracking_info", "field_access_pattern.flexible");
8782
}
8883
}

server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.action.ingest;
1111

1212
import org.elasticsearch.ElasticsearchParseException;
13-
import org.elasticsearch.cluster.metadata.DataStream;
1413
import org.elasticsearch.core.RestApiVersion;
1514
import org.elasticsearch.index.VersionType;
1615
import org.elasticsearch.ingest.CompoundProcessor;
@@ -197,7 +196,7 @@ public void testParseWithProvidedPipeline() throws Exception {
197196
false,
198197
ingestService,
199198
RestApiVersion.current(),
200-
(nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
199+
(nodeFeature) -> true
201200
);
202201
assertThat(actualRequest.verbose(), equalTo(false));
203202
assertThat(actualRequest.documents().size(), equalTo(numDocs));
@@ -276,7 +275,7 @@ public void testNotValidDocs() {
276275
false,
277276
ingestService,
278277
RestApiVersion.current(),
279-
(nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
278+
(nodeFeature) -> true
280279
)
281280
);
282281
assertThat(e1.getMessage(), equalTo("must specify at least one document in [docs]"));
@@ -294,7 +293,7 @@ public void testNotValidDocs() {
294293
false,
295294
ingestService,
296295
RestApiVersion.current(),
297-
(nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
296+
(nodeFeature) -> true
298297
)
299298
);
300299
assertThat(e2.getMessage(), equalTo("malformed [docs] section, should include an inner object"));
@@ -310,7 +309,7 @@ public void testNotValidDocs() {
310309
false,
311310
ingestService,
312311
RestApiVersion.current(),
313-
(nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
312+
(nodeFeature) -> true
314313
)
315314
);
316315
assertThat(e3.getMessage(), containsString("required property is missing"));
@@ -391,7 +390,7 @@ public void testIngestPipelineWithDocumentsWithType() throws Exception {
391390
false,
392391
ingestService,
393392
RestApiVersion.V_8,
394-
(nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
393+
(nodeFeature) -> true
395394
);
396395
assertThat(actualRequest.verbose(), equalTo(false));
397396
assertThat(actualRequest.documents().size(), equalTo(numDocs));

server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
package org.elasticsearch.ingest;
1111

12-
import org.elasticsearch.cluster.metadata.DataStream;
1312
import org.elasticsearch.common.bytes.BytesArray;
1413
import org.elasticsearch.common.xcontent.XContentHelper;
1514
import org.elasticsearch.test.ESTestCase;
1615
import org.elasticsearch.xcontent.XContentType;
1716
import org.hamcrest.Matchers;
18-
import org.junit.Assume;
1917
import org.junit.Before;
2018
import org.mockito.ArgumentCaptor;
2119

@@ -2108,8 +2106,6 @@ public void testSourceHashMapIsNotCopied() {
21082106
* restore the previous pipeline's access pattern.
21092107
*/
21102108
public void testNestedAccessPatternPropagation() {
2111-
Assume.assumeTrue(DataStream.LOGS_STREAM_FEATURE_FLAG);
2112-
21132109
Map<String, Object> source = new HashMap<>(Map.of("foo", 1));
21142110
IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
21152111

server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java

Lines changed: 13 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.ingest;
1111

1212
import org.elasticsearch.ElasticsearchParseException;
13-
import org.elasticsearch.cluster.metadata.DataStream;
1413
import org.elasticsearch.cluster.metadata.ProjectId;
1514
import org.elasticsearch.core.Tuple;
1615
import org.elasticsearch.script.ScriptService;
@@ -24,7 +23,6 @@
2423
import static org.hamcrest.Matchers.equalTo;
2524
import static org.hamcrest.Matchers.is;
2625
import static org.hamcrest.Matchers.nullValue;
27-
import static org.junit.Assume.assumeTrue;
2826
import static org.mockito.Mockito.mock;
2927

3028
public class PipelineFactoryTests extends ESTestCase {
@@ -49,19 +47,10 @@ public void testCreate() throws Exception {
4947
pipelineConfig.put(Pipeline.DEPRECATED_KEY, deprecated);
5048
pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig0), Map.of("test", processorConfig1)));
5149
IngestPipelineFieldAccessPattern expectedAccessPattern = IngestPipelineFieldAccessPattern.CLASSIC;
52-
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
53-
expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values());
54-
pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, expectedAccessPattern.getKey());
55-
}
50+
expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values());
51+
pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, expectedAccessPattern.getKey());
5652
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
57-
Pipeline pipeline = Pipeline.create(
58-
"_id",
59-
pipelineConfig,
60-
processorRegistry,
61-
scriptService,
62-
null,
63-
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
64-
);
53+
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true);
6554
assertThat(pipeline.getId(), equalTo("_id"));
6655
assertThat(pipeline.getDescription(), equalTo("_description"));
6756
assertThat(pipeline.getVersion(), equalTo(version));
@@ -82,7 +71,7 @@ public void testCreateWithNoProcessorsField() throws Exception {
8271
pipelineConfig.put(Pipeline.META_KEY, metadata);
8372
}
8473
try {
85-
Pipeline.create("_id", pipelineConfig, Map.of(), scriptService, null, nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG);
74+
Pipeline.create("_id", pipelineConfig, Map.of(), scriptService, null, nodeFeature -> true);
8675
fail("should fail, missing required [processors] field");
8776
} catch (ElasticsearchParseException e) {
8877
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
@@ -97,14 +86,7 @@ public void testCreateWithEmptyProcessorsField() throws Exception {
9786
pipelineConfig.put(Pipeline.META_KEY, metadata);
9887
}
9988
pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of());
100-
Pipeline pipeline = Pipeline.create(
101-
"_id",
102-
pipelineConfig,
103-
null,
104-
scriptService,
105-
null,
106-
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
107-
);
89+
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null, scriptService, null, nodeFeature -> true);
10890
assertThat(pipeline.getId(), equalTo("_id"));
10991
assertThat(pipeline.getDescription(), equalTo("_description"));
11092
assertThat(pipeline.getVersion(), equalTo(version));
@@ -122,14 +104,7 @@ public void testCreateWithPipelineOnFailure() throws Exception {
122104
pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig)));
123105
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, List.of(Map.of("test", processorConfig)));
124106
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
125-
Pipeline pipeline = Pipeline.create(
126-
"_id",
127-
pipelineConfig,
128-
processorRegistry,
129-
scriptService,
130-
null,
131-
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
132-
);
107+
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true);
133108
assertThat(pipeline.getId(), equalTo("_id"));
134109
assertThat(pipeline.getDescription(), equalTo("_description"));
135110
assertThat(pipeline.getVersion(), equalTo(version));
@@ -152,14 +127,7 @@ public void testCreateWithPipelineEmptyOnFailure() throws Exception {
152127
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
153128
Exception e = expectThrows(
154129
ElasticsearchParseException.class,
155-
() -> Pipeline.create(
156-
"_id",
157-
pipelineConfig,
158-
processorRegistry,
159-
scriptService,
160-
null,
161-
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
162-
)
130+
() -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true)
163131
);
164132
assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined"));
165133
}
@@ -177,14 +145,7 @@ public void testCreateWithPipelineEmptyOnFailureInProcessor() throws Exception {
177145
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
178146
Exception e = expectThrows(
179147
ElasticsearchParseException.class,
180-
() -> Pipeline.create(
181-
"_id",
182-
pipelineConfig,
183-
processorRegistry,
184-
scriptService,
185-
null,
186-
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
187-
)
148+
() -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true)
188149
);
189150
assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty"));
190151
}
@@ -202,14 +163,7 @@ public void testCreateWithPipelineIgnoreFailure() throws Exception {
202163
}
203164
pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig)));
204165

205-
Pipeline pipeline = Pipeline.create(
206-
"_id",
207-
pipelineConfig,
208-
processorRegistry,
209-
scriptService,
210-
null,
211-
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
212-
);
166+
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true);
213167
assertThat(pipeline.getId(), equalTo("_id"));
214168
assertThat(pipeline.getDescription(), equalTo("_description"));
215169
assertThat(pipeline.getVersion(), equalTo(version));
@@ -222,15 +176,12 @@ public void testCreateWithPipelineIgnoreFailure() throws Exception {
222176
}
223177

224178
public void testCreateUnsupportedFieldAccessPattern() throws Exception {
225-
assumeTrue("Test is only valid if the logs stream feature flag is enabled", DataStream.LOGS_STREAM_FEATURE_FLAG);
226179
Map<String, Object> processorConfig = new HashMap<>();
227180
processorConfig.put(ConfigurationUtils.TAG_KEY, "test-processor");
228181
Map<String, Object> pipelineConfig = new HashMap<>();
229182
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
230183
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
231-
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
232-
pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, "random");
233-
}
184+
pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, "random");
234185
if (metadata != null) {
235186
pipelineConfig.put(Pipeline.META_KEY, metadata);
236187
}
@@ -239,14 +190,7 @@ public void testCreateUnsupportedFieldAccessPattern() throws Exception {
239190
Exception e = expectThrows(
240191
ElasticsearchParseException.class,
241192
// All node features disabled
242-
() -> Pipeline.create(
243-
"_id",
244-
pipelineConfig,
245-
processorRegistry,
246-
scriptService,
247-
null,
248-
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
249-
)
193+
() -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true)
250194
);
251195
assertThat(e.getMessage(), equalTo("pipeline [_id] doesn't support value of [random] for parameter [field_access_pattern]"));
252196
}
@@ -287,14 +231,7 @@ public void testCreateUnusedProcessorOptions() throws Exception {
287231
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
288232
Exception e = expectThrows(
289233
ElasticsearchParseException.class,
290-
() -> Pipeline.create(
291-
"_id",
292-
pipelineConfig,
293-
processorRegistry,
294-
scriptService,
295-
null,
296-
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
297-
)
234+
() -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true)
298235
);
299236
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
300237
}
@@ -311,14 +248,7 @@ public void testCreateProcessorsWithOnFailureProperties() throws Exception {
311248
}
312249
pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig)));
313250
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
314-
Pipeline pipeline = Pipeline.create(
315-
"_id",
316-
pipelineConfig,
317-
processorRegistry,
318-
scriptService,
319-
null,
320-
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
321-
);
251+
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true);
322252
assertThat(pipeline.getId(), equalTo("_id"));
323253
assertThat(pipeline.getDescription(), equalTo("_description"));
324254
assertThat(pipeline.getVersion(), equalTo(version));

0 commit comments

Comments
 (0)