Skip to content

Commit e0b7b04

Browse files
committed
Update the downsampled mapping in place
1 parent 0581d82 commit e0b7b04

File tree

3 files changed

+25
-94
lines changed

3 files changed

+25
-94
lines changed

x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void testDownsamplingPassthroughMetrics() throws Exception {
134134
"properties": {
135135
"cpu_usage": {
136136
"type": "double",
137-
"time_series_metric": "counter"
137+
"time_series_metric": "gauge"
138138
}
139139
}
140140
}

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TimeseriesFieldTypeHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public boolean isTimeSeriesDimension(final String unused, final Map<String, ?> f
5252
return Boolean.TRUE.equals(fieldMapping.get(TIME_SERIES_DIMENSION_PARAM)) && isPassthroughField(fieldMapping) == false;
5353
}
5454

55-
public static boolean isPassthroughField(final Map<String, ?> fieldMapping) {
55+
public boolean isPassthroughField(final Map<String, ?> fieldMapping) {
5656
return PassThroughObjectMapper.CONTENT_TYPE.equals(fieldMapping.get(ContextMapping.FIELD_TYPE));
5757
}
5858

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

Lines changed: 23 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,12 @@
4545
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
4646
import org.elasticsearch.common.Priority;
4747
import org.elasticsearch.common.Strings;
48-
import org.elasticsearch.common.bytes.BytesReference;
4948
import org.elasticsearch.common.compress.CompressedXContent;
5049
import org.elasticsearch.common.settings.IndexScopedSettings;
5150
import org.elasticsearch.common.settings.Setting;
5251
import org.elasticsearch.common.settings.Settings;
5352
import org.elasticsearch.common.util.concurrent.EsExecutors;
5453
import org.elasticsearch.common.util.concurrent.ThreadContext;
55-
import org.elasticsearch.common.xcontent.XContentHelper;
5654
import org.elasticsearch.core.TimeValue;
5755
import org.elasticsearch.core.Tuple;
5856
import org.elasticsearch.index.Index;
@@ -76,8 +74,6 @@
7674
import org.elasticsearch.threadpool.ThreadPool;
7775
import org.elasticsearch.transport.TransportService;
7876
import org.elasticsearch.xcontent.XContentBuilder;
79-
import org.elasticsearch.xcontent.XContentFactory;
80-
import org.elasticsearch.xcontent.XContentType;
8177
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateMetricDoubleFieldMapper;
8278
import org.elasticsearch.xpack.core.ClientHelper;
8379
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
@@ -342,7 +338,7 @@ protected void masterOperation(
342338

343339
final String mapping;
344340
try {
345-
mapping = createDownsampleIndexMapping(helper, request.getDownsampleConfig(), mapperService, sourceIndexMappings);
341+
mapping = createDownsampleIndexMapping(helper, request.getDownsampleConfig(), sourceIndexMappings);
346342
} catch (IOException e) {
347343
recordFailureMetrics(startTime);
348344
delegate.onFailure(e);
@@ -677,82 +673,40 @@ protected ClusterBlockException checkBlock(DownsampleAction.Request request, Clu
677673
* @param sourceIndexMappings a map with the source index mapping
678674
* @return the mapping of the downsample index
679675
*/
680-
public static String createDownsampleIndexMapping(
676+
static String createDownsampleIndexMapping(
681677
final TimeseriesFieldTypeHelper helper,
682678
final DownsampleConfig config,
683-
final MapperService mapperService,
684679
final Map<String, Object> sourceIndexMappings
685680
) throws IOException {
686-
final XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
687-
688-
addDynamicTemplates(builder);
689-
690-
builder.startObject("properties");
691-
692-
addTimestampField(config, sourceIndexMappings, builder);
693-
addMetricFields(helper, sourceIndexMappings, builder);
694-
695-
builder.endObject(); // match initial startObject
696-
builder.endObject(); // match startObject("properties")
697-
698-
final CompressedXContent mappingDiffXContent = CompressedXContent.fromJSON(
699-
XContentHelper.convertToJson(BytesReference.bytes(builder), false, XContentType.JSON)
700-
);
701-
return mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mappingDiffXContent, MapperService.MergeReason.INDEX_TEMPLATE)
702-
.mappingSource()
703-
.uncompressed()
704-
.utf8ToString();
705-
}
706-
707-
private static void addMetricFields(
708-
final TimeseriesFieldTypeHelper helper,
709-
final Map<String, Object> sourceIndexMappings,
710-
final XContentBuilder builder
711-
) {
712-
MappingVisitor.visitMapping(sourceIndexMappings, (field, mapping) -> {
713-
if (helper.isTimeSeriesMetric(field, mapping)) {
714-
try {
715-
addMetricFieldMapping(builder, field, mapping);
716-
} catch (IOException e) {
717-
throw new ElasticsearchException("Error while adding metric for field [" + field + "]");
718-
}
719-
}
720-
});
721-
}
722-
723-
private static void addTimestampField(
724-
final DownsampleConfig config,
725-
Map<String, Object> sourceIndexMappings,
726-
final XContentBuilder builder
727-
) throws IOException {
681+
// TODO deep copy the map
728682
final String timestampField = config.getTimestampField();
729683
final String dateIntervalType = config.getIntervalType();
730684
final String dateInterval = config.getInterval().toString();
731685
final String timezone = config.getTimeZone();
732-
builder.startObject(timestampField);
733-
734686
MappingVisitor.visitMapping(sourceIndexMappings, (field, mapping) -> {
735-
try {
736-
if (timestampField.equals(field)) {
737-
final String timestampType = String.valueOf(mapping.get("type"));
738-
builder.field("type", timestampType != null ? timestampType : DateFieldMapper.CONTENT_TYPE);
739-
if (mapping.get("format") != null) {
740-
builder.field("format", mapping.get("format"));
741-
}
742-
if (mapping.get("ignore_malformed") != null) {
743-
builder.field("ignore_malformed", mapping.get("ignore_malformed"));
744-
}
687+
@SuppressWarnings("unchecked")
688+
Map<String, Object> updatedMapping = (Map<String, Object>) mapping;
689+
if (timestampField.equals(field)) {
690+
final String timestampType = String.valueOf(mapping.get("type"));
691+
updatedMapping.put("type", timestampType != null ? timestampType : DateFieldMapper.CONTENT_TYPE);
692+
updatedMapping.put("meta", Map.of(dateIntervalType, dateInterval, DownsampleConfig.TIME_ZONE, timezone));
693+
} else if (helper.isTimeSeriesMetric(field, mapping)) {
694+
final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString(
695+
mapping.get(TIME_SERIES_METRIC_PARAM).toString()
696+
);
697+
if (metricType == TimeSeriesParams.MetricType.GAUGE
698+
&& AggregateMetricDoubleFieldMapper.CONTENT_TYPE.equals(mapping.get("type")) == false) {
699+
var supportedMetrics = getSupportedMetrics(metricType, mapping);
700+
701+
updatedMapping.clear();
702+
updatedMapping.put(TIME_SERIES_METRIC_PARAM, metricType.toString());
703+
updatedMapping.put("type", AggregateMetricDoubleFieldMapper.CONTENT_TYPE);
704+
updatedMapping.put(AggregateMetricDoubleFieldMapper.Names.METRICS, supportedMetrics.supportedMetrics);
705+
updatedMapping.put(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, supportedMetrics.defaultMetric);
745706
}
746-
} catch (IOException e) {
747-
throw new ElasticsearchException("Unable to create timestamp field mapping for field [" + timestampField + "]", e);
748707
}
749708
});
750-
751-
builder.startObject("meta")
752-
.field(dateIntervalType, dateInterval)
753-
.field(DownsampleConfig.TIME_ZONE, timezone)
754-
.endObject()
755-
.endObject();
709+
return new CompressedXContent(sourceIndexMappings).uncompressed().utf8ToString();
756710
}
757711

758712
// public for testing
@@ -788,29 +742,6 @@ public static AggregateMetricDoubleFieldSupportedMetrics getSupportedMetrics(
788742
return new AggregateMetricDoubleFieldSupportedMetrics(defaultMetric, supportedAggs);
789743
}
790744

791-
private static void addMetricFieldMapping(final XContentBuilder builder, final String field, final Map<String, ?> fieldProperties)
792-
throws IOException {
793-
final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString(
794-
fieldProperties.get(TIME_SERIES_METRIC_PARAM).toString()
795-
);
796-
builder.startObject(field);
797-
if (metricType == TimeSeriesParams.MetricType.COUNTER) {
798-
// For counters, we keep the same field type, because they store
799-
// only one value (the last value of the counter)
800-
for (String fieldProperty : fieldProperties.keySet()) {
801-
builder.field(fieldProperty, fieldProperties.get(fieldProperty));
802-
}
803-
} else {
804-
var supported = getSupportedMetrics(metricType, fieldProperties);
805-
806-
builder.field("type", AggregateMetricDoubleFieldMapper.CONTENT_TYPE)
807-
.stringListField(AggregateMetricDoubleFieldMapper.Names.METRICS, supported.supportedMetrics)
808-
.field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, supported.defaultMetric)
809-
.field(TIME_SERIES_METRIC_PARAM, metricType);
810-
}
811-
builder.endObject();
812-
}
813-
814745
private static void validateDownsamplingInterval(MapperService mapperService, DownsampleConfig config) {
815746
MappedFieldType timestampFieldType = mapperService.fieldType(config.getTimestampField());
816747
assert timestampFieldType != null : "Cannot find timestamp field [" + config.getTimestampField() + "] in the mapping";

0 commit comments

Comments
 (0)