Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions x-pack/plugin/downsample/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ dependencies {
compileOnly project(path: xpackModule('mapper-aggregate-metric'))
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(xpackModule('ccr'))
testImplementation project(xpackModule('esql'))
testImplementation project(xpackModule('esql-core'))
}

addQaCheckDependencies(project)
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,36 @@

package org.elasticsearch.xpack.downsample;

import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.downsample.DownsampleAction;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.esql.action.ColumnInfoImpl;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.downsample.DownsampleDataStreamTests.TIMEOUT;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class DownsampleIT extends DownsamplingIntegTestCase {

Expand Down Expand Up @@ -96,4 +108,161 @@ public void testDownsamplingPassthroughDimensions() throws Exception {

assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);
}

public void testEsqlTSAfterDownsampling() throws Exception {
String dataStreamName = "metrics-foo";
Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build();
putTSDBIndexTemplate("my-template", List.of("metrics-foo"), settings, """
{
"properties": {
"host": {
"type": "keyword",
"time_series_dimension": true
},
"cluster" : {
"type": "keyword",
"time_series_dimension": true
},
"cpu": {
"type": "double",
"time_series_metric": "gauge"
}
}
}
""", null, null);

// Create data stream by indexing documents
final Instant now = Instant.now();
Supplier<XContentBuilder> sourceSupplier = () -> {
String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli());
try {
return XContentFactory.jsonBuilder()
.startObject()
.field("@timestamp", ts)
.field("host", randomFrom("host1", "host2", "host3"))
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
.field("cpu", randomDouble())
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
bulkIndex(dataStreamName, sourceSupplier, 100);

// Rollover to ensure the index we will downsample is not the write index
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
String sourceIndex = backingIndices.get(0);
String interval = "5m";
String targetIndex = "downsample-" + interval + "-" + sourceIndex;
// Set the source index to read-only state
assertAcked(
indicesAdmin().prepareUpdateSettings(sourceIndex)
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
);

DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval));
assertAcked(
client().execute(
DownsampleAction.INSTANCE,
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig)
)
);

// Wait for downsampling to complete
SubscribableListener<Void> listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
final var indexMetadata = clusterState.metadata().getProject().index(targetIndex);
if (indexMetadata == null) {
return false;
}
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS;
});
safeAwait(listener);

assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);

// remove old backing index and replace with downsampled index and delete old so old is not queried
assertAcked(
client().execute(
ModifyDataStreamsAction.INSTANCE,
new ModifyDataStreamsAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
List.of(
DataStreamAction.removeBackingIndex(dataStreamName, sourceIndex),
DataStreamAction.addBackingIndex(dataStreamName, targetIndex)
)
)
).actionGet()
);
assertAcked(client().execute(TransportDeleteIndexAction.TYPE, new DeleteIndexRequest(sourceIndex)).actionGet());

// index to the next backing index
Supplier<XContentBuilder> nextSourceSupplier = () -> {
String ts = randomDateForRange(now.plusSeconds(60 * 31).toEpochMilli(), now.plusSeconds(60 * 59).toEpochMilli());
try {
return XContentFactory.jsonBuilder()
.startObject()
.field("@timestamp", ts)
.field("host", randomFrom("host1", "host2", "host3"))
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
.field("cpu", randomDouble())
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
bulkIndex(dataStreamName, nextSourceSupplier, 100);

// Since the downsampled field (cpu) is downsampled in one index and not in the other, we want to confirm
// first that the field is unsupported and has 2 original types - double and aggregate_metric_double
try (var resp = esqlCommand("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu")) {
var columns = resp.columns();
assertThat(columns, hasSize(4));
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl("@timestamp", "date", null),
new ColumnInfoImpl("host", "keyword", null),
new ColumnInfoImpl("cluster", "keyword", null),
new ColumnInfoImpl("cpu", "unsupported", List.of("aggregate_metric_double", "double"))
)
)
);

}
// test that implicit casting within time aggregation query works
try (
var resp = esqlCommand(
"TS "
+ dataStreamName
+ " | STATS min = sum(min_over_time(cpu)), max = sum(max_over_time(cpu)) by cluster, bucket(@timestamp, 1 hour)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So cool to see this working.. well done!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider randomizing the inner and outer agg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the current integration test I decided to have it loop through some known combinations for a single aggregation, right now the 2nd bug I listed would cause some combinations (avg_over_time + sum/count_over_time) to fail so I decided to not randomize it.
But it might be good to have some random testing that generates a random number of these aggregations to run in the same query once that bug gets figured out

)
) {
var columns = resp.columns();
assertThat(columns, hasSize(4));
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl("min", "double", null),
new ColumnInfoImpl("max", "double", null),
new ColumnInfoImpl("cluster", "keyword", null),
new ColumnInfoImpl("bucket(@timestamp, 1 hour)", "date", null)
)
)
);
// TODO: verify the numbers are accurate
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've manually verified that the values produced make sense, but I'm not sure how to go about verifying within the IT test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah tricky.. maybe it's easy to track and min of mins and max of maxes, and verify just that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm depending on the query I'm not sure that would work, anything involving sum/_over_time would go out of that range, and then throwing negative numbers into the mix I think would throw everything off quite a bit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so only (min|max)(min|max_over_time) would work. One option is to track the min and and max value overall, and verify that they get returned if we have one of those combinations.

}
}

private EsqlQueryResponse esqlCommand(String command) throws IOException {
if (command.toLowerCase(Locale.ROOT).contains("limit") == false) {
// add a (high) limit to avoid warnings on default limit
command += " | limit 10000000";
}
return client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(command)).actionGet(30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;

import java.io.IOException;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -82,7 +83,13 @@ public abstract class DownsamplingIntegTestCase extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class);
return List.of(
DataStreamsPlugin.class,
LocalStateCompositeXPackPlugin.class,
Downsample.class,
AggregateMetricMapperPlugin.class,
EsqlPlugin.class
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,12 @@ public enum Cap {
/**
* (Re)Added EXPLAIN command
*/
EXPLAIN(Build.current().isSnapshot());
EXPLAIN(Build.current().isSnapshot()),

/**
* Support for implicit casting of aggregate metric double when run in aggregations
*/
AGGREGATE_METRIC_DOUBLE_IMPLICIT_CASTING_IN_AGGS(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG);

private final boolean enabled;

Expand Down
Loading