Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/122409.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 122409
summary: Allow setting the `type` in the reroute processor
area: Ingest Node
type: enhancement
issues:
- 121553
3 changes: 3 additions & 0 deletions docs/reference/ingest/processors/reroute.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ Otherwise, the document will be rejected with a security exception which looks l
|======
| Name | Required | Default | Description
| `destination` | no | - | A static value for the target. Can't be set when the `dataset` or `namespace` option is set.
| `type` | no | `{{data_stream.type}}` a| Field references or a static value for the type part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `logs` and `metrics`.

Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<type>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
| `dataset` | no | `{{data_stream.dataset}}` a| Field references or a static value for the dataset part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`.

Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<dataset>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.DATASET_VALUE_SOURCE;
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.NAMESPACE_VALUE_SOURCE;
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.TYPE_VALUE_SOURCE;

public final class RerouteProcessor extends AbstractProcessor {

Expand All @@ -39,18 +40,25 @@ public final class RerouteProcessor extends AbstractProcessor {
private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset";
private static final String DATA_STREAM_NAMESPACE = DATA_STREAM_PREFIX + "namespace";
private static final String EVENT_DATASET = "event.dataset";
private final List<DataStreamValueSource> type;
private final List<DataStreamValueSource> dataset;
private final List<DataStreamValueSource> namespace;
private final String destination;

RerouteProcessor(
String tag,
String description,
List<DataStreamValueSource> type,
List<DataStreamValueSource> dataset,
List<DataStreamValueSource> namespace,
String destination
) {
super(tag, description);
if (type.isEmpty()) {
this.type = List.of(TYPE_VALUE_SOURCE);
} else {
this.type = type;
}
if (dataset.isEmpty()) {
this.dataset = List.of(DATASET_VALUE_SOURCE);
} else {
Expand All @@ -71,7 +79,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return ingestDocument;
}
final String indexName = ingestDocument.getFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), String.class);
final String type;
final String currentType;
final String currentDataset;
final String currentNamespace;

Expand All @@ -84,10 +92,11 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
if (indexOfSecondDash < 0) {
throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
}
type = parseDataStreamType(indexName, indexOfFirstDash);
currentType = parseDataStreamType(indexName, indexOfFirstDash);
currentDataset = parseDataStreamDataset(indexName, indexOfFirstDash, indexOfSecondDash);
currentNamespace = parseDataStreamNamespace(indexName, indexOfSecondDash);

String type = determineDataStreamField(ingestDocument, this.type, currentType);
String dataset = determineDataStreamField(ingestDocument, this.dataset, currentDataset);
String namespace = determineDataStreamField(ingestDocument, this.namespace, currentNamespace);
String newTarget = type + "-" + dataset + "-" + namespace;
Expand Down Expand Up @@ -168,6 +177,15 @@ public RerouteProcessor create(
String description,
Map<String, Object> config
) throws Exception {
List<DataStreamValueSource> type;
try {
type = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "type")
.stream()
.map(DataStreamValueSource::type)
.toList();
} catch (IllegalArgumentException e) {
throw newConfigurationException(TYPE, tag, "type", e.getMessage());
}
List<DataStreamValueSource> dataset;
try {
dataset = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "dataset")
Expand All @@ -188,11 +206,11 @@ public RerouteProcessor create(
}

String destination = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "destination");
if (destination != null && (dataset.isEmpty() == false || namespace.isEmpty() == false)) {
throw newConfigurationException(TYPE, tag, "destination", "can only be set if dataset and namespace are not set");
if (destination != null && (type.isEmpty() == false || dataset.isEmpty() == false || namespace.isEmpty() == false)) {
throw newConfigurationException(TYPE, tag, "destination", "can only be set if type, dataset, and namespace are not set");
}

return new RerouteProcessor(tag, description, dataset, namespace, destination);
return new RerouteProcessor(tag, description, type, dataset, namespace, destination);
}
}

Expand All @@ -203,15 +221,21 @@ static final class DataStreamValueSource {

private static final int MAX_LENGTH = 100;
private static final String REPLACEMENT = "_";
private static final Pattern DISALLOWED_IN_TYPE = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]");
static final DataStreamValueSource TYPE_VALUE_SOURCE = type("{{" + DATA_STREAM_TYPE + "}}");
static final DataStreamValueSource DATASET_VALUE_SOURCE = dataset("{{" + DATA_STREAM_DATASET + "}}");
static final DataStreamValueSource NAMESPACE_VALUE_SOURCE = namespace("{{" + DATA_STREAM_NAMESPACE + "}}");

private final String value;
private final String fieldReference;
private final Function<String, String> sanitizer;

public static DataStreamValueSource type(String type) {
return new DataStreamValueSource(type, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_TYPE));
}

public static DataStreamValueSource dataset(String dataset) {
return new DataStreamValueSource(dataset, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_DATASET));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testDestinationAndDataset() {
ElasticsearchParseException.class,
() -> create(Map.of("destination", "foo", "dataset", "bar"))
);
assertThat(e.getMessage(), equalTo("[destination] can only be set if dataset and namespace are not set"));
assertThat(e.getMessage(), equalTo("[destination] can only be set if type, dataset, and namespace are not set"));
}

public void testFieldReference() throws Exception {
Expand Down
Loading
Loading