-
Notifications
You must be signed in to change notification settings - Fork 74
feat: add utils to project exprs on rows to exps on partitions #399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,244 @@ | ||||||||||||||
| /* | ||||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one | ||||||||||||||
| * or more contributor license agreements. See the NOTICE file | ||||||||||||||
| * distributed with this work for additional information | ||||||||||||||
| * regarding copyright ownership. The ASF licenses this file | ||||||||||||||
| * to you under the Apache License, Version 2.0 (the | ||||||||||||||
| * "License"); you may not use this file except in compliance | ||||||||||||||
| * with the License. You may obtain a copy of the License at | ||||||||||||||
| * | ||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||
| * | ||||||||||||||
| * Unless required by applicable law or agreed to in writing, | ||||||||||||||
| * software distributed under the License is distributed on an | ||||||||||||||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||||||||
| * KIND, either express or implied. See the License for the | ||||||||||||||
| * specific language governing permissions and limitations | ||||||||||||||
| * under the License. | ||||||||||||||
| */ | ||||||||||||||
|
|
||||||||||||||
| #include "iceberg/expression/projections.h" | ||||||||||||||
|
|
||||||||||||||
| #include <memory> | ||||||||||||||
| #include <vector> | ||||||||||||||
|
|
||||||||||||||
| #include "iceberg/expression/expression.h" | ||||||||||||||
| #include "iceberg/expression/expression_visitor.h" | ||||||||||||||
| #include "iceberg/expression/expressions.h" | ||||||||||||||
| #include "iceberg/expression/predicate.h" | ||||||||||||||
| #include "iceberg/expression/rewrite_not.h" | ||||||||||||||
| #include "iceberg/expression/term.h" | ||||||||||||||
| #include "iceberg/partition_field.h" | ||||||||||||||
| #include "iceberg/partition_spec.h" | ||||||||||||||
| #include "iceberg/result.h" | ||||||||||||||
| #include "iceberg/transform.h" | ||||||||||||||
| #include "iceberg/util/macros.h" | ||||||||||||||
|
|
||||||||||||||
| namespace iceberg { | ||||||||||||||
|
|
||||||||||||||
| // Implementation detail - not exported | ||||||||||||||
| class ProjectionVisitor : public ExpressionVisitor<std::shared_ptr<Expression>> { | ||||||||||||||
| public: | ||||||||||||||
| ~ProjectionVisitor() override = default; | ||||||||||||||
|
|
||||||||||||||
| ProjectionVisitor(const std::shared_ptr<PartitionSpec>& spec, | ||||||||||||||
| const std::shared_ptr<Schema>& schema, bool case_sensitive) | ||||||||||||||
| : spec_(spec), schema_(schema), case_sensitive_(case_sensitive) {} | ||||||||||||||
|
|
||||||||||||||
| Result<std::shared_ptr<Expression>> AlwaysTrue() override { return True::Instance(); } | ||||||||||||||
|
|
||||||||||||||
| Result<std::shared_ptr<Expression>> AlwaysFalse() override { return False::Instance(); } | ||||||||||||||
|
|
||||||||||||||
| Result<std::shared_ptr<Expression>> Not( | ||||||||||||||
| const std::shared_ptr<Expression>& child_result) override { | ||||||||||||||
| return InvalidExpression("Project called on expression with a not"); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| Result<std::shared_ptr<Expression>> And( | ||||||||||||||
| const std::shared_ptr<Expression>& left_result, | ||||||||||||||
| const std::shared_ptr<Expression>& right_result) override { | ||||||||||||||
| return Expressions::And(left_result, right_result); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| Result<std::shared_ptr<Expression>> Or( | ||||||||||||||
| const std::shared_ptr<Expression>& left_result, | ||||||||||||||
| const std::shared_ptr<Expression>& right_result) override { | ||||||||||||||
| return Expressions::Or(left_result, right_result); | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| Result<std::shared_ptr<Expression>> Predicate( | ||||||||||||||
| const std::shared_ptr<UnboundPredicate>& pred) override { | ||||||||||||||
| ICEBERG_ASSIGN_OR_RAISE(auto bound_pred, pred->Bind(*schema_, case_sensitive_)); | ||||||||||||||
| if (bound_pred->is_bound_predicate()) { | ||||||||||||||
| auto bound_predicate = std::dynamic_pointer_cast<BoundPredicate>(bound_pred); | ||||||||||||||
| ICEBERG_DCHECK( | ||||||||||||||
| bound_predicate != nullptr, | ||||||||||||||
| "Expected bound_predicate to be non-null after is_bound_predicate() check"); | ||||||||||||||
| return Predicate(bound_predicate); | ||||||||||||||
|
Comment on lines
+73
to
+77
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
| } | ||||||||||||||
| return bound_pred; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| Result<std::shared_ptr<Expression>> Predicate( | ||||||||||||||
| const std::shared_ptr<BoundPredicate>& pred) override { | ||||||||||||||
| return InvalidExpression("Bound predicates are not supported in projections"); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| protected: | ||||||||||||||
| const std::shared_ptr<PartitionSpec>& spec_; | ||||||||||||||
| const std::shared_ptr<Schema>& schema_; | ||||||||||||||
| bool case_sensitive_; | ||||||||||||||
|
|
||||||||||||||
| /// \brief Get partition fields that match the predicate's term. | ||||||||||||||
| std::vector<const PartitionField*> GetFieldsByPredicate( | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's port |
||||||||||||||
| const std::shared_ptr<BoundPredicate>& pred) const { | ||||||||||||||
| int32_t source_id; | ||||||||||||||
| switch (pred->term()->kind()) { | ||||||||||||||
| case Term::Kind::kReference: { | ||||||||||||||
| const auto& ref = pred->term()->reference(); | ||||||||||||||
| source_id = ref->field().field_id(); | ||||||||||||||
| break; | ||||||||||||||
| } | ||||||||||||||
| case Term::Kind::kTransform: { | ||||||||||||||
| const auto& transform = | ||||||||||||||
| internal::checked_pointer_cast<BoundTransform>(pred->term()); | ||||||||||||||
| source_id = transform->reference()->field().field_id(); | ||||||||||||||
| break; | ||||||||||||||
| } | ||||||||||||||
| default: | ||||||||||||||
| std::unreachable(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| std::vector<const PartitionField*> result; | ||||||||||||||
| for (const auto& field : spec_->fields()) { | ||||||||||||||
| if (field.source_id() == source_id) { | ||||||||||||||
| result.push_back(&field); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| return result; | ||||||||||||||
| } | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| ProjectionEvaluator::ProjectionEvaluator(std::unique_ptr<ProjectionVisitor> visitor) | ||||||||||||||
| : visitor_(std::move(visitor)) {} | ||||||||||||||
|
|
||||||||||||||
| ProjectionEvaluator::~ProjectionEvaluator() = default; | ||||||||||||||
|
|
||||||||||||||
| /// \brief Inclusive projection visitor. | ||||||||||||||
| /// | ||||||||||||||
| /// Uses AND to combine projections from multiple partition fields. | ||||||||||||||
| class InclusiveProjectionVisitor : public ProjectionVisitor { | ||||||||||||||
| public: | ||||||||||||||
| ~InclusiveProjectionVisitor() override = default; | ||||||||||||||
|
|
||||||||||||||
| InclusiveProjectionVisitor(const std::shared_ptr<PartitionSpec>& spec, | ||||||||||||||
| const std::shared_ptr<Schema>& schema, bool case_sensitive) | ||||||||||||||
| : ProjectionVisitor(spec, schema, case_sensitive) {} | ||||||||||||||
|
|
||||||||||||||
| Result<std::shared_ptr<Expression>> Predicate( | ||||||||||||||
| const std::shared_ptr<BoundPredicate>& pred) override { | ||||||||||||||
| ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null"); | ||||||||||||||
| // Find partition fields that match the predicate's term | ||||||||||||||
| auto partition_fields = GetFieldsByPredicate(pred); | ||||||||||||||
| if (partition_fields.empty()) { | ||||||||||||||
| // The predicate has no partition column | ||||||||||||||
| return AlwaysTrue(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Project the predicate for each partition field and combine with AND | ||||||||||||||
| // | ||||||||||||||
| // consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d) | ||||||||||||||
| // projections: b1 = bucket(7, '2019-01-01') = 5, b2 = bucket(5, '2019-01-01') = 0 | ||||||||||||||
| // any value where b1 != 5 or any value where b2 != 0 cannot be the '2019-01-01' | ||||||||||||||
| // | ||||||||||||||
| // similarly, if partitioning by day(ts) and hour(ts), the more restrictive | ||||||||||||||
| // projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and | ||||||||||||||
| // hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02. | ||||||||||||||
| std::shared_ptr<Expression> result = True::Instance(); | ||||||||||||||
| for (const auto* part_field : partition_fields) { | ||||||||||||||
| ICEBERG_ASSIGN_OR_RAISE(auto projected, | ||||||||||||||
| part_field->transform()->Project(part_field->name(), pred)); | ||||||||||||||
| if (projected != nullptr) { | ||||||||||||||
| result = | ||||||||||||||
| Expressions::And(result, std::shared_ptr<Expression>(projected.release())); | ||||||||||||||
|
Comment on lines
+162
to
+163
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| return result; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| protected: | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| /// \brief Strict projection evaluator. | ||||||||||||||
| /// | ||||||||||||||
| /// Uses OR to combine projections from multiple partition fields. | ||||||||||||||
| class StrictProjectionVisitor : public ProjectionVisitor { | ||||||||||||||
| public: | ||||||||||||||
| ~StrictProjectionVisitor() override = default; | ||||||||||||||
|
|
||||||||||||||
| StrictProjectionVisitor(const std::shared_ptr<PartitionSpec>& spec, | ||||||||||||||
| const std::shared_ptr<Schema>& schema, bool case_sensitive) | ||||||||||||||
| : ProjectionVisitor(spec, schema, case_sensitive) {} | ||||||||||||||
|
|
||||||||||||||
| Result<std::shared_ptr<Expression>> Predicate( | ||||||||||||||
| const std::shared_ptr<BoundPredicate>& pred) override { | ||||||||||||||
| ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null"); | ||||||||||||||
| // Find partition fields that match the predicate's term | ||||||||||||||
| auto partition_fields = GetFieldsByPredicate(pred); | ||||||||||||||
| if (partition_fields.empty()) { | ||||||||||||||
| // The predicate has no matching partition columns | ||||||||||||||
| return AlwaysFalse(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Project the predicate for each partition field and combine with OR | ||||||||||||||
| // | ||||||||||||||
| // consider (ts > 2019-01-01T01:00:00) with day(ts) and hour(ts) | ||||||||||||||
| // projections: d >= 2019-01-02 and h >= 2019-01-01-02 (note the inclusive bounds). | ||||||||||||||
| // any timestamp where either projection predicate is true must match the original | ||||||||||||||
| // predicate. For example, ts = 2019-01-01T03:00:00 matches the hour projection but | ||||||||||||||
| // not the day, but does match the original predicate. | ||||||||||||||
| std::shared_ptr<Expression> result = False::Instance(); | ||||||||||||||
| for (const auto* part_field : partition_fields) { | ||||||||||||||
| ICEBERG_ASSIGN_OR_RAISE(auto projected, part_field->transform()->ProjectStrict( | ||||||||||||||
| part_field->name(), pred)); | ||||||||||||||
| if (projected != nullptr) { | ||||||||||||||
| result = | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||||||||||||||
| Expressions::Or(result, std::shared_ptr<Expression>(projected.release())); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| return result; | ||||||||||||||
| } | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| Result<std::shared_ptr<Expression>> ProjectionEvaluator::Project( | ||||||||||||||
| const std::shared_ptr<Expression>& expr) { | ||||||||||||||
| // Projections assume that there are no NOT nodes in the expression tree. To ensure that | ||||||||||||||
| // this is the case, the expression is rewritten to push all NOT nodes down to the | ||||||||||||||
| // expression leaf nodes. | ||||||||||||||
| // | ||||||||||||||
| // This is necessary to ensure that the default expression returned when a predicate | ||||||||||||||
| // can't be projected is correct. | ||||||||||||||
| ICEBERG_ASSIGN_OR_RAISE(auto rewritten, RewriteNot::Visit(expr)); | ||||||||||||||
| return Visit<std::shared_ptr<Expression>, ProjectionVisitor>(rewritten, *visitor_); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| std::unique_ptr<ProjectionEvaluator> Projections::Inclusive( | ||||||||||||||
| const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<Schema>& schema, | ||||||||||||||
| bool case_sensitive) { | ||||||||||||||
| auto visitor = | ||||||||||||||
| std::make_unique<InclusiveProjectionVisitor>(spec, schema, case_sensitive); | ||||||||||||||
| return std::unique_ptr<ProjectionEvaluator>( | ||||||||||||||
| new ProjectionEvaluator(std::move(visitor))); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| std::unique_ptr<ProjectionEvaluator> Projections::Strict( | ||||||||||||||
| const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<Schema>& schema, | ||||||||||||||
| bool case_sensitive) { | ||||||||||||||
| auto visitor = std::make_unique<StrictProjectionVisitor>(spec, schema, case_sensitive); | ||||||||||||||
| return std::unique_ptr<ProjectionEvaluator>( | ||||||||||||||
| new ProjectionEvaluator(std::move(visitor))); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| } // namespace iceberg | ||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,109 @@ | ||||||||
| /* | ||||||||
| * Licensed to the Apache Software Foundation (ASF) under one | ||||||||
| * or more contributor license agreements. See the NOTICE file | ||||||||
| * distributed with this work for additional information | ||||||||
| * regarding copyright ownership. The ASF licenses this file | ||||||||
| * to you under the Apache License, Version 2.0 (the | ||||||||
| * "License"); you may not use this file except in compliance | ||||||||
| * with the License. You may obtain a copy of the License at | ||||||||
| * | ||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||
| * | ||||||||
| * Unless required by applicable law or agreed to in writing, | ||||||||
| * software distributed under the License is distributed on an | ||||||||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||
| * KIND, either express or implied. See the License for the | ||||||||
| * specific language governing permissions and limitations | ||||||||
| * under the License. | ||||||||
| */ | ||||||||
|
|
||||||||
| #pragma once | ||||||||
|
|
||||||||
| /// \file iceberg/expression/projections.h | ||||||||
| /// Utils to project expressions on rows to expressions on partitions. | ||||||||
|
|
||||||||
| #include <memory> | ||||||||
|
|
||||||||
| #include "iceberg/expression/expression.h" | ||||||||
| #include "iceberg/iceberg_export.h" | ||||||||
| #include "iceberg/partition_spec.h" | ||||||||
zhjwpku marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||
| #include "iceberg/result.h" | ||||||||
| #include "iceberg/type_fwd.h" | ||||||||
|
|
||||||||
| namespace iceberg { | ||||||||
|
|
||||||||
| /// \brief A class that projects expressions for a table's data rows into expressions on | ||||||||
| /// the table's partition values, for a table's partition spec. | ||||||||
| class ICEBERG_EXPORT ProjectionEvaluator { | ||||||||
| public: | ||||||||
| ~ProjectionEvaluator(); | ||||||||
|
|
||||||||
| /// \brief Project the given row expression to a partition expression. | ||||||||
| /// | ||||||||
| /// \param expr an expression on data rows | ||||||||
| /// \return an expression on partition data (depends on the projection) | ||||||||
| Result<std::shared_ptr<Expression>> Project(const std::shared_ptr<Expression>& expr); | ||||||||
|
|
||||||||
| private: | ||||||||
| friend class Projections; | ||||||||
|
|
||||||||
| /// \brief Create a ProjectionEvaluator. | ||||||||
| /// | ||||||||
| /// \param visitor The projection visitor to use | ||||||||
| explicit ProjectionEvaluator(std::unique_ptr<class ProjectionVisitor> visitor); | ||||||||
|
|
||||||||
| std::unique_ptr<class ProjectionVisitor> visitor_; | ||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
| }; | ||||||||
|
|
||||||||
| /// \brief Utils to project expressions on rows to expressions on partitions. | ||||||||
| /// | ||||||||
| /// There are two types of projections: inclusive and strict. | ||||||||
| /// | ||||||||
| /// An inclusive projection guarantees that if an expression matches a row, the projected | ||||||||
| /// expression will match the row's partition. | ||||||||
| /// | ||||||||
| /// A strict projection guarantees that if a partition matches a projected expression, | ||||||||
| /// then all rows in that partition will match the original expression. | ||||||||
| class ICEBERG_EXPORT Projections { | ||||||||
| public: | ||||||||
|
Comment on lines
+65
to
+66
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
| /// \brief Creates an inclusive ProjectionEvaluator for the partition spec. | ||||||||
| /// | ||||||||
| /// An evaluator is used to project expressions for a table's data rows into expressions | ||||||||
| /// on the table's partition values. The evaluator returned by this function is | ||||||||
| /// inclusive and will build expressions with the following guarantee: if the original | ||||||||
| /// expression matches a row, then the projected expression will match that row's | ||||||||
| /// partition. | ||||||||
| /// | ||||||||
| /// Each predicate in the expression is projected using Transform::Project. | ||||||||
| /// | ||||||||
| /// \param spec a partition spec | ||||||||
| /// \param case_sensitive whether the Projection should consider case sensitivity on | ||||||||
| /// column names or not. Defaults to true (case sensitive). | ||||||||
| /// \return an inclusive projection evaluator for the partition spec | ||||||||
| static std::unique_ptr<ProjectionEvaluator> Inclusive( | ||||||||
| const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<Schema>& schema, | ||||||||
| bool case_sensitive = true); | ||||||||
|
|
||||||||
| /// \brief Creates a strict ProjectionEvaluator for the partition spec. | ||||||||
| /// | ||||||||
| /// An evaluator is used to project expressions for a table's data rows into expressions | ||||||||
| /// on the table's partition values. The evaluator returned by this function is strict | ||||||||
| /// and will build expressions with the following guarantee: if the projected expression | ||||||||
| /// matches a partition, then the original expression will match all rows in that | ||||||||
| /// partition. | ||||||||
| /// | ||||||||
| /// Each predicate in the expression is projected using Transform::ProjectStrict. | ||||||||
| /// | ||||||||
| /// \param spec a partition spec | ||||||||
| /// \param case_sensitive whether the Projection should consider case sensitivity on | ||||||||
| /// column names or not. Defaults to true (case sensitive). | ||||||||
| /// \return a strict projection evaluator for the partition spec | ||||||||
| static std::unique_ptr<ProjectionEvaluator> Strict( | ||||||||
| const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<Schema>& schema, | ||||||||
| bool case_sensitive = true); | ||||||||
|
|
||||||||
| private: | ||||||||
| Projections() = default; | ||||||||
|
Comment on lines
+102
to
+104
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
| }; | ||||||||
|
|
||||||||
| } // namespace iceberg | ||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.