Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/121727.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 121727
summary: Copy metrics and `default_metric` properties when downsampling `aggregate_metric_double`
area: Downsampling
type: bug
issues:
- 119696
- 96076
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ public class DataStreamFeatures implements FeatureSpecification {

public static final NodeFeature DATA_STREAM_FAILURE_STORE_TSDB_FIX = new NodeFeature("data_stream.failure_store.tsdb_fix");

public static final NodeFeature DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX = new NodeFeature(
"data_stream.downsample.default_aggregate_metric_fix"
);

@Override
public Set<NodeFeature> getFeatures() {
return Set.of();
}

@Override
public Set<NodeFeature> getTestFeatures() {
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX);
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX, DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"downsample aggregate field":
- requires:
cluster_features: ["data_stream.downsample.default_aggregate_metric_fix"]
reason: "#119696 fixed"

- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
index:
mode: time_series
routing_path: [sensor_id]
time_series:
start_time: 2021-04-28T00:00:00Z
end_time: 2021-04-29T00:00:00Z
mappings:
properties:
"@timestamp":
type: date
sensor_id:
type: keyword
time_series_dimension: true
temperature:
type: aggregate_metric_double
metrics: [min, sum, value_count]
default_metric: sum
time_series_metric: gauge
- do:
bulk:
refresh: true
index: test
body:
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:00:00Z", "sensor_id": "1", "temperature": {"min": 24.7, "sum": 50.2, "value_count": 2}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:30:00Z", "sensor_id": "1", "temperature": {"min": 24.2, "sum": 73.8, "value_count": 3}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:00:00Z", "sensor_id": "1", "temperature": {"min": 25.1, "sum": 51.0, "value_count": 2}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:30:00Z", "sensor_id": "1", "temperature": {"min": 24.8, "sum": 24.8, "value_count": 1}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T20:00:00Z", "sensor_id": "1", "temperature": {"min": 24.6, "sum": 49.1, "value_count": 2}}'

- do:
indices.put_settings:
index: test
body:
index.blocks.write: true

- do:
indices.downsample:
index: test
target_index: test-downsample
body: >
{
"fixed_interval": "1h"
}
- is_true: acknowledged

- do:
search:
index: test-downsample
body:
size: 0

- match:
hits.total.value: 3

- do:
indices.get_mapping:
index: test-downsample
- match:
test-downsample.mappings.properties.temperature:
type: aggregate_metric_double
metrics: [min, sum, value_count]
default_metric: sum
time_series_metric: gauge
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -739,6 +740,39 @@ private static void addTimestampField(
.endObject();
}

// public for testing
public record AggregateMetricDoubleFieldSupportedMetrics(String defaultMetric, List<String> supportedMetrics) {}

// public for testing
public static AggregateMetricDoubleFieldSupportedMetrics getSupportedMetrics(
final TimeSeriesParams.MetricType metricType,
final Map<String, ?> fieldProperties
) {
boolean sourceIsAggregate = fieldProperties.get("type").equals(AggregateMetricDoubleFieldMapper.CONTENT_TYPE);
List<String> supportedAggs = List.of(metricType.supportedAggs());

if (sourceIsAggregate) {
@SuppressWarnings("unchecked")
List<String> currentAggs = (List<String>) fieldProperties.get(AggregateMetricDoubleFieldMapper.Names.METRICS);
supportedAggs = supportedAggs.stream().filter(currentAggs::contains).toList();
}

assert supportedAggs.size() > 0;

String defaultMetric = "max";
if (supportedAggs.contains(defaultMetric) == false) {
defaultMetric = supportedAggs.get(0);
}
if (sourceIsAggregate) {
defaultMetric = Objects.requireNonNullElse(
(String) fieldProperties.get(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC),
defaultMetric
);
}

return new AggregateMetricDoubleFieldSupportedMetrics(defaultMetric, supportedAggs);
}

private static void addMetricFieldMapping(final XContentBuilder builder, final String field, final Map<String, ?> fieldProperties)
throws IOException {
final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString(
Expand All @@ -752,12 +786,11 @@ private static void addMetricFieldMapping(final XContentBuilder builder, final S
builder.field(fieldProperty, fieldProperties.get(fieldProperty));
}
} else {
final String[] supportedAggsArray = metricType.supportedAggs();
// We choose max as the default metric
final String defaultMetric = List.of(supportedAggsArray).contains("max") ? "max" : supportedAggsArray[0];
var supported = getSupportedMetrics(metricType, fieldProperties);

builder.field("type", AggregateMetricDoubleFieldMapper.CONTENT_TYPE)
.array(AggregateMetricDoubleFieldMapper.Names.METRICS, supportedAggsArray)
.field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, defaultMetric)
.stringListField(AggregateMetricDoubleFieldMapper.Names.METRICS, supported.supportedMetrics)
.field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, supported.defaultMetric)
.field(TIME_SERIES_METRIC_PARAM, metricType);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;

import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.Matchers.is;

public class TransportDownsampleActionTests extends ESTestCase {
public void testCopyIndexMetadata() {
// GIVEN
Expand Down Expand Up @@ -107,4 +111,25 @@ private static void assertTargetSettings(final IndexMetadata indexMetadata, fina
settings.get(IndexMetadata.SETTING_CREATION_DATE)
);
}

public void testGetSupportedMetrics() {
TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.GAUGE;
Map<String, Object> fieldProperties = Map.of(
"type",
"aggregate_metric_double",
"metrics",
List.of("max", "sum"),
"default_metric",
"sum"
);

var supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties);
assertThat(supported.defaultMetric(), is("sum"));
assertThat(supported.supportedMetrics(), is(List.of("max", "sum")));

fieldProperties = Map.of("type", "integer");
supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties);
assertThat(supported.defaultMetric(), is("max"));
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
}
}