Skip to content

Commit 428a171

Browse files
authored
feat: add DataFile aggregate evaluation (#400)
1 parent 316b325 commit 428a171

File tree

8 files changed

+597
-18
lines changed

8 files changed

+597
-18
lines changed

src/iceberg/expression/aggregate.cc

Lines changed: 254 additions & 8 deletions
Large diffs are not rendered by default.

src/iceberg/expression/aggregate.h

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,15 @@ class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound
109109

110110
virtual Status Update(const StructLike& data) = 0;
111111

112-
virtual Status Update(const DataFile& file) {
113-
return NotImplemented("Update(DataFile) not implemented");
114-
}
112+
virtual Status Update(const DataFile& file) = 0;
113+
114+
/// \brief Whether the aggregator is still valid.
115+
virtual bool IsValid() const = 0;
115116

116117
/// \brief Get the result of the aggregation.
117118
/// \return The result of the aggregation.
118119
/// \note It is an undefined behavior to call this method if any previous Update call
119-
/// has returned an error.
120+
/// has returned an error or if IsValid() returns false.
120121
virtual Literal GetResult() const = 0;
121122
};
122123

@@ -128,6 +129,11 @@ class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound
128129

129130
Result<Literal> Evaluate(const StructLike& data) const override = 0;
130131

132+
virtual Result<Literal> Evaluate(const DataFile& file) const = 0;
133+
134+
/// \brief Whether metrics in the data file are sufficient to evaluate.
135+
virtual bool HasValue(const DataFile& file) const = 0;
136+
131137
bool is_bound_aggregate() const override { return true; }
132138

133139
/// \brief Create a new aggregator for this aggregate.
@@ -142,12 +148,15 @@ class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound
142148
/// \brief Base class for COUNT aggregates.
143149
class ICEBERG_EXPORT CountAggregate : public BoundAggregate {
144150
public:
145-
Result<Literal> Evaluate(const StructLike& data) const final;
151+
Result<Literal> Evaluate(const StructLike& data) const override;
152+
Result<Literal> Evaluate(const DataFile& file) const override;
146153

147154
std::unique_ptr<Aggregator> NewAggregator() const override;
148155

149156
/// \brief Count for a single row. Subclasses implement this.
150157
virtual Result<int64_t> CountFor(const StructLike& data) const = 0;
158+
/// \brief Count using metrics from a data file.
159+
virtual Result<int64_t> CountFor(const DataFile& file) const = 0;
151160

152161
protected:
153162
CountAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term)
@@ -161,6 +170,8 @@ class ICEBERG_EXPORT CountNonNullAggregate : public CountAggregate {
161170
std::shared_ptr<BoundTerm> term);
162171

163172
Result<int64_t> CountFor(const StructLike& data) const override;
173+
Result<int64_t> CountFor(const DataFile& file) const override;
174+
bool HasValue(const DataFile& file) const override;
164175

165176
private:
166177
explicit CountNonNullAggregate(std::shared_ptr<BoundTerm> term);
@@ -173,6 +184,8 @@ class ICEBERG_EXPORT CountNullAggregate : public CountAggregate {
173184
std::shared_ptr<BoundTerm> term);
174185

175186
Result<int64_t> CountFor(const StructLike& data) const override;
187+
Result<int64_t> CountFor(const DataFile& file) const override;
188+
bool HasValue(const DataFile& file) const override;
176189

