Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class DataStreamFeatures implements FeatureSpecification {

public static final NodeFeature FAILURE_STORE_IN_LOG_DATA_STREAMS = new NodeFeature("logs_data_streams.failure_store.enabled");

public static final NodeFeature DOWNSAMPLE_INDEX_LEVEL_DEFAULT_METRIC = new NodeFeature(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is a downsample feature. I think we should add this feature to the downsample module? It doesn't seem to have a class extending FeatureSpecification yet, but I think we can add that.

"data_stream.downsample.index_level_default_metric"
);

@Override
public Set<NodeFeature> getFeatures() {
return Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE);
Expand All @@ -41,7 +45,8 @@ public Set<NodeFeature> getTestFeatures() {
DATA_STREAM_FAILURE_STORE_TSDB_FIX,
DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX,
LOGS_STREAM_FEATURE,
FAILURE_STORE_IN_LOG_DATA_STREAMS
FAILURE_STORE_IN_LOG_DATA_STREAMS,
DOWNSAMPLE_INDEX_LEVEL_DEFAULT_METRIC
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
// TSDB index settings
IndexSettings.MODE,
IndexMetadata.INDEX_ROUTING_PATH,
IndexSettings.TIME_SERIES_DEFAULT_METRIC,
IndexSettings.TIME_SERIES_START_TIME,
IndexSettings.TIME_SERIES_END_TIME,
IndexSettings.SEQ_NO_INDEX_OPTIONS_SETTING,
Expand Down
22 changes: 22 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,26 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public enum AggregateMetricDoubleDefaultMetric {
MIN,
MAX,
SUM,
VALUE_COUNT;
}
Comment on lines +650 to +655
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It feels a bit smelly to have this enum duplicate the other enum AggregateMetricDoubleFieldMapper.Metric. It makes sense since that second enum only exists in the mapper-aggregate-metric plug-in and so can't be referenced here, but I wonder if you could instead create one enum somewhere and have both classes refer to that one enum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we have a few of these enums....there's similarly one in the ES|QL side too and I would love to have just one enum but I'm not sure where it would live


public static final Setting<AggregateMetricDoubleDefaultMetric> TIME_SERIES_DEFAULT_METRIC = Setting.enumSetting(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this setting be moved to the xpack downsample module? We can just parse it in TimeseriesFieldTypeHelper and keep it around as a field there? (The setting can then be registered in Downsample plugin class by overwriting the getSettings() method)

If that works then there is no need to define the enum that Jordan added a comment about, given that I think we can just use AggregateMetricDoubleFieldMapper.Metric directly in the downsample module (seems to have a compile dependency on mapper-aggregate-metric module).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try it out..!

AggregateMetricDoubleDefaultMetric.class,
"index.time_series.default_metric",
AggregateMetricDoubleDefaultMetric.MAX,
Property.IndexScope,
Property.Final,
Property.ServerlessPublic
);

public AggregateMetricDoubleDefaultMetric getDefaultMetric() {
return defaultMetric;
}

/**
* Returns <code>true</code> if TSDB encoding is enabled. The default is <code>true</code>
*/
Expand Down Expand Up @@ -896,6 +916,7 @@ private static String getIgnoreAboveDefaultValue(final Settings settings) {
private final boolean softDeleteEnabled;
private volatile long softDeleteRetentionOperations;
private final boolean es87TSDBCodecEnabled;
private final AggregateMetricDoubleDefaultMetric defaultMetric;
private final boolean logsdbRouteOnSortFields;
private final boolean logsdbSortOnHostName;
private final boolean logsdbAddHostNameField;
Expand Down Expand Up @@ -1116,6 +1137,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
indexRouting = IndexRouting.fromIndexMetadata(indexMetadata);
sourceKeepMode = scopedSettings.get(Mapper.SYNTHETIC_SOURCE_KEEP_INDEX_SETTING);
es87TSDBCodecEnabled = scopedSettings.get(TIME_SERIES_ES87TSDB_CODEC_ENABLED_SETTING);
defaultMetric = scopedSettings.get(TIME_SERIES_DEFAULT_METRIC);
logsdbRouteOnSortFields = scopedSettings.get(LOGSDB_ROUTE_ON_SORT_FIELDS);
logsdbSortOnHostName = scopedSettings.get(LOGSDB_SORT_ON_HOST_NAME);
logsdbAddHostNameField = scopedSettings.get(LOGSDB_ADD_HOST_NAME_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public final class TimeSeriesParams {

public static final String TIME_SERIES_METRIC_PARAM = "time_series_metric";
public static final String TIME_SERIES_DIMENSION_PARAM = "time_series_dimension";
public static final String TIME_SERIES_DEFAULT_METRIC_PARAM = "default_metric";

private TimeSeriesParams() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,91 @@
metrics: [min, sum, value_count]
default_metric: sum
time_series_metric: gauge

---
"downsample numeric field":
- requires:
cluster_features: ["data_stream.downsample.index_level_default_metric"]
reason: "Specify default metric to be used in an index when downsampling numerics"

- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
index:
mode: time_series
routing_path: [sensor_id]
time_series:
start_time: 2021-04-28T00:00:00Z
end_time: 2021-04-29T00:00:00Z
default_metric: value_count
mappings:
properties:
"@timestamp":
type: date
sensor_id:
type: keyword
time_series_dimension: true
temperature:
type: double
time_series_metric: gauge
agg_metric:
type: aggregate_metric_double
metrics: [ min, sum, value_count ]
default_metric: sum
time_series_metric: gauge

- do:
bulk:
refresh: true
index: test
body:
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T17:34:00Z", "sensor_id": "1", "temperature": 24.1, "agg_metric": {"min": -1, "sum": 20, "value_count": 5}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T17:39:00Z", "sensor_id": "1", "temperature": 25.3, "agg_metric": {"min": 3, "sum": 50, "value_count": 7}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T17:45:00Z", "sensor_id": "1", "temperature": 24.8, "agg_metric": {"min": -5, "sum": 33, "value_count": 9}}'

- do:
indices.put_settings:
index: test
body:
index.blocks.write: true

- do:
indices.downsample:
index: test
target_index: test-downsample
body: >
{
"fixed_interval": "30m"
}
- is_true: acknowledged

- do:
search:
index: test-downsample
body:
size: 0

- match:
hits.total.value: 1

- do:
indices.get_mapping:
index: test-downsample
- match:
test-downsample.mappings.properties.temperature:
type: aggregate_metric_double
metrics: [min, max, sum, value_count]
default_metric: value_count
time_series_metric: gauge
- match:
test-downsample.mappings.properties.agg_metric:
type: aggregate_metric_double
metrics: [min, sum, value_count]
default_metric: sum
time_series_metric: gauge
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.downsample;

import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
Expand Down Expand Up @@ -56,6 +57,10 @@ public static boolean isPassthroughField(final Map<String, ?> fieldMapping) {
return PassThroughObjectMapper.CONTENT_TYPE.equals(fieldMapping.get(ContextMapping.FIELD_TYPE));
}

public IndexSettings.AggregateMetricDoubleDefaultMetric getDefaultMetric() {
return mapperService.getIndexSettings().getDefaultMetric();
}

public List<String> extractFlattenedDimensions(final String field, final Map<String, ?> fieldMapping) {
var mapper = mapperService.mappingLookup().getMapper(field);
if (mapper instanceof FlattenedFieldMapper == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -712,7 +713,7 @@ private static void addMetricFields(
MappingVisitor.visitMapping(sourceIndexMappings, (field, mapping) -> {
if (helper.isTimeSeriesMetric(field, mapping)) {
try {
addMetricFieldMapping(builder, field, mapping);
addMetricFieldMapping(builder, field, mapping, helper.getDefaultMetric());
} catch (IOException e) {
throw new ElasticsearchException("Error while adding metric for field [" + field + "]");
}
Expand Down Expand Up @@ -761,7 +762,8 @@ public record AggregateMetricDoubleFieldSupportedMetrics(String defaultMetric, L
// public for testing
public static AggregateMetricDoubleFieldSupportedMetrics getSupportedMetrics(
final TimeSeriesParams.MetricType metricType,
final Map<String, ?> fieldProperties
final Map<String, ?> fieldProperties,
final IndexSettings.AggregateMetricDoubleDefaultMetric indexLevelDefaultMetric
) {
boolean sourceIsAggregate = fieldProperties.get("type").equals(AggregateMetricDoubleFieldMapper.CONTENT_TYPE);
List<String> supportedAggs = List.of(metricType.supportedAggs());
Expand All @@ -783,13 +785,19 @@ public static AggregateMetricDoubleFieldSupportedMetrics getSupportedMetrics(
(String) fieldProperties.get(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC),
defaultMetric
);
} else {
defaultMetric = indexLevelDefaultMetric.name().toLowerCase(Locale.ROOT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading this correctly, index.time_series.default_metric will only apply to non-aggregate number fields that are being downsampled into aggregate metrics, and existing aggregate metrics with no specified default_metric will still fall back to "max".

Would it makes sense to instead have index.time_series.default_metric specify the default metric in both cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this does make sense. Maybe the defaultMetric variable should just be initialized by indexLevelDefaultMetric (on line 777 in main branch)? Given that the default of the new index setting is also max.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm not entirely sure I understand the situation
I had an assumption that if you don't specify the default metric for an aggregate metric double field, then it's automatically assigned one (which would be max if max is present), which would mean all existing aggregate metric doubles have a default metric - is that not the case?
I was thinking I want to avoid a situation where someone already has an aggregate metric double, and then this setting gets introduced and it overwrites their old aggregate metric double's default setting

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right--in most cases, there will already be a default metric specified on the aggregate metric, since aggregate metrics are usually created by downsampling which adds a default metric using this logic.

But it is technically possible to manually create an aggregate metric field that has no default metric specified. In this case, the logic here is hard-coded to default to using "max" as the default metric. But it might make more sense to instead use the new index-level index.time_series.default_metric here.

}

return new AggregateMetricDoubleFieldSupportedMetrics(defaultMetric, supportedAggs);
}

private static void addMetricFieldMapping(final XContentBuilder builder, final String field, final Map<String, ?> fieldProperties)
throws IOException {
private static void addMetricFieldMapping(
final XContentBuilder builder,
final String field,
final Map<String, ?> fieldProperties,
IndexSettings.AggregateMetricDoubleDefaultMetric defaultMetric
) throws IOException {
final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString(
fieldProperties.get(TIME_SERIES_METRIC_PARAM).toString()
);
Expand All @@ -801,7 +809,7 @@ private static void addMetricFieldMapping(final XContentBuilder builder, final S
builder.field(fieldProperty, fieldProperties.get(fieldProperty));
}
} else {
var supported = getSupportedMetrics(metricType, fieldProperties);
var supported = getSupportedMetrics(metricType, fieldProperties, defaultMetric);

builder.field("type", AggregateMetricDoubleFieldMapper.CONTENT_TYPE)
.stringListField(AggregateMetricDoubleFieldMapper.Names.METRICS, supported.supportedMetrics)
Expand Down Expand Up @@ -942,6 +950,10 @@ private void createDownsampleIndex(
.put(
IndexSettings.TIME_SERIES_END_TIME.getKey(),
sourceIndexMetadata.getSettings().get(IndexSettings.TIME_SERIES_END_TIME.getKey())
)
.put(
IndexSettings.TIME_SERIES_DEFAULT_METRIC.getKey(),
sourceIndexMetadata.getSettings().get(IndexSettings.TIME_SERIES_DEFAULT_METRIC.getKey())
);
if (sourceIndexMetadata.getSettings().hasValue(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey())) {
builder.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,20 @@ public void testGetSupportedMetrics() {
"sum"
);

var supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties);
var supported = TransportDownsampleAction.getSupportedMetrics(
metricType,
fieldProperties,
IndexSettings.AggregateMetricDoubleDefaultMetric.MAX
);
assertThat(supported.defaultMetric(), is("sum"));
assertThat(supported.supportedMetrics(), is(List.of("max", "sum")));

fieldProperties = Map.of("type", "integer");
supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties);
supported = TransportDownsampleAction.getSupportedMetrics(
metricType,
fieldProperties,
IndexSettings.AggregateMetricDoubleDefaultMetric.MAX
);
assertThat(supported.defaultMetric(), is("max"));
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
}
Expand Down