Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.ingest.otel;

import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -20,10 +19,6 @@ public class NormalizeForStreamPlugin extends Plugin implements IngestPlugin {

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory());
} else {
return Map.of();
}
return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.rest.streams;

import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -29,7 +28,6 @@
import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation;
import org.elasticsearch.rest.streams.logs.TransportStreamsStatusAction;

import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand All @@ -53,10 +51,7 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster,
Predicate<NodeFeature> clusterSupportsFeature
) {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction());
}
return Collections.emptyList();
return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -90,7 +89,6 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
private static final TransportVersion MAPPINGS_IN_DATA_STREAMS = TransportVersion.fromName("mappings_in_data_streams");

public static final NodeFeature DATA_STREAM_FAILURE_STORE_FEATURE = new NodeFeature("data_stream.failure_store");
public static final boolean LOGS_STREAM_FEATURE_FLAG = new FeatureFlag("logs_stream").isEnabled();
public static final TransportVersion ADDED_FAILURE_STORE_TRANSPORT_VERSION = TransportVersions.V_8_12_0;
public static final TransportVersion ADDED_AUTO_SHARDING_EVENT_VERSION = TransportVersions.V_8_14_0;
public static final TransportVersion ADD_DATA_STREAM_OPTIONS_VERSION = TransportVersions.V_8_16_0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.ingest;

import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.features.FeatureSpecification;
import org.elasticsearch.features.NodeFeature;

Expand All @@ -21,11 +20,7 @@ public class IngestFeatures implements FeatureSpecification {

@Override
public Set<NodeFeature> getFeatures() {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
return Set.of(IngestService.FIELD_ACCESS_PATTERN);
} else {
return Set.of();
}
return Set.of(IngestService.FIELD_ACCESS_PATTERN);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
* @return true if the node feature can be supported in the local library code, false if it is not supported
*/
public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
// logs_stream feature flag guard
return IngestService.FIELD_ACCESS_PATTERN.equals(nodeFeature);
}
// Default to unsupported if not contained here
return false;
return IngestService.FIELD_ACCESS_PATTERN.equals(nodeFeature);
}

