-
Couldn't load subscription status.
- Fork 25.6k
[ES|QL] Support implicit casting of aggregate_metric_double #129108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 3 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
b6b041c
[ES|QL] Cast aggregate_metric_double implicitly in aggregations
limotova e98a2db
some more test cases and comments
limotova 3e80b90
Merge branch 'main' into implicit-casting-agg-metric
limotova fea20bd
Merge branch 'main' into implicit-casting-agg-metric
limotova 583a321
fix bug and changes
limotova af7a60b
undo AbstractConvertFunction -> EsqlScalarFunction change in TypeConv…
limotova 6022f17
Merge branch 'main' into implicit-casting-agg-metric
limotova 38682b7
Revert undo AbstractConvertFunction -> EsqlScalarFunction
limotova e540c03
Move capabilities and add comment about substitute surrogates
limotova c1ae136
Merge branch 'main' into implicit-casting-agg-metric
limotova c5c1b00
Merge branch 'main' into implicit-casting-agg-metric
limotova 47b46b8
remove call to substitute surrogates for avg/_over_time
limotova 202d348
Merge branch 'main' into implicit-casting-agg-metric
limotova 49564cf
Merge branch 'main' into implicit-casting-agg-metric
limotova File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,24 +7,36 @@ | |
|
|
||
| package org.elasticsearch.xpack.downsample; | ||
|
|
||
| import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; | ||
| import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; | ||
| import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; | ||
| import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; | ||
| import org.elasticsearch.action.downsample.DownsampleAction; | ||
| import org.elasticsearch.action.downsample.DownsampleConfig; | ||
| import org.elasticsearch.action.support.SubscribableListener; | ||
| import org.elasticsearch.cluster.metadata.DataStreamAction; | ||
| import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; | ||
| import org.elasticsearch.test.ClusterServiceUtils; | ||
| import org.elasticsearch.xcontent.XContentBuilder; | ||
| import org.elasticsearch.xcontent.XContentFactory; | ||
| import org.elasticsearch.xpack.esql.action.ColumnInfoImpl; | ||
| import org.elasticsearch.xpack.esql.action.EsqlQueryAction; | ||
| import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; | ||
| import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; | ||
|
|
||
| import java.io.IOException; | ||
| import java.time.Instant; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
| import static org.elasticsearch.xpack.downsample.DownsampleDataStreamTests.TIMEOUT; | ||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.Matchers.hasSize; | ||
|
|
||
| public class DownsampleIT extends DownsamplingIntegTestCase { | ||
|
|
||
|
|
@@ -96,4 +108,161 @@ public void testDownsamplingPassthroughDimensions() throws Exception { | |
|
|
||
| assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig); | ||
| } | ||
|
|
||
| public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { | ||
| String dataStreamName = "metrics-foo"; | ||
| Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build(); | ||
| putTSDBIndexTemplate("my-template", List.of("metrics-foo"), settings, """ | ||
| { | ||
| "properties": { | ||
| "host": { | ||
| "type": "keyword", | ||
| "time_series_dimension": true | ||
| }, | ||
| "cluster" : { | ||
| "type": "keyword", | ||
| "time_series_dimension": true | ||
| }, | ||
| "cpu": { | ||
| "type": "double", | ||
| "time_series_metric": "gauge" | ||
| } | ||
| } | ||
| } | ||
| """, null, null); | ||
|
|
||
| // Create data stream by indexing documents | ||
| final Instant now = Instant.now(); | ||
| Supplier<XContentBuilder> sourceSupplier = () -> { | ||
| String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli()); | ||
| try { | ||
| return XContentFactory.jsonBuilder() | ||
| .startObject() | ||
| .field("@timestamp", ts) | ||
| .field("host", randomFrom("host1", "host2", "host3")) | ||
| .field("cluster", randomFrom("cluster1", "cluster2", "cluster3")) | ||
| .field("cpu", randomDouble()) | ||
| .endObject(); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| }; | ||
| bulkIndex(dataStreamName, sourceSupplier, 100); | ||
|
|
||
| // Rollover to ensure the index we will downsample is not the write index | ||
| assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null))); | ||
| List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); | ||
| String sourceIndex = backingIndices.get(0); | ||
| String interval = "5m"; | ||
| String targetIndex = "downsample-" + interval + "-" + sourceIndex; | ||
| // Set the source index to read-only state | ||
| assertAcked( | ||
| indicesAdmin().prepareUpdateSettings(sourceIndex) | ||
| .setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build()) | ||
| ); | ||
|
|
||
| DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval)); | ||
| assertAcked( | ||
| client().execute( | ||
| DownsampleAction.INSTANCE, | ||
| new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig) | ||
| ) | ||
| ); | ||
|
|
||
| // Wait for downsampling to complete | ||
| SubscribableListener<Void> listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { | ||
| final var indexMetadata = clusterState.metadata().getProject().index(targetIndex); | ||
| if (indexMetadata == null) { | ||
| return false; | ||
| } | ||
| var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings()); | ||
| return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS; | ||
| }); | ||
| safeAwait(listener); | ||
|
|
||
| assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig); | ||
|
|
||
| // remove old backing index and replace with downsampled index and delete old so old is not queried | ||
| assertAcked( | ||
| client().execute( | ||
| ModifyDataStreamsAction.INSTANCE, | ||
| new ModifyDataStreamsAction.Request( | ||
| TEST_REQUEST_TIMEOUT, | ||
| TEST_REQUEST_TIMEOUT, | ||
| List.of( | ||
| DataStreamAction.removeBackingIndex(dataStreamName, sourceIndex), | ||
| DataStreamAction.addBackingIndex(dataStreamName, targetIndex) | ||
| ) | ||
| ) | ||
| ).actionGet() | ||
| ); | ||
| assertAcked(client().execute(TransportDeleteIndexAction.TYPE, new DeleteIndexRequest(sourceIndex)).actionGet()); | ||
|
|
||
| // index to the next backing index; random time between 31 and 59m in the future to because default look_ahead_time is 30m and we | ||
| // don't want to conflict with the previous backing index | ||
| Supplier<XContentBuilder> nextSourceSupplier = () -> { | ||
| String ts = randomDateForRange(now.plusSeconds(60 * 31).toEpochMilli(), now.plusSeconds(60 * 59).toEpochMilli()); | ||
limotova marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| try { | ||
| return XContentFactory.jsonBuilder() | ||
| .startObject() | ||
| .field("@timestamp", ts) | ||
| .field("host", randomFrom("host1", "host2", "host3")) | ||
| .field("cluster", randomFrom("cluster1", "cluster2", "cluster3")) | ||
| .field("cpu", randomDouble()) | ||
| .endObject(); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| }; | ||
| bulkIndex(dataStreamName, nextSourceSupplier, 100); | ||
|
|
||
| // Since the downsampled field (cpu) is downsampled in one index and not in the other, we want to confirm | ||
| // first that the field is unsupported and has 2 original types - double and aggregate_metric_double | ||
| try (var resp = esqlCommand("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu")) { | ||
| var columns = resp.columns(); | ||
| assertThat(columns, hasSize(4)); | ||
| assertThat( | ||
| resp.columns(), | ||
| equalTo( | ||
| List.of( | ||
| new ColumnInfoImpl("@timestamp", "date", null), | ||
| new ColumnInfoImpl("host", "keyword", null), | ||
| new ColumnInfoImpl("cluster", "keyword", null), | ||
| new ColumnInfoImpl("cpu", "unsupported", List.of("aggregate_metric_double", "double")) | ||
| ) | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| // test _over_time commands with implicit casting of aggregate_metric_double | ||
| for (String innerCommand : List.of("min_over_time", "max_over_time", "avg_over_time", "count_over_time")) { | ||
| for (String outerCommand : List.of("min", "max", "sum", "count")) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider using |
||
| String command = outerCommand + " (" + innerCommand + "(cpu))"; | ||
| String expectedType = innerCommand.equals("count_over_time") || outerCommand.equals("count") ? "long" : "double"; | ||
| try (var resp = esqlCommand("TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")) { | ||
| var columns = resp.columns(); | ||
| assertThat(columns, hasSize(3)); | ||
| assertThat( | ||
| resp.columns(), | ||
| equalTo( | ||
| List.of( | ||
| new ColumnInfoImpl(command, expectedType, null), | ||
| new ColumnInfoImpl("cluster", "keyword", null), | ||
| new ColumnInfoImpl("bucket(@timestamp, 1 hour)", "date", null) | ||
| ) | ||
| ) | ||
| ); | ||
| // TODO: verify the numbers are accurate | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private EsqlQueryResponse esqlCommand(String command) throws IOException { | ||
| if (command.toLowerCase(Locale.ROOT).contains("limit") == false) { | ||
limotova marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // add a (high) limit to avoid warnings on default limit | ||
| command += " | limit 10000000"; | ||
| } | ||
| return client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(command)).actionGet(30, TimeUnit.SECONDS); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.