diff --git a/x-pack/plugin/downsample/build.gradle b/x-pack/plugin/downsample/build.gradle index 2de12b89b5d3d..b823f1cd2e266 100644 --- a/x-pack/plugin/downsample/build.gradle +++ b/x-pack/plugin/downsample/build.gradle @@ -18,6 +18,8 @@ dependencies { compileOnly project(path: xpackModule('mapper-aggregate-metric')) testImplementation(testArtifact(project(xpackModule('core')))) testImplementation project(xpackModule('ccr')) + testImplementation project(xpackModule('esql')) + testImplementation project(xpackModule('esql-core')) } addQaCheckDependencies(project) diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java index 70150d4f95bc9..0b815fce21b04 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java @@ -7,24 +7,35 @@ package org.elasticsearch.xpack.downsample; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.downsample.DownsampleAction; import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.cluster.metadata.DataStreamAction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xpack.esql.action.ColumnInfoImpl; +import org.elasticsearch.xpack.esql.action.EsqlQueryAction; +import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import java.io.IOException; import java.time.Instant; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.downsample.DownsampleDataStreamTests.TIMEOUT; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; public class DownsampleIT extends DownsamplingIntegTestCase { @@ -96,4 +107,157 @@ public void testDownsamplingPassthroughDimensions() throws Exception { assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig); } + + public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { + String dataStreamName = "metrics-foo"; + Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build(); + putTSDBIndexTemplate("my-template", List.of("metrics-foo"), settings, """ + { + "properties": { + "host": { + "type": "keyword", + "time_series_dimension": true + }, + "cluster" : { + "type": "keyword", + "time_series_dimension": true + }, + "cpu": { + "type": "double", + "time_series_metric": "gauge" + } + } + } + """, null, null); + + // Create data stream by indexing documents + final Instant now = Instant.now(); + Supplier sourceSupplier = () -> { + String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli()); + try { + return XContentFactory.jsonBuilder() + .startObject() + .field("@timestamp", ts) + .field("host", randomFrom("host1", "host2", "host3")) + .field("cluster", randomFrom("cluster1", "cluster2", "cluster3")) + .field("cpu", randomDouble()) + .endObject(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + bulkIndex(dataStreamName, sourceSupplier, 100); + + // Rollover to ensure the index we will downsample is not the write index + assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null))); + List backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); + String sourceIndex = backingIndices.get(0); + String interval = "5m"; + String targetIndex = "downsample-" + interval + "-" + sourceIndex; + // Set the source index to read-only state + assertAcked( + indicesAdmin().prepareUpdateSettings(sourceIndex) + .setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build()) + ); + + DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval)); + assertAcked( + client().execute( + DownsampleAction.INSTANCE, + new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig) + ) + ); + + // Wait for downsampling to complete + SubscribableListener listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + final var indexMetadata = clusterState.metadata().getProject().index(targetIndex); + if (indexMetadata == null) { + return false; + } + var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings()); + return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS; + }); + safeAwait(listener); + + assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig); + + // remove old backing index and replace with downsampled index and delete old so old is not queried + assertAcked( + client().execute( + ModifyDataStreamsAction.INSTANCE, + new ModifyDataStreamsAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + List.of( + DataStreamAction.removeBackingIndex(dataStreamName, sourceIndex), + DataStreamAction.addBackingIndex(dataStreamName, targetIndex) + ) + ) + ).actionGet() + ); + assertAcked(client().execute(TransportDeleteIndexAction.TYPE, new DeleteIndexRequest(sourceIndex)).actionGet()); + + // index to the next backing index; random time between 31 and 59m in the future to because default look_ahead_time is 30m and we + // don't want to conflict with the previous backing index + Supplier nextSourceSupplier = () -> { + String ts = randomDateForRange(now.plusSeconds(60 * 31).toEpochMilli(), now.plusSeconds(60 * 59).toEpochMilli()); + try { + return XContentFactory.jsonBuilder() + .startObject() + .field("@timestamp", ts) + .field("host", randomFrom("host1", "host2", "host3")) + .field("cluster", randomFrom("cluster1", "cluster2", "cluster3")) + .field("cpu", randomDouble()) + .endObject(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + bulkIndex(dataStreamName, nextSourceSupplier, 100); + + // Since the downsampled field (cpu) is downsampled in one index and not in the other, we want to confirm + // first that the field is unsupported and has 2 original types - double and aggregate_metric_double + try (var resp = esqlCommand("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu")) { + var columns = resp.columns(); + assertThat(columns, hasSize(4)); + assertThat( + resp.columns(), + equalTo( + List.of( + new ColumnInfoImpl("@timestamp", "date", null), + new ColumnInfoImpl("host", "keyword", null), + new ColumnInfoImpl("cluster", "keyword", null), + new ColumnInfoImpl("cpu", "unsupported", List.of("aggregate_metric_double", "double")) + ) + ) + ); + } + + // test _over_time commands with implicit casting of aggregate_metric_double + for (String innerCommand : List.of("min_over_time", "max_over_time", "avg_over_time", "count_over_time")) { + for (String outerCommand : List.of("min", "max", "sum", "count")) { + String command = outerCommand + " (" + innerCommand + "(cpu))"; + String expectedType = innerCommand.equals("count_over_time") || outerCommand.equals("count") ? "long" : "double"; + try (var resp = esqlCommand("TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")) { + var columns = resp.columns(); + assertThat(columns, hasSize(3)); + assertThat( + resp.columns(), + equalTo( + List.of( + new ColumnInfoImpl(command, expectedType, null), + new ColumnInfoImpl("cluster", "keyword", null), + new ColumnInfoImpl("bucket(@timestamp, 1 hour)", "date", null) + ) + ) + ); + // TODO: verify the numbers are accurate + } + } + } + } + + private EsqlQueryResponse esqlCommand(String command) throws IOException { + return client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(command)).actionGet(30, TimeUnit.SECONDS); + } } diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java index 27de42447d3a0..4991a9025956f 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java @@ -45,6 +45,7 @@ import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import java.io.IOException; import java.time.LocalDateTime; @@ -82,7 +83,13 @@ public abstract class DownsamplingIntegTestCase extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class); + return List.of( + DataStreamsPlugin.class, + LocalStateCompositeXPackPlugin.class, + Downsample.class, + AggregateMetricMapperPlugin.class, + EsqlPlugin.class + ); } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 41a92214406bb..5c97879dd6a6d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -890,6 +890,31 @@ public enum Cap { */ AGGREGATE_METRIC_DOUBLE_PARTIAL_SUBMETRICS(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + /** + * Support for rendering aggregate_metric_double type + */ + AGGREGATE_METRIC_DOUBLE_RENDERING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + + /** + * Support for to_aggregate_metric_double function + */ + AGGREGATE_METRIC_DOUBLE_CONVERT_TO(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + + /** + * Support for sorting when aggregate_metric_doubles are present + */ + AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + + /** + * Support avg with aggregate metric doubles + */ + AGGREGATE_METRIC_DOUBLE_AVG(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + + /** + * Support for implicit casting of aggregate metric double when run in aggregations + */ + AGGREGATE_METRIC_DOUBLE_IMPLICIT_CASTING_IN_AGGS(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + /** * Support change point detection "CHANGE_POINT". */ @@ -913,11 +938,6 @@ public enum Cap { */ SUPPORT_PARTIAL_RESULTS, - /** - * Support for rendering aggregate_metric_double type - */ - AGGREGATE_METRIC_DOUBLE_RENDERING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), - /** * Support for RERANK command */ @@ -964,11 +984,6 @@ public enum Cap { */ NON_FULL_TEXT_FUNCTIONS_SCORING, - /** - * Support for to_aggregate_metric_double function - */ - AGGREGATE_METRIC_DOUBLE_CONVERT_TO(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), - /** * The {@code _query} API now reports the original types. */ @@ -995,11 +1010,6 @@ public enum Cap { */ MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT, - /** - * Support for sorting when aggregate_metric_doubles are present - */ - AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), - /** * Supercedes {@link Cap#MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT}. */ @@ -1260,11 +1270,6 @@ public enum Cap { */ LIKE_ON_INDEX_FIELDS, - /** - * Support avg with aggregate metric doubles - */ - AGGREGATE_METRIC_DOUBLE_AVG(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), - /** * Forbid usage of brackets in unquoted index and enrich policy names * https://github.com/elastic/elasticsearch/issues/130378 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 0058a8691f818..44e4fd5a1bb3c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.logging.HeaderWarning; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.lucene.BytesRefs; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.Block; import org.elasticsearch.core.Strings; import org.elasticsearch.index.IndexMode; @@ -53,6 +54,17 @@ import org.elasticsearch.xpack.esql.expression.function.FunctionDefinition; import org.elasticsearch.xpack.esql.expression.function.UnresolvedFunction; import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute; +import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Avg; +import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; +import org.elasticsearch.xpack.esql.expression.function.aggregate.CountOverTime; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Max; +import org.elasticsearch.xpack.esql.expression.function.aggregate.MaxOverTime; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Min; +import org.elasticsearch.xpack.esql.expression.function.aggregate.MinOverTime; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum; +import org.elasticsearch.xpack.esql.expression.function.aggregate.SumOverTime; import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction; import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction; import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Case; @@ -61,6 +73,8 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ConvertFunction; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FoldablesConvertFunction; +import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromAggregateMetricDouble; +import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToAggregateMetricDouble; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToDateNanos; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToDouble; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToInteger; @@ -135,6 +149,7 @@ import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.IMPLICIT_CASTING_DATE_AND_DATE_NANOS; +import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE; import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN; import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME; import static org.elasticsearch.xpack.esql.core.type.DataType.DATE_NANOS; @@ -182,7 +197,8 @@ public class Analyzer extends ParameterizedRuleExecutor("Finish Analysis", Limiter.ONCE, new AddImplicitLimit(), new AddImplicitForkLimit(), new UnionTypesCleanup()) ); @@ -1688,9 +1704,15 @@ private LogicalPlan doRule(LogicalPlan plan) { return plan; } - // And add generated fields to EsRelation, so these new attributes will appear in the OutputExec of the Fragment - // and thereby get used in FieldExtractExec - plan = plan.transformDown(EsRelation.class, esr -> { + return addGeneratedFieldsToEsRelations(plan, unionFieldAttributes); + } + + /** + * Add generated fields to EsRelation, so these new attributes will appear in the OutputExec of the Fragment + * and thereby get used in FieldExtractExec + */ + private static LogicalPlan addGeneratedFieldsToEsRelations(LogicalPlan plan, List unionFieldAttributes) { + return plan.transformDown(EsRelation.class, esr -> { List missing = new ArrayList<>(); for (FieldAttribute fa : unionFieldAttributes) { // Using outputSet().contains looks by NameId, resp. uses semanticEquals. @@ -1710,7 +1732,6 @@ private LogicalPlan doRule(LogicalPlan plan) { } return esr; }); - return plan; } private Expression resolveConvertFunction(ConvertFunction convert, List unionFieldAttributes) { @@ -1838,7 +1859,10 @@ private static Expression typeSpecificConvert(ConvertFunction convert, Source so originalFieldAttr.id(), true ); - Expression e = ((Expression) convert).replaceChildren(Collections.singletonList(resolvedAttr)); + Expression fn = (Expression) convert; + List children = new ArrayList<>(fn.children()); + children.set(0, resolvedAttr); + Expression e = ((Expression) convert).replaceChildren(children); /* * Resolve surrogates immediately because these type specific conversions are serialized * and SurrogateExpressions are expected to be resolved on the coordinating node. At least, @@ -1957,4 +1981,103 @@ private static void typeResolutions( var concreteConvert = ResolveUnionTypes.typeSpecificConvert(convert, fieldAttribute.source(), type, imf); typeResolutions.put(key, concreteConvert); } + + /** + * Take InvalidMappedFields in specific aggregations (min, max, sum, count, and avg) and if all original data types + * are aggregate metric double + any combination of numerics, implicitly cast them to the same type: aggregate metric + * double for count, and double for min, max, and sum. Avg gets replaced with its surrogate (Div(Sum, Count)) + */ + private static class ImplicitCastAggregateMetricDoubles extends Rule { + + @Override + public LogicalPlan apply(LogicalPlan plan) { + return plan.transformUp(Aggregate.class, p -> p.childrenResolved() == false ? p : doRule(p)); + } + + private LogicalPlan doRule(Aggregate plan) { + Map unionFields = new HashMap<>(); + Holder aborted = new Holder<>(Boolean.FALSE); + var newPlan = plan.transformExpressionsOnly(AggregateFunction.class, aggFunc -> { + if (aggFunc.field() instanceof FieldAttribute fa && fa.field() instanceof InvalidMappedField mtf) { + if (mtf.types().contains(AGGREGATE_METRIC_DOUBLE) == false + || mtf.types().stream().allMatch(f -> f == AGGREGATE_METRIC_DOUBLE || f.isNumeric()) == false) { + aborted.set(Boolean.TRUE); + return aggFunc; + } + Map typeConverters = typeConverters(aggFunc, fa, mtf); + if (typeConverters == null) { + aborted.set(Boolean.TRUE); + return aggFunc; + } + var newField = unionFields.computeIfAbsent( + Attribute.rawTemporaryName(fa.name(), aggFunc.functionName(), aggFunc.sourceText()), + newName -> new FieldAttribute( + fa.source(), + fa.parentName(), + newName, + MultiTypeEsField.resolveFrom(mtf, typeConverters), + fa.nullable(), + null, + true + ) + ); + List children = new ArrayList<>(aggFunc.children()); + children.set(0, newField); + return aggFunc.replaceChildren(children); + } + return aggFunc; + }); + if (unionFields.isEmpty() || aborted.get()) { + return plan; + } + return ResolveUnionTypes.addGeneratedFieldsToEsRelations(newPlan, unionFields.values().stream().toList()); + } + + private Map typeConverters(AggregateFunction aggFunc, FieldAttribute fa, InvalidMappedField mtf) { + var metric = getMetric(aggFunc); + if (metric == null) { + return null; + } + Map typeConverter = new HashMap<>(); + for (DataType type : mtf.types()) { + final ConvertFunction convert; + // Counting on aggregate metric double has unique behavior in that we cannot just provide the number of + // documents, instead we have to look inside the aggregate metric double's count field and sum those together. + // Grabbing the count value with FromAggregateMetricDouble the same way we do with min/max/sum would result in + // a single Int field, and incorrectly be treated as 1 document (instead of however many originally went into + // the aggregate metric double). + if (metric == AggregateMetricDoubleBlockBuilder.Metric.COUNT) { + convert = new ToAggregateMetricDouble(fa.source(), fa); + } else if (type == AGGREGATE_METRIC_DOUBLE) { + convert = FromAggregateMetricDouble.withMetric(aggFunc.source(), fa, metric); + } else if (type.isNumeric()) { + convert = new ToDouble(fa.source(), fa); + } else { + return null; + } + Expression expression = ResolveUnionTypes.typeSpecificConvert(convert, fa.source(), type, mtf); + typeConverter.put(type.typeName(), expression); + } + return typeConverter; + } + + private static AggregateMetricDoubleBlockBuilder.Metric getMetric(AggregateFunction aggFunc) { + if (aggFunc instanceof Max || aggFunc instanceof MaxOverTime) { + return AggregateMetricDoubleBlockBuilder.Metric.MAX; + } + if (aggFunc instanceof Min || aggFunc instanceof MinOverTime) { + return AggregateMetricDoubleBlockBuilder.Metric.MIN; + } + if (aggFunc instanceof Sum || aggFunc instanceof SumOverTime) { + return AggregateMetricDoubleBlockBuilder.Metric.SUM; + } + if (aggFunc instanceof Count || aggFunc instanceof CountOverTime) { + return AggregateMetricDoubleBlockBuilder.Metric.COUNT; + } + if (aggFunc instanceof Avg || aggFunc instanceof AvgOverTime) { + return AggregateMetricDoubleBlockBuilder.Metric.COUNT; + } + return null; + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java index e469f16f8d5a2..5e8c3a9bcf104 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java @@ -14,12 +14,14 @@ import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.SurrogateExpression; import org.elasticsearch.xpack.esql.expression.function.Example; import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesTo; import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesToLifecycle; import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.FunctionType; import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Div; import java.io.IOException; import java.util.List; @@ -29,7 +31,7 @@ /** * Similar to {@link Avg}, but it is used to calculate the average value over a time series of values from the given field. */ -public class AvgOverTime extends TimeSeriesAggregateFunction { +public class AvgOverTime extends TimeSeriesAggregateFunction implements SurrogateExpression { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( Expression.class, "AvgOverTime", @@ -93,6 +95,13 @@ public AvgOverTime withFilter(Expression filter) { return new AvgOverTime(source(), field(), filter); } + @Override + public Expression surrogate() { + Source s = source(); + Expression f = field(); + return new Div(s, new SumOverTime(s, f, filter()), new CountOverTime(s, f, filter()), dataType()); + } + @Override public AggregateFunction perTimeSeriesAggregation() { return new Avg(source(), field(), filter()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateMetricDouble.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateMetricDouble.java index 61129df973a55..f9f01307e025f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateMetricDouble.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateMetricDouble.java @@ -32,14 +32,16 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; +import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE; import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE; import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER; import static org.elasticsearch.xpack.esql.core.type.DataType.NULL; -public class FromAggregateMetricDouble extends EsqlScalarFunction { +public class FromAggregateMetricDouble extends EsqlScalarFunction implements ConvertFunction { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( Expression.class, "FromAggregateMetricDouble", @@ -169,4 +171,14 @@ public String toString() { } }; } + + @Override + public Expression field() { + return field; + } + + @Override + public Set supportedTypes() { + return Set.of(AGGREGATE_METRIC_DOUBLE); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 47deb3d78ca72..7abe84d99e5f2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -65,7 +65,7 @@ import org.elasticsearch.xpack.esql.core.type.KeywordEsField; import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField; import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField; -import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction; +import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; @@ -199,7 +199,7 @@ private BlockLoader getBlockLoaderFor(int shardId, Attribute attr, MappedFieldTy Expression conversion = unionTypes.getConversionExpressionForIndex(indexName); return conversion == null ? BlockLoader.CONSTANT_NULLS - : new TypeConvertingBlockLoader(blockLoader, (AbstractConvertFunction) conversion); + : new TypeConvertingBlockLoader(blockLoader, (EsqlScalarFunction) conversion); } return blockLoader; } @@ -506,9 +506,9 @@ private static class TypeConvertingBlockLoader implements BlockLoader { private final BlockLoader delegate; private final TypeConverter typeConverter; - protected TypeConvertingBlockLoader(BlockLoader delegate, AbstractConvertFunction convertFunction) { + protected TypeConvertingBlockLoader(BlockLoader delegate, EsqlScalarFunction convertFunction) { this.delegate = delegate; - this.typeConverter = TypeConverter.fromConvertFunction(convertFunction); + this.typeConverter = TypeConverter.fromScalarFunction(convertFunction); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/TypeConverter.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/TypeConverter.java index 4dea8a50b5c17..31b95bcc5f860 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/TypeConverter.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/TypeConverter.java @@ -17,7 +17,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper; -import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction; +import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction; class TypeConverter { private final String evaluatorName; @@ -28,7 +28,7 @@ private TypeConverter(String evaluatorName, ExpressionEvaluator convertEvaluator this.convertEvaluator = convertEvaluator; } - public static TypeConverter fromConvertFunction(AbstractConvertFunction convertFunction) { + public static TypeConverter fromScalarFunction(EsqlScalarFunction convertFunction) { DriverContext driverContext1 = new DriverContext( BigArrays.NON_RECYCLING_INSTANCE, new org.elasticsearch.compute.data.BlockFactory( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java index dbd888ed29fd2..ff15f3cc1e4ba 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java @@ -305,7 +305,7 @@ private Block getBlockForMultiType(DocBlock indexDoc, MultiTypeEsField multiType } return switch (extractBlockForSingleDoc(indexDoc, ((FieldAttribute) conversion.field()).fieldName().string(), blockCopier)) { case BlockResultMissing unused -> getNullsBlock(indexDoc); - case BlockResultSuccess success -> TypeConverter.fromConvertFunction(conversion).convert(success.block); + case BlockResultSuccess success -> TypeConverter.fromScalarFunction(conversion).convert(success.block); }; } diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml index ee1a381c6e589..23e6772d26d08 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml @@ -251,3 +251,381 @@ setup: - match: {values.0.1: 800479.0} - match: {values.0.2: 4812452.0} - match: {values.0.3: 6} + +--- +"Stats from downsampled and non-downsampled index simultaneously with implicit casting": + - requires: + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [aggregate_metric_double_implicit_casting_in_aggs] + reason: "Support for casting aggregate metric double implicitly when present in aggregations" + + - do: + indices.downsample: + index: test + target_index: test-downsample + body: > + { + "fixed_interval": "1h" + } + - is_true: acknowledged + + - do: + indices.create: + index: test-2 + body: + settings: + number_of_shards: 1 + index: + mode: time_series + routing_path: [ metricset, k8s.pod.uid ] + time_series: + start_time: 2021-04-29T00:00:00Z + end_time: 2021-04-30T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + metricset: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + uid: + type: keyword + time_series_dimension: true + name: + type: keyword + created_at: + type: date_nanos + running: + type: boolean + number_of_containers: + type: integer + ip: + type: ip + tags: + type: keyword + values: + type: integer + network: + properties: + tx: + type: long + time_series_metric: gauge + rx: + type: long + time_series_metric: gauge + + - do: + bulk: + refresh: true + index: test-2 + body: + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001810, "rx": 802339}, "created_at": "2021-04-28T19:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.26", "network": {"tx": 2000177, "rx": 800479}, "created_at": "2021-04-28T19:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}' + - '{"index": {}}' + + - do: + esql.query: + body: + query: "FROM test-* | + WHERE k8s.pod.uid == \"947e4ced-1786-4e53-9e0c-5c447e959507\" | + STATS max(k8s.pod.network.rx), min(k8s.pod.network.rx), sum(k8s.pod.network.rx), count(k8s.pod.network.rx), avg(k8s.pod.network.rx) | + LIMIT 100" + + - length: {values: 1} + - length: {values.0: 5} + - match: {columns.0.name: "max(k8s.pod.network.rx)"} + - match: {columns.0.type: "double"} + - match: {columns.1.name: "min(k8s.pod.network.rx)"} + - match: {columns.1.type: "double"} + - match: {columns.2.name: "sum(k8s.pod.network.rx)"} + - match: {columns.2.type: "double"} + - match: {columns.3.name: "count(k8s.pod.network.rx)"} + - match: {columns.3.type: "long"} + - match: {columns.4.name: "avg(k8s.pod.network.rx)"} + - match: {columns.4.type: "double"} + - match: {values.0.0: 803685.0} + - match: {values.0.1: 800479.0} + - match: {values.0.2: 4812452.0} + - match: {values.0.3: 6} + - match: {values.0.4: 802075.3333333334} + + - do: + esql.query: + body: + query: "TS test-* | STATS max = max(k8s.pod.network.rx) | LIMIT 100" + - length: {values: 1} + - length: {values.0: 1} + - match: {columns.0.name: "max"} + - match: {columns.0.type: "double"} + - match: {values.0.0: 803685.0} + +--- +"Over time functions from downsampled and non-downsampled indices simultaneously, no grouping": + - requires: + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [aggregate_metric_double_implicit_casting_in_aggs] + reason: "Support for casting aggregate metric double implicitly when present in aggregations" + + - do: + indices.downsample: + index: test + target_index: test-downsample + body: > + { + "fixed_interval": "1h" + } + - is_true: acknowledged + + - do: + indices.create: + index: test-2 + body: + settings: + number_of_shards: 1 + index: + mode: time_series + routing_path: [ metricset, k8s.pod.uid ] + time_series: + start_time: 2021-04-29T00:00:00Z + end_time: 2021-04-30T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + metricset: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + uid: + type: keyword + time_series_dimension: true + name: + type: keyword + created_at: + type: date_nanos + running: + type: boolean + number_of_containers: + type: integer + ip: + type: ip + tags: + type: keyword + values: + type: integer + network: + properties: + tx: + type: long + time_series_metric: gauge + rx: + type: long + time_series_metric: gauge + + - do: + bulk: + refresh: true + index: test-2 + body: + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.10", "network": {"tx": 2005820, "rx": 802339}, "created_at": "2021-04-29T21:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.28", "network": {"tx": 2000481, "rx": 800479}, "created_at": "2021-04-29T21:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:14.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.192", "network": {"tx": 1458377, "rx": 530184}, "created_at": "2021-04-29T21:36:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "test"], "values": [3, 3, 1]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.206", "network": {"tx": 1434104, "rx": 535020}, "created_at": "2021-04-29T21:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west2"], "values": [4, 1, 3]}}}' + - '{"index": {}}' + + - do: + esql.query: + body: + query: "TS test-* | + STATS avg = sum(avg_over_time(k8s.pod.network.rx)), + count = sum(count_over_time(k8s.pod.network.rx)), + sum = sum(sum_over_time(k8s.pod.network.rx)) + BY time_bucket = bucket(@timestamp, 1 hour) | + SORT time_bucket | LIMIT 10" + + - length: {values: 4} + - length: {values.0: 4} + - match: {columns.0.name: "avg"} + - match: {columns.0.type: "double"} + - match: {columns.1.name: "count"} + - match: {columns.1.type: "long"} + - match: {columns.2.name: "sum"} + - match: {columns.2.type: "double"} + - match: {columns.3.name: "time_bucket"} + - match: {columns.3.type: "date"} + - match: {values.0.0: 1332393.5} + - match: {values.0.1: 4} + - match: {values.0.2: 2664787.0} + - match: {values.0.3: "2021-04-28T18:00:00.000Z"} + - match: {values.1.0: 530604.5} + - match: {values.1.1: 2} + - match: {values.1.2: 1061209.0} + - match: {values.1.3: "2021-04-28T19:00:00.000Z"} + - match: {values.2.0: 803011.0} + - match: {values.2.1: 2} + - match: {values.2.2: 1606022.0} + - match: {values.2.3: "2021-04-28T20:00:00.000Z"} + - match: {values.3.0: 1334011.0} + - match: {values.3.1: 4} + - match: {values.3.2: 2668022.0} + - match: {values.3.3: "2021-04-29T21:00:00.000Z"} + +--- +"Over time functions from downsampled and non-downsampled indices simultaneously, with grouping": + - requires: + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [aggregate_metric_double_implicit_casting_in_aggs] + reason: "Support for casting aggregate metric double implicitly when present in aggregations" + + - do: + indices.downsample: + index: test + target_index: test-downsample + body: > + { + "fixed_interval": "1h" + } + - is_true: acknowledged + + - do: + indices.create: + index: test-2 + body: + settings: + number_of_shards: 1 + index: + mode: time_series + routing_path: [ metricset, k8s.pod.uid ] + time_series: + start_time: 2021-04-29T00:00:00Z + end_time: 2021-04-30T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + metricset: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + uid: + type: keyword + time_series_dimension: true + name: + type: keyword + created_at: + type: date_nanos + running: + type: boolean + number_of_containers: + type: integer + ip: + type: ip + tags: + type: keyword + values: + type: integer + network: + properties: + tx: + type: long + time_series_metric: gauge + rx: + type: long + time_series_metric: gauge + + - do: + bulk: + refresh: true + index: test-2 + body: + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.10", "network": {"tx": 2005820, "rx": 802339}, "created_at": "2021-04-29T21:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.28", "network": {"tx": 2000481, "rx": 800479}, "created_at": "2021-04-29T21:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:14.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.192", "network": {"tx": 1458377, "rx": 530184}, "created_at": "2021-04-29T21:36:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "test"], "values": [3, 3, 1]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.206", "network": {"tx": 1434104, "rx": 535020}, "created_at": "2021-04-29T21:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west2"], "values": [4, 1, 3]}}}' + - '{"index": {}}' + + - do: + esql.query: + body: + query: "TS test-* | + STATS avg = sum(avg_over_time(k8s.pod.network.rx)), + count = sum(count_over_time(k8s.pod.network.rx)), + sum = sum(sum_over_time(k8s.pod.network.rx)) + BY k8s.pod.name, time_bucket = bucket(@timestamp, 1 hour) | + SORT time_bucket, k8s.pod.name | + LIMIT 10" + + - length: {values: 6} + - length: {values.0: 5} + - match: {columns.0.name: "avg"} + - match: {columns.0.type: "double"} + - match: {columns.1.name: "count"} + - match: {columns.1.type: "long"} + - match: {columns.2.name: "sum"} + - match: {columns.2.type: "double"} + - match: {columns.3.name: "k8s.pod.name"} + - match: {columns.3.type: "keyword"} + - match: {columns.4.name: "time_bucket"} + - match: {columns.4.type: "date"} + - match: {values.0.0: 801806.0} + - match: {values.0.1: 2} + - match: {values.0.2: 1603612.0} + - match: {values.0.3: "cat"} + - match: {values.0.4: "2021-04-28T18:00:00.000Z"} + - match: {values.1.0: 530587.5} + - match: {values.1.1: 2} + - match: {values.1.2: 1061175.0} + - match: {values.1.3: "dog"} + - match: {values.1.4: "2021-04-28T18:00:00.000Z"} + - match: {values.2.0: 530604.5} + - match: {values.2.1: 2} + - match: {values.2.2: 1061209.0} + - match: {values.2.3: "dog"} + - match: {values.2.4: "2021-04-28T19:00:00.000Z"} + - match: {values.3.0: 803011.0} + - match: {values.3.1: 2} + - match: {values.3.2: 1606022.0} + - match: {values.3.3: "cat"} + - match: {values.3.4: "2021-04-28T20:00:00.000Z"} + - match: {values.4.0: 801409.0} + - match: {values.4.1: 2} + - match: {values.4.2: 1602818.0} + - match: {values.4.3: "cat"} + - match: {values.4.4: "2021-04-29T21:00:00.000Z"} + - match: {values.5.0: 532602.0} + - match: {values.5.1: 2} + - match: {values.5.2: 1065204.0} + - match: {values.5.3: "dog"} + - match: {values.5.4: "2021-04-29T21:00:00.000Z"}