|
45 | 45 | import org.elasticsearch.cluster.service.MasterServiceTaskQueue; |
46 | 46 | import org.elasticsearch.common.Priority; |
47 | 47 | import org.elasticsearch.common.Strings; |
48 | | -import org.elasticsearch.common.TriConsumer; |
49 | 48 | import org.elasticsearch.common.compress.CompressedXContent; |
50 | 49 | import org.elasticsearch.common.settings.IndexScopedSettings; |
51 | 50 | import org.elasticsearch.common.settings.Setting; |
|
96 | 95 | import java.util.concurrent.atomic.AtomicInteger; |
97 | 96 | import java.util.function.Predicate; |
98 | 97 |
|
| 98 | +import static org.elasticsearch.action.admin.cluster.stats.MappingVisitor.FIELD_TYPE; |
| 99 | +import static org.elasticsearch.action.admin.cluster.stats.MappingVisitor.MULTI_FIELDS; |
| 100 | +import static org.elasticsearch.action.admin.cluster.stats.MappingVisitor.PROPERTIES; |
| 101 | +import static org.elasticsearch.action.admin.cluster.stats.MappingVisitor.visitAndCopyMapping; |
99 | 102 | import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; |
100 | 103 | import static org.elasticsearch.xpack.core.ilm.DownsampleAction.DOWNSAMPLED_INDEX_PREFIX; |
101 | 104 |
|
|
108 | 111 | public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAction<DownsampleAction.Request> { |
109 | 112 |
|
110 | 113 | private static final Logger logger = LogManager.getLogger(TransportDownsampleAction.class); |
111 | | - private static final String MAPPING_PROPERTIES = "properties"; |
112 | 114 | private static final String MAPPING_DYNAMIC_TEMPLATES = "dynamic_templates"; |
113 | | - private static final String FIELD_TYPE = "type"; |
114 | | - private static final String MULTI_FIELDS = "fields"; |
115 | 115 | private final Client client; |
116 | 116 | private final IndicesService indicesService; |
117 | 117 | private final MasterServiceTaskQueue<DownsampleClusterStateUpdateTask> taskQueue; |
@@ -694,96 +694,67 @@ static String createDownsampleIndexMapping( |
694 | 694 | @SuppressWarnings("unchecked") |
695 | 695 | List<Object> downsampledDynamicTemplates = (List<Object>) downsampledMapping.get(MAPPING_DYNAMIC_TEMPLATES); |
696 | 696 | downsampledDynamicTemplates.addAll(sourceDynamicTemplates); |
697 | | - } else if (entry.getKey().equals(MAPPING_PROPERTIES) == false) { |
| 697 | + } else if (entry.getKey().equals(MappingVisitor.PROPERTIES) == false) { |
698 | 698 | downsampledMapping.put(entry.getKey(), entry.getValue()); |
699 | 699 | } |
700 | 700 | } |
701 | | - populateMappingProperties(sourceIndexMappings, downsampledMapping, (fieldName, sourceMapping, updatedMapping) -> { |
| 701 | + visitAndCopyMapping(sourceIndexMappings, downsampledMapping, (fieldName, sourceMapping, updatedMapping) -> { |
702 | 702 | if (timestampField.equals(fieldName)) { |
703 | | - final String timestampType = String.valueOf(sourceMapping.get(FIELD_TYPE)); |
704 | | - updatedMapping.put(FIELD_TYPE, timestampType != null ? timestampType : DateFieldMapper.CONTENT_TYPE); |
705 | | - if (sourceMapping.get("format") != null) { |
706 | | - updatedMapping.put("format", sourceMapping.get("format")); |
707 | | - } |
708 | | - if (sourceMapping.get("ignore_malformed") != null) { |
709 | | - updatedMapping.put("ignore_malformed", sourceMapping.get("ignore_malformed")); |
710 | | - } |
711 | | - updatedMapping.put("meta", Map.of(dateIntervalType, dateInterval, DownsampleConfig.TIME_ZONE, timezone)); |
| 703 | + updateTimestampField(sourceMapping, updatedMapping, dateIntervalType, dateInterval, timezone); |
712 | 704 | return; |
713 | | - } |
714 | | - if (helper.isTimeSeriesMetric(fieldName, sourceMapping)) { |
715 | | - final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString( |
716 | | - sourceMapping.get(TIME_SERIES_METRIC_PARAM).toString() |
717 | | - ); |
718 | | - if (metricType == TimeSeriesParams.MetricType.GAUGE |
719 | | - && AggregateMetricDoubleFieldMapper.CONTENT_TYPE.equals(sourceMapping.get(FIELD_TYPE)) == false) { |
720 | | - var supportedMetrics = getSupportedMetrics(metricType, sourceMapping); |
721 | | - |
722 | | - updatedMapping.put(TIME_SERIES_METRIC_PARAM, metricType.toString()); |
723 | | - updatedMapping.put(FIELD_TYPE, AggregateMetricDoubleFieldMapper.CONTENT_TYPE); |
724 | | - updatedMapping.put(AggregateMetricDoubleFieldMapper.Names.METRICS, supportedMetrics.supportedMetrics); |
725 | | - updatedMapping.put(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, supportedMetrics.defaultMetric); |
726 | | - return; |
727 | | - } |
728 | | - } |
729 | | - for (String f : sourceMapping.keySet()) { |
730 | | - if (f.equals(MAPPING_PROPERTIES) || f.equals(MULTI_FIELDS)) { |
731 | | - continue; |
732 | | - } |
733 | | - updatedMapping.put(f, sourceMapping.get(f)); |
| 705 | + } else if (helper.isTimeSeriesMetric(fieldName, sourceMapping)) { |
| 706 | + processMetricField(sourceMapping, updatedMapping); |
| 707 | + } else { |
| 708 | + copyMapping(sourceMapping, updatedMapping); |
734 | 709 | } |
735 | 710 | }); |
736 | 711 |
|
737 | 712 | return new CompressedXContent(downsampledMapping).uncompressed().utf8ToString(); |
738 | 713 | } |
739 | 714 |
|
740 | | - private static void populateMappingProperties( |
741 | | - final Map<String, ?> sourceMapping, |
742 | | - final Map<String, Object> destMapping, |
743 | | - TriConsumer<String, Map<String, ?>, Map<String, Object>> fieldProcessor |
744 | | - ) { |
745 | | - Object sourceProperties0 = sourceMapping.get(MAPPING_PROPERTIES); |
746 | | - if (sourceProperties0 instanceof Map) { |
747 | | - @SuppressWarnings("unchecked") |
748 | | - Map<String, ?> sourceProperties = (Map<String, ?>) sourceProperties0; |
749 | | - var destProperties = new HashMap<>(sourceProperties.size()); |
750 | | - destMapping.put(MAPPING_PROPERTIES, destProperties); |
751 | | - for (Map.Entry<String, ?> entry : sourceProperties.entrySet()) { |
752 | | - String fieldName = entry.getKey(); |
753 | | - final Object v = entry.getValue(); |
754 | | - if (v instanceof Map) { |
755 | | - @SuppressWarnings("unchecked") |
756 | | - Map<String, ?> sourceFieldMapping = (Map<String, ?>) v; |
757 | | - var destFieldMapping = new HashMap<String, Object>(sourceFieldMapping.size()); |
758 | | - destProperties.put(fieldName, destFieldMapping); |
759 | | - // Process the field from properties and multi-fields. |
760 | | - fieldProcessor.apply(entry.getKey(), sourceFieldMapping, destFieldMapping); |
761 | | - populateMappingProperties(sourceFieldMapping, destFieldMapping, fieldProcessor); |
762 | | - |
763 | | - // Multi fields |
764 | | - Object sourceFieldsO = sourceFieldMapping.get(MULTI_FIELDS); |
765 | | - if (sourceFieldsO instanceof Map) { |
766 | | - @SuppressWarnings("unchecked") |
767 | | - Map<String, ?> sourceFields = (Map<String, ?>) sourceFieldsO; |
768 | | - var destFields = new HashMap<String, Object>(sourceFields.size()); |
769 | | - destFieldMapping.put(MULTI_FIELDS, destFields); |
770 | | - for (Map.Entry<String, ?> multiFieldEntry : sourceFields.entrySet()) { |
771 | | - Object v2 = multiFieldEntry.getValue(); |
772 | | - if (v2 instanceof Map) { |
773 | | - String multiFieldName = multiFieldEntry.getKey(); |
774 | | - @SuppressWarnings("unchecked") |
775 | | - Map<String, ?> sourceMultiFieldMapping = (Map<String, ?>) v2; |
776 | | - Map<String, Object> destMultiFieldMapping = new HashMap<>(sourceMultiFieldMapping.size()); |
777 | | - destFields.put(multiFieldName, destMultiFieldMapping); |
778 | | - fieldProcessor.apply(multiFieldName, sourceMultiFieldMapping, destMultiFieldMapping); |
779 | | - } |
780 | | - } |
781 | | - } |
782 | | - } |
| 715 | + private static void processMetricField(Map<String, ?> sourceMapping, Map<String, Object> updatedMapping) { |
| 716 | + final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString( |
| 717 | + sourceMapping.get(TIME_SERIES_METRIC_PARAM).toString() |
| 718 | + ); |
| 719 | + if (metricType == TimeSeriesParams.MetricType.GAUGE |
| 720 | + && AggregateMetricDoubleFieldMapper.CONTENT_TYPE.equals(sourceMapping.get(FIELD_TYPE)) == false) { |
| 721 | + var supportedMetrics = getSupportedMetrics(metricType, sourceMapping); |
| 722 | + |
| 723 | + updatedMapping.put(TIME_SERIES_METRIC_PARAM, metricType.toString()); |
| 724 | + updatedMapping.put(FIELD_TYPE, AggregateMetricDoubleFieldMapper.CONTENT_TYPE); |
| 725 | + updatedMapping.put(AggregateMetricDoubleFieldMapper.Names.METRICS, supportedMetrics.supportedMetrics); |
| 726 | + updatedMapping.put(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, supportedMetrics.defaultMetric); |
| 727 | + } else { |
| 728 | + copyMapping(sourceMapping, updatedMapping); |
| 729 | + } |
| 730 | + } |
| 731 | + |
| 732 | + private static void copyMapping(Map<String, ?> sourceMapping, Map<String, Object> updatedMapping) { |
| 733 | + for (String f : sourceMapping.keySet()) { |
| 734 | + if (f.equals(PROPERTIES) == false && f.equals(MULTI_FIELDS) == false) { |
| 735 | + updatedMapping.put(f, sourceMapping.get(f)); |
783 | 736 | } |
784 | 737 | } |
785 | 738 | } |
786 | 739 |
|
| 740 | + private static void updateTimestampField( |
| 741 | + Map<String, ?> sourceMapping, |
| 742 | + Map<String, Object> updatedMapping, |
| 743 | + String dateIntervalType, |
| 744 | + String dateInterval, |
| 745 | + String timezone |
| 746 | + ) { |
| 747 | + final String timestampType = String.valueOf(sourceMapping.get(FIELD_TYPE)); |
| 748 | + updatedMapping.put(FIELD_TYPE, timestampType != null ? timestampType : DateFieldMapper.CONTENT_TYPE); |
| 749 | + if (sourceMapping.get("format") != null) { |
| 750 | + updatedMapping.put("format", sourceMapping.get("format")); |
| 751 | + } |
| 752 | + if (sourceMapping.get("ignore_malformed") != null) { |
| 753 | + updatedMapping.put("ignore_malformed", sourceMapping.get("ignore_malformed")); |
| 754 | + } |
| 755 | + updatedMapping.put("meta", Map.of(dateIntervalType, dateInterval, DownsampleConfig.TIME_ZONE, timezone)); |
| 756 | + } |
| 757 | + |
787 | 758 | // public for testing |
788 | 759 | public record AggregateMetricDoubleFieldSupportedMetrics(String defaultMetric, List<String> supportedMetrics) {} |
789 | 760 |
|
|
0 commit comments