Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6b98987
OTLP: store units in mappings
felixbarny Sep 15, 2025
a091f7b
Update docs/changelog/134709.yaml
felixbarny Sep 15, 2025
d3da0f7
[CI] Auto commit changes from spotless
Sep 15, 2025
b56bb3e
Merge branch 'main' into bulk-meta
felixbarny Sep 15, 2025
ca07ebb
Use double braces for template values to avoid ambiguity
felixbarny Sep 16, 2025
730082f
Merge remote-tracking branch 'origin/main' into bulk-meta
felixbarny Sep 16, 2025
c779487
Merge remote-tracking branch 'origin/main' into bulk-meta
felixbarny Sep 25, 2025
d540e7a
Fix issues after merge
felixbarny Sep 25, 2025
7ab8702
Add transport version file
felixbarny Sep 25, 2025
8664681
Merge remote-tracking branch 'origin/main' into bulk-meta
felixbarny Oct 21, 2025
fa15fb4
Update transport version
felixbarny Oct 21, 2025
5149afd
Bump OTel template version
felixbarny Oct 21, 2025
b6206a9
Merge remote-tracking branch 'origin/main' into bulk-meta
felixbarny Oct 24, 2025
2693bef
Bump transport version
felixbarny Oct 24, 2025
343e92d
Merge remote-tracking branch 'origin/main' into bulk-meta
felixbarny Oct 24, 2025
535a874
Merge remote-tracking branch 'origin/main' into bulk-meta
felixbarny Nov 25, 2025
eb4be7e
Merge remote-tracking branch 'origin/main' into bulk-meta
felixbarny Nov 25, 2025
2f55ef4
Fix high watermark indexing tests
felixbarny Nov 26, 2025
5c065f8
Merge remote-tracking branch 'origin/main' into bulk-meta
felixbarny Nov 26, 2025
cb4ca3a
Merge remote-tracking branch 'origin/main' into bulk-meta
felixbarny Nov 26, 2025
d7c4d9e
Address review comments
felixbarny Nov 27, 2025
588b7c5
Merge remote-tracking branch 'origin/main' into bulk-meta
felixbarny Nov 27, 2025
b3b7b6e
Fix replacement logic
felixbarny Nov 27, 2025
d967c35
Merge branch 'main' into bulk-meta
felixbarny Nov 27, 2025
c949f3e
Rename `dynamic_templates_params` to `dynamic_template_params`
felixbarny Nov 27, 2025
3906494
Merge remote-tracking branch 'felixbarny/bulk-meta' into bulk-meta
felixbarny Nov 27, 2025
f315989
Update server/src/test/java/org/elasticsearch/action/index/IndexReque…
felixbarny Nov 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/134709.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 134709
summary: "OTLP: store units in mappings"
area: Mapping
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
AtomicBoolean nextPage = new AtomicBoolean(false);