private final MasterServiceTaskQueue<PipelineClusterStateUpdateTask> taskQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -79,10 +78,6 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl
@Override
public Set<String> supportedCapabilities() {
// pipeline_tracking info: `{created,modified}_date` system properties defined within pipeline definition.
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
return Set.of("pipeline_tracking_info", "field_access_pattern.flexible");
} else {
return Set.of("pipeline_tracking_info");
}
return Set.of("pipeline_tracking_info", "field_access_pattern.flexible");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.elasticsearch.action.ingest;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.CompoundProcessor;
Expand Down Expand Up @@ -197,7 +196,7 @@ public void testParseWithProvidedPipeline() throws Exception {
false,
ingestService,
RestApiVersion.current(),
(nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
(nodeFeature) -> true
);
assertThat(actualRequest.verbose(), equalTo(false));
assertThat(actualRequest.documents().size(), equalTo(numDocs));
Expand Down Expand Up @@ -276,7 +275,7 @@ public void testNotValidDocs() {
false,
ingestService,
RestApiVersion.current(),
(nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
(nodeFeature) -> true
)
);
assertThat(e1.getMessage(), equalTo("must specify at least one document in [docs]"));
Expand All @@ -294,7 +293,7 @@ public void testNotValidDocs() {
false,
ingestService,
RestApiVersion.current(),
(nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
(nodeFeature) -> true
)
);
assertThat(e2.getMessage(), equalTo("malformed [docs] section, should include an inner object"));
Expand All @@ -310,7 +309,7 @@ public void testNotValidDocs() {
false,
ingestService,
RestApiVersion.current(),
(nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
(nodeFeature) -> true
)
);
assertThat(e3.getMessage(), containsString("required property is missing"));
Expand Down Expand Up @@ -391,7 +390,7 @@ public void testIngestPipelineWithDocumentsWithType() throws Exception {
false,
ingestService,
RestApiVersion.V_8,
(nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
(nodeFeature) -> true
);
assertThat(actualRequest.verbose(), equalTo(false));
assertThat(actualRequest.documents().size(), equalTo(numDocs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@

package org.elasticsearch.ingest;

import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.hamcrest.Matchers;
import org.junit.Assume;
import org.junit.Before;
import org.mockito.ArgumentCaptor;

Expand Down Expand Up @@ -2108,8 +2106,6 @@ public void testSourceHashMapIsNotCopied() {
* restore the previous pipeline's access pattern.
*/
public void testNestedAccessPatternPropagation() {
Assume.assumeTrue(DataStream.LOGS_STREAM_FEATURE_FLAG);

Map<String, Object> source = new HashMap<>(Map.of("foo", 1));
IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.script.ScriptService;
Expand All @@ -24,7 +23,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;

public class PipelineFactoryTests extends ESTestCase {
Expand All @@ -49,19 +47,10 @@ public void testCreate() throws Exception {
pipelineConfig.put(Pipeline.DEPRECATED_KEY, deprecated);
pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig0), Map.of("test", processorConfig1)));
IngestPipelineFieldAccessPattern expectedAccessPattern = IngestPipelineFieldAccessPattern.CLASSIC;
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values());
pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, expectedAccessPattern.getKey());
}
expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values());
pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, expectedAccessPattern.getKey());
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
Pipeline pipeline = Pipeline.create(
"_id",
pipelineConfig,
processorRegistry,
scriptService,
null,
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
);
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
Expand All @@ -82,7 +71,7 @@ public void testCreateWithNoProcessorsField() throws Exception {
pipelineConfig.put(Pipeline.META_KEY, metadata);
}
try {
Pipeline.create("_id", pipelineConfig, Map.of(), scriptService, null, nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG);
Pipeline.create("_id", pipelineConfig, Map.of(), scriptService, null, nodeFeature -> true);
fail("should fail, missing required [processors] field");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
Expand All @@ -97,14 +86,7 @@ public void testCreateWithEmptyProcessorsField() throws Exception {
pipelineConfig.put(Pipeline.META_KEY, metadata);
}
pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of());
Pipeline pipeline = Pipeline.create(
"_id",
pipelineConfig,
null,
scriptService,
null,
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
);
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null, scriptService, null, nodeFeature -> true);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
Expand All @@ -122,14 +104,7 @@ public void testCreateWithPipelineOnFailure() throws Exception {
pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig)));
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, List.of(Map.of("test", processorConfig)));
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
Pipeline pipeline = Pipeline.create(
"_id",
pipelineConfig,
processorRegistry,
scriptService,
null,
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
);
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
Expand All @@ -152,14 +127,7 @@ public void testCreateWithPipelineEmptyOnFailure() throws Exception {
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
Exception e = expectThrows(
ElasticsearchParseException.class,
() -> Pipeline.create(
"_id",
pipelineConfig,
processorRegistry,
scriptService,
null,
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
)
() -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true)
);
assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined"));
}
Expand All @@ -177,14 +145,7 @@ public void testCreateWithPipelineEmptyOnFailureInProcessor() throws Exception {
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
Exception e = expectThrows(
ElasticsearchParseException.class,
() -> Pipeline.create(
"_id",
pipelineConfig,
processorRegistry,
scriptService,
null,
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
)
() -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true)
);
assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty"));
}
Expand All @@ -202,14 +163,7 @@ public void testCreateWithPipelineIgnoreFailure() throws Exception {
}
pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig)));

Pipeline pipeline = Pipeline.create(
"_id",
pipelineConfig,
processorRegistry,
scriptService,
null,
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
);
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
Expand All @@ -222,15 +176,12 @@ public void testCreateWithPipelineIgnoreFailure() throws Exception {
}

public void testCreateUnsupportedFieldAccessPattern() throws Exception {
assumeTrue("Test is only valid if the logs stream feature flag is enabled", DataStream.LOGS_STREAM_FEATURE_FLAG);
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put(ConfigurationUtils.TAG_KEY, "test-processor");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, "random");
}
pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, "random");
if (metadata != null) {
pipelineConfig.put(Pipeline.META_KEY, metadata);
}
Expand All @@ -239,14 +190,7 @@ public void testCreateUnsupportedFieldAccessPattern() throws Exception {
Exception e = expectThrows(
ElasticsearchParseException.class,
// All node features disabled
() -> Pipeline.create(
"_id",
pipelineConfig,
processorRegistry,
scriptService,
null,
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
)
() -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true)
);
assertThat(e.getMessage(), equalTo("pipeline [_id] doesn't support value of [random] for parameter [field_access_pattern]"));
}
Expand Down Expand Up @@ -287,14 +231,7 @@ public void testCreateUnusedProcessorOptions() throws Exception {
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
Exception e = expectThrows(
ElasticsearchParseException.class,
() -> Pipeline.create(
"_id",
pipelineConfig,
processorRegistry,
scriptService,
null,
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
)
() -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true)
);
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
Expand All @@ -311,14 +248,7 @@ public void testCreateProcessorsWithOnFailureProperties() throws Exception {
}
pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig)));
Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
Pipeline pipeline = Pipeline.create(
"_id",
pipelineConfig,
processorRegistry,
scriptService,
null,
nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
);
Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> true);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
Expand Down
Loading