Skip to content

Commit e3e474c

Browse files
Copy metrics and default_metric properties when downsampling aggregate_metric_double (#121727)
Fixes #119696 and #96076
1 parent 56cac1b commit e3e474c

File tree

5 files changed

+154
-6
lines changed

5 files changed

+154
-6
lines changed

docs/changelog/121727.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 121727
2+
summary: Copy metrics and `default_metric` properties when downsampling `aggregate_metric_double`
3+
area: Downsampling
4+
type: bug
5+
issues:
6+
- 119696
7+
- 96076

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@ public class DataStreamFeatures implements FeatureSpecification {
2121

2222
public static final NodeFeature DATA_STREAM_FAILURE_STORE_TSDB_FIX = new NodeFeature("data_stream.failure_store.tsdb_fix");
2323

24+
public static final NodeFeature DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX = new NodeFeature(
25+
"data_stream.downsample.default_aggregate_metric_fix"
26+
);
27+
2428
@Override
2529
public Set<NodeFeature> getFeatures() {
2630
return Set.of();
2731
}
2832

2933
@Override
3034
public Set<NodeFeature> getTestFeatures() {
31-
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX);
35+
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX, DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX);
3236
}
3337
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"downsample aggregate field":
2+
- requires:
3+
cluster_features: ["data_stream.downsample.default_aggregate_metric_fix"]
4+
reason: "#119696 fixed"
5+
6+
- do:
7+
indices.create:
8+
index: test
9+
body:
10+
settings:
11+
number_of_shards: 1
12+
index:
13+
mode: time_series
14+
routing_path: [sensor_id]
15+
time_series:
16+
start_time: 2021-04-28T00:00:00Z
17+
end_time: 2021-04-29T00:00:00Z
18+
mappings:
19+
properties:
20+
"@timestamp":
21+
type: date
22+
sensor_id:
23+
type: keyword
24+
time_series_dimension: true
25+
temperature:
26+
type: aggregate_metric_double
27+
metrics: [min, sum, value_count]
28+
default_metric: sum
29+
time_series_metric: gauge
30+
- do:
31+
bulk:
32+
refresh: true
33+
index: test
34+
body:
35+
- '{"index": {}}'
36+
- '{"@timestamp": "2021-04-28T18:00:00Z", "sensor_id": "1", "temperature": {"min": 24.7, "sum": 50.2, "value_count": 2}}'
37+
- '{"index": {}}'
38+
- '{"@timestamp": "2021-04-28T18:30:00Z", "sensor_id": "1", "temperature": {"min": 24.2, "sum": 73.8, "value_count": 3}}'
39+
- '{"index": {}}'
40+
- '{"@timestamp": "2021-04-28T19:00:00Z", "sensor_id": "1", "temperature": {"min": 25.1, "sum": 51.0, "value_count": 2}}'
41+
- '{"index": {}}'
42+
- '{"@timestamp": "2021-04-28T19:30:00Z", "sensor_id": "1", "temperature": {"min": 24.8, "sum": 24.8, "value_count": 1}}'
43+
- '{"index": {}}'
44+
- '{"@timestamp": "2021-04-28T20:00:00Z", "sensor_id": "1", "temperature": {"min": 24.6, "sum": 49.1, "value_count": 2}}'
45+
46+
- do:
47+
indices.put_settings:
48+
index: test
49+
body:
50+
index.blocks.write: true
51+
52+
- do:
53+
indices.downsample:
54+
index: test
55+
target_index: test-downsample
56+
body: >
57+
{
58+
"fixed_interval": "1h"
59+
}
60+
- is_true: acknowledged
61+
62+
- do:
63+
search:
64+
index: test-downsample
65+
body:
66+
size: 0
67+
68+
- match:
69+
hits.total.value: 3
70+
71+
- do:
72+
indices.get_mapping:
73+
index: test-downsample
74+
- match:
75+
test-downsample.mappings.properties.temperature:
76+
type: aggregate_metric_double
77+
metrics: [min, sum, value_count]
78+
default_metric: sum
79+
time_series_metric: gauge

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import java.util.ArrayList;
9292
import java.util.List;
9393
import java.util.Map;
94+
import java.util.Objects;
9495
import java.util.Set;
9596
import java.util.concurrent.atomic.AtomicBoolean;
9697
import java.util.concurrent.atomic.AtomicInteger;
@@ -739,6 +740,39 @@ private static void addTimestampField(
739740
.endObject();
740741
}
741742

743+
// public for testing
744+
public record AggregateMetricDoubleFieldSupportedMetrics(String defaultMetric, List<String> supportedMetrics) {}
745+
746+
// public for testing
747+
public static AggregateMetricDoubleFieldSupportedMetrics getSupportedMetrics(
748+
final TimeSeriesParams.MetricType metricType,
749+
final Map<String, ?> fieldProperties
750+
) {
751+
boolean sourceIsAggregate = fieldProperties.get("type").equals(AggregateMetricDoubleFieldMapper.CONTENT_TYPE);
752+
List<String> supportedAggs = List.of(metricType.supportedAggs());
753+
754+
if (sourceIsAggregate) {
755+
@SuppressWarnings("unchecked")
756+
List<String> currentAggs = (List<String>) fieldProperties.get(AggregateMetricDoubleFieldMapper.Names.METRICS);
757+
supportedAggs = supportedAggs.stream().filter(currentAggs::contains).toList();
758+
}
759+
760+
assert supportedAggs.size() > 0;
761+
762+
String defaultMetric = "max";
763+
if (supportedAggs.contains(defaultMetric) == false) {
764+
defaultMetric = supportedAggs.get(0);
765+
}
766+
if (sourceIsAggregate) {
767+
defaultMetric = Objects.requireNonNullElse(
768+
(String) fieldProperties.get(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC),
769+
defaultMetric
770+
);
771+
}
772+
773+
return new AggregateMetricDoubleFieldSupportedMetrics(defaultMetric, supportedAggs);
774+
}
775+
742776
private static void addMetricFieldMapping(final XContentBuilder builder, final String field, final Map<String, ?> fieldProperties)
743777
throws IOException {
744778
final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString(
@@ -752,12 +786,11 @@ private static void addMetricFieldMapping(final XContentBuilder builder, final S
752786
builder.field(fieldProperty, fieldProperties.get(fieldProperty));
753787
}
754788
} else {
755-
final String[] supportedAggsArray = metricType.supportedAggs();
756-
// We choose max as the default metric
757-
final String defaultMetric = List.of(supportedAggsArray).contains("max") ? "max" : supportedAggsArray[0];
789+
var supported = getSupportedMetrics(metricType, fieldProperties);
790+
758791
builder.field("type", AggregateMetricDoubleFieldMapper.CONTENT_TYPE)
759-
.array(AggregateMetricDoubleFieldMapper.Names.METRICS, supportedAggsArray)
760-
.field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, defaultMetric)
792+
.stringListField(AggregateMetricDoubleFieldMapper.Names.METRICS, supported.supportedMetrics)
793+
.field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, supported.defaultMetric)
761794
.field(TIME_SERIES_METRIC_PARAM, metricType);
762795
}
763796
builder.endObject();

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.index.IndexSettings;
1515
import org.elasticsearch.index.IndexVersion;
16+
import org.elasticsearch.index.mapper.TimeSeriesParams;
1617
import org.elasticsearch.test.ESTestCase;
1718
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
1819

1920
import java.util.List;
21+
import java.util.Map;
2022
import java.util.UUID;
2123

24+
import static org.hamcrest.Matchers.is;
25+
2226
public class TransportDownsampleActionTests extends ESTestCase {
2327
public void testCopyIndexMetadata() {
2428
// GIVEN
@@ -107,4 +111,25 @@ private static void assertTargetSettings(final IndexMetadata indexMetadata, fina
107111
settings.get(IndexMetadata.SETTING_CREATION_DATE)
108112
);
109113
}
114+
115+
public void testGetSupportedMetrics() {
116+
TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.GAUGE;
117+
Map<String, Object> fieldProperties = Map.of(
118+
"type",
119+
"aggregate_metric_double",
120+
"metrics",
121+
List.of("max", "sum"),
122+
"default_metric",
123+
"sum"
124+
);
125+
126+
var supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties);
127+
assertThat(supported.defaultMetric(), is("sum"));
128+
assertThat(supported.supportedMetrics(), is(List.of("max", "sum")));
129+
130+
fieldProperties = Map.of("type", "integer");
131+
supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties);
132+
assertThat(supported.defaultMetric(), is("max"));
133+
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
134+
}
110135
}

0 commit comments

Comments
 (0)