Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ public String getLabel() {
}
}

/**
* Literal to represent AggregateMetricDouble and primarily used for testing and during folding.
* For all other purposes it is preferred to use the individual builders over the literal for generating blocks when possible.
*/
public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) implements GenericNamedWriteable {
public AggregateMetricDoubleLiteral {
min = (min == null || min.isNaN()) ? null : min;
Expand Down Expand Up @@ -247,4 +251,28 @@ public TransportVersion getMinimalSupportedVersion() {
throw new UnsupportedOperationException("must not be called when overriding supportsVersion");
}
}

public AggregateMetricDoubleBlockBuilder appendLiteral(AggregateMetricDoubleLiteral literal) {
if (literal.min != null) {
minBuilder.appendDouble(literal.min);
} else {
minBuilder.appendNull();
}
if (literal.max != null) {
maxBuilder.appendDouble(literal.max);
} else {
maxBuilder.appendNull();
}
if (literal.sum != null) {
sumBuilder.appendDouble(literal.sum);
} else {
sumBuilder.appendNull();
}
if (literal.count != null) {
countBuilder.appendInt(literal.count);
} else {
countBuilder.appendNull();
}
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public static void appendValue(Block.Builder builder, Object val, ElementType ty
case FLOAT -> ((FloatBlock.Builder) builder).appendFloat((Float) val);
case DOUBLE -> ((DoubleBlock.Builder) builder).appendDouble((Double) val);
case BOOLEAN -> ((BooleanBlock.Builder) builder).appendBoolean((Boolean) val);
case AGGREGATE_METRIC_DOUBLE -> ((AggregateMetricDoubleBlockBuilder) builder).appendLiteral((AggregateMetricDoubleLiteral) val);
default -> throw new UnsupportedOperationException("unsupported element type [" + type + "]");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void append(String stringValue) {
return;
}
stringValue = mvStrings[0].replace(ESCAPED_COMMA_SEQUENCE, ",");
} else if (stringValue.contains(",")) {// multi-value field
} else if (stringValue.contains(",") && type != Type.AGGREGATE_METRIC_DOUBLE) {// multi-value field
builderWrapper().builder().beginPositionEntry();

String[] arrayOfValues = delimitedListToStringArray(stringValue, ",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3913,3 +3913,26 @@ from employees
10003 |0 |null
;

inlineStatsOnAggregateMetricDouble
required_capability: inline_stats
required_capability: aggregate_metric_double_literal_registered
required_capability: aggregate_metric_double_convert_to
FROM k8s-downsampled
| EVAL a = TO_AGGREGATE_METRIC_DOUBLE(1) // Temporary workaround to enable aggregate_metric_double
| INLINE STATS tx_max = MAX(network.eth0.tx) BY pod
| SORT @timestamp, cluster, pod
| KEEP @timestamp, cluster, pod, network.eth0.tx, tx_max
| LIMIT 9
;

@timestamp:datetime | cluster:keyword | pod:keyword | network.eth0.tx:aggregate_metric_double | tx_max:double
2024-05-09T23:30:00.000Z | prod | one | {"min":565.0,"max":829.0,"sum":7290.0,"value_count":10} | 1060.0
2024-05-09T23:30:00.000Z | prod | three | {"min":201.0,"max":582.0,"sum":1794.0,"value_count":6} | 824.0
2024-05-09T23:30:00.000Z | prod | two | {"min":20.0,"max":190.0,"sum":370.0,"value_count":10} | 1419.0
2024-05-09T23:30:00.000Z | qa | one | {"min":346.0,"max":356.0,"sum":1765.0,"value_count":5} | 1060.0
2024-05-09T23:30:00.000Z | qa | three | {"min":605.0,"max":605.0,"sum":605.0,"value_count":1} | 824.0
2024-05-09T23:30:00.000Z | qa | two | {"min":304.0,"max":1148.0,"sum":8590.0,"value_count":10} | 1419.0
2024-05-09T23:30:00.000Z | staging | one | {"min":263.0,"max":740.0,"sum":5390.0,"value_count":10} | 1060.0
2024-05-09T23:30:00.000Z | staging | three | {"min":341.0,"max":592.0,"sum":1956.0,"value_count":5} | 824.0
2024-05-09T23:30:00.000Z | staging | two | {"min":442.0,"max":1011.0,"sum":3850.0,"value_count":7} | 1419.0
;
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,11 @@ public enum Cap {
*/
AGGREGATE_METRIC_DOUBLE_MV_EXPAND(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Registering AggregateMetricDoubleLiteral as a NamedWritable.
*/
AGGREGATE_METRIC_DOUBLE_LITERAL_REGISTERED(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Support change point detection "CHANGE_POINT".
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.xpack.esql.expression;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
import org.elasticsearch.xpack.esql.core.expression.ExpressionCoreWritables;
import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
Expand Down Expand Up @@ -124,6 +126,12 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return entries;
}

public static List<SearchPlugin.GenericNamedWriteableSpec> getGenericNamedWriteables() {
List<SearchPlugin.GenericNamedWriteableSpec> entries = new ArrayList<>();
entries.addAll(literals());
return entries;
}

public static List<NamedWriteableRegistry.Entry> attributes() {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.addAll(ExpressionCoreWritables.attributes());
Expand Down Expand Up @@ -267,4 +275,13 @@ private static List<NamedWriteableRegistry.Entry> fullText() {
private static List<NamedWriteableRegistry.Entry> vector() {
return VectorWritables.getNamedWritables();
}

public static List<SearchPlugin.GenericNamedWriteableSpec> literals() {
return List.of(
new SearchPlugin.GenericNamedWriteableSpec(
"AggregateMetricDoubleLiteral",
AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral::new
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.threadpool.ExecutorBuilder;
Expand Down Expand Up @@ -90,7 +91,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

public class EsqlPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin {
public class EsqlPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SearchPlugin {

public static final String ESQL_WORKER_THREAD_POOL_NAME = "esql_worker";

Expand Down Expand Up @@ -374,4 +375,9 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
public void loadExtensions(ExtensionLoader loader) {
extraCheckerProviders.addAll(loader.loadExtensions(PlanCheckerProvider.class));
}

@Override
public List<SearchPlugin.GenericNamedWriteableSpec> getGenericNamedWriteables() {
return ExpressionWritables.getGenericNamedWriteables();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -813,22 +813,24 @@ public static AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral str
Double max = null;
Double sum = null;
Integer count = null;

s = s.replace("\\,", ",");
String[] values = s.substring(1, s.length() - 1).split(",");
for (String v : values) {
var pair = v.split(":");
String type = pair[0];
String number = pair[1];
switch (type) {
case "min":
case "min", "\"min\"":
min = Double.parseDouble(number);
break;
case "max":
case "max", "\"max\"":
max = Double.parseDouble(number);
break;
case "sum":
case "sum", "\"sum\"":
sum = Double.parseDouble(number);
break;
case "value_count":
case "value_count", "\"value_count\"":
count = Integer.parseInt(number);
break;
default:
Expand Down