Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 254 additions & 8 deletions src/iceberg/expression/aggregate.cc

Large diffs are not rendered by default.

39 changes: 32 additions & 7 deletions src/iceberg/expression/aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound

virtual Status Update(const StructLike& data) = 0;

virtual Status Update(const DataFile& file) {
return NotImplemented("Update(DataFile) not implemented");
}
virtual Status Update(const DataFile& file) = 0;

/// \brief Whether the aggregator is still valid.
virtual bool IsValid() const = 0;

/// \brief Get the result of the aggregation.
/// \return The result of the aggregation.
/// \note It is an undefined behavior to call this method if any previous Update call
/// has returned an error.
/// has returned an error or if IsValid() returns false.
virtual Literal GetResult() const = 0;
};

Expand All @@ -128,6 +129,11 @@ class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound

Result<Literal> Evaluate(const StructLike& data) const override = 0;

virtual Result<Literal> Evaluate(const DataFile& file) const = 0;

/// \brief Whether metrics in the data file are sufficient to evaluate.
virtual bool HasValue(const DataFile& file) const = 0;

bool is_bound_aggregate() const override { return true; }

/// \brief Create a new aggregator for this aggregate.
Expand All @@ -142,12 +148,15 @@ class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound
/// \brief Base class for COUNT aggregates.
class ICEBERG_EXPORT CountAggregate : public BoundAggregate {
public:
Result<Literal> Evaluate(const StructLike& data) const final;
Result<Literal> Evaluate(const StructLike& data) const override;
Result<Literal> Evaluate(const DataFile& file) const override;

std::unique_ptr<Aggregator> NewAggregator() const override;

/// \brief Count for a single row. Subclasses implement this.
virtual Result<int64_t> CountFor(const StructLike& data) const = 0;
/// \brief Count using metrics from a data file.
virtual Result<int64_t> CountFor(const DataFile& file) const = 0;

protected:
CountAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term)
Expand All @@ -161,6 +170,8 @@ class ICEBERG_EXPORT CountNonNullAggregate : public CountAggregate {
std::shared_ptr<BoundTerm> term);

Result<int64_t> CountFor(const StructLike& data) const override;
Result<int64_t> CountFor(const DataFile& file) const override;
bool HasValue(const DataFile& file) const override;

private:
explicit CountNonNullAggregate(std::shared_ptr<BoundTerm> term);
Expand All @@ -173,6 +184,8 @@ class ICEBERG_EXPORT CountNullAggregate : public CountAggregate {
std::shared_ptr<BoundTerm> term);

Result<int64_t> CountFor(const StructLike& data) const override;
Result<int64_t> CountFor(const DataFile& file) const override;
bool HasValue(const DataFile& file) const override;

