Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,17 @@ public enum Cap {
/**
* Support for tbucket function
*/
TBUCKET;
TBUCKET,

/**
* Support for the Absent function
*/
FN_ABSENT,

/**
* Support for the Present function
*/
FN_PRESENT;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.util.Check;
import org.elasticsearch.xpack.esql.expression.SurrogateExpression;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Absent;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Avg;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
Expand Down Expand Up @@ -334,7 +335,8 @@ private static FunctionDefinition[][] functions() {
def(Sum.class, uni(Sum::new), "sum"),
def(Top.class, tri(Top::new), "top"),
def(Values.class, uni(Values::new), "values"),
def(WeightedAvg.class, bi(WeightedAvg::new), "weighted_avg") },
def(WeightedAvg.class, bi(WeightedAvg::new), "weighted_avg"),
def(Absent.class, uni(Absent::new), "absent") },
// math
new FunctionDefinition[] {
def(Abs.class, Abs::new, "abs"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.aggregate;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.LastDoubleByTimestampAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.LastFloatByTimestampAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.LastIntByTimestampAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.LastLongByTimestampAggregatorFunctionSupplier;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.Nullability;
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.Example;
import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesTo;
import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesToLifecycle;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
import org.elasticsearch.xpack.esql.expression.function.OptionalArgument;
import org.elasticsearch.xpack.esql.expression.function.Param;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.planner.ToAggregator;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;

/**
* ABSENT(expr) -> boolean
* Returns true if the input expression yields no (non-null) values within the current aggregation context; otherwise false.
*/
public class Absent extends TimeSeriesAggregateFunction implements OptionalArgument, ToAggregator {

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Absent", Absent::new);

private final Expression timestamp;

@FunctionInfo(
type = FunctionType.TIME_SERIES_AGGREGATE,
returnType = "boolean",
description = "Returns true if the input expression has no non-null values in the current aggregation context; otherwise false.",
appliesTo = { @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.UNAVAILABLE) },
note = "Available with the [TS](/reference/query-languages/esql/commands/source-commands.md#esql-ts) command in snapshot builds",
examples = { @Example(description = "Compute absence of a field across all rows:", file = "docs", tag = "absent_bool_global") }
)
public Absent(
Source source,
@Param(
name = "field",
type = {
"aggregate_metric_double",
"boolean",
"cartesian_point",
"cartesian_shape",
"date",
"date_nanos",
"double",
"geo_point",
"geo_shape",
"integer",
"ip",
"keyword",
"long",
"text",
"unsigned_long",
"version" },
description = "Expression to check for absence (non-null values indicate absense)."
) Expression field
) {
this(source, field, new UnresolvedAttribute(source, "@timestamp"));
}

public Absent(Source source, Expression field, Expression timestamp) {
this(source, field, Literal.TRUE, timestamp);
}

// compatibility constructor used when reading from the stream
private Absent(Source source, Expression field, Expression filter, List<Expression> children) {
this(source, field, filter, children.getFirst());
}

public Absent(Source source, Expression field, Expression filter, Expression timestamp) {
super(source, field, filter, List.of(timestamp));
this.timestamp = timestamp;
}

public Absent(StreamInput in) throws IOException {
this(
Source.readFrom((PlanStreamInput) in),
in.readNamedWriteable(Expression.class),
in.readNamedWriteable(Expression.class),
in.readNamedWriteableCollectionAsList(Expression.class)
);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
protected NodeInfo<Absent> info() {
return NodeInfo.create(this, Absent::new, field(), filter(), timestamp);
}

@Override
public AggregateFunction withFilter(Expression filter) {
return new Absent(source(), field(), filter, timestamp);
}

@Override
public Absent replaceChildren(List<Expression> newChildren) {
if (newChildren.size() != 3) {
assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren;
throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren);
}
return new Absent(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2));
}

@Override
public DataType dataType() {
return DataType.BOOLEAN;
}

@Override
public Nullability nullable() {
return Nullability.FALSE;
}

@Override
protected TypeResolution resolveType() {
return isType(field(), dt -> true, sourceText(), DEFAULT, "any type").and(
isType(timestamp, dt -> dt == DataType.DATETIME || dt == DataType.DATE_NANOS, sourceText(), SECOND, "date_nanos or datetime")
);
}

@Override
public AggregatorFunctionSupplier supplier() {
final DataType type = field().dataType();
return switch (type) {
case LONG -> new LastLongByTimestampAggregatorFunctionSupplier();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why we use these evaluators?

case INTEGER -> new LastIntByTimestampAggregatorFunctionSupplier();
case DOUBLE -> new LastDoubleByTimestampAggregatorFunctionSupplier();
case FLOAT -> new LastFloatByTimestampAggregatorFunctionSupplier();
default -> throw EsqlIllegalArgumentException.illegalDataType(type);
};
}

@Override
public Absent perTimeSeriesAggregation() {
return this;
}

@Override
public String toString() {
return "absent(" + field() + ")";
}

Expression timestamp() {
return timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
// internal functions
ToPartial.ENTRY,
FromPartial.ENTRY,
WeightedAvg.ENTRY
WeightedAvg.ENTRY,
Absent.ENTRY
);
}
}