Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions x-pack/plugin/downsample/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ dependencies {
compileOnly project(path: xpackModule('mapper-aggregate-metric'))
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(xpackModule('ccr'))
testImplementation project(xpackModule('esql'))
testImplementation project(xpackModule('esql-core'))
}

addQaCheckDependencies(project)
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,35 @@

package org.elasticsearch.xpack.downsample;

import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.downsample.DownsampleAction;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.esql.action.ColumnInfoImpl;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.downsample.DownsampleDataStreamTests.TIMEOUT;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class DownsampleIT extends DownsamplingIntegTestCase {

Expand Down Expand Up @@ -96,4 +107,157 @@ public void testDownsamplingPassthroughDimensions() throws Exception {

assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);
}

public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
String dataStreamName = "metrics-foo";
Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build();
putTSDBIndexTemplate("my-template", List.of("metrics-foo"), settings, """
{
"properties": {
"host": {
"type": "keyword",
"time_series_dimension": true
},
"cluster" : {
"type": "keyword",
"time_series_dimension": true
},
"cpu": {
"type": "double",
"time_series_metric": "gauge"
}
}
}
""", null, null);

// Create data stream by indexing documents
final Instant now = Instant.now();
Supplier<XContentBuilder> sourceSupplier = () -> {
String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli());
try {
return XContentFactory.jsonBuilder()
.startObject()
.field("@timestamp", ts)
.field("host", randomFrom("host1", "host2", "host3"))
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
.field("cpu", randomDouble())
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
bulkIndex(dataStreamName, sourceSupplier, 100);

// Rollover to ensure the index we will downsample is not the write index
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
String sourceIndex = backingIndices.get(0);
String interval = "5m";
String targetIndex = "downsample-" + interval + "-" + sourceIndex;
// Set the source index to read-only state
assertAcked(
indicesAdmin().prepareUpdateSettings(sourceIndex)
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
);

DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval));
assertAcked(
client().execute(
DownsampleAction.INSTANCE,
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig)
)
);

// Wait for downsampling to complete
SubscribableListener<Void> listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
final var indexMetadata = clusterState.metadata().getProject().index(targetIndex);
if (indexMetadata == null) {
return false;
}
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS;
});
safeAwait(listener);

assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);

// remove old backing index and replace with downsampled index and delete old so old is not queried
assertAcked(
client().execute(
ModifyDataStreamsAction.INSTANCE,
new ModifyDataStreamsAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
List.of(
DataStreamAction.removeBackingIndex(dataStreamName, sourceIndex),
DataStreamAction.addBackingIndex(dataStreamName, targetIndex)
)
)
).actionGet()
);
assertAcked(client().execute(TransportDeleteIndexAction.TYPE, new DeleteIndexRequest(sourceIndex)).actionGet());

// index to the next backing index; random time between 31 and 59m in the future to because default look_ahead_time is 30m and we
// don't want to conflict with the previous backing index
Supplier<XContentBuilder> nextSourceSupplier = () -> {
String ts = randomDateForRange(now.plusSeconds(60 * 31).toEpochMilli(), now.plusSeconds(60 * 59).toEpochMilli());
try {
return XContentFactory.jsonBuilder()
.startObject()
.field("@timestamp", ts)
.field("host", randomFrom("host1", "host2", "host3"))
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
.field("cpu", randomDouble())
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
bulkIndex(dataStreamName, nextSourceSupplier, 100);

// Since the downsampled field (cpu) is downsampled in one index and not in the other, we want to confirm
// first that the field is unsupported and has 2 original types - double and aggregate_metric_double
try (var resp = esqlCommand("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu")) {
var columns = resp.columns();
assertThat(columns, hasSize(4));
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl("@timestamp", "date", null),
new ColumnInfoImpl("host", "keyword", null),
new ColumnInfoImpl("cluster", "keyword", null),
new ColumnInfoImpl("cpu", "unsupported", List.of("aggregate_metric_double", "double"))
)
)
);
}

// test _over_time commands with implicit casting of aggregate_metric_double
for (String innerCommand : List.of("min_over_time", "max_over_time", "avg_over_time", "count_over_time")) {
for (String outerCommand : List.of("min", "max", "sum", "count")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using randomFrom instead of checking all combinations in each run.

String command = outerCommand + " (" + innerCommand + "(cpu))";
String expectedType = innerCommand.equals("count_over_time") || outerCommand.equals("count") ? "long" : "double";
try (var resp = esqlCommand("TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")) {
var columns = resp.columns();
assertThat(columns, hasSize(3));
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl(command, expectedType, null),
new ColumnInfoImpl("cluster", "keyword", null),
new ColumnInfoImpl("bucket(@timestamp, 1 hour)", "date", null)
)
)
);
// TODO: verify the numbers are accurate
}
}
}
}

private EsqlQueryResponse esqlCommand(String command) throws IOException {
return client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(command)).actionGet(30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;

import java.io.IOException;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -82,7 +83,13 @@ public abstract class DownsamplingIntegTestCase extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class);
return List.of(
DataStreamsPlugin.class,
LocalStateCompositeXPackPlugin.class,
Downsample.class,
AggregateMetricMapperPlugin.class,
EsqlPlugin.class
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,31 @@ public enum Cap {
*/
AGGREGATE_METRIC_DOUBLE_PARTIAL_SUBMETRICS(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Support for rendering aggregate_metric_double type
*/
AGGREGATE_METRIC_DOUBLE_RENDERING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Support for to_aggregate_metric_double function
*/
AGGREGATE_METRIC_DOUBLE_CONVERT_TO(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Support for sorting when aggregate_metric_doubles are present
*/
AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Support avg with aggregate metric doubles
*/
AGGREGATE_METRIC_DOUBLE_AVG(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Support for implicit casting of aggregate metric double when run in aggregations
*/
AGGREGATE_METRIC_DOUBLE_IMPLICIT_CASTING_IN_AGGS(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Support change point detection "CHANGE_POINT".
*/
Expand All @@ -913,11 +938,6 @@ public enum Cap {
*/
SUPPORT_PARTIAL_RESULTS,

/**
* Support for rendering aggregate_metric_double type
*/
AGGREGATE_METRIC_DOUBLE_RENDERING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Support for RERANK command
*/
Expand Down Expand Up @@ -964,11 +984,6 @@ public enum Cap {
*/
NON_FULL_TEXT_FUNCTIONS_SCORING,

/**
* Support for to_aggregate_metric_double function
*/
AGGREGATE_METRIC_DOUBLE_CONVERT_TO(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* The {@code _query} API now reports the original types.
*/
Expand All @@ -995,11 +1010,6 @@ public enum Cap {
*/
MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT,

/**
* Support for sorting when aggregate_metric_doubles are present
*/
AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Supercedes {@link Cap#MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT}.
*/
Expand Down Expand Up @@ -1260,11 +1270,6 @@ public enum Cap {
*/
LIKE_ON_INDEX_FIELDS,

/**
* Support avg with aggregate metric doubles
*/
AGGREGATE_METRIC_DOUBLE_AVG(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Forbid usage of brackets in unquoted index and enrich policy names
* https://github.com/elastic/elasticsearch/issues/130378
Expand Down
Loading