Skip to content

Commit 7d65abd

Browse files
committed
feat: add utils to project expressions on rows to expressions on partitions
add a schema field to PartitionSpec since ProjectionVisitor's Predicate on UnboundPredicate need to Bind to schema.
1 parent 09f26b6 commit 7d65abd

File tree

13 files changed

+1459
-28
lines changed

13 files changed

+1459
-28
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ set(ICEBERG_SOURCES
2828
expression/inclusive_metrics_evaluator.cc
2929
expression/literal.cc
3030
expression/predicate.cc
31+
expression/projections.cc
3132
expression/rewrite_not.cc
3233
expression/strict_metrics_evaluator.cc
3334
expression/term.cc

src/iceberg/expression/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ install_headers(
2626
'inclusive_metrics_evaluator.h',
2727
'literal.h',
2828
'predicate.h',
29+
'projections.h',
2930
'rewrite_not.h',
3031
'strict_metrics_evaluator.h',
3132
'term.h',
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/expression/projections.h"
21+
22+
#include <memory>
23+
#include <vector>
24+
25+
#include "iceberg/expression/expression.h"
26+
#include "iceberg/expression/expression_visitor.h"
27+
#include "iceberg/expression/expressions.h"
28+
#include "iceberg/expression/predicate.h"
29+
#include "iceberg/expression/rewrite_not.h"
30+
#include "iceberg/expression/term.h"
31+
#include "iceberg/partition_field.h"
32+
#include "iceberg/partition_spec.h"
33+
#include "iceberg/result.h"
34+
#include "iceberg/transform.h"
35+
#include "iceberg/util/macros.h"
36+
37+
namespace iceberg {
38+
39+
// Implementation detail - not exported
40+
class ProjectionVisitor : public ExpressionVisitor<std::shared_ptr<Expression>> {
41+
public:
42+
~ProjectionVisitor() override = default;
43+
44+
ProjectionVisitor(const PartitionSpec& spec, bool case_sensitive)
45+
: spec_(spec), case_sensitive_(case_sensitive) {}
46+
47+
Result<std::shared_ptr<Expression>> AlwaysTrue() override { return True::Instance(); }
48+
49+
Result<std::shared_ptr<Expression>> AlwaysFalse() override { return False::Instance(); }
50+
51+
Result<std::shared_ptr<Expression>> Not(
52+
const std::shared_ptr<Expression>& child_result) override {
53+
return InvalidExpression("Project called on expression with a not");
54+
}
55+
56+
Result<std::shared_ptr<Expression>> And(
57+
const std::shared_ptr<Expression>& left_result,
58+
const std::shared_ptr<Expression>& right_result) override {
59+
return Expressions::And(left_result, right_result);
60+
}
61+
62+
Result<std::shared_ptr<Expression>> Or(
63+
const std::shared_ptr<Expression>& left_result,
64+
const std::shared_ptr<Expression>& right_result) override {
65+
return Expressions::Or(left_result, right_result);
66+
}
67+
68+
Result<std::shared_ptr<Expression>> Predicate(
69+
const std::shared_ptr<UnboundPredicate>& pred) override {
70+
ICEBERG_ASSIGN_OR_RAISE(auto bound_pred,
71+
pred->Bind(*spec_.schema(), case_sensitive_));
72+
if (bound_pred->is_bound_predicate()) {
73+
auto bound_predicate = std::dynamic_pointer_cast<BoundPredicate>(bound_pred);
74+
ICEBERG_DCHECK(
75+
bound_predicate != nullptr,
76+
"Expected bound_predicate to be non-null after is_bound_predicate() check");
77+
return Predicate(bound_predicate);
78+
}
79+
return bound_pred;
80+
}
81+
82+
Result<std::shared_ptr<Expression>> Predicate(
83+
const std::shared_ptr<BoundPredicate>& pred) override {
84+
return InvalidExpression("Bound predicates are not supported in projections");
85+
}
86+
87+
protected:
88+
const PartitionSpec& spec_;
89+
bool case_sensitive_;
90+
91+
/// \brief Get partition fields that match the predicate's term.
92+
std::vector<const PartitionField*> GetFieldsByPredicate(
93+
const std::shared_ptr<BoundPredicate>& pred) const {
94+
int32_t source_id;
95+
switch (pred->term()->kind()) {
96+
case Term::Kind::kReference: {
97+
const auto& ref = pred->term()->reference();
98+
source_id = ref->field().field_id();
99+
break;
100+
}
101+
case Term::Kind::kTransform: {
102+
const auto& transform =
103+
internal::checked_pointer_cast<BoundTransform>(pred->term());
104+
source_id = transform->reference()->field().field_id();
105+
break;
106+
}
107+
default:
108+
std::unreachable();
109+
}
110+
111+
std::vector<const PartitionField*> result;
112+
for (const auto& field : spec_.fields()) {
113+
if (field.source_id() == source_id) {
114+
result.push_back(&field);
115+
}
116+
}
117+
return result;
118+
}
119+
};
120+
121+
ProjectionEvaluator::ProjectionEvaluator(std::unique_ptr<ProjectionVisitor> visitor)
122+
: visitor_(std::move(visitor)) {}
123+
124+
ProjectionEvaluator::~ProjectionEvaluator() = default;
125+
126+
/// \brief Inclusive projection visitor.
127+
///
128+
/// Uses AND to combine projections from multiple partition fields.
129+
class InclusiveProjectionVisitor : public ProjectionVisitor {
130+
public:
131+
~InclusiveProjectionVisitor() override = default;
132+
133+
InclusiveProjectionVisitor(const PartitionSpec& spec, bool case_sensitive)
134+
: ProjectionVisitor(spec, case_sensitive) {}
135+
136+
Result<std::shared_ptr<Expression>> Predicate(
137+
const std::shared_ptr<BoundPredicate>& pred) override {
138+
ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
139+
// Find partition fields that match the predicate's term
140+
auto partition_fields = GetFieldsByPredicate(pred);
141+
if (partition_fields.empty()) {
142+
// The predicate has no partition column
143+
return AlwaysTrue();
144+
}
145+
146+
// Project the predicate for each partition field and combine with AND
147+
//
148+
// consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d)
149+
// projections: b1 = bucket(7, '2019-01-01') = 5, b2 = bucket(5, '2019-01-01') = 0
150+
// any value where b1 != 5 or any value where b2 != 0 cannot be the '2019-01-01'
151+
//
152+
// similarly, if partitioning by day(ts) and hour(ts), the more restrictive
153+
// projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and
154+
// hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02.
155+
std::shared_ptr<Expression> result = True::Instance();
156+
for (const auto* part_field : partition_fields) {
157+
ICEBERG_ASSIGN_OR_RAISE(auto projected,
158+
part_field->transform()->Project(part_field->name(), pred));
159+
if (projected != nullptr) {
160+
result =
161+
Expressions::And(result, std::shared_ptr<Expression>(projected.release()));
162+
}
163+
}
164+
165+
return result;
166+
}
167+
168+
protected:
169+
};
170+
171+
/// \brief Strict projection evaluator.
172+
///
173+
/// Uses OR to combine projections from multiple partition fields.
174+
class StrictProjectionVisitor : public ProjectionVisitor {
175+
public:
176+
~StrictProjectionVisitor() override = default;
177+
178+
StrictProjectionVisitor(const PartitionSpec& spec, bool case_sensitive)
179+
: ProjectionVisitor(spec, case_sensitive) {}
180+
181+
Result<std::shared_ptr<Expression>> Predicate(
182+
const std::shared_ptr<BoundPredicate>& pred) override {
183+
ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
184+
// Find partition fields that match the predicate's term
185+
auto partition_fields = GetFieldsByPredicate(pred);
186+
if (partition_fields.empty()) {
187+
// The predicate has no matching partition columns
188+
return AlwaysFalse();
189+
}
190+
191+
// Project the predicate for each partition field and combine with OR
192+
//
193+
// consider (ts > 2019-01-01T01:00:00) with day(ts) and hour(ts)
194+
// projections: d >= 2019-01-02 and h >= 2019-01-01-02 (note the inclusive bounds).
195+
// any timestamp where either projection predicate is true must match the original
196+
// predicate. For example, ts = 2019-01-01T03:00:00 matches the hour projection but
197+
// not the day, but does match the original predicate.
198+
std::shared_ptr<Expression> result = False::Instance();
199+
for (const auto* part_field : partition_fields) {
200+
ICEBERG_ASSIGN_OR_RAISE(auto projected, part_field->transform()->ProjectStrict(
201+
part_field->name(), pred));
202+
if (projected != nullptr) {
203+
result =
204+
Expressions::Or(result, std::shared_ptr<Expression>(projected.release()));
205+
}
206+
}
207+
208+
return result;
209+
}
210+
};
211+
212+
Result<std::shared_ptr<Expression>> ProjectionEvaluator::Project(
213+
const std::shared_ptr<Expression>& expr) {
214+
// Projections assume that there are no NOT nodes in the expression tree. To ensure that
215+
// this is the case, the expression is rewritten to push all NOT nodes down to the
216+
// expression leaf nodes.
217+
//
218+
// This is necessary to ensure that the default expression returned when a predicate
219+
// can't be projected is correct.
220+
ICEBERG_ASSIGN_OR_RAISE(auto rewritten, RewriteNot::Visit(expr));
221+
return Visit<std::shared_ptr<Expression>, ProjectionVisitor>(rewritten, *visitor_);
222+
}
223+
224+
std::unique_ptr<ProjectionEvaluator> Projections::Inclusive(const PartitionSpec& spec,
225+
bool case_sensitive) {
226+
auto visitor = std::make_unique<InclusiveProjectionVisitor>(spec, case_sensitive);
227+
return std::unique_ptr<ProjectionEvaluator>(
228+
new ProjectionEvaluator(std::move(visitor)));
229+
}
230+
231+
std::unique_ptr<ProjectionEvaluator> Projections::Strict(const PartitionSpec& spec,
232+
bool case_sensitive) {
233+
auto visitor = std::make_unique<StrictProjectionVisitor>(spec, case_sensitive);
234+
return std::unique_ptr<ProjectionEvaluator>(
235+
new ProjectionEvaluator(std::move(visitor)));
236+
}
237+
238+
} // namespace iceberg
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/expression/projections.h
23+
/// Utils to project expressions on rows to expressions on partitions.
24+
25+
#include <memory>
26+
27+
#include "iceberg/expression/expression.h"
28+
#include "iceberg/iceberg_export.h"
29+
#include "iceberg/partition_spec.h"
30+
#include "iceberg/result.h"
31+
#include "iceberg/type_fwd.h"
32+
33+
namespace iceberg {
34+
35+
/// \brief A class that projects expressions for a table's data rows into expressions on
36+
/// the table's partition values, for a table's partition spec.
37+
class ICEBERG_EXPORT ProjectionEvaluator {
38+
public:
39+
~ProjectionEvaluator();
40+
41+
/// \brief Project the given row expression to a partition expression.
42+
///
43+
/// \param expr an expression on data rows
44+
/// \return an expression on partition data (depends on the projection)
45+
Result<std::shared_ptr<Expression>> Project(const std::shared_ptr<Expression>& expr);
46+
47+
private:
48+
friend class Projections;
49+
50+
/// \brief Create a ProjectionEvaluator.
51+
///
52+
/// \param visitor The projection visitor to use
53+
explicit ProjectionEvaluator(std::unique_ptr<class ProjectionVisitor> visitor);
54+
55+
std::unique_ptr<class ProjectionVisitor> visitor_;
56+
};
57+
58+
/// \brief Utils to project expressions on rows to expressions on partitions.
59+
///
60+
/// There are two types of projections: inclusive and strict.
61+
///
62+
/// An inclusive projection guarantees that if an expression matches a row, the projected
63+
/// expression will match the row's partition.
64+
///
65+
/// A strict projection guarantees that if a partition matches a projected expression,
66+
/// then all rows in that partition will match the original expression.
67+
class ICEBERG_EXPORT Projections {
68+
public:
69+
/// \brief Creates an inclusive ProjectionEvaluator for the partition spec.
70+
///
71+
/// An evaluator is used to project expressions for a table's data rows into expressions
72+
/// on the table's partition values. The evaluator returned by this function is
73+
/// inclusive and will build expressions with the following guarantee: if the original
74+
/// expression matches a row, then the projected expression will match that row's
75+
/// partition.
76+
///
77+
/// Each predicate in the expression is projected using Transform::Project.
78+
///
79+
/// \param spec a partition spec
80+
/// \param case_sensitive whether the Projection should consider case sensitivity on
81+
/// column names or not. Defaults to true (case sensitive).
82+
/// \return an inclusive projection evaluator for the partition spec
83+
static std::unique_ptr<ProjectionEvaluator> Inclusive(const PartitionSpec& spec,
84+
bool case_sensitive = true);
85+
86+
/// \brief Creates a strict ProjectionEvaluator for the partition spec.
87+
///
88+
/// An evaluator is used to project expressions for a table's data rows into expressions
89+
/// on the table's partition values. The evaluator returned by this function is strict
90+
/// and will build expressions with the following guarantee: if the projected expression
91+
/// matches a partition, then the original expression will match all rows in that
92+
/// partition.
93+
///
94+
/// Each predicate in the expression is projected using Transform::ProjectStrict.
95+
///
96+
/// \param spec a partition spec
97+
/// \param case_sensitive whether the Projection should consider case sensitivity on
98+
/// column names or not. Defaults to true (case sensitive).
99+
/// \return a strict projection evaluator for the partition spec
100+
static std::unique_ptr<ProjectionEvaluator> Strict(const PartitionSpec& spec,
101+
bool case_sensitive = true);
102+
103+
private:
104+
Projections() = default;
105+
};
106+
107+
} // namespace iceberg

src/iceberg/json_internal.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -547,11 +547,11 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
547547
std::unique_ptr<PartitionSpec> spec;
548548
if (default_spec_id == spec_id) {
549549
ICEBERG_ASSIGN_OR_RAISE(
550-
spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
550+
spec, PartitionSpec::Make(schema, spec_id, std::move(partition_fields),
551551
/*allow_missing_fields=*/false));
552552
} else {
553553
ICEBERG_ASSIGN_OR_RAISE(
554-
spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
554+
spec, PartitionSpec::Make(schema, spec_id, std::move(partition_fields),
555555
/*allow_missing_fields=*/true));
556556
}
557557
return spec;
@@ -930,7 +930,7 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
930930
// Create partition spec with schema validation
931931
ICEBERG_ASSIGN_OR_RAISE(
932932
auto spec,
933-
PartitionSpec::Make(*current_schema, PartitionSpec::kInitialSpecId,
933+
PartitionSpec::Make(current_schema, PartitionSpec::kInitialSpecId,
934934
std::move(fields), /*allow_missing_fields=*/false));
935935
default_spec_id = spec->spec_id();
936936
partition_specs.push_back(std::move(spec));

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ iceberg_sources = files(
5050
'expression/inclusive_metrics_evaluator.cc',
5151
'expression/literal.cc',
5252
'expression/predicate.cc',
53+
'expression/projections.cc',
5354
'expression/rewrite_not.cc',
5455
'expression/strict_metrics_evaluator.cc',
5556
'expression/term.cc',

0 commit comments

Comments
 (0)