Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9188000,9185001
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_9.2.0,9185000
dimension_values,9185001
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
resolved_index_expressions,9187000
dimension_values,9188000
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
Present.ENTRY,
PresentOverTime.ENTRY,
Absent.ENTRY,
AbsentOverTime.ENTRY
AbsentOverTime.ENTRY,
DimensionValues.ENTRY
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.aggregate;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.io.stream.VersionedExpression;
import org.elasticsearch.xpack.esql.planner.ToAggregator;

import java.io.IOException;
import java.util.List;

import static java.util.Collections.emptyList;

/**
* A specialization of {@link Values} for collecting dimension fields in time-series queries.
*/
public class DimensionValues extends AggregateFunction implements ToAggregator, VersionedExpression {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"DimensionValues",
DimensionValues::new
);

static final TransportVersion DIMENSION_VALUES_VERSION = TransportVersion.fromName("dimension_values");

public DimensionValues(Source source, Expression field, Expression filter) {
super(source, field, filter, emptyList());
}

private DimensionValues(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public NamedWriteable getVersionedNamedWriteable(TransportVersion transportVersion) {
if (transportVersion.onOrAfter(DIMENSION_VALUES_VERSION)) {
return this;
} else {
// fallback to VALUES
return new Values(source(), field(), filter());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we fallback dimension_values to values.

}
}

@Override
protected NodeInfo<DimensionValues> info() {
return NodeInfo.create(this, DimensionValues::new, field(), filter());
}

@Override
public DimensionValues replaceChildren(List<Expression> newChildren) {
return new DimensionValues(source(), newChildren.get(0), newChildren.get(1));
}

@Override
public DimensionValues withFilter(Expression filter) {
return new DimensionValues(source(), field(), filter);
}

@Override
public DataType dataType() {
return field().dataType().noText();
}

@Override
protected TypeResolution resolveType() {
return new Values(source(), field(), filter()).resolveType();
}

@Override
public AggregatorFunctionSupplier supplier() {
// TODO: link new implementation
return new Values(source(), field(), filter()).supplier();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
Expand Down Expand Up @@ -296,4 +297,12 @@ static BytesReference fromConfigKey(String table, String column) throws IOExcept
return key.bytes();
}
}

