Skip to content

Commit 49858d1

Browse files
authored
OTLP: store units in mappings (#134709)
This is enabled by adding a new `dynamic_template_params` option to the bulk request header. It can be used in combination with the `dynamic_templates` option to provide parameters that can be used within a dynamic template. The dynamic template can control which parameters it accepts and can provide a default value in case clients don't provide the parameters in the bulk request. If no default value is provided, and the bulk request also doesn't specify the parameter, the value is kept as it is defined in the template (including braces). For example, the dynamic template can define a `unit` placeholder with an empty string as the default value. ``` { "dynamic_templates": [ { "gauge_double": { "mapping": { "type": "double", "time_series_metric": "gauge", "meta": { "unit": "{unit:}" } } } } ] } ``` Clients can then set provide a value for the unit in the bulk request header like so: ``` {"create": {"dynamic_templates":{"metrics.foo.bar":"gauge_double"},"dynamic_template_params":{"metrics.foo.bar":{"unit": "By"}}}} ```
1 parent 40e5ea3 commit 49858d1

File tree

27 files changed

+430
-56
lines changed

27 files changed

+430
-56
lines changed

docs/changelog/134709.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 134709
2+
summary: "OTLP: store units in mappings"
3+
area: Mapping
4+
type: enhancement
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
203203
AtomicBoolean nextPage = new AtomicBoolean(false);
204204

205205
ArrayList<IncrementalBulkService.Handler> handlers = new ArrayList<>();
206-
for (int i = 0; i < 4; ++i) {
206+
for (int i = 0; i < 5; ++i) {
207207
ArrayList<DocWriteRequest<?>> requests = new ArrayList<>();
208208
add512BRequests(requests, index);
209209
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
@@ -230,6 +230,8 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
230230
// Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
231231
add512BRequests(requestsThrottle, index);
232232
add512BRequests(requestsThrottle, index);
233+
// Ensure we'll be above SPLIT_BULK_HIGH_WATERMARK
234+
assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes() + 1024, greaterThan(4096L));
233235

234236
CountDownLatch finishLatch = new CountDownLatch(1);
235237
blockWriteCoordinationPool(threadPool, finishLatch);

server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception {
796796
AtomicBoolean nextPage = new AtomicBoolean(false);
797797

798798
ArrayList<IncrementalBulkService.Handler> handlers = new ArrayList<>();
799-
for (int i = 0; i < 4; ++i) {
799+
for (int i = 0; i < 5; ++i) {
800800
ArrayList<DocWriteRequest<?>> requests = new ArrayList<>();
801801
add512BRequests(requests, index);
802802
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
@@ -838,6 +838,8 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception {
838838
// Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
839839
add512BRequests(requestsThrottle, index);
840840
add512BRequests(requestsThrottle, index);
841+
// Ensure we'll be above SPLIT_BULK_HIGH_WATERMARK
842+
assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes() + 1024, greaterThan(4096L));
841843

842844
CountDownLatch finishLatch = new CountDownLatch(1);
843845
blockWriteCoordinationPool(threadPool, finishLatch);

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public final class BulkRequestParser {
6868
private static final ParseField REQUIRE_DATA_STREAM = new ParseField(DocWriteRequest.REQUIRE_DATA_STREAM);
6969
private static final ParseField LIST_EXECUTED_PIPELINES = new ParseField(DocWriteRequest.LIST_EXECUTED_PIPELINES);
7070
private static final ParseField DYNAMIC_TEMPLATES = new ParseField("dynamic_templates");
71+
private static final ParseField DYNAMIC_TEMPLATE_PARAMS = new ParseField("dynamic_template_params");
7172

7273
// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
7374
// for CompatibleApi V7 this means to deprecate on type, for V8+ it means to throw an error
@@ -359,6 +360,7 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
359360
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
360361
boolean requireDataStream = defaultRequireDataStream != null && defaultRequireDataStream;
361362
Map<String, String> dynamicTemplates = Map.of();
363+
Map<String, Map<String, String>> dynamicTemplatesParms = Map.of();
362364

363365
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
364366
// or START_OBJECT which will have another set of parameters
@@ -427,19 +429,22 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
427429
&& DYNAMIC_TEMPLATES.match(currentFieldName, parser.getDeprecationHandler())) {
428430
dynamicTemplates = parser.mapStrings();
429431
} else if (token == XContentParser.Token.START_OBJECT
430-
&& SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
431-
currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
432-
} else if (token != XContentParser.Token.VALUE_NULL) {
433-
throw new IllegalArgumentException(
434-
"Malformed action/metadata line ["
435-
+ line
436-
+ "], expected a simple value for field ["
437-
+ currentFieldName
438-
+ "] but found ["
439-
+ token
440-
+ "]"
441-
);
442-
}
432+
&& DYNAMIC_TEMPLATE_PARAMS.match(currentFieldName, parser.getDeprecationHandler())) {
433+
dynamicTemplatesParms = parser.map(HashMap::new, XContentParser::mapStrings);
434+
} else if (token == XContentParser.Token.START_OBJECT
435+
&& SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
436+
currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
437+
} else if (token != XContentParser.Token.VALUE_NULL) {
438+
throw new IllegalArgumentException(
439+
"Malformed action/metadata line ["
440+
+ line
441+
+ "], expected a simple value for field ["
442+
+ currentFieldName
443+
+ "] but found ["
444+
+ token
445+
+ "]"
446+
);
447+
}
443448
}
444449
} else if (token != XContentParser.Token.END_OBJECT) {
445450
throw new IllegalArgumentException(
@@ -462,6 +467,11 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
462467
"Delete request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName()
463468
);
464469
}
470+
if (dynamicTemplatesParms.isEmpty() == false) {
471+
throw new IllegalArgumentException(
472+
"Update request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATE_PARAMS.getPreferredName()
473+
);
474+
}
465475
currentRequest = new DeleteRequest(index).id(id)
466476
.routing(routing)
467477
.version(version)
@@ -480,6 +490,7 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
480490
.setIfSeqNo(ifSeqNo)
481491
.setIfPrimaryTerm(ifPrimaryTerm)
482492
.setDynamicTemplates(dynamicTemplates)
493+
.setDynamicTemplateParams(dynamicTemplatesParms)
483494
.setRequireAlias(requireAlias)
484495
.setRequireDataStream(requireDataStream)
485496
.setListExecutedPipelines(currentListExecutedPipelines)
@@ -508,6 +519,11 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
508519
"Update request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName()
509520
);
510521
}
522+
if (dynamicTemplatesParms.isEmpty() == false) {
523+
throw new IllegalArgumentException(
524+
"Update request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATE_PARAMS.getPreferredName()
525+
);
526+
}
511527
UpdateRequest updateRequest = new UpdateRequest().index(index)
512528
.id(id)
513529
.routing(routing)

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ static boolean executeBulkItemRequest(
433433
request.getContentType(),
434434
request.routing(),
435435
request.getDynamicTemplates(),
436+
request.getDynamicTemplateParams(),
436437
request.getIncludeSourceOnError(),
437438
meteringParserDecorator,
438439
request.tsid()
@@ -755,6 +756,7 @@ private static Engine.Result performOpOnReplica(
755756
indexRequest.getContentType(),
756757
indexRequest.routing(),
757758
Map.of(),
759+
Map.of(),
758760
true,
759761
XContentMeteringParserDecorator.NOOP,
760762
indexRequest.tsid()

server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ private ValidationResult validateMappings(
206206
request.getContentType(),
207207
request.routing(),
208208
request.getDynamicTemplates(),
209+
request.getDynamicTemplateParams(),
209210
request.getIncludeSourceOnError(),
210211
XContentMeteringParserDecorator.NOOP,
211212
request.tsid()

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
7979
private static final TransportVersion PIPELINES_HAVE_RUN_FIELD_ADDED = TransportVersions.V_8_10_X;
8080
private static final TransportVersion INDEX_REQUEST_INCLUDE_TSID = TransportVersion.fromName("index_request_include_tsid");
8181
private static final TransportVersion INDEX_SOURCE = TransportVersion.fromName("index_source");
82+
static final TransportVersion INGEST_REQUEST_DYNAMIC_TEMPLATE_PARAMS = TransportVersion.fromName(
83+
"ingest_request_dynamic_template_params"
84+
);
8285

8386
private static final Supplier<String> ID_GENERATOR = UUIDs::base64UUID;
8487

@@ -147,6 +150,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
147150
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
148151

149152
private Map<String, String> dynamicTemplates = Map.of();
153+
private Map<String, Map<String, String>> dynamicTemplateParams = Map.of();
150154

151155
/**
152156
* rawTimestamp field is used on the coordinate node, it doesn't need to be serialised.
@@ -234,6 +238,10 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
234238
if (in.getTransportVersion().supports(INDEX_REQUEST_INCLUDE_TSID)) {
235239
tsid = in.readBytesRefOrNullIfEmpty();
236240
}
241+
242+
if (in.getTransportVersion().supports(INGEST_REQUEST_DYNAMIC_TEMPLATE_PARAMS)) {
243+
dynamicTemplateParams = in.readMap(StreamInput::readString, i -> i.readMap(StreamInput::readString));
244+
}
237245
}
238246

239247
public IndexRequest() {
@@ -823,6 +831,9 @@ private void writeBody(StreamOutput out) throws IOException {
823831
if (out.getTransportVersion().supports(INDEX_REQUEST_INCLUDE_TSID)) {
824832
out.writeBytesRef(tsid);
825833
}
834+
if (out.getTransportVersion().supports(INGEST_REQUEST_DYNAMIC_TEMPLATE_PARAMS)) {
835+
out.writeMap(dynamicTemplateParams, StreamOutput::writeString, (o, v) -> o.writeMap(v, StreamOutput::writeString));
836+
}
826837
}
827838

828839
@Override
@@ -982,6 +993,15 @@ public Map<String, String> getDynamicTemplates() {
982993
return dynamicTemplates;
983994
}
984995

996+
public IndexRequest setDynamicTemplateParams(Map<String, Map<String, String>> dynamicTemplateParams) {
997+
this.dynamicTemplateParams = dynamicTemplateParams;
998+
return this;
999+
}
1000+
1001+
public Map<String, Map<String, String>> getDynamicTemplateParams() {
1002+
return dynamicTemplateParams;
1003+
}
1004+
9851005
public Object getRawTimestamp() {
9861006
return rawTimestamp;
9871007
}

server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,28 @@ public final DynamicTemplate findDynamicTemplate(String fieldName, DynamicTempla
922922
return null;
923923
}
924924

925+
/**
926+
* Provides parameters for the {@link DynamicTemplate} returned by
927+
* {@link #findDynamicTemplate(String, DynamicTemplate.XContentFieldType)}.
928+
* The {@link DynamicTemplate} can use placeholders in its definition that
929+
* will be replaced by the values returned by this method.
930+
* For example, a dynamic template may contain a snippet like this:
931+
* <pre>
932+
* "meta": {
933+
* "unit": "{{unit}}"
934+
* }
935+
* </pre>
936+
* When applying the dynamic template to a field, the placeholder <code>{{unit}}</code>
937+
* will be replaced by the value returned by this method for the key <code>unit</code>.
938+
*
939+
* @param fieldName the name of the field
940+
* @return a map of parameter names to values; empty map if no parameters exist
941+
*/
942+
public final Map<String, String> getDynamicTemplateParams(String fieldName) {
943+
final String pathAsString = path().pathAsText(fieldName);
944+
return sourceToParse.dynamicTemplateParams().getOrDefault(pathAsString, Map.of());
945+
}
946+
925947
// XContentParser that wraps an existing parser positioned on a value,
926948
// and a field name, and returns a stream that looks like { 'field' : 'value' }
927949
private static class CopyToParser extends FilterXContentParserWrapper {

server/src/main/java/org/elasticsearch/index/mapper/DynamicFieldsBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ private static boolean applyMatchingTemplate(
237237
String dynamicType = dynamicTemplate.isRuntimeMapping() ? matchType.defaultRuntimeMappingType() : matchType.defaultMappingType();
238238

239239
String mappingType = dynamicTemplate.mappingType(dynamicType);
240-
Map<String, Object> mapping = dynamicTemplate.mappingForName(name, dynamicType);
240+
Map<String, Object> mapping = dynamicTemplate.mappingForName(name, dynamicType, context.getDynamicTemplateParams(name));
241241
if (dynamicTemplate.isRuntimeMapping()) {
242242
MappingParserContext parserContext = context.dynamicTemplateParserContext(dateFormatter);
243243
RuntimeField.Parser parser = parserContext.runtimeFieldParser(mappingType);
@@ -262,7 +262,7 @@ private static Mapper.Builder findTemplateBuilderForObject(DocumentParserContext
262262
}
263263
String dynamicType = matchType.defaultMappingType();
264264
String mappingType = dynamicTemplate.mappingType(dynamicType);
265-
Map<String, Object> mapping = dynamicTemplate.mappingForName(name, dynamicType);
265+
Map<String, Object> mapping = dynamicTemplate.mappingForName(name, dynamicType, context.getDynamicTemplateParams(name));
266266
return parseDynamicTemplateMapping(name, mappingType, mapping, null, context);
267267
}
268268

0 commit comments

Comments
 (0)