Skip to content

Commit 29d6458

Browse files
authored
Allow setting the type in the reroute processor (elastic#122409) (elastic#122892)
* Allow setting the `type` in the reroute processor This allows configuring the `type` from within the ingest `reroute` processor. Similar to `dataset` and `namespace`, the type defaults to the value extracted from the index name. This means that documents sent to `logs-mysql.access.default` will have a default value of `logs` for the type. Resolves elastic#121553 * Update docs/changelog/122409.yaml
1 parent 61f3d57 commit 29d6458

File tree

5 files changed

+88
-28
lines changed

5 files changed

+88
-28
lines changed

docs/changelog/122409.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 122409
2+
summary: Allow setting the `type` in the reroute processor
3+
area: Ingest Node
4+
type: enhancement
5+
issues:
6+
- 121553

docs/reference/ingest/processors/reroute.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ Otherwise, the document will be rejected with a security exception which looks l
4545
|======
4646
| Name | Required | Default | Description
4747
| `destination` | no | - | A static value for the target. Can't be set when the `dataset` or `namespace` option is set.
48+
| `type` | no | `{{data_stream.type}}` a| Field references or a static value for the type part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `logs` and `metrics`.
49+
50+
Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<type>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
4851
| `dataset` | no | `{{data_stream.dataset}}` a| Field references or a static value for the dataset part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`.
4952

5053
Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<dataset>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
2727
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.DATASET_VALUE_SOURCE;
2828
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.NAMESPACE_VALUE_SOURCE;
29+
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.TYPE_VALUE_SOURCE;
2930

3031
public final class RerouteProcessor extends AbstractProcessor {
3132

@@ -39,18 +40,25 @@ public final class RerouteProcessor extends AbstractProcessor {
3940
private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset";
4041
private static final String DATA_STREAM_NAMESPACE = DATA_STREAM_PREFIX + "namespace";
4142
private static final String EVENT_DATASET = "event.dataset";
43+
private final List<DataStreamValueSource> type;
4244
private final List<DataStreamValueSource> dataset;
4345
private final List<DataStreamValueSource> namespace;
4446
private final String destination;
4547

4648
RerouteProcessor(
4749
String tag,
4850
String description,
51+
List<DataStreamValueSource> type,
4952
List<DataStreamValueSource> dataset,
5053
List<DataStreamValueSource> namespace,
5154
String destination
5255
) {
5356
super(tag, description);
57+
if (type.isEmpty()) {
58+
this.type = List.of(TYPE_VALUE_SOURCE);
59+
} else {
60+
this.type = type;
61+
}
5462
if (dataset.isEmpty()) {
5563
this.dataset = List.of(DATASET_VALUE_SOURCE);
5664
} else {
@@ -71,7 +79,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
7179
return ingestDocument;
7280
}
7381
final String indexName = ingestDocument.getFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), String.class);
74-
final String type;
82+
final String currentType;
7583
final String currentDataset;
7684
final String currentNamespace;
7785

@@ -84,10 +92,11 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
8492
if (indexOfSecondDash < 0) {
8593
throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
8694
}
87-
type = parseDataStreamType(indexName, indexOfFirstDash);
95+
currentType = parseDataStreamType(indexName, indexOfFirstDash);
8896
currentDataset = parseDataStreamDataset(indexName, indexOfFirstDash, indexOfSecondDash);
8997
currentNamespace = parseDataStreamNamespace(indexName, indexOfSecondDash);
9098

99+
String type = determineDataStreamField(ingestDocument, this.type, currentType);
91100
String dataset = determineDataStreamField(ingestDocument, this.dataset, currentDataset);
92101
String namespace = determineDataStreamField(ingestDocument, this.namespace, currentNamespace);
93102
String newTarget = type + "-" + dataset + "-" + namespace;
@@ -168,6 +177,15 @@ public RerouteProcessor create(
168177
String description,
169178
Map<String, Object> config
170179
) throws Exception {
180+
List<DataStreamValueSource> type;
181+
try {
182+
type = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "type")
183+
.stream()
184+
.map(DataStreamValueSource::type)
185+
.toList();
186+
} catch (IllegalArgumentException e) {
187+
throw newConfigurationException(TYPE, tag, "type", e.getMessage());
188+
}
171189
List<DataStreamValueSource> dataset;
172190
try {
173191
dataset = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "dataset")
@@ -188,11 +206,11 @@ public RerouteProcessor create(
188206
}
189207

190208
String destination = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "destination");
191-
if (destination != null && (dataset.isEmpty() == false || namespace.isEmpty() == false)) {
192-
throw newConfigurationException(TYPE, tag, "destination", "can only be set if dataset and namespace are not set");
209+
if (destination != null && (type.isEmpty() == false || dataset.isEmpty() == false || namespace.isEmpty() == false)) {
210+
throw newConfigurationException(TYPE, tag, "destination", "can only be set if type, dataset, and namespace are not set");
193211
}
194212

195-
return new RerouteProcessor(tag, description, dataset, namespace, destination);
213+
return new RerouteProcessor(tag, description, type, dataset, namespace, destination);
196214
}
197215
}
198216

@@ -203,15 +221,21 @@ static final class DataStreamValueSource {
203221

204222
private static final int MAX_LENGTH = 100;
205223
private static final String REPLACEMENT = "_";
224+
private static final Pattern DISALLOWED_IN_TYPE = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
206225
private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
207226
private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]");
227+
static final DataStreamValueSource TYPE_VALUE_SOURCE = type("{{" + DATA_STREAM_TYPE + "}}");
208228
static final DataStreamValueSource DATASET_VALUE_SOURCE = dataset("{{" + DATA_STREAM_DATASET + "}}");
209229
static final DataStreamValueSource NAMESPACE_VALUE_SOURCE = namespace("{{" + DATA_STREAM_NAMESPACE + "}}");
210230

211231
private final String value;
212232
private final String fieldReference;
213233
private final Function<String, String> sanitizer;
214234

235+
public static DataStreamValueSource type(String type) {
236+
return new DataStreamValueSource(type, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_TYPE));
237+
}
238+
215239
public static DataStreamValueSource dataset(String dataset) {
216240
return new DataStreamValueSource(dataset, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_DATASET));
217241
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void testDestinationAndDataset() {
4747
ElasticsearchParseException.class,
4848
() -> create(Map.of("destination", "foo", "dataset", "bar"))
4949
);
50-
assertThat(e.getMessage(), equalTo("[destination] can only be set if dataset and namespace are not set"));
50+
assertThat(e.getMessage(), equalTo("[destination] can only be set if type, dataset, and namespace are not set"));
5151
}
5252

5353
public void testFieldReference() throws Exception {

0 commit comments

Comments
 (0)