177190
private:
178191
explicit CountNullAggregate(std::shared_ptr<BoundTerm> term);
@@ -184,6 +197,8 @@ class ICEBERG_EXPORT CountStarAggregate : public CountAggregate {
184197
static Result<std::unique_ptr<CountStarAggregate>> Make();
185198

186199
Result<int64_t> CountFor(const StructLike& data) const override;
200+
Result<int64_t> CountFor(const DataFile& file) const override;
201+
bool HasValue(const DataFile& file) const override;
187202

188203
private:
189204
CountStarAggregate();
@@ -192,9 +207,11 @@ class ICEBERG_EXPORT CountStarAggregate : public CountAggregate {
192207
/// \brief Bound MAX aggregate.
193208
class ICEBERG_EXPORT MaxAggregate : public BoundAggregate {
194209
public:
195-
static std::shared_ptr<MaxAggregate> Make(std::shared_ptr<BoundTerm> term);
210+
static Result<std::unique_ptr<MaxAggregate>> Make(std::shared_ptr<BoundTerm> term);
196211

197212
Result<Literal> Evaluate(const StructLike& data) const override;
213+
Result<Literal> Evaluate(const DataFile& file) const override;
214+
bool HasValue(const DataFile& file) const override;
198215

199216
std::unique_ptr<Aggregator> NewAggregator() const override;
200217

@@ -205,9 +222,11 @@ class ICEBERG_EXPORT MaxAggregate : public BoundAggregate {
205222
/// \brief Bound MIN aggregate.
206223
class ICEBERG_EXPORT MinAggregate : public BoundAggregate {
207224
public:
208-
static std::shared_ptr<MinAggregate> Make(std::shared_ptr<BoundTerm> term);
225+
static Result<std::unique_ptr<MinAggregate>> Make(std::shared_ptr<BoundTerm> term);
209226

210227
Result<Literal> Evaluate(const StructLike& data) const override;
228+
Result<Literal> Evaluate(const DataFile& file) const override;
229+
bool HasValue(const DataFile& file) const override;
211230

212231
std::unique_ptr<Aggregator> NewAggregator() const override;
213232

@@ -234,11 +253,17 @@ class ICEBERG_EXPORT AggregateEvaluator {
234253
/// \brief Update aggregates with a row.
235254
virtual Status Update(const StructLike& data) = 0;
236255

256+
/// \brief Update aggregates using data file metrics.
257+
virtual Status Update(const DataFile& file) = 0;
258+
237259
/// \brief Final aggregated value.
238260
virtual Result<std::span<const Literal>> GetResults() const = 0;
239261

240262
/// \brief Convenience accessor when only one aggregate is evaluated.
241263
virtual Result<Literal> GetResult() const = 0;
264+
265+
/// \brief Whether all aggregators are still valid (metrics present).
266+
virtual bool AllAggregatorsValid() const = 0;
242267
};
243268

244269
} // namespace iceberg

src/iceberg/expression/expressions.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,13 @@ std::shared_ptr<UnboundAggregateImpl<BoundReference>> Expressions::Max(
138138
return agg;
139139
}
140140

141+
std::shared_ptr<UnboundAggregateImpl<BoundTransform>> Expressions::Max(
142+
std::shared_ptr<UnboundTerm<BoundTransform>> expr) {
143+
ICEBERG_ASSIGN_OR_THROW(auto agg, UnboundAggregateImpl<BoundTransform>::Make(
144+
Expression::Operation::kMax, std::move(expr)));
145+
return agg;
146+
}
147+
141148
std::shared_ptr<UnboundAggregateImpl<BoundReference>> Expressions::Min(std::string name) {
142149
return Min(Ref(std::move(name)));
143150
}
@@ -149,6 +156,13 @@ std::shared_ptr<UnboundAggregateImpl<BoundReference>> Expressions::Min(
149156
return agg;
150157
}
151158

159+
std::shared_ptr<UnboundAggregateImpl<BoundTransform>> Expressions::Min(
160+
std::shared_ptr<UnboundTerm<BoundTransform>> expr) {
161+
ICEBERG_ASSIGN_OR_THROW(auto agg, UnboundAggregateImpl<BoundTransform>::Make(
162+
Expression::Operation::kMin, std::move(expr)));
163+
return agg;
164+
}
165+
152166
// Template implementations for unary predicates
153167

154168
std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::IsNull(

src/iceberg/expression/expressions.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,19 @@ class ICEBERG_EXPORT Expressions {
135135
/// \brief Create a MAX aggregate for an unbound term.
136136
static std::shared_ptr<UnboundAggregateImpl<BoundReference>> Max(
137137
std::shared_ptr<UnboundTerm<BoundReference>> expr);
138+
/// \brief Create a MAX aggregate for an unbound transform term.
139+
static std::shared_ptr<UnboundAggregateImpl<BoundTransform>> Max(
140+
std::shared_ptr<UnboundTerm<BoundTransform>> expr);
138141

139142
/// \brief Create a MIN aggregate for a field name.
140143
static std::shared_ptr<UnboundAggregateImpl<BoundReference>> Min(std::string name);
141144

142145
/// \brief Create a MIN aggregate for an unbound term.
143146
static std::shared_ptr<UnboundAggregateImpl<BoundReference>> Min(
144147
std::shared_ptr<UnboundTerm<BoundReference>> expr);
148+
/// \brief Create a MIN aggregate for an unbound transform term.
149+
static std::shared_ptr<UnboundAggregateImpl<BoundTransform>> Min(
150+
std::shared_ptr<UnboundTerm<BoundTransform>> expr);
145151

146152
// Unary predicates
147153

src/iceberg/expression/term.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,13 @@ namespace iceberg {
3737
/// \brief A term is an expression node that produces a typed value when evaluated.
3838
class ICEBERG_EXPORT Term : public util::Formattable {
3939
public:
40-
enum class Kind : uint8_t { kReference = 0, kTransform, kExtract };
40+
enum class Kind : uint8_t { kReference, kTransform, kExtract };
4141

4242
/// \brief Returns the kind of this term.
4343
virtual Kind kind() const = 0;
44+
45+
/// \brief Returns whether this term is unbound.
46+
virtual bool is_unbound() const = 0;
4447
};
4548

4649
template <typename T>
@@ -53,6 +56,8 @@ template <typename B>
5356
class ICEBERG_EXPORT UnboundTerm : public Unbound<B>, public Term {
5457
public:
5558
using BoundType = B;
59+
60+
bool is_unbound() const override { return true; }
5661
};
5762

5863
/// \brief Base class for bound terms.
@@ -66,8 +71,6 @@ class ICEBERG_EXPORT BoundTerm : public Bound, public Term {
6671
/// \brief Returns whether this term may produce null values.
6772
virtual bool MayProduceNull() const = 0;
6873

69-
// TODO(gangwu): add a comparator function to Literal and BoundTerm.
70-
7174
/// \brief Returns whether this term is equivalent to another.
7275
///
7376
/// Two terms are equivalent if they produce the same values when evaluated.
@@ -79,6 +82,8 @@ class ICEBERG_EXPORT BoundTerm : public Bound, public Term {
7982
friend bool operator==(const BoundTerm& lhs, const BoundTerm& rhs) {
8083
return lhs.Equals(rhs);
8184
}
85+
86+
bool is_unbound() const override { return false; }
8287
};
8388

8489
/// \brief A reference represents a named field in an expression.

src/iceberg/row/struct_like.cc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
#include "iceberg/row/struct_like.h"
2121

22+
#include <string>
2223
#include <utility>
24+
#include <vector>
2325

2426
#include "iceberg/result.h"
2527
#include "iceberg/util/checked_cast.h"
@@ -28,6 +30,44 @@
2830

2931
namespace iceberg {
3032

33+
Result<Scalar> LiteralToScalar(const Literal& literal) {
34+
if (literal.IsNull()) {
35+
return Scalar{std::monostate{}};
36+
}
37+
38+
switch (literal.type()->type_id()) {
39+
case TypeId::kBoolean:
40+
return Scalar{std::get<bool>(literal.value())};
41+
case TypeId::kInt:
42+
case TypeId::kDate:
43+
return Scalar{std::get<int32_t>(literal.value())};
44+
case TypeId::kLong:
45+
case TypeId::kTime:
46+
case TypeId::kTimestamp:
47+
case TypeId::kTimestampTz:
48+
return Scalar{std::get<int64_t>(literal.value())};
49+
case TypeId::kFloat:
50+
return Scalar{std::get<float>(literal.value())};
51+
case TypeId::kDouble:
52+
return Scalar{std::get<double>(literal.value())};
53+
case TypeId::kString: {
54+
const auto& str = std::get<std::string>(literal.value());
55+
return Scalar{std::string_view(str)};
56+
}
57+
case TypeId::kBinary:
58+
case TypeId::kFixed: {
59+
const auto& bytes = std::get<std::vector<uint8_t>>(literal.value());
60+
return Scalar{
61+
std::string_view(reinterpret_cast<const char*>(bytes.data()), bytes.size())};
62+
}
63+
case TypeId::kDecimal:
64+
return Scalar{std::get<Decimal>(literal.value())};
65+
default:
66+
return NotSupported("Cannot convert literal of type {} to Scalar",
67+
literal.type()->ToString());
68+
}
69+
}
70+
3171
StructLikeAccessor::StructLikeAccessor(std::shared_ptr<Type> type,
3272
std::span<const size_t> position_path)
3373
: type_(std::move(type)) {

src/iceberg/row/struct_like.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ using Scalar = std::variant<std::monostate, // for null
5555
std::shared_ptr<ArrayLike>, // for list
5656
std::shared_ptr<MapLike>>; // for map
5757

58+
/// \brief Convert a Literal to a Scalar
59+
Result<Scalar> LiteralToScalar(const Literal& literal);
60+
5861
/// \brief An immutable struct-like wrapper.
5962
class ICEBERG_EXPORT StructLike {
6063
public:

0 commit comments

Comments
 (0)