ArrayList<IncrementalBulkService.Handler> handlers = new ArrayList<>();
for (int i = 0; i < 4; ++i) {
for (int i = 0; i < 5; ++i) {
ArrayList<DocWriteRequest<?>> requests = new ArrayList<>();
add512BRequests(requests, index);
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
Expand All @@ -230,6 +230,8 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
// Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
add512BRequests(requestsThrottle, index);
add512BRequests(requestsThrottle, index);
// Ensure we'll be above SPLIT_BULK_HIGH_WATERMARK
assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes() + 1024, greaterThan(4096L));

CountDownLatch finishLatch = new CountDownLatch(1);
blockWriteCoordinationPool(threadPool, finishLatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception {
AtomicBoolean nextPage = new AtomicBoolean(false);

ArrayList<IncrementalBulkService.Handler> handlers = new ArrayList<>();
for (int i = 0; i < 4; ++i) {
for (int i = 0; i < 5; ++i) {
ArrayList<DocWriteRequest<?>> requests = new ArrayList<>();
add512BRequests(requests, index);
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
Expand Down Expand Up @@ -838,6 +838,8 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception {
// Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
add512BRequests(requestsThrottle, index);
add512BRequests(requestsThrottle, index);
// Ensure we'll be above SPLIT_BULK_HIGH_WATERMARK
assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes() + 1024, greaterThan(4096L));

CountDownLatch finishLatch = new CountDownLatch(1);
blockWriteCoordinationPool(threadPool, finishLatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public final class BulkRequestParser {
private static final ParseField REQUIRE_DATA_STREAM = new ParseField(DocWriteRequest.REQUIRE_DATA_STREAM);
private static final ParseField LIST_EXECUTED_PIPELINES = new ParseField(DocWriteRequest.LIST_EXECUTED_PIPELINES);
private static final ParseField DYNAMIC_TEMPLATES = new ParseField("dynamic_templates");
private static final ParseField DYNAMIC_TEMPLATE_PARAMS = new ParseField("dynamic_template_params");

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
// for CompatibleApi V7 this means to deprecate on type, for V8+ it means to throw an error
Expand Down Expand Up @@ -359,6 +360,7 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
boolean requireDataStream = defaultRequireDataStream != null && defaultRequireDataStream;
Map<String, String> dynamicTemplates = Map.of();
Map<String, Map<String, String>> dynamicTemplatesParms = Map.of();

// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
Expand Down Expand Up @@ -427,19 +429,22 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
&& DYNAMIC_TEMPLATES.match(currentFieldName, parser.getDeprecationHandler())) {
dynamicTemplates = parser.mapStrings();
} else if (token == XContentParser.Token.START_OBJECT
&& SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (token != XContentParser.Token.VALUE_NULL) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected a simple value for field ["
+ currentFieldName
+ "] but found ["
+ token
+ "]"
);
}
&& DYNAMIC_TEMPLATE_PARAMS.match(currentFieldName, parser.getDeprecationHandler())) {
dynamicTemplatesParms = parser.map(HashMap::new, XContentParser::mapStrings);
} else if (token == XContentParser.Token.START_OBJECT
&& SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (token != XContentParser.Token.VALUE_NULL) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ line
+ "], expected a simple value for field ["
+ currentFieldName
+ "] but found ["
+ token
+ "]"
);
}
}
} else if (token != XContentParser.Token.END_OBJECT) {
throw new IllegalArgumentException(
Expand All @@ -462,6 +467,11 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
"Delete request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName()
);
}
if (dynamicTemplatesParms.isEmpty() == false) {
throw new IllegalArgumentException(
"Update request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATE_PARAMS.getPreferredName()
);
}
currentRequest = new DeleteRequest(index).id(id)
.routing(routing)
.version(version)
Expand All @@ -480,6 +490,7 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.setDynamicTemplates(dynamicTemplates)
.setDynamicTemplateParams(dynamicTemplatesParms)
.setRequireAlias(requireAlias)
.setRequireDataStream(requireDataStream)
.setListExecutedPipelines(currentListExecutedPipelines)
Expand Down Expand Up @@ -508,6 +519,11 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
"Update request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName()
);
}
if (dynamicTemplatesParms.isEmpty() == false) {
throw new IllegalArgumentException(
"Update request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATE_PARAMS.getPreferredName()
);
}
UpdateRequest updateRequest = new UpdateRequest().index(index)
.id(id)
.routing(routing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ static boolean executeBulkItemRequest(
request.getContentType(),
request.routing(),
request.getDynamicTemplates(),
request.getDynamicTemplateParams(),
request.getIncludeSourceOnError(),
meteringParserDecorator,
request.tsid()
Expand Down Expand Up @@ -755,6 +756,7 @@ private static Engine.Result performOpOnReplica(
indexRequest.getContentType(),
indexRequest.routing(),
Map.of(),
Map.of(),
true,
XContentMeteringParserDecorator.NOOP,
indexRequest.tsid()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ private ValidationResult validateMappings(
request.getContentType(),
request.routing(),
request.getDynamicTemplates(),
request.getDynamicTemplateParams(),
request.getIncludeSourceOnError(),
XContentMeteringParserDecorator.NOOP,
request.tsid()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private static final TransportVersion PIPELINES_HAVE_RUN_FIELD_ADDED = TransportVersions.V_8_10_X;
private static final TransportVersion INDEX_REQUEST_INCLUDE_TSID = TransportVersion.fromName("index_request_include_tsid");
private static final TransportVersion INDEX_SOURCE = TransportVersion.fromName("index_source");
static final TransportVersion INGEST_REQUEST_DYNAMIC_TEMPLATE_PARAMS = TransportVersion.fromName(
"ingest_request_dynamic_template_params"
);

private static final Supplier<String> ID_GENERATOR = UUIDs::base64UUID;

Expand Down Expand Up @@ -147,6 +150,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;

private Map<String, String> dynamicTemplates = Map.of();
private Map<String, Map<String, String>> dynamicTemplateParams = Map.of();

/**
* rawTimestamp field is used on the coordinate node, it doesn't need to be serialised.
Expand Down Expand Up @@ -234,6 +238,10 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
if (in.getTransportVersion().supports(INDEX_REQUEST_INCLUDE_TSID)) {
tsid = in.readBytesRefOrNullIfEmpty();
}

if (in.getTransportVersion().supports(INGEST_REQUEST_DYNAMIC_TEMPLATE_PARAMS)) {
dynamicTemplateParams = in.readMap(StreamInput::readString, i -> i.readMap(StreamInput::readString));
}
}

public IndexRequest() {
Expand Down Expand Up @@ -823,6 +831,9 @@ private void writeBody(StreamOutput out) throws IOException {
if (out.getTransportVersion().supports(INDEX_REQUEST_INCLUDE_TSID)) {
out.writeBytesRef(tsid);
}
if (out.getTransportVersion().supports(INGEST_REQUEST_DYNAMIC_TEMPLATE_PARAMS)) {
out.writeMap(dynamicTemplateParams, StreamOutput::writeString, (o, v) -> o.writeMap(v, StreamOutput::writeString));
}
}

@Override
Expand Down Expand Up @@ -982,6 +993,15 @@ public Map<String, String> getDynamicTemplates() {
return dynamicTemplates;
}

public IndexRequest setDynamicTemplateParams(Map<String, Map<String, String>> dynamicTemplateParams) {
this.dynamicTemplateParams = dynamicTemplateParams;
return this;
}

public Map<String, Map<String, String>> getDynamicTemplateParams() {
return dynamicTemplateParams;
}

public Object getRawTimestamp() {
return rawTimestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,28 @@ public final DynamicTemplate findDynamicTemplate(String fieldName, DynamicTempla
return null;
}

/**
* Provides parameters for the {@link DynamicTemplate} returned by
* {@link #findDynamicTemplate(String, DynamicTemplate.XContentFieldType)}.
* The {@link DynamicTemplate} can use placeholders in its definition that
* will be replaced by the values returned by this method.
* For example, a dynamic template may contain a snippet like this:
* <pre>
* "meta": {
* "unit": "{{unit}}"
* }
* </pre>
* When applying the dynamic template to a field, the placeholder <code>{{unit}}</code>
* will be replaced by the value returned by this method for the key <code>unit</code>.
*
* @param fieldName the name of the field
* @return a map of parameter names to values; empty map if no parameters exist
*/
public final Map<String, String> getDynamicTemplateParams(String fieldName) {
final String pathAsString = path().pathAsText(fieldName);
return sourceToParse.dynamicTemplateParams().getOrDefault(pathAsString, Map.of());
}

// XContentParser that wraps an existing parser positioned on a value,
// and a field name, and returns a stream that looks like { 'field' : 'value' }
private static class CopyToParser extends FilterXContentParserWrapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private static boolean applyMatchingTemplate(
String dynamicType = dynamicTemplate.isRuntimeMapping() ? matchType.defaultRuntimeMappingType() : matchType.defaultMappingType();

String mappingType = dynamicTemplate.mappingType(dynamicType);
Map<String, Object> mapping = dynamicTemplate.mappingForName(name, dynamicType);
Map<String, Object> mapping = dynamicTemplate.mappingForName(name, dynamicType, context.getDynamicTemplateParams(name));
if (dynamicTemplate.isRuntimeMapping()) {
MappingParserContext parserContext = context.dynamicTemplateParserContext(dateFormatter);
RuntimeField.Parser parser = parserContext.runtimeFieldParser(mappingType);
Expand All @@ -262,7 +262,7 @@ private static Mapper.Builder findTemplateBuilderForObject(DocumentParserContext
}
String dynamicType = matchType.defaultMappingType();
String mappingType = dynamicTemplate.mappingType(dynamicType);
Map<String, Object> mapping = dynamicTemplate.mappingForName(name, dynamicType);
Map<String, Object> mapping = dynamicTemplate.mappingForName(name, dynamicType, context.getDynamicTemplateParams(name));
return parseDynamicTemplateMapping(name, mappingType, mapping, null, context);
}

Expand Down
Loading