@Override
public void writeNamedWriteable(NamedWriteable namedWriteable) throws IOException {
if (namedWriteable instanceof VersionedExpression v) {
namedWriteable = v.getVersionedNamedWriteable(getTransportVersion());
}
super.writeNamedWriteable(namedWriteable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.io.stream;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.NamedWriteable;

/**
* An interface indicating to {@link PlanStreamOutput} that this expression can have multiple versions and may need to fallback.
* TODO: Should we move this to VersionedNamedWriteable?
*/
public interface VersionedExpression {
/**
* Returns the writeable expression for the specified transport version.
* This allows introducing a new expression when nodes are ready to handle it; otherwise, it falls back to the previous expression.
*/
NamedWriteable getVersionedNamedWriteable(TransportVersion transportVersion);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.DimensionValues;
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
Expand Down Expand Up @@ -59,16 +60,16 @@
* becomes
*
* TS k8s
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid
* | STATS max(rate_$1) BY host=`VALUES(host)`
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid
* | STATS max(rate_$1) BY host=`DIMENSION_VALUES(host)`
*
* TS k8s | STATS avg(rate(request)) BY host
*
* becomes
*
* TS k8s
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid
* | STATS sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid
* | STATS sum(rate_$1), count(rate_$1) BY host=`DIMENSION_VALUES(host)`
* | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
* | KEEP `avg(rate(request))`, host
*
Expand All @@ -78,8 +79,8 @@
*
* TS k8s
* | EVAL `bucket(@timestamp, 1minute)`=datetrunc(@timestamp, 1minute)
* | STATS rate_$1=rate(request), VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
* | STATS sum=sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`, `bucket(@timestamp, 1minute)`
* | STATS rate_$1=rate(request), DIMENSION_VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
* | STATS sum=sum(rate_$1), count(rate_$1) BY host=`DIMENSION_VALUES(host)`, `bucket(@timestamp, 1minute)`
* | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
* | KEEP `avg(rate(request))`, host, `bucket(@timestamp, 1minute)`
* </pre>
Expand All @@ -99,8 +100,8 @@
* becomes
*
* TS k8s
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used), VALUES(host) BY _tsid
* | STATS max(rate_$1), $sum=sum($p1), $count=count($p1) BY host=`VALUES(host)`
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used), DIMENSION_VALUES(host) BY _tsid
* | STATS max(rate_$1), $sum=sum($p1), $count=count($p1) BY host=`DIMENSION_VALUES(host)`
* | EVAL `avg(memory_used)` = $sum / $count
* | KEEP `max(rate(request))`, `avg(memory_used)`, host
*
Expand All @@ -110,8 +111,8 @@
*
* TS k8s
* | EVAL `bucket(@timestamp, 5m)` = datetrunc(@timestamp, '5m')
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used)), VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
* | STATS sum(rate_$1), `min(memory_used)` = min($p1) BY pod=`VALUES(pod)`, `bucket(@timestamp, 5m)`
* | STATS rate_$1=rate(request), $p1=last_over_time(memory_used)), DIMENSION_VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
* | STATS sum(rate_$1), `min(memory_used)` = min($p1) BY pod=`DIMENSION_VALUES(pod)`, `bucket(@timestamp, 5m)`
* | KEEP `min(memory_used)`, `sum(rate_$1)`, pod, `bucket(@timestamp, 5m)`
*
* {agg}_over_time time-series aggregation will be rewritten in the similar way
Expand All @@ -121,7 +122,7 @@
* becomes
*
* FROM k8s
* | STATS max_over_time_$1 = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
* | STATS max_over_time_$1 = max(memory_usage), host_values=DIMENSION_VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
* | STATS sum(max_over_time_$1) BY host_values, time_bucket
*
*
Expand All @@ -130,7 +131,7 @@
* becomes
*
* FROM k8s
* | STATS avg_over_time_$1 = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
* | STATS avg_over_time_$1 = avg(memory_usage), host_values=DIMENSION_VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
* | STATS sum(avg_over_time_$1) BY host_values, time_bucket
*
* TS k8s | STATS max(rate(post_requests) + rate(get_requests)) BY host, bucket(@timestamp, 1minute)
Expand Down Expand Up @@ -272,7 +273,11 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
newFinalGroup = timeBucket.toAttribute();
firstPassGroupings.add(newFinalGroup);
} else {
newFinalGroup = new Alias(g.source(), g.name(), new Values(g.source(), g), g.id());
if (g.isDimension()) {
newFinalGroup = new Alias(g.source(), g.name(), new DimensionValues(g.source(), g, Literal.TRUE), g.id());
} else {
newFinalGroup = new Alias(g.source(), g.name(), new Values(g.source(), g, Literal.TRUE), g.id());
}
firstPassAggs.add(newFinalGroup);
}
secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.aggregate;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TransportVersionUtils;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests;
import org.elasticsearch.xpack.esql.expression.function.ReferenceAttributeTests;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;

public class DimensionValuesSerializationTests extends AbstractExpressionSerializationTests<DimensionValues> {
@Override
protected DimensionValues createTestInstance() {
return new DimensionValues(randomSource(), randomChild(), randomChild());
}

@Override
protected DimensionValues mutateInstance(DimensionValues instance) throws IOException {
return new DimensionValues(
instance.source(),
randomValueOtherThan(instance.field(), AbstractExpressionSerializationTests::randomChild),
randomValueOtherThan(instance.filter(), AbstractExpressionSerializationTests::randomChild)
);
}

public void testBWC() throws Exception {
DimensionValues dimensionValues = new DimensionValues(
randomSource(),
randomAttributeWithoutQualifier(),
randomAttributeWithoutQualifier()
);
TransportVersion version = TransportVersionUtils.randomVersionBetween(
random(),
TransportVersion.minimumCompatible(),
TransportVersionUtils.getPreviousVersion(DimensionValues.DIMENSION_VALUES_VERSION)
);
NamedWriteableRegistry namedWriteableRegistry = getNamedWriteableRegistry();
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setTransportVersion(version);
PlanStreamOutput planOut = new PlanStreamOutput(output, configuration());
planOut.writeNamedWriteable(dimensionValues);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
PlanStreamInput planIn = new PlanStreamInput(in, namedWriteableRegistry, configuration());
planIn.setTransportVersion(version);
Expression oldValues = (Expression) planIn.readNamedWriteable(categoryClass());
Values expected = new Values(dimensionValues.source(), dimensionValues.field(), dimensionValues.filter());
assertThat(oldValues, equalTo(expected));
}
}
}

// can't serialize qualified attribute with older versions
private Expression randomAttributeWithoutQualifier() {
ReferenceAttribute e = ReferenceAttributeTests.randomReferenceAttribute(ESTestCase.randomBoolean());
return new ReferenceAttribute(e.source(), null, e.name(), e.dataType(), e.nullable(), e.id(), e.synthetic());
}
}
Loading