private:
explicit CountNullAggregate(std::shared_ptr<BoundTerm> term);
Expand All @@ -184,6 +197,8 @@ class ICEBERG_EXPORT CountStarAggregate : public CountAggregate {
static Result<std::unique_ptr<CountStarAggregate>> Make();

Result<int64_t> CountFor(const StructLike& data) const override;
Result<int64_t> CountFor(const DataFile& file) const override;
bool HasValue(const DataFile& file) const override;

private:
CountStarAggregate();
Expand All @@ -192,9 +207,11 @@ class ICEBERG_EXPORT CountStarAggregate : public CountAggregate {
/// \brief Bound MAX aggregate.
class ICEBERG_EXPORT MaxAggregate : public BoundAggregate {
public:
static std::shared_ptr<MaxAggregate> Make(std::shared_ptr<BoundTerm> term);
static Result<std::unique_ptr<MaxAggregate>> Make(std::shared_ptr<BoundTerm> term);

Result<Literal> Evaluate(const StructLike& data) const override;
Result<Literal> Evaluate(const DataFile& file) const override;
bool HasValue(const DataFile& file) const override;

std::unique_ptr<Aggregator> NewAggregator() const override;

Expand All @@ -205,9 +222,11 @@ class ICEBERG_EXPORT MaxAggregate : public BoundAggregate {
/// \brief Bound MIN aggregate.
class ICEBERG_EXPORT MinAggregate : public BoundAggregate {
public:
static std::shared_ptr<MinAggregate> Make(std::shared_ptr<BoundTerm> term);
static Result<std::unique_ptr<MinAggregate>> Make(std::shared_ptr<BoundTerm> term);

Result<Literal> Evaluate(const StructLike& data) const override;
Result<Literal> Evaluate(const DataFile& file) const override;
bool HasValue(const DataFile& file) const override;

std::unique_ptr<Aggregator> NewAggregator() const override;

Expand All @@ -234,11 +253,17 @@ class ICEBERG_EXPORT AggregateEvaluator {
/// \brief Update aggregates with a row.
virtual Status Update(const StructLike& data) = 0;

/// \brief Update aggregates using data file metrics.
virtual Status Update(const DataFile& file) = 0;

/// \brief Final aggregated value.
virtual Result<std::span<const Literal>> GetResults() const = 0;

/// \brief Convenience accessor when only one aggregate is evaluated.
virtual Result<Literal> GetResult() const = 0;

/// \brief Whether all aggregators are still valid (metrics present).
virtual bool AllAggregatorsValid() const = 0;
};

} // namespace iceberg
14 changes: 14 additions & 0 deletions src/iceberg/expression/expressions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ std::shared_ptr<UnboundAggregateImpl<BoundReference>> Expressions::Max(
return agg;
}

std::shared_ptr<UnboundAggregateImpl<BoundTransform>> Expressions::Max(
std::shared_ptr<UnboundTerm<BoundTransform>> expr) {
ICEBERG_ASSIGN_OR_THROW(auto agg, UnboundAggregateImpl<BoundTransform>::Make(
Expression::Operation::kMax, std::move(expr)));
return agg;
}

std::shared_ptr<UnboundAggregateImpl<BoundReference>> Expressions::Min(std::string name) {
return Min(Ref(std::move(name)));
}
Expand All @@ -149,6 +156,13 @@ std::shared_ptr<UnboundAggregateImpl<BoundReference>> Expressions::Min(
return agg;
}

std::shared_ptr<UnboundAggregateImpl<BoundTransform>> Expressions::Min(
std::shared_ptr<UnboundTerm<BoundTransform>> expr) {
ICEBERG_ASSIGN_OR_THROW(auto agg, UnboundAggregateImpl<BoundTransform>::Make(
Expression::Operation::kMin, std::move(expr)));
return agg;
}

// Template implementations for unary predicates

std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::IsNull(
Expand Down
6 changes: 6 additions & 0 deletions src/iceberg/expression/expressions.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,19 @@ class ICEBERG_EXPORT Expressions {
/// \brief Create a MAX aggregate for an unbound term.
static std::shared_ptr<UnboundAggregateImpl<BoundReference>> Max(
std::shared_ptr<UnboundTerm<BoundReference>> expr);
/// \brief Create a MAX aggregate for an unbound transform term.
static std::shared_ptr<UnboundAggregateImpl<BoundTransform>> Max(
std::shared_ptr<UnboundTerm<BoundTransform>> expr);

/// \brief Create a MIN aggregate for a field name.
static std::shared_ptr<UnboundAggregateImpl<BoundReference>> Min(std::string name);

/// \brief Create a MIN aggregate for an unbound term.
static std::shared_ptr<UnboundAggregateImpl<BoundReference>> Min(
std::shared_ptr<UnboundTerm<BoundReference>> expr);
/// \brief Create a MIN aggregate for an unbound transform term.
static std::shared_ptr<UnboundAggregateImpl<BoundTransform>> Min(
std::shared_ptr<UnboundTerm<BoundTransform>> expr);

// Unary predicates

Expand Down
11 changes: 8 additions & 3 deletions src/iceberg/expression/term.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ namespace iceberg {
/// \brief A term is an expression node that produces a typed value when evaluated.
class ICEBERG_EXPORT Term : public util::Formattable {
public:
enum class Kind : uint8_t { kReference = 0, kTransform, kExtract };
enum class Kind : uint8_t { kReference, kTransform, kExtract };

/// \brief Returns the kind of this term.
virtual Kind kind() const = 0;

/// \brief Returns whether this term is unbound.
virtual bool is_unbound() const = 0;
};

template <typename T>
Expand All @@ -53,6 +56,8 @@ template <typename B>
class ICEBERG_EXPORT UnboundTerm : public Unbound<B>, public Term {
public:
using BoundType = B;

bool is_unbound() const override { return true; }
};

/// \brief Base class for bound terms.
Expand All @@ -66,8 +71,6 @@ class ICEBERG_EXPORT BoundTerm : public Bound, public Term {
/// \brief Returns whether this term may produce null values.
virtual bool MayProduceNull() const = 0;

// TODO(gangwu): add a comparator function to Literal and BoundTerm.

/// \brief Returns whether this term is equivalent to another.
///
/// Two terms are equivalent if they produce the same values when evaluated.
Expand All @@ -79,6 +82,8 @@ class ICEBERG_EXPORT BoundTerm : public Bound, public Term {
friend bool operator==(const BoundTerm& lhs, const BoundTerm& rhs) {
return lhs.Equals(rhs);
}

bool is_unbound() const override { return false; }
};

/// \brief A reference represents a named field in an expression.
Expand Down
40 changes: 40 additions & 0 deletions src/iceberg/row/struct_like.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

#include "iceberg/row/struct_like.h"

#include <string>
#include <utility>
#include <vector>

#include "iceberg/result.h"
#include "iceberg/util/checked_cast.h"
Expand All @@ -28,6 +30,44 @@

namespace iceberg {

Result<Scalar> LiteralToScalar(const Literal& literal) {
if (literal.IsNull()) {
return Scalar{std::monostate{}};
}

switch (literal.type()->type_id()) {
case TypeId::kBoolean:
return Scalar{std::get<bool>(literal.value())};
case TypeId::kInt:
case TypeId::kDate:
return Scalar{std::get<int32_t>(literal.value())};
case TypeId::kLong:
case TypeId::kTime:
case TypeId::kTimestamp:
case TypeId::kTimestampTz:
return Scalar{std::get<int64_t>(literal.value())};
case TypeId::kFloat:
return Scalar{std::get<float>(literal.value())};
case TypeId::kDouble:
return Scalar{std::get<double>(literal.value())};
case TypeId::kString: {
const auto& str = std::get<std::string>(literal.value());
return Scalar{std::string_view(str)};
}
case TypeId::kBinary:
case TypeId::kFixed: {
const auto& bytes = std::get<std::vector<uint8_t>>(literal.value());
return Scalar{
std::string_view(reinterpret_cast<const char*>(bytes.data()), bytes.size())};
}
case TypeId::kDecimal:
return Scalar{std::get<Decimal>(literal.value())};
default:
return NotSupported("Cannot convert literal of type {} to Scalar",
literal.type()->ToString());
}
}

StructLikeAccessor::StructLikeAccessor(std::shared_ptr<Type> type,
std::span<const size_t> position_path)
: type_(std::move(type)) {
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/row/struct_like.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ using Scalar = std::variant<std::monostate, // for null
std::shared_ptr<ArrayLike>, // for list
std::shared_ptr<MapLike>>; // for map

/// \brief Convert a Literal to a Scalar
Result<Scalar> LiteralToScalar(const Literal& literal);

/// \brief An immutable struct-like wrapper.
class ICEBERG_EXPORT StructLike {
public:
Expand Down
Loading
Loading