Skip to content

Commit 55c57f6

Browse files
committed
feat: add utils to project exprs on rows to exps on partitions
1 parent 316b325 commit 55c57f6

File tree

9 files changed

+1498
-1
lines changed

9 files changed

+1498
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ set(ICEBERG_SOURCES
2828
expression/inclusive_metrics_evaluator.cc
2929
expression/literal.cc
3030
expression/predicate.cc
31+
expression/projections.cc
3132
expression/rewrite_not.cc
3233
expression/strict_metrics_evaluator.cc
3334
expression/term.cc

src/iceberg/expression/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ install_headers(
2626
'inclusive_metrics_evaluator.h',
2727
'literal.h',
2828
'predicate.h',
29+
'projections.h',
2930
'rewrite_not.h',
3031
'strict_metrics_evaluator.h',
3132
'term.h',
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/expression/projections.h"
21+
22+
#include <memory>
23+
#include <vector>
24+
25+
#include "iceberg/expression/expression.h"
26+
#include "iceberg/expression/expression_visitor.h"
27+
#include "iceberg/expression/expressions.h"
28+
#include "iceberg/expression/predicate.h"
29+
#include "iceberg/expression/rewrite_not.h"
30+
#include "iceberg/expression/term.h"
31+
#include "iceberg/partition_field.h"
32+
#include "iceberg/partition_spec.h"
33+
#include "iceberg/result.h"
34+
#include "iceberg/transform.h"
35+
#include "iceberg/util/macros.h"
36+
37+
namespace iceberg {
38+
39+
// Implementation detail - not exported
40+
class ProjectionVisitor : public ExpressionVisitor<std::shared_ptr<Expression>> {
41+
public:
42+
~ProjectionVisitor() override = default;
43+
44+
ProjectionVisitor(const std::shared_ptr<PartitionSpec>& spec,
45+
const std::shared_ptr<Schema>& schema, bool case_sensitive)
46+
: spec_(spec), schema_(schema), case_sensitive_(case_sensitive) {}
47+
48+
Result<std::shared_ptr<Expression>> AlwaysTrue() override { return True::Instance(); }
49+
50+
Result<std::shared_ptr<Expression>> AlwaysFalse() override { return False::Instance(); }
51+
52+
Result<std::shared_ptr<Expression>> Not(
53+
const std::shared_ptr<Expression>& child_result) override {
54+
return InvalidExpression("Project called on expression with a not");
55+
}
56+
57+
Result<std::shared_ptr<Expression>> And(
58+
const std::shared_ptr<Expression>& left_result,
59+
const std::shared_ptr<Expression>& right_result) override {
60+
return Expressions::And(left_result, right_result);
61+
}
62+
63+
Result<std::shared_ptr<Expression>> Or(
64+
const std::shared_ptr<Expression>& left_result,
65+
const std::shared_ptr<Expression>& right_result) override {
66+
return Expressions::Or(left_result, right_result);
67+
}
68+
69+
Result<std::shared_ptr<Expression>> Predicate(
70+
const std::shared_ptr<UnboundPredicate>& pred) override {
71+
ICEBERG_ASSIGN_OR_RAISE(auto bound_pred, pred->Bind(*schema_, case_sensitive_));
72+
if (bound_pred->is_bound_predicate()) {
73+
auto bound_predicate = std::dynamic_pointer_cast<BoundPredicate>(bound_pred);
74+
ICEBERG_DCHECK(
75+
bound_predicate != nullptr,
76+
"Expected bound_predicate to be non-null after is_bound_predicate() check");
77+
return Predicate(bound_predicate);
78+
}
79+
return bound_pred;
80+
}
81+
82+
Result<std::shared_ptr<Expression>> Predicate(
83+
const std::shared_ptr<BoundPredicate>& pred) override {
84+
return InvalidExpression("Bound predicates are not supported in projections");
85+
}
86+
87+
protected:
88+
const std::shared_ptr<PartitionSpec>& spec_;
89+
const std::shared_ptr<Schema>& schema_;
90+
bool case_sensitive_;
91+
92+
/// \brief Get partition fields that match the predicate's term.
93+
std::vector<const PartitionField*> GetFieldsByPredicate(
94+
const std::shared_ptr<BoundPredicate>& pred) const {
95+
int32_t source_id;
96+
switch (pred->term()->kind()) {
97+
case Term::Kind::kReference: {
98+
const auto& ref = pred->term()->reference();
99+
source_id = ref->field().field_id();
100+
break;
101+
}
102+
case Term::Kind::kTransform: {
103+
const auto& transform =
104+
internal::checked_pointer_cast<BoundTransform>(pred->term());
105+
source_id = transform->reference()->field().field_id();
106+
break;
107+
}
108+
default:
109+
std::unreachable();
110+
}
111+
112+
std::vector<const PartitionField*> result;
113+
for (const auto& field : spec_->fields()) {
114+
if (field.source_id() == source_id) {
115+
result.push_back(&field);
116+
}
117+
}
118+
return result;
119+
}
120+
};
121+
122+
ProjectionEvaluator::ProjectionEvaluator(std::unique_ptr<ProjectionVisitor> visitor)
123+
: visitor_(std::move(visitor)) {}
124+
125+
ProjectionEvaluator::~ProjectionEvaluator() = default;
126+
127+
/// \brief Inclusive projection visitor.
128+
///
129+
/// Uses AND to combine projections from multiple partition fields.
130+
class InclusiveProjectionVisitor : public ProjectionVisitor {
131+
public:
132+
~InclusiveProjectionVisitor() override = default;
133+
134+
InclusiveProjectionVisitor(const std::shared_ptr<PartitionSpec>& spec,
135+
const std::shared_ptr<Schema>& schema, bool case_sensitive)
136+
: ProjectionVisitor(spec, schema, case_sensitive) {}
137+
138+
Result<std::shared_ptr<Expression>> Predicate(
139+
const std::shared_ptr<BoundPredicate>& pred) override {
140+
ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
141+
// Find partition fields that match the predicate's term
142+
auto partition_fields = GetFieldsByPredicate(pred);
143+
if (partition_fields.empty()) {
144+
// The predicate has no partition column
145+
return AlwaysTrue();
146+
}
147+
148+
// Project the predicate for each partition field and combine with AND
149+
//
150+
// consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d)
151+
// projections: b1 = bucket(7, '2019-01-01') = 5, b2 = bucket(5, '2019-01-01') = 0
152+
// any value where b1 != 5 or any value where b2 != 0 cannot be the '2019-01-01'
153+
//
154+
// similarly, if partitioning by day(ts) and hour(ts), the more restrictive
155+
// projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and
156+
// hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02.
157+
std::shared_ptr<Expression> result = True::Instance();
158+
for (const auto* part_field : partition_fields) {
159+
ICEBERG_ASSIGN_OR_RAISE(auto projected,
160+
part_field->transform()->Project(part_field->name(), pred));
161+
if (projected != nullptr) {
162+
result =
163+
Expressions::And(result, std::shared_ptr<Expression>(projected.release()));
164+
}
165+
}
166+
167+
return result;
168+
}
169+
170+
protected:
171+
};
172+
173+
/// \brief Strict projection evaluator.
174+
///
175+
/// Uses OR to combine projections from multiple partition fields.
176+
class StrictProjectionVisitor : public ProjectionVisitor {
177+
public:
178+
~StrictProjectionVisitor() override = default;
179+
180+
StrictProjectionVisitor(const std::shared_ptr<PartitionSpec>& spec,
181+
const std::shared_ptr<Schema>& schema, bool case_sensitive)
182+
: ProjectionVisitor(spec, schema, case_sensitive) {}
183+
184+
Result<std::shared_ptr<Expression>> Predicate(
185+
const std::shared_ptr<BoundPredicate>& pred) override {
186+
ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
187+
// Find partition fields that match the predicate's term
188+
auto partition_fields = GetFieldsByPredicate(pred);
189+
if (partition_fields.empty()) {
190+
// The predicate has no matching partition columns
191+
return AlwaysFalse();
192+
}
193+
194+
// Project the predicate for each partition field and combine with OR
195+
//
196+
// consider (ts > 2019-01-01T01:00:00) with day(ts) and hour(ts)
197+
// projections: d >= 2019-01-02 and h >= 2019-01-01-02 (note the inclusive bounds).
198+
// any timestamp where either projection predicate is true must match the original
199+
// predicate. For example, ts = 2019-01-01T03:00:00 matches the hour projection but
200+
// not the day, but does match the original predicate.
201+
std::shared_ptr<Expression> result = False::Instance();
202+
for (const auto* part_field : partition_fields) {
203+
ICEBERG_ASSIGN_OR_RAISE(auto projected, part_field->transform()->ProjectStrict(
204+
part_field->name(), pred));
205+
if (projected != nullptr) {
206+
result =
207+
Expressions::Or(result, std::shared_ptr<Expression>(projected.release()));
208+
}
209+
}
210+
211+
return result;
212+
}
213+
};
214+
215+
Result<std::shared_ptr<Expression>> ProjectionEvaluator::Project(
216+
const std::shared_ptr<Expression>& expr) {
217+
// Projections assume that there are no NOT nodes in the expression tree. To ensure that
218+
// this is the case, the expression is rewritten to push all NOT nodes down to the
219+
// expression leaf nodes.
220+
//
221+
// This is necessary to ensure that the default expression returned when a predicate
222+
// can't be projected is correct.
223+
ICEBERG_ASSIGN_OR_RAISE(auto rewritten, RewriteNot::Visit(expr));
224+
return Visit<std::shared_ptr<Expression>, ProjectionVisitor>(rewritten, *visitor_);
225+
}
226+
227+
std::unique_ptr<ProjectionEvaluator> Projections::Inclusive(
228+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<Schema>& schema,
229+
bool case_sensitive) {
230+
auto visitor =
231+
std::make_unique<InclusiveProjectionVisitor>(spec, schema, case_sensitive);
232+
return std::unique_ptr<ProjectionEvaluator>(
233+
new ProjectionEvaluator(std::move(visitor)));
234+
}
235+
236+
std::unique_ptr<ProjectionEvaluator> Projections::Strict(
237+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<Schema>& schema,
238+
bool case_sensitive) {
239+
auto visitor = std::make_unique<StrictProjectionVisitor>(spec, schema, case_sensitive);
240+
return std::unique_ptr<ProjectionEvaluator>(
241+
new ProjectionEvaluator(std::move(visitor)));
242+
}
243+
244+
} // namespace iceberg
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/expression/projections.h
23+
/// Utils to project expressions on rows to expressions on partitions.
24+
25+
#include <memory>
26+
27+
#include "iceberg/expression/expression.h"
28+
#include "iceberg/iceberg_export.h"
29+
#include "iceberg/partition_spec.h"
30+
#include "iceberg/result.h"
31+
#include "iceberg/type_fwd.h"
32+
33+
namespace iceberg {
34+
35+
/// \brief A class that projects expressions for a table's data rows into expressions on
36+
/// the table's partition values, for a table's partition spec.
37+
class ICEBERG_EXPORT ProjectionEvaluator {
38+
public:
39+
~ProjectionEvaluator();
40+
41+
/// \brief Project the given row expression to a partition expression.
42+
///
43+
/// \param expr an expression on data rows
44+
/// \return an expression on partition data (depends on the projection)
45+
Result<std::shared_ptr<Expression>> Project(const std::shared_ptr<Expression>& expr);
46+
47+
private:
48+
friend class Projections;
49+
50+
/// \brief Create a ProjectionEvaluator.
51+
///
52+
/// \param visitor The projection visitor to use
53+
explicit ProjectionEvaluator(std::unique_ptr<class ProjectionVisitor> visitor);
54+
55+
std::unique_ptr<class ProjectionVisitor> visitor_;
56+
};
57+
58+
/// \brief Utils to project expressions on rows to expressions on partitions.
59+
///
60+
/// There are two types of projections: inclusive and strict.
61+
///
62+
/// An inclusive projection guarantees that if an expression matches a row, the projected
63+
/// expression will match the row's partition.
64+
///
65+
/// A strict projection guarantees that if a partition matches a projected expression,
66+
/// then all rows in that partition will match the original expression.
67+
class ICEBERG_EXPORT Projections {
68+
public:
69+
/// \brief Creates an inclusive ProjectionEvaluator for the partition spec.
70+
///
71+
/// An evaluator is used to project expressions for a table's data rows into expressions
72+
/// on the table's partition values. The evaluator returned by this function is
73+
/// inclusive and will build expressions with the following guarantee: if the original
74+
/// expression matches a row, then the projected expression will match that row's
75+
/// partition.
76+
///
77+
/// Each predicate in the expression is projected using Transform::Project.
78+
///
79+
/// \param spec a partition spec
80+
/// \param case_sensitive whether the Projection should consider case sensitivity on
81+
/// column names or not. Defaults to true (case sensitive).
82+
/// \return an inclusive projection evaluator for the partition spec
83+
static std::unique_ptr<ProjectionEvaluator> Inclusive(
84+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<Schema>& schema,
85+
bool case_sensitive = true);
86+
87+
/// \brief Creates a strict ProjectionEvaluator for the partition spec.
88+
///
89+
/// An evaluator is used to project expressions for a table's data rows into expressions
90+
/// on the table's partition values. The evaluator returned by this function is strict
91+
/// and will build expressions with the following guarantee: if the projected expression
92+
/// matches a partition, then the original expression will match all rows in that
93+
/// partition.
94+
///
95+
/// Each predicate in the expression is projected using Transform::ProjectStrict.
96+
///
97+
/// \param spec a partition spec
98+
/// \param case_sensitive whether the Projection should consider case sensitivity on
99+
/// column names or not. Defaults to true (case sensitive).
100+
/// \return a strict projection evaluator for the partition spec
101+
static std::unique_ptr<ProjectionEvaluator> Strict(
102+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<Schema>& schema,
103+
bool case_sensitive = true);
104+
105+
private:
106+
Projections() = default;
107+
};
108+
109+
} // namespace iceberg

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ iceberg_sources = files(
5050
'expression/inclusive_metrics_evaluator.cc',
5151
'expression/literal.cc',
5252
'expression/predicate.cc',
53+
'expression/projections.cc',
5354
'expression/rewrite_not.cc',
5455
'expression/strict_metrics_evaluator.cc',
5556
'expression/term.cc',

src/iceberg/partition_spec.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
105105
private:
106106
/// \brief Create a new partition spec.
107107
///
108-
/// \param schema The table schema.
109108
/// \param spec_id The spec ID.
110109
/// \param fields The partition fields.
111110
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ add_iceberg_test(expression_test
8888
inclusive_metrics_evaluator_test.cc
8989
inclusive_metrics_evaluator_with_transform_test.cc
9090
predicate_test.cc
91+
projections_test.cc
9192
strict_metrics_evaluator_test.cc)
9293

9394
add_iceberg_test(json_serde_test

0 commit comments

Comments
 (0)