Skip to content
7 changes: 7 additions & 0 deletions docs/changelog/121727.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 121727
summary: Copy metrics and `default_metric` properties when downsampling `aggregate_metric_double`
area: Downsampling
type: bug
issues:
- 119696
- 96076
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ public class DataStreamFeatures implements FeatureSpecification {

public static final NodeFeature DATA_STREAM_FAILURE_STORE_TSDB_FIX = new NodeFeature("data_stream.failure_store.tsdb_fix");

public static final NodeFeature DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX = new NodeFeature(
"data_stream.downsample.default_aggregate_metric_fix"
);

@Override
public Set<NodeFeature> getFeatures() {
return Set.of();
}

@Override
public Set<NodeFeature> getTestFeatures() {
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX);
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX, DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"downsample aggregate field":
- requires:
cluster_features: ["data_stream.downsample.default_aggregate_metric_fix"]
reason: "#119696 fixed"

- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
index:
mode: time_series
routing_path: [sensor_id]
time_series:
start_time: 2021-04-28T00:00:00Z
end_time: 2021-04-29T00:00:00Z
mappings:
properties:
"@timestamp":
type: date
sensor_id:
type: keyword
time_series_dimension: true
temperature:
type: aggregate_metric_double
metrics: [min, sum, value_count]
default_metric: sum
time_series_metric: gauge
- do:
bulk:
refresh: true
index: test
body:
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:00:00Z", "sensor_id": "1", "temperature": {"min": 24.7, "sum": 50.2, "value_count": 2}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:30:00Z", "sensor_id": "1", "temperature": {"min": 24.2, "sum": 73.8, "value_count": 3}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:00:00Z", "sensor_id": "1", "temperature": {"min": 25.1, "sum": 51.0, "value_count": 2}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:30:00Z", "sensor_id": "1", "temperature": {"min": 24.8, "sum": 24.8, "value_count": 1}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T20:00:00Z", "sensor_id": "1", "temperature": {"min": 24.6, "sum": 49.1, "value_count": 2}}'

- do:
indices.put_settings:
index: test
body:
index.blocks.write: true

- do:
indices.downsample:
index: test
target_index: test-downsample
body: >
{
"fixed_interval": "1h"
}
- is_true: acknowledged

- do:
search:
index: test-downsample
body:
size: 0

- match:
hits.total.value: 3

- do:
indices.get_mapping:
index: test-downsample
- match:
test-downsample.mappings.properties.temperature:
type: aggregate_metric_double
metrics: [min, sum, value_count]
default_metric: sum
time_series_metric: gauge
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -739,6 +740,39 @@ private static void addTimestampField(
.endObject();
}

// public for testing
public record AggregateMetricDoubleFieldSupportedMetrics(String defaultMetric, List<String> supportedMetrics) {}

// public for testing
public static AggregateMetricDoubleFieldSupportedMetrics getSupportedMetrics(
final TimeSeriesParams.MetricType metricType,
final Map<String, ?> fieldProperties
) {
boolean sourceIsAggregate = fieldProperties.get("type").equals(AggregateMetricDoubleFieldMapper.CONTENT_TYPE);
List<String> supportedAggs = List.of(metricType.supportedAggs());

if (sourceIsAggregate) {
@SuppressWarnings("unchecked")
List<String> currentAggs = (List<String>) fieldProperties.get(AggregateMetricDoubleFieldMapper.Names.METRICS);
supportedAggs = supportedAggs.stream().filter(currentAggs::contains).toList();
}

assert supportedAggs.size() > 0;

String defaultMetric = "max";
if (supportedAggs.contains(defaultMetric) == false) {
defaultMetric = supportedAggs.get(0);
}
if (sourceIsAggregate) {
defaultMetric = Objects.requireNonNullElse(
(String) fieldProperties.get(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC),
defaultMetric
);
}

return new AggregateMetricDoubleFieldSupportedMetrics(defaultMetric, supportedAggs);
}

private static void addMetricFieldMapping(final XContentBuilder builder, final String field, final Map<String, ?> fieldProperties)
throws IOException {
final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString(
Expand All @@ -752,12 +786,11 @@ private static void addMetricFieldMapping(final XContentBuilder builder, final S
builder.field(fieldProperty, fieldProperties.get(fieldProperty));
}
} else {
final String[] supportedAggsArray = metricType.supportedAggs();
// We choose max as the default metric
final String defaultMetric = List.of(supportedAggsArray).contains("max") ? "max" : supportedAggsArray[0];
var supported = getSupportedMetrics(metricType, fieldProperties);

builder.field("type", AggregateMetricDoubleFieldMapper.CONTENT_TYPE)
.array(AggregateMetricDoubleFieldMapper.Names.METRICS, supportedAggsArray)
.field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, defaultMetric)
.stringListField(AggregateMetricDoubleFieldMapper.Names.METRICS, supported.supportedMetrics)
.field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, supported.defaultMetric)
.field(TIME_SERIES_METRIC_PARAM, metricType);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;

import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.Matchers.is;

public class TransportDownsampleActionTests extends ESTestCase {
public void testCopyIndexMetadata() {
// GIVEN
Expand Down Expand Up @@ -107,4 +111,25 @@ private static void assertTargetSettings(final IndexMetadata indexMetadata, fina
settings.get(IndexMetadata.SETTING_CREATION_DATE)
);
}

public void testGetSupportedMetrics() {
TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.GAUGE;
Map<String, Object> fieldProperties = Map.of(
"type",
"aggregate_metric_double",
"metrics",
List.of("max", "sum"),
"default_metric",
"sum"
);

var supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties);
assertThat(supported.defaultMetric(), is("sum"));
assertThat(supported.supportedMetrics(), is(List.of("max", "sum")));

fieldProperties = Map.of("type", "integer");
supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties);
assertThat(supported.defaultMetric(), is("max"));
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
}
}