Skip to content

Commit 5b9a9ba

Browse files
committed
Check if the dimension is passthrough and skip it.
1 parent 8904282 commit 5b9a9ba

File tree

4 files changed

+213
-2
lines changed

4 files changed

+213
-2
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.downsample;
9+
10+
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
11+
import org.elasticsearch.action.downsample.DownsampleAction;
12+
import org.elasticsearch.action.downsample.DownsampleConfig;
13+
import org.elasticsearch.action.support.SubscribableListener;
14+
import org.elasticsearch.cluster.metadata.IndexMetadata;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.datastreams.DataStreamsPlugin;
17+
import org.elasticsearch.plugins.Plugin;
18+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
19+
import org.elasticsearch.test.ClusterServiceUtils;
20+
import org.elasticsearch.xcontent.XContentBuilder;
21+
import org.elasticsearch.xcontent.XContentFactory;
22+
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
23+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
24+
25+
import java.io.IOException;
26+
import java.time.Instant;
27+
import java.util.Collection;
28+
import java.util.List;
29+
import java.util.function.Supplier;
30+
31+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
32+
import static org.elasticsearch.xpack.downsample.DownsampleDataStreamTests.TIMEOUT;
33+
34+
public class DownsampleIT extends DownsamplingIntegTestCase {
35+
36+
@Override
37+
protected Collection<Class<? extends Plugin>> nodePlugins() {
38+
return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class);
39+
}
40+
41+
public void testDownsampling() throws Exception {
42+
String dataStreamName = "metrics-foo";
43+
44+
putTSDBIndexTemplate("my-template", List.of("metrics-foo"), null, """
45+
{
46+
"properties": {
47+
"metrics": {
48+
"type": "passthrough",
49+
"priority": 10,
50+
"time_series_dimension": true
51+
},
52+
"metrics.cpu_usage": {
53+
"type": "double",
54+
"time_series_metric": "counter"
55+
}
56+
}
57+
}
58+
}
59+
""", null, null);
60+
61+
final Instant now = Instant.now();
62+
Supplier<XContentBuilder> sourceSupplier = () -> {
63+
String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli());
64+
try {
65+
return XContentFactory.jsonBuilder()
66+
.startObject()
67+
.field("@timestamp", ts)
68+
.field("metrics.host.name", randomFrom("host1", "host2", "host3"))
69+
.field("metrics.cpu_usage", randomDouble())
70+
.endObject();
71+
} catch (IOException e) {
72+
throw new RuntimeException(e);
73+
}
74+
};
75+
bulkIndex(dataStreamName, sourceSupplier, 100);
76+
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
77+
78+
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
79+
String sourceIndex = backingIndices.get(0);
80+
String interval = "5m";
81+
String targetIndex = "downsample-" + interval + "-" + sourceIndex;
82+
// Set the source index to read-only state
83+
assertAcked(
84+
indicesAdmin().prepareUpdateSettings(sourceIndex)
85+
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
86+
);
87+
88+
DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval));
89+
assertAcked(
90+
client().execute(
91+
DownsampleAction.INSTANCE,
92+
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig)
93+
)
94+
);
95+
96+
SubscribableListener<Void> listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
97+
final var indexMetadata = clusterState.metadata().getProject().index(targetIndex);
98+
if (indexMetadata == null) {
99+
return false;
100+
}
101+
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
102+
return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS;
103+
});
104+
safeAwait(listener);
105+
106+
assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);
107+
}
108+
}

x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,20 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.ElasticsearchException;
1313
import org.elasticsearch.action.DocWriteRequest;
14+
import org.elasticsearch.action.admin.cluster.stats.MappingVisitor;
15+
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
1416
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
1517
import org.elasticsearch.action.bulk.BulkItemResponse;
1618
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1719
import org.elasticsearch.action.bulk.BulkResponse;
1820
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
21+
import org.elasticsearch.action.downsample.DownsampleConfig;
1922
import org.elasticsearch.action.index.IndexRequest;
2023
import org.elasticsearch.action.support.WriteRequest;
2124
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2225
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
2326
import org.elasticsearch.cluster.metadata.IndexMetadata;
27+
import org.elasticsearch.cluster.metadata.Metadata;
2428
import org.elasticsearch.cluster.metadata.Template;
2529
import org.elasticsearch.common.Strings;
2630
import org.elasticsearch.common.compress.CompressedXContent;
@@ -31,6 +35,10 @@
3135
import org.elasticsearch.index.IndexMode;
3236
import org.elasticsearch.index.IndexSettings;
3337
import org.elasticsearch.index.engine.VersionConflictEngineException;
38+
import org.elasticsearch.index.mapper.DateFieldMapper;
39+
import org.elasticsearch.index.mapper.MapperService;
40+
import org.elasticsearch.index.mapper.TimeSeriesParams;
41+
import org.elasticsearch.indices.IndicesService;
3442
import org.elasticsearch.plugins.Plugin;
3543
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
3644
import org.elasticsearch.test.ESIntegTestCase;
@@ -44,12 +52,20 @@
4452
import java.time.ZoneId;
4553
import java.util.ArrayList;
4654
import java.util.Collection;
55+
import java.util.HashMap;
56+
import java.util.HashSet;
4757
import java.util.List;
4858
import java.util.Locale;
4959
import java.util.Map;
60+
import java.util.Set;
61+
import java.util.concurrent.atomic.AtomicBoolean;
5062
import java.util.function.Supplier;
5163

64+
import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM;
65+
import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM;
5266
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
67+
import static org.hamcrest.Matchers.arrayContaining;
68+
import static org.hamcrest.Matchers.equalTo;
5369

5470
/**
5571
* Base test case for downsampling integration tests. It provides helper methods to:
@@ -237,4 +253,85 @@ String randomDateForInterval(final DateHistogramInterval interval, final long st
237253
String randomDateForRange(long start, long end) {
238254
return DATE_FORMATTER.formatMillis(randomLongBetween(start, end));
239255
}
256+
257+
/**
258+
* Currently we assert the correctness of metrics and dimensions. The assertions can be extended when needed.
259+
*/
260+
@SuppressWarnings("unchecked")
261+
void assertDownsampleIndexFieldsAndDimensions(String sourceIndex, String downsampleIndex, DownsampleConfig config) throws Exception {
262+
GetIndexResponse getIndexResponse = indicesAdmin().prepareGetIndex(TEST_REQUEST_TIMEOUT)
263+
.setIndices(sourceIndex, downsampleIndex)
264+
.get();
265+
assertThat(getIndexResponse.indices(), arrayContaining(sourceIndex, downsampleIndex));
266+
267+
// Retrieve field information for the metric fields
268+
final Map<String, Object> sourceIndexMappings = getIndexResponse.mappings().get(sourceIndex).getSourceAsMap();
269+
final Map<String, Object> downsampleIndexMappings = getIndexResponse.mappings().get(downsampleIndex).getSourceAsMap();
270+
271+
final MapperService mapperService = getMapperServiceForIndex(sourceIndex);
272+
final CompressedXContent sourceIndexCompressedXContent = new CompressedXContent(sourceIndexMappings);
273+
mapperService.merge(MapperService.SINGLE_MAPPING_NAME, sourceIndexCompressedXContent, MapperService.MergeReason.INDEX_TEMPLATE);
274+
275+
// Collect expected mappings for fields and dimensions
276+
Map<String, TimeSeriesParams.MetricType> metricFields = new HashMap<>();
277+
Map<String, String> dimensionFields = new HashMap<>();
278+
MappingVisitor.visitMapping(sourceIndexMappings, (field, fieldMapping) -> {
279+
if (isTimeSeriesMetric(fieldMapping)) {
280+
metricFields.put(field, TimeSeriesParams.MetricType.fromString(fieldMapping.get(TIME_SERIES_METRIC_PARAM).toString()));
281+
} else if (hasTimeSeriesDimensionTrue(fieldMapping)) {
282+
// This includes passthrough objects
283+
dimensionFields.put(field, fieldMapping.get("type").toString());
284+
}
285+
});
286+
287+
AtomicBoolean encounteredTimestamp = new AtomicBoolean(false);
288+
Set<String> encounteredMetrics = new HashSet<>();
289+
Set<String> encounteredDimensions = new HashSet<>();
290+
MappingVisitor.visitMapping(downsampleIndexMappings, (field, fieldMapping) -> {
291+
if (field.equals(config.getTimestampField())) {
292+
encounteredTimestamp.set(true);
293+
assertThat(fieldMapping.get("type"), equalTo(DateFieldMapper.CONTENT_TYPE));
294+
Map<String, Object> dateTimeMeta = (Map<String, Object>) fieldMapping.get("meta");
295+
assertThat(dateTimeMeta.get("time_zone"), equalTo(config.getTimeZone()));
296+
assertThat(dateTimeMeta.get(config.getIntervalType()), equalTo(config.getInterval().toString()));
297+
} else if (metricFields.containsKey(field)) {
298+
encounteredMetrics.add(field);
299+
TimeSeriesParams.MetricType metricType = metricFields.get(field);
300+
switch (metricType) {
301+
case COUNTER -> assertThat(fieldMapping.get("type"), equalTo("double"));
302+
case GAUGE -> assertThat(fieldMapping.get("type"), equalTo("aggregate_metric_double"));
303+
default -> fail("Unsupported field type");
304+
}
305+
assertThat(fieldMapping.get("time_series_metric"), equalTo(metricType.toString()));
306+
} else if (dimensionFields.containsKey(field)) {
307+
encounteredDimensions.add(field);
308+
assertThat(fieldMapping.get("type"), equalTo(dimensionFields.get(field)));
309+
assertThat(fieldMapping.get("time_series_dimension"), equalTo(true));
310+
}
311+
});
312+
assertThat(encounteredTimestamp.get(), equalTo(true));
313+
assertThat(encounteredMetrics, equalTo(metricFields.keySet()));
314+
assertThat(encounteredDimensions, equalTo(dimensionFields.keySet()));
315+
}
316+
317+
private static MapperService getMapperServiceForIndex(String sourceIndex) throws IOException {
318+
final IndexMetadata indexMetadata = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT)
319+
.get()
320+
.getState()
321+
.getMetadata()
322+
.getProject(Metadata.DEFAULT_PROJECT_ID)
323+
.index(sourceIndex);
324+
final IndicesService indicesService = internalCluster().getAnyMasterNodeInstance(IndicesService.class);
325+
return indicesService.createIndexMapperServiceForValidation(indexMetadata);
326+
}
327+
328+
boolean isTimeSeriesMetric(final Map<String, ?> fieldMapping) {
329+
final String metricType = (String) fieldMapping.get(TIME_SERIES_METRIC_PARAM);
330+
return metricType != null
331+
&& List.of(TimeSeriesParams.MetricType.values()).contains(TimeSeriesParams.MetricType.fromString(metricType));
332+
}
333+
334+
private static boolean hasTimeSeriesDimensionTrue(Map<String, ?> fieldMapping) {
335+
return Boolean.TRUE.equals(fieldMapping.get(TIME_SERIES_DIMENSION_PARAM));
336+
}
240337
}

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldValueFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ static List<FieldValueFetcher> create(final SearchExecutionContext context, fina
4040
List<FieldValueFetcher> fetchers = new ArrayList<>();
4141
for (String dimension : dimensions) {
4242
MappedFieldType fieldType = context.getFieldType(dimension);
43-
assert fieldType != null : "Unknown dimension field type for dimension field: [" + dimension + "]";
43+
assert fieldType != null : "Unknown type for dimension field: [" + dimension + "]";
4444

4545
if (context.fieldExistsInIndex(fieldType.name())) {
4646
final IndexFieldData<?> fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH);

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TimeseriesFieldTypeHelper.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
import org.elasticsearch.index.mapper.MappedFieldType;
1111
import org.elasticsearch.index.mapper.MapperService;
1212
import org.elasticsearch.index.mapper.MappingLookup;
13+
import org.elasticsearch.index.mapper.PassThroughObjectMapper;
1314
import org.elasticsearch.index.mapper.TimeSeriesParams;
1415
import org.elasticsearch.index.mapper.flattened.FlattenedFieldMapper;
16+
import org.elasticsearch.search.suggest.completion.context.ContextMapping;
1517

1618
import java.io.IOException;
1719
import java.util.List;
@@ -47,7 +49,11 @@ public boolean isTimeSeriesMetric(final String unused, final Map<String, ?> fiel
4749
}
4850

4951
public boolean isTimeSeriesDimension(final String unused, final Map<String, ?> fieldMapping) {
50-
return Boolean.TRUE.equals(fieldMapping.get(TIME_SERIES_DIMENSION_PARAM));
52+
return Boolean.TRUE.equals(fieldMapping.get(TIME_SERIES_DIMENSION_PARAM)) && isPassthroughField(fieldMapping) == false;
53+
}
54+
55+
public static boolean isPassthroughField(final Map<String, ?> fieldMapping) {
56+
return PassThroughObjectMapper.CONTENT_TYPE.equals(fieldMapping.get(ContextMapping.FIELD_TYPE));
5157
}
5258

5359
public List<String> extractFlattenedDimensions(final String field, final Map<String, ?> fieldMapping) {

0 commit comments

Comments
 (0)