|
45 | 45 | import org.elasticsearch.cluster.service.MasterServiceTaskQueue; |
46 | 46 | import org.elasticsearch.common.Priority; |
47 | 47 | import org.elasticsearch.common.Strings; |
48 | | -import org.elasticsearch.common.bytes.BytesReference; |
49 | 48 | import org.elasticsearch.common.compress.CompressedXContent; |
50 | 49 | import org.elasticsearch.common.settings.IndexScopedSettings; |
51 | 50 | import org.elasticsearch.common.settings.Setting; |
52 | 51 | import org.elasticsearch.common.settings.Settings; |
53 | 52 | import org.elasticsearch.common.util.concurrent.EsExecutors; |
54 | | -import org.elasticsearch.common.xcontent.XContentHelper; |
55 | 53 | import org.elasticsearch.core.TimeValue; |
56 | 54 | import org.elasticsearch.core.Tuple; |
57 | 55 | import org.elasticsearch.index.Index; |
|
75 | 73 | import org.elasticsearch.threadpool.ThreadPool; |
76 | 74 | import org.elasticsearch.transport.TransportService; |
77 | 75 | import org.elasticsearch.xcontent.XContentBuilder; |
78 | | -import org.elasticsearch.xcontent.XContentFactory; |
79 | | -import org.elasticsearch.xcontent.XContentType; |
80 | 76 | import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateMetricDoubleFieldMapper; |
81 | 77 | import org.elasticsearch.xpack.core.ClientHelper; |
82 | 78 | import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; |
|
90 | 86 | import java.time.OffsetDateTime; |
91 | 87 | import java.time.format.DateTimeFormatter; |
92 | 88 | import java.util.ArrayList; |
| 89 | +import java.util.HashMap; |
93 | 90 | import java.util.List; |
94 | 91 | import java.util.Map; |
95 | 92 | import java.util.Objects; |
|
99 | 96 | import java.util.function.Predicate; |
100 | 97 | import java.util.function.Supplier; |
101 | 98 |
|
| 99 | +import static org.elasticsearch.action.admin.cluster.stats.MappingVisitor.FIELD_TYPE; |
| 100 | +import static org.elasticsearch.action.admin.cluster.stats.MappingVisitor.MULTI_FIELDS; |
| 101 | +import static org.elasticsearch.action.admin.cluster.stats.MappingVisitor.PROPERTIES; |
| 102 | +import static org.elasticsearch.action.admin.cluster.stats.MappingVisitor.visitAndCopyMapping; |
102 | 103 | import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; |
103 | 104 | import static org.elasticsearch.xpack.core.ilm.DownsampleAction.DOWNSAMPLED_INDEX_PREFIX; |
104 | 105 |
|
@@ -378,7 +379,7 @@ protected void masterOperation( |
378 | 379 |
|
379 | 380 | final String mapping; |
380 | 381 | try { |
381 | | - mapping = createDownsampleIndexMapping(helper, request.getDownsampleConfig(), mapperService, sourceIndexMappings); |
| 382 | + mapping = createDownsampleIndexMapping(helper, request.getDownsampleConfig(), sourceIndexMappings); |
382 | 383 | } catch (IOException e) { |
383 | 384 | recordFailureMetrics(startTime); |
384 | 385 | delegate.onFailure(e); |
@@ -716,88 +717,81 @@ protected ClusterBlockException checkBlock(DownsampleAction.Request request, Clu |
716 | 717 | public static String createDownsampleIndexMapping( |
717 | 718 | final TimeseriesFieldTypeHelper helper, |
718 | 719 | final DownsampleConfig config, |
719 | | - final MapperService mapperService, |
720 | 720 | final Map<String, Object> sourceIndexMappings |
721 | 721 | ) throws IOException { |
722 | | - final XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); |
723 | 722 |
|
724 | | - addDynamicTemplates(builder); |
725 | | - |
726 | | - builder.startObject("properties"); |
727 | | - |
728 | | - addTimestampField(config, sourceIndexMappings, builder); |
729 | | - addMetricFieldOverwrites(config, helper, sourceIndexMappings, builder); |
730 | | - |
731 | | - builder.endObject(); // match initial startObject |
732 | | - builder.endObject(); // match startObject("properties") |
733 | | - |
734 | | - final CompressedXContent mappingDiffXContent = CompressedXContent.fromJSON( |
735 | | - XContentHelper.convertToJson(BytesReference.bytes(builder), false, XContentType.JSON) |
736 | | - ); |
737 | | - return mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mappingDiffXContent, MapperService.MergeReason.INDEX_TEMPLATE) |
738 | | - .mappingSource() |
739 | | - .uncompressed() |
740 | | - .utf8ToString(); |
| 723 | + final String timestampField = config.getTimestampField(); |
| 724 | + final String dateIntervalType = config.getIntervalType(); |
| 725 | + final String dateInterval = config.getInterval().toString(); |
| 726 | + final String timezone = config.getTimeZone(); |
| 727 | + Map<String, Object> downsampledMapping = new HashMap<>(); |
| 728 | + for (Map.Entry<String, Object> entry : sourceIndexMappings.entrySet()) { |
| 729 | + if (entry.getKey().equals(PROPERTIES) == false) { |
| 730 | + downsampledMapping.put(entry.getKey(), entry.getValue()); |
| 731 | + } |
| 732 | + } |
| 733 | + visitAndCopyMapping(sourceIndexMappings, downsampledMapping, (fieldName, sourceMapping, updatedMapping) -> { |
| 734 | + if (timestampField.equals(fieldName)) { |
| 735 | + updateTimestampField(sourceMapping, updatedMapping, dateIntervalType, dateInterval, timezone); |
| 736 | + } else if (helper.isTimeSeriesMetric(fieldName, sourceMapping)) { |
| 737 | + processMetricField(sourceMapping, updatedMapping, config.getSamplingMethodOrDefault()); |
| 738 | + } else { |
| 739 | + copyMapping(sourceMapping, updatedMapping); |
| 740 | + } |
| 741 | + }); |
| 742 | + return new CompressedXContent(downsampledMapping).uncompressed().utf8ToString(); |
741 | 743 | } |
742 | 744 |
|
743 | 745 | /** |
744 | 746 | * Adds metric mapping overwrites. When downsampling certain metrics change their mapping type. For example, |
745 | 747 | * when we are using the aggregate sampling method, the mapping of a gauge metric becomes an aggregate_metric_double. |
746 | 748 | */ |
747 | | - private static void addMetricFieldOverwrites( |
748 | | - final DownsampleConfig config, |
749 | | - final TimeseriesFieldTypeHelper helper, |
750 | | - final Map<String, Object> sourceIndexMappings, |
751 | | - final XContentBuilder builder |
| 749 | + private static void processMetricField( |
| 750 | + Map<String, ?> sourceMapping, |
| 751 | + Map<String, Object> updatedMapping, |
| 752 | + DownsampleConfig.SamplingMethod samplingMethod |
752 | 753 | ) { |
753 | | - // The last value sampling method preserves the source mapping. |
754 | | - if (config.getSamplingMethodOrDefault() == DownsampleConfig.SamplingMethod.LAST_VALUE) { |
755 | | - return; |
| 754 | + final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString( |
| 755 | + sourceMapping.get(TIME_SERIES_METRIC_PARAM).toString() |
| 756 | + ); |
| 757 | + if (samplingMethod == DownsampleConfig.SamplingMethod.AGGREGATE |
| 758 | + && metricType == TimeSeriesParams.MetricType.GAUGE |
| 759 | + && AggregateMetricDoubleFieldMapper.CONTENT_TYPE.equals(sourceMapping.get(FIELD_TYPE)) == false) { |
| 760 | + var supportedMetrics = getSupportedMetrics(metricType, sourceMapping); |
| 761 | + |
| 762 | + updatedMapping.put(TIME_SERIES_METRIC_PARAM, metricType.toString()); |
| 763 | + updatedMapping.put(FIELD_TYPE, AggregateMetricDoubleFieldMapper.CONTENT_TYPE); |
| 764 | + updatedMapping.put(AggregateMetricDoubleFieldMapper.Names.METRICS, supportedMetrics.supportedMetrics); |
| 765 | + updatedMapping.put(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, supportedMetrics.defaultMetric); |
| 766 | + } else { |
| 767 | + copyMapping(sourceMapping, updatedMapping); |
756 | 768 | } |
757 | | - MappingVisitor.visitMapping(sourceIndexMappings, (field, mapping) -> { |
758 | | - if (helper.isTimeSeriesMetric(field, mapping)) { |
759 | | - try { |
760 | | - addMetricFieldMapping(builder, field, mapping); |
761 | | - } catch (IOException e) { |
762 | | - throw new ElasticsearchException("Error while adding metric for field [" + field + "]"); |
763 | | - } |
764 | | - } |
765 | | - }); |
766 | 769 | } |
767 | 770 |
|
768 | | - private static void addTimestampField( |
769 | | - final DownsampleConfig config, |
770 | | - Map<String, Object> sourceIndexMappings, |
771 | | - final XContentBuilder builder |
772 | | - ) throws IOException { |
773 | | - final String timestampField = config.getTimestampField(); |
774 | | - final String dateIntervalType = config.getIntervalType(); |
775 | | - final String dateInterval = config.getInterval().toString(); |
776 | | - final String timezone = config.getTimeZone(); |
777 | | - builder.startObject(timestampField); |
778 | | - |
779 | | - MappingVisitor.visitMapping(sourceIndexMappings, (field, mapping) -> { |
780 | | - try { |
781 | | - if (timestampField.equals(field)) { |
782 | | - final String timestampType = String.valueOf(mapping.get("type")); |
783 | | - builder.field("type", timestampType != null ? timestampType : DateFieldMapper.CONTENT_TYPE); |
784 | | - if (mapping.get("format") != null) { |
785 | | - builder.field("format", mapping.get("format")); |
786 | | - } |
787 | | - if (mapping.get("ignore_malformed") != null) { |
788 | | - builder.field("ignore_malformed", mapping.get("ignore_malformed")); |
789 | | - } |
790 | | - } |
791 | | - } catch (IOException e) { |
792 | | - throw new ElasticsearchException("Unable to create timestamp field mapping for field [" + timestampField + "]", e); |
| 771 | + private static void copyMapping(Map<String, ?> sourceMapping, Map<String, Object> updatedMapping) { |
| 772 | + for (String f : sourceMapping.keySet()) { |
| 773 | + if (f.equals(PROPERTIES) == false && f.equals(MULTI_FIELDS) == false) { |
| 774 | + updatedMapping.put(f, sourceMapping.get(f)); |
793 | 775 | } |
794 | | - }); |
| 776 | + } |
| 777 | + } |
795 | 778 |
|
796 | | - builder.startObject("meta") |
797 | | - .field(dateIntervalType, dateInterval) |
798 | | - .field(DownsampleConfig.TIME_ZONE, timezone) |
799 | | - .endObject() |
800 | | - .endObject(); |
| 779 | + private static void updateTimestampField( |
| 780 | + Map<String, ?> sourceMapping, |
| 781 | + Map<String, Object> updatedMapping, |
| 782 | + String dateIntervalType, |
| 783 | + String dateInterval, |
| 784 | + String timezone |
| 785 | + ) { |
| 786 | + final String timestampType = String.valueOf(sourceMapping.get(FIELD_TYPE)); |
| 787 | + updatedMapping.put(FIELD_TYPE, timestampType != null ? timestampType : DateFieldMapper.CONTENT_TYPE); |
| 788 | + if (sourceMapping.get("format") != null) { |
| 789 | + updatedMapping.put("format", sourceMapping.get("format")); |
| 790 | + } |
| 791 | + if (sourceMapping.get("ignore_malformed") != null) { |
| 792 | + updatedMapping.put("ignore_malformed", sourceMapping.get("ignore_malformed")); |
| 793 | + } |
| 794 | + updatedMapping.put("meta", Map.of(dateIntervalType, dateInterval, DownsampleConfig.TIME_ZONE, timezone)); |
801 | 795 | } |
802 | 796 |
|
803 | 797 | // public for testing |
@@ -833,29 +827,6 @@ public static AggregateMetricDoubleFieldSupportedMetrics getSupportedMetrics( |
833 | 827 | return new AggregateMetricDoubleFieldSupportedMetrics(defaultMetric, supportedAggs); |
834 | 828 | } |
835 | 829 |
|
836 | | - private static void addMetricFieldMapping(final XContentBuilder builder, final String field, final Map<String, ?> fieldProperties) |
837 | | - throws IOException { |
838 | | - final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString( |
839 | | - fieldProperties.get(TIME_SERIES_METRIC_PARAM).toString() |
840 | | - ); |
841 | | - builder.startObject(field); |
842 | | - if (metricType == TimeSeriesParams.MetricType.COUNTER) { |
843 | | - // For counters, we keep the same field type, because they store |
844 | | - // only one value (the last value of the counter) |
845 | | - for (String fieldProperty : fieldProperties.keySet()) { |
846 | | - builder.field(fieldProperty, fieldProperties.get(fieldProperty)); |
847 | | - } |
848 | | - } else { |
849 | | - var supported = getSupportedMetrics(metricType, fieldProperties); |
850 | | - |
851 | | - builder.field("type", AggregateMetricDoubleFieldMapper.CONTENT_TYPE) |
852 | | - .stringListField(AggregateMetricDoubleFieldMapper.Names.METRICS, supported.supportedMetrics) |
853 | | - .field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, supported.defaultMetric) |
854 | | - .field(TIME_SERIES_METRIC_PARAM, metricType); |
855 | | - } |
856 | | - builder.endObject(); |
857 | | - } |
858 | | - |
859 | 830 | private static void validateDownsamplingConfiguration( |
860 | 831 | MapperService mapperService, |
861 | 832 | DownsampleConfig config, |
|
0 commit comments