diff --git a/docs/changelog/122409.yaml b/docs/changelog/122409.yaml new file mode 100644 index 0000000000000..d54be44f02f8b --- /dev/null +++ b/docs/changelog/122409.yaml @@ -0,0 +1,6 @@ +pr: 122409 +summary: Allow setting the `type` in the reroute processor +area: Ingest Node +type: enhancement +issues: + - 121553 diff --git a/docs/reference/ingest/processors/reroute.asciidoc b/docs/reference/ingest/processors/reroute.asciidoc index 482ff3b1cc116..03fc62dc0b138 100644 --- a/docs/reference/ingest/processors/reroute.asciidoc +++ b/docs/reference/ingest/processors/reroute.asciidoc @@ -45,6 +45,9 @@ Otherwise, the document will be rejected with a security exception which looks l |====== | Name | Required | Default | Description | `destination` | no | - | A static value for the target. Can't be set when the `dataset` or `namespace` option is set. +| `type` | no | `{{data_stream.type}}` a| Field references or a static value for the type part of the data stream name. In addition to the criteria for <>, cannot contain `-` and must be no longer than 100 characters. Example values are `logs` and `metrics`. + +Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value. | `dataset` | no | `{{data_stream.dataset}}` a| Field references or a static value for the dataset part of the data stream name. In addition to the criteria for <>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`. Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value. diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java index 52e0aae1116cc..57479749e1794 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java @@ -26,6 +26,7 @@ import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.DATASET_VALUE_SOURCE; import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.NAMESPACE_VALUE_SOURCE; +import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.TYPE_VALUE_SOURCE; public final class RerouteProcessor extends AbstractProcessor { @@ -39,6 +40,7 @@ public final class RerouteProcessor extends AbstractProcessor { private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset"; private static final String DATA_STREAM_NAMESPACE = DATA_STREAM_PREFIX + "namespace"; private static final String EVENT_DATASET = "event.dataset"; + private final List type; private final List dataset; private final List namespace; private final String destination; @@ -46,11 +48,17 @@ public final class RerouteProcessor extends AbstractProcessor { RerouteProcessor( String tag, String description, + List type, List dataset, List namespace, String destination ) { super(tag, description); + if (type.isEmpty()) { + this.type = List.of(TYPE_VALUE_SOURCE); + } else { + this.type = type; + } if (dataset.isEmpty()) { this.dataset = List.of(DATASET_VALUE_SOURCE); } else { @@ -71,7 +79,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { return ingestDocument; } final String indexName = ingestDocument.getFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), String.class); - final String type; + final String currentType; final String currentDataset; final String currentNamespace; @@ -84,10 +92,11 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { if (indexOfSecondDash < 0) { throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName)); } - type = parseDataStreamType(indexName, indexOfFirstDash); + currentType = parseDataStreamType(indexName, indexOfFirstDash); currentDataset = parseDataStreamDataset(indexName, indexOfFirstDash, indexOfSecondDash); currentNamespace = parseDataStreamNamespace(indexName, indexOfSecondDash); + String type = determineDataStreamField(ingestDocument, this.type, currentType); String dataset = determineDataStreamField(ingestDocument, this.dataset, currentDataset); String namespace = determineDataStreamField(ingestDocument, this.namespace, currentNamespace); String newTarget = type + "-" + dataset + "-" + namespace; @@ -168,6 +177,15 @@ public RerouteProcessor create( String description, Map config ) throws Exception { + List type; + try { + type = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "type") + .stream() + .map(DataStreamValueSource::type) + .toList(); + } catch (IllegalArgumentException e) { + throw newConfigurationException(TYPE, tag, "type", e.getMessage()); + } List dataset; try { dataset = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "dataset") @@ -188,11 +206,11 @@ public RerouteProcessor create( } String destination = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "destination"); - if (destination != null && (dataset.isEmpty() == false || namespace.isEmpty() == false)) { - throw newConfigurationException(TYPE, tag, "destination", "can only be set if dataset and namespace are not set"); + if (destination != null && (type.isEmpty() == false || dataset.isEmpty() == false || namespace.isEmpty() == false)) { + throw newConfigurationException(TYPE, tag, "destination", "can only be set if type, dataset, and namespace are not set"); } - return new RerouteProcessor(tag, description, dataset, namespace, destination); + return new RerouteProcessor(tag, description, type, dataset, namespace, destination); } } @@ -203,8 +221,10 @@ static final class DataStreamValueSource { private static final int MAX_LENGTH = 100; private static final String REPLACEMENT = "_"; + private static final Pattern DISALLOWED_IN_TYPE = Pattern.compile("[\\\\/*?\"<>| ,#:-]"); private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]"); private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]"); + static final DataStreamValueSource TYPE_VALUE_SOURCE = type("{{" + DATA_STREAM_TYPE + "}}"); static final DataStreamValueSource DATASET_VALUE_SOURCE = dataset("{{" + DATA_STREAM_DATASET + "}}"); static final DataStreamValueSource NAMESPACE_VALUE_SOURCE = namespace("{{" + DATA_STREAM_NAMESPACE + "}}"); @@ -212,6 +232,10 @@ static final class DataStreamValueSource { private final String fieldReference; private final Function sanitizer; + public static DataStreamValueSource type(String type) { + return new DataStreamValueSource(type, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_TYPE)); + } + public static DataStreamValueSource dataset(String dataset) { return new DataStreamValueSource(dataset, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_DATASET)); } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java index 07e8ba405736b..e83093809cf01 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java @@ -47,7 +47,7 @@ public void testDestinationAndDataset() { ElasticsearchParseException.class, () -> create(Map.of("destination", "foo", "dataset", "bar")) ); - assertThat(e.getMessage(), equalTo("[destination] can only be set if dataset and namespace are not set")); + assertThat(e.getMessage(), equalTo("[destination] can only be set if type, dataset, and namespace are not set")); } public void testFieldReference() throws Exception { diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java index 084be1c2b227d..55eebbc160486 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java @@ -27,16 +27,25 @@ public class RerouteProcessorTests extends ESTestCase { public void testDefaults() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); - RerouteProcessor processor = createRerouteProcessor(List.of(), List.of()); + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of(), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "generic", "default"); } + public void testRouteOnType() throws Exception { + IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); + ingestDocument.setFieldValue("event.type", "foo"); + + RerouteProcessor processor = createRerouteProcessor(List.of("{{event.type}}"), List.of(), List.of()); + processor.execute(ingestDocument); + assertDataSetFields(ingestDocument, "foo", "generic", "default"); + } + public void testEventDataset() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); ingestDocument.setFieldValue("event.dataset", "foo"); - RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset }}"), List.of()); + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{event.dataset }}"), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "foo", "default"); assertThat(ingestDocument.getFieldValue("event.dataset", String.class), equalTo("foo")); @@ -46,7 +55,7 @@ public void testEventDatasetDottedFieldName() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); ingestDocument.getCtxMap().put("event.dataset", "foo"); - RerouteProcessor processor = createRerouteProcessor(List.of("{{ event.dataset}}"), List.of()); + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{ event.dataset}}"), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "foo", "default"); assertThat(ingestDocument.getCtxMap().get("event.dataset"), equalTo("foo")); @@ -57,7 +66,7 @@ public void testNoDataset() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); ingestDocument.setFieldValue("ds", "foo"); - RerouteProcessor processor = createRerouteProcessor(List.of("{{ ds }}"), List.of()); + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{ ds }}"), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "foo", "default"); assertFalse(ingestDocument.hasField("event.dataset")); @@ -66,8 +75,8 @@ public void testNoDataset() throws Exception { public void testSkipFirstProcessor() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); - RerouteProcessor skippedProcessor = createRerouteProcessor(List.of("skip"), List.of()); - RerouteProcessor executedProcessor = createRerouteProcessor(List.of("executed"), List.of()); + RerouteProcessor skippedProcessor = createRerouteProcessor(List.of(), List.of("skip"), List.of()); + RerouteProcessor executedProcessor = createRerouteProcessor(List.of(), List.of("executed"), List.of()); CompoundProcessor processor = new CompoundProcessor(new SkipProcessor(skippedProcessor), executedProcessor); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "executed", "default"); @@ -76,8 +85,8 @@ public void testSkipFirstProcessor() throws Exception { public void testSkipLastProcessor() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); - RerouteProcessor executedProcessor = createRerouteProcessor(List.of("executed"), List.of()); - RerouteProcessor skippedProcessor = createRerouteProcessor(List.of("skip"), List.of()); + RerouteProcessor executedProcessor = createRerouteProcessor(List.of(), List.of("executed"), List.of()); + RerouteProcessor skippedProcessor = createRerouteProcessor(List.of(), List.of("skip"), List.of()); CompoundProcessor processor = new CompoundProcessor(executedProcessor, skippedProcessor); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "executed", "default"); @@ -85,23 +94,24 @@ public void testSkipLastProcessor() throws Exception { public void testDataStreamFieldsFromDocument() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); + ingestDocument.setFieldValue("data_stream.type", "eggplant"); ingestDocument.setFieldValue("data_stream.dataset", "foo"); ingestDocument.setFieldValue("data_stream.namespace", "bar"); - RerouteProcessor processor = createRerouteProcessor(List.of(), List.of()); + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of(), List.of()); processor.execute(ingestDocument); - assertDataSetFields(ingestDocument, "logs", "foo", "bar"); + assertDataSetFields(ingestDocument, "eggplant", "foo", "bar"); } public void testDataStreamFieldsFromDocumentDottedNotation() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); - ingestDocument.getCtxMap().put("data_stream.type", "logs"); + ingestDocument.getCtxMap().put("data_stream.type", "eggplant"); ingestDocument.getCtxMap().put("data_stream.dataset", "foo"); ingestDocument.getCtxMap().put("data_stream.namespace", "bar"); - RerouteProcessor processor = createRerouteProcessor(List.of(), List.of()); + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of(), List.of()); processor.execute(ingestDocument); - assertDataSetFields(ingestDocument, "logs", "foo", "bar"); + assertDataSetFields(ingestDocument, "eggplant", "foo", "bar"); } public void testInvalidDataStreamFieldsFromDocument() throws Exception { @@ -109,7 +119,7 @@ public void testInvalidDataStreamFieldsFromDocument() throws Exception { ingestDocument.setFieldValue("data_stream.dataset", "foo-bar"); ingestDocument.setFieldValue("data_stream.namespace", "baz#qux"); - RerouteProcessor processor = createRerouteProcessor(List.of(), List.of()); + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of(), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "foo_bar", "baz_qux"); } @@ -128,7 +138,7 @@ public void testFieldReference() throws Exception { ingestDocument.setFieldValue("service.name", "opbeans-java"); ingestDocument.setFieldValue("service.environment", "dev"); - RerouteProcessor processor = createRerouteProcessor(List.of("{{service.name}}"), List.of("{{service.environment}}")); + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{service.name}}"), List.of("{{service.environment}}")); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "opbeans_java", "dev"); } @@ -136,7 +146,7 @@ public void testFieldReference() throws Exception { public void testRerouteToCurrentTarget() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); - RerouteProcessor reroute = createRerouteProcessor(List.of("generic"), List.of("default")); + RerouteProcessor reroute = createRerouteProcessor(List.of(), List.of("generic"), List.of("default")); CompoundProcessor processor = new CompoundProcessor( reroute, new TestProcessor(doc -> doc.setFieldValue("pipeline_is_continued", true)) @@ -149,7 +159,7 @@ public void testRerouteToCurrentTarget() throws Exception { public void testFieldReferenceWithMissingReroutesToCurrentTarget() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); - RerouteProcessor reroute = createRerouteProcessor(List.of("{{service.name}}"), List.of("{{service.environment}}")); + RerouteProcessor reroute = createRerouteProcessor(List.of(), List.of("{{service.name}}"), List.of("{{service.environment}}")); CompoundProcessor processor = new CompoundProcessor( reroute, new TestProcessor(doc -> doc.setFieldValue("pipeline_is_continued", true)) @@ -166,6 +176,7 @@ public void testDataStreamFieldReference() throws Exception { ingestDocument.setFieldValue("data_stream.namespace", "namespace_from_doc"); RerouteProcessor processor = createRerouteProcessor( + List.of(), List.of("{{{data_stream.dataset}}}", "fallback"), List.of("{{data_stream.namespace}}", "fallback") ); @@ -177,6 +188,7 @@ public void testDatasetFieldReferenceMissingValue() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); RerouteProcessor processor = createRerouteProcessor( + List.of(), List.of("{{data_stream.dataset}}", "fallback"), List.of("{{data_stream.namespace}}", "fallback") ); @@ -190,6 +202,7 @@ public void testDatasetFieldReference() throws Exception { ingestDocument.setFieldValue("data_stream.namespace", "default"); RerouteProcessor processor = createRerouteProcessor( + List.of(), List.of("{{data_stream.dataset}}", "fallback"), List.of("{{{data_stream.namespace}}}", "fallback") ); @@ -202,7 +215,7 @@ public void testFallbackToValuesFrom_index() throws Exception { ingestDocument.setFieldValue("data_stream.dataset", "foo"); ingestDocument.setFieldValue("data_stream.namespace", "bar"); - RerouteProcessor processor = createRerouteProcessor(List.of("{{foo}}"), List.of("{{bar}}")); + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{foo}}"), List.of("{{bar}}")); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "generic", "default"); } @@ -210,7 +223,7 @@ public void testFallbackToValuesFrom_index() throws Exception { public void testInvalidDataStreamName() throws Exception { { IngestDocument ingestDocument = createIngestDocument("foo"); - RerouteProcessor processor = createRerouteProcessor(List.of(), List.of()); + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of(), List.of()); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); assertThat(e.getMessage(), equalTo("invalid data stream name: [foo]; must follow naming scheme --")); } @@ -227,11 +240,16 @@ public void testInvalidDataStreamName() throws Exception { public void testRouteOnNonStringFieldFails() { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); ingestDocument.setFieldValue("numeric_field", 42); - RerouteProcessor processor = createRerouteProcessor(List.of("{{numeric_field}}"), List.of()); + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{numeric_field}}"), List.of()); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); assertThat(e.getMessage(), equalTo("field [numeric_field] of type [java.lang.Integer] cannot be cast to [java.lang.String]")); } + public void testTypeSanitization() { + assertTypeSanitization("\\/*?\"<>| ,#:-", "_____________"); + assertTypeSanitization("foo*bar", "foo_bar"); + } + public void testDatasetSanitization() { assertDatasetSanitization("\\/*?\"<>| ,#:-", "_____________"); assertDatasetSanitization("foo*bar", "foo_bar"); @@ -242,6 +260,14 @@ public void testNamespaceSanitization() { assertNamespaceSanitization("foo*bar", "foo_bar"); } + private static void assertTypeSanitization(String type, String sanitizedType) { + assertThat( + RerouteProcessor.DataStreamValueSource.type("{{foo}}") + .resolve(RandomDocumentPicks.randomIngestDocument(random(), Map.of("foo", type))), + equalTo(sanitizedType) + ); + } + private static void assertDatasetSanitization(String dataset, String sanitizedDataset) { assertThat( RerouteProcessor.DataStreamValueSource.dataset("{{foo}}") @@ -258,10 +284,11 @@ private static void assertNamespaceSanitization(String namespace, String sanitiz ); } - private RerouteProcessor createRerouteProcessor(List dataset, List namespace) { + private RerouteProcessor createRerouteProcessor(List type, List dataset, List namespace) { return new RerouteProcessor( null, null, + type.stream().map(RerouteProcessor.DataStreamValueSource::type).toList(), dataset.stream().map(RerouteProcessor.DataStreamValueSource::dataset).toList(), namespace.stream().map(RerouteProcessor.DataStreamValueSource::namespace).toList(), null @@ -269,7 +296,7 @@ private RerouteProcessor createRerouteProcessor(List dataset, List