diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 4c63c0c48..6e9eb0baf 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -29,6 +29,7 @@ set(ICEBERG_SOURCES expression/literal.cc expression/manifest_evaluator.cc expression/predicate.cc + expression/projections.cc expression/residual_evaluator.cc expression/rewrite_not.cc expression/strict_metrics_evaluator.cc diff --git a/src/iceberg/expression/meson.build b/src/iceberg/expression/meson.build index fbb072671..9b143ad31 100644 --- a/src/iceberg/expression/meson.build +++ b/src/iceberg/expression/meson.build @@ -27,6 +27,7 @@ install_headers( 'literal.h', 'manifest_evaluator.h', 'predicate.h', + 'projections.h', 'residual_evaluator.h', 'rewrite_not.h', 'strict_metrics_evaluator.h', diff --git a/src/iceberg/expression/projections.cc b/src/iceberg/expression/projections.cc new file mode 100644 index 000000000..dd83ef801 --- /dev/null +++ b/src/iceberg/expression/projections.cc @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/projections.h" + +#include + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/expression_visitor.h" +#include "iceberg/expression/predicate.h" +#include "iceberg/expression/rewrite_not.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/transform.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +class ProjectionVisitor : public ExpressionVisitor> { + public: + ~ProjectionVisitor() override = default; + + ProjectionVisitor(const PartitionSpec& spec, const Schema& schema, bool case_sensitive) + : spec_(spec), schema_(schema), case_sensitive_(case_sensitive) {} + + Result> AlwaysTrue() override { return True::Instance(); } + + Result> AlwaysFalse() override { return False::Instance(); } + + Result> Not( + const std::shared_ptr& child_result) override { + return InvalidExpression("Project called on expression with a not"); + } + + Result> And( + const std::shared_ptr& left_result, + const std::shared_ptr& right_result) override { + return And::MakeFolded(left_result, right_result); + } + + Result> Or( + const std::shared_ptr& left_result, + const std::shared_ptr& right_result) override { + return Or::MakeFolded(left_result, right_result); + } + + Result> Predicate( + const std::shared_ptr& pred) override { + ICEBERG_ASSIGN_OR_RAISE(auto bound_pred, pred->Bind(schema_, case_sensitive_)); + if (bound_pred->is_bound_predicate()) { + return Predicate(std::dynamic_pointer_cast(bound_pred)); + } + return bound_pred; + } + + Result> Predicate( + const std::shared_ptr& pred) override { + return InvalidExpression("Bound predicates are not supported in projections"); + } + + protected: + const PartitionSpec& spec_; + const Schema& schema_; + bool case_sensitive_; +}; + +ProjectionEvaluator::ProjectionEvaluator(std::unique_ptr visitor) + : visitor_(std::move(visitor)) {} + +ProjectionEvaluator::~ProjectionEvaluator() = default; + +/// \brief Inclusive projection visitor. +/// +/// Uses AND to combine projections from multiple partition fields. +class InclusiveProjectionVisitor : public ProjectionVisitor { + public: + ~InclusiveProjectionVisitor() override = default; + + InclusiveProjectionVisitor(const PartitionSpec& spec, const Schema& schema, + bool case_sensitive) + : ProjectionVisitor(spec, schema, case_sensitive) {} + + Result> Predicate( + const std::shared_ptr& pred) override { + ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null"); + // Find partition fields that match the predicate's term + ICEBERG_ASSIGN_OR_RAISE( + auto parts, spec_.GetFieldsBySourceId(pred->reference()->field().field_id())); + if (parts.empty()) { + // The predicate has no partition column + return AlwaysTrue(); + } + + // Project the predicate for each partition field and combine with AND + // + // consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d) + // projections: b1 = bucket(7, '2019-01-01') = 5, b2 = bucket(5, '2019-01-01') = 0 + // any value where b1 != 5 or any value where b2 != 0 cannot be the '2019-01-01' + // + // similarly, if partitioning by day(ts) and hour(ts), the more restrictive + // projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and + // hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02. + std::shared_ptr result = True::Instance(); + for (const auto& part : parts) { + ICEBERG_ASSIGN_OR_RAISE(auto projected, + part.get().transform()->Project(part.get().name(), pred)); + if (projected != nullptr) { + ICEBERG_ASSIGN_OR_RAISE(result, + And::MakeFolded(std::move(result), std::move(projected))); + } + } + + return result; + } +}; + +/// \brief Strict projection evaluator. +/// +/// Uses OR to combine projections from multiple partition fields. +class StrictProjectionVisitor : public ProjectionVisitor { + public: + ~StrictProjectionVisitor() override = default; + + StrictProjectionVisitor(const PartitionSpec& spec, const Schema& schema, + bool case_sensitive) + : ProjectionVisitor(spec, schema, case_sensitive) {} + + Result> Predicate( + const std::shared_ptr& pred) override { + ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null"); + // Find partition fields that match the predicate's term + ICEBERG_ASSIGN_OR_RAISE( + auto parts, spec_.GetFieldsBySourceId(pred->reference()->field().field_id())); + if (parts.empty()) { + // The predicate has no matching partition columns + return AlwaysFalse(); + } + + // Project the predicate for each partition field and combine with OR + // + // consider (ts > 2019-01-01T01:00:00) with day(ts) and hour(ts) + // projections: d >= 2019-01-02 and h >= 2019-01-01-02 (note the inclusive bounds). + // any timestamp where either projection predicate is true must match the original + // predicate. For example, ts = 2019-01-01T03:00:00 matches the hour projection but + // not the day, but does match the original predicate. + std::shared_ptr result = False::Instance(); + for (const auto& part : parts) { + ICEBERG_ASSIGN_OR_RAISE( + auto projected, part.get().transform()->ProjectStrict(part.get().name(), pred)); + if (projected != nullptr) { + ICEBERG_ASSIGN_OR_RAISE(result, + Or::MakeFolded(std::move(result), std::move(projected))); + } + } + + return result; + } +}; + +Result> ProjectionEvaluator::Project( + const std::shared_ptr& expr) { + // Projections assume that there are no NOT nodes in the expression tree. To ensure that + // this is the case, the expression is rewritten to push all NOT nodes down to the + // expression leaf nodes. + // + // This is necessary to ensure that the default expression returned when a predicate + // can't be projected is correct. + ICEBERG_ASSIGN_OR_RAISE(auto rewritten, RewriteNot::Visit(expr)); + return Visit, ProjectionVisitor>(rewritten, *visitor_); +} + +std::unique_ptr Projections::Inclusive(const PartitionSpec& spec, + const Schema& schema, + bool case_sensitive) { + auto visitor = + std::make_unique(spec, schema, case_sensitive); + return std::unique_ptr( + new ProjectionEvaluator(std::move(visitor))); +} + +std::unique_ptr Projections::Strict(const PartitionSpec& spec, + const Schema& schema, + bool case_sensitive) { + auto visitor = std::make_unique(spec, schema, case_sensitive); + return std::unique_ptr( + new ProjectionEvaluator(std::move(visitor))); +} + +} // namespace iceberg diff --git a/src/iceberg/expression/projections.h b/src/iceberg/expression/projections.h new file mode 100644 index 000000000..b2022c4f1 --- /dev/null +++ b/src/iceberg/expression/projections.h @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/projections.h +/// Utils to project expressions on rows to expressions on partitions. + +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief A class that projects expressions for a table's data rows into expressions on +/// the table's partition values, for a table's partition spec. +class ICEBERG_EXPORT ProjectionEvaluator { + public: + ~ProjectionEvaluator(); + + /// \brief Project the given row expression to a partition expression. + /// + /// \param expr an expression on data rows + /// \return an expression on partition data (depends on the projection) + Result> Project(const std::shared_ptr& expr); + + private: + friend class Projections; + + /// \brief Create a ProjectionEvaluator. + /// + /// \param visitor The projection visitor to use + explicit ProjectionEvaluator(std::unique_ptr visitor); + + std::unique_ptr visitor_; +}; + +/// \brief Utils to project expressions on rows to expressions on partitions. +/// +/// There are two types of projections: inclusive and strict. +/// +/// An inclusive projection guarantees that if an expression matches a row, the projected +/// expression will match the row's partition. +/// +/// A strict projection guarantees that if a partition matches a projected expression, +/// then all rows in that partition will match the original expression. +struct ICEBERG_EXPORT Projections { + /// \brief Creates an inclusive ProjectionEvaluator for the partition spec. + /// + /// An evaluator is used to project expressions for a table's data rows into expressions + /// on the table's partition values. The evaluator returned by this function is + /// inclusive and will build expressions with the following guarantee: if the original + /// expression matches a row, then the projected expression will match that row's + /// partition. + /// + /// Each predicate in the expression is projected using Transform::Project. + /// + /// \param spec a partition spec + /// \param schema a schema + /// \param case_sensitive whether the Projection should consider case sensitivity on + /// column names or not. Defaults to true (case sensitive). + /// \return an inclusive projection evaluator for the partition spec + static std::unique_ptr Inclusive(const PartitionSpec& spec, + const Schema& schema, + bool case_sensitive = true); + + /// \brief Creates a strict ProjectionEvaluator for the partition spec. + /// + /// An evaluator is used to project expressions for a table's data rows into expressions + /// on the table's partition values. The evaluator returned by this function is strict + /// and will build expressions with the following guarantee: if the projected expression + /// matches a partition, then the original expression will match all rows in that + /// partition. + /// + /// Each predicate in the expression is projected using Transform::ProjectStrict. + /// + /// \param spec a partition spec + /// \param schema a schema + /// \param case_sensitive whether the Projection should consider case sensitivity on + /// column names or not. Defaults to true (case sensitive). + /// \return a strict projection evaluator for the partition spec + static std::unique_ptr Strict(const PartitionSpec& spec, + const Schema& schema, + bool case_sensitive = true); +}; + +} // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index d70eae253..d473d72e1 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -51,6 +51,7 @@ iceberg_sources = files( 'expression/literal.cc', 'expression/manifest_evaluator.cc', 'expression/predicate.cc', + 'expression/projections.cc', 'expression/residual_evaluator.cc', 'expression/rewrite_not.cc', 'expression/strict_metrics_evaluator.cc', diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h index 7f8f67822..0d1a78f16 100644 --- a/src/iceberg/partition_spec.h +++ b/src/iceberg/partition_spec.h @@ -114,7 +114,6 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { private: /// \brief Create a new partition spec. /// - /// \param schema The table schema. /// \param spec_id The spec ID. /// \param fields The partition fields. /// \param last_assigned_field_id The last assigned field ID. If not provided, it will diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 7d9434842..a48567132 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -89,6 +89,7 @@ add_iceberg_test(expression_test inclusive_metrics_evaluator_test.cc inclusive_metrics_evaluator_with_transform_test.cc predicate_test.cc + projections_test.cc residual_evaluator_test.cc strict_metrics_evaluator_test.cc) diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index f058cddad..6a2a9e9ab 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -66,6 +66,7 @@ iceberg_tests = { 'literal_test.cc', 'manifest_evaluator_test.cc', 'predicate_test.cc', + 'projections_test.cc', 'residual_evaluator_test.cc', 'strict_metrics_evaluator_test.cc', ), diff --git a/src/iceberg/test/projections_test.cc b/src/iceberg/test/projections_test.cc new file mode 100644 index 000000000..475a18da6 --- /dev/null +++ b/src/iceberg/test/projections_test.cc @@ -0,0 +1,1092 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/projections.h" + +#include +#include +#include + +#include + +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/predicate.h" +#include "iceberg/partition_field.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/temporal_test_helper.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" + +namespace iceberg { + +class ProjectionsTest : public ::testing::Test { + protected: + void SetUp() override { + // Create a simple test schema with various field types + schema_ = std::make_shared( + std::vector{SchemaField::MakeOptional(16, "id", int64())}, + /*schema_id=*/0); + } + + std::shared_ptr schema_; +}; + +// Helper function to extract UnboundPredicate from expression +std::shared_ptr ExtractUnboundPredicate( + const std::shared_ptr& expr) { + if (expr->is_unbound_predicate()) { + return std::dynamic_pointer_cast(expr); + } + return nullptr; +} + +// Helper function to extract BoundPredicate from expression +std::shared_ptr ExtractBoundPredicate( + const std::shared_ptr& expr) { + if (expr->is_bound_predicate()) { + return std::dynamic_pointer_cast(expr); + } + return nullptr; +} + +// Helper function to assert projection operation +void AssertProjectionOperation(const std::shared_ptr& projection, + Expression::Operation expected_op) { + ASSERT_NE(projection, nullptr); + EXPECT_EQ(projection->op(), expected_op); +} + +// Helper function to assert projection value for True/False +void AssertProjectionValue(const std::shared_ptr& projection, + Expression::Operation expected_op) { + ASSERT_NE(projection, nullptr); + EXPECT_EQ(projection->op(), expected_op); +} + +TEST_F(ProjectionsTest, IdentityProjectionInclusive) { + auto identity_transform = Transform::Identity(); + PartitionField pt_field(16, 1000, "id", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + std::vector> predicates = { + Expressions::NotNull("id"), + Expressions::IsNull("id"), + Expressions::LessThan("id", Literal::Long(100)), + Expressions::LessThanOrEqual("id", Literal::Long(101)), + Expressions::GreaterThan("id", Literal::Long(102)), + Expressions::GreaterThanOrEqual("id", Literal::Long(103)), + Expressions::Equal("id", Literal::Long(104)), + Expressions::NotEqual("id", Literal::Long(105)), + }; + + for (const auto& predicate : predicates) { + // Bind the predicate first + ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, predicate->Bind(*schema_, true)); + auto bound = ExtractBoundPredicate(bound_pred); + ASSERT_NE(bound, nullptr); + + // Project the bound predicate + auto evaluator = Projections::Inclusive(*spec, *schema_, true); + ICEBERG_UNWRAP_OR_FAIL(auto projected_expr, evaluator->Project(bound_pred)); + + // Check that we got a predicate back + auto projected = ExtractUnboundPredicate(projected_expr); + ASSERT_NE(projected, nullptr); + + // Check that the operation matches + EXPECT_EQ(projected->op(), bound->op()); + + // Check that the field name matches + EXPECT_EQ(projected->reference()->name(), "id"); + + if (bound->kind() == BoundPredicate::Kind::kLiteral) { + const auto& literal_predicate = + internal::checked_pointer_cast>(projected); + const auto& bound_literal_predicate = + internal::checked_pointer_cast(bound); + EXPECT_EQ(literal_predicate->literals().front(), + bound_literal_predicate->literal()); + } + } +} + +TEST_F(ProjectionsTest, IdentityProjectionStrict) { + auto identity_transform = Transform::Identity(); + PartitionField pt_field(16, 1000, "id", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + std::vector> predicates = { + Expressions::NotNull("id"), + Expressions::IsNull("id"), + Expressions::LessThan("id", Literal::Long(100)), + Expressions::LessThanOrEqual("id", Literal::Long(101)), + Expressions::GreaterThan("id", Literal::Long(102)), + Expressions::GreaterThanOrEqual("id", Literal::Long(103)), + Expressions::Equal("id", Literal::Long(104)), + Expressions::NotEqual("id", Literal::Long(105)), + }; + + for (const auto& predicate : predicates) { + // Bind the predicate first + ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, predicate->Bind(*schema_, true)); + auto bound = ExtractBoundPredicate(bound_pred); + ASSERT_NE(bound, nullptr); + + // Project the bound predicate + auto evaluator = Projections::Strict(*spec, *schema_, true); + ICEBERG_UNWRAP_OR_FAIL(auto projected_expr, evaluator->Project(bound_pred)); + + // Check that we got a predicate back + auto projected = ExtractUnboundPredicate(projected_expr); + ASSERT_NE(projected, nullptr); + + // Check that the operation matches + EXPECT_EQ(projected->op(), bound->op()); + + // Check that the field name matches + EXPECT_EQ(projected->reference()->name(), "id"); + + if (bound->kind() == BoundPredicate::Kind::kLiteral) { + const auto& literal_predicate = + internal::checked_pointer_cast>(projected); + const auto& bound_literal_predicate = + internal::checked_pointer_cast(bound); + EXPECT_EQ(literal_predicate->literals().front(), + bound_literal_predicate->literal()); + } + } +} + +TEST_F(ProjectionsTest, CaseInsensitiveIdentityProjection) { + auto identity_transform = Transform::Identity(); + PartitionField pt_field(16, 1000, "id", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + std::vector> predicates = { + Expressions::NotNull("ID"), + Expressions::IsNull("ID"), + Expressions::LessThan("ID", Literal::Long(100)), + Expressions::LessThanOrEqual("ID", Literal::Long(101)), + Expressions::GreaterThan("ID", Literal::Long(102)), + Expressions::GreaterThanOrEqual("ID", Literal::Long(103)), + Expressions::Equal("ID", Literal::Long(104)), + Expressions::NotEqual("ID", Literal::Long(105)), + }; + + for (const auto& predicate : predicates) { + // Bind the predicate first (case insensitive) + ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, predicate->Bind(*schema_, false)); + auto bound = ExtractBoundPredicate(bound_pred); + ASSERT_NE(bound, nullptr); + + // Project the bound predicate (case insensitive) + auto evaluator = Projections::Inclusive(*spec, *schema_, false); + ICEBERG_UNWRAP_OR_FAIL(auto projected_expr, evaluator->Project(bound_pred)); + + // Check that we got a predicate back + auto projected = ExtractUnboundPredicate(projected_expr); + ASSERT_NE(projected, nullptr); + + // Check that the operation matches + EXPECT_EQ(projected->op(), bound->op()); + + // Check that the field name matches + EXPECT_EQ(projected->reference()->name(), "id"); + + if (bound->kind() == BoundPredicate::Kind::kLiteral) { + const auto& literal_predicate = + internal::checked_pointer_cast>(projected); + const auto& bound_literal_predicate = + internal::checked_pointer_cast(bound); + EXPECT_EQ(literal_predicate->literals().front(), + bound_literal_predicate->literal()); + } + } +} + +TEST_F(ProjectionsTest, CaseSensitiveIdentityProjectionFailure) { + auto identity_transform = Transform::Identity(); + PartitionField pt_field(16, 1000, "id", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto predicate = Expressions::NotNull("ID"); + // Binding should fail with case sensitive + auto bound_result = predicate->Bind(*schema_, true); + EXPECT_THAT(bound_result, IsError(ErrorKind::kInvalidExpression)); +} + +// Bucketing projection tests +class BucketingProjectionTest : public ::testing::Test { + protected: + void AssertProjectionStrict(const PartitionSpec& spec, const Schema& schema, + const std::shared_ptr& filter, + Expression::Operation expected_op, + const std::string& expected_literal) { + auto evaluator = Projections::Strict(spec, schema, true); + ICEBERG_UNWRAP_OR_FAIL(auto projection, evaluator->Project(filter)); + AssertProjectionOperation(projection, expected_op); + + if (expected_op != Expression::Operation::kFalse) { + auto predicate = ExtractUnboundPredicate(projection); + ASSERT_NE(predicate, nullptr); + if (predicate->op() == Expression::Operation::kNotIn) { + // For NOT_IN, check literals + const auto& literal_predicate = + internal::checked_pointer_cast>( + predicate); + auto literals = literal_predicate->literals(); + std::vector values; + for (const auto& lit : literals) { + values.push_back(std::to_string(std::get(lit.value()))); + } + std::ranges::sort(values); + std::string actual = "["; + for (size_t i = 0; i < values.size(); ++i) { + if (i > 0) actual += ", "; + actual += values[i]; + } + actual += "]"; + EXPECT_EQ(actual, expected_literal); + } else { + // For other operations, check single literal + const auto& literal_predicate = + internal::checked_pointer_cast>( + predicate); + auto literal = literal_predicate->literals().front(); + std::string output = std::to_string(std::get(literal.value())); + EXPECT_EQ(output, expected_literal); + } + } + } + + void AssertProjectionStrictValue(const PartitionSpec& spec, const Schema& schema, + const std::shared_ptr& filter, + Expression::Operation expected_op) { + auto evaluator = Projections::Strict(spec, schema, true); + ICEBERG_UNWRAP_OR_FAIL(auto projection, evaluator->Project(filter)); + AssertProjectionValue(projection, expected_op); + } + + void AssertProjectionInclusive(const PartitionSpec& spec, const Schema& schema, + const std::shared_ptr& filter, + Expression::Operation expected_op, + const std::string& expected_literal) { + auto evaluator = Projections::Inclusive(spec, schema, true); + ICEBERG_UNWRAP_OR_FAIL(auto projection, evaluator->Project(filter)); + AssertProjectionOperation(projection, expected_op); + + if (expected_op != Expression::Operation::kTrue) { + auto predicate = ExtractUnboundPredicate(projection); + ASSERT_NE(predicate, nullptr); + if (predicate->op() == Expression::Operation::kIn) { + // For IN, check literals + const auto& literal_predicate = + internal::checked_pointer_cast>( + predicate); + auto literals = literal_predicate->literals(); + std::vector values; + for (const auto& lit : literals) { + values.push_back(std::to_string(std::get(lit.value()))); + } + std::ranges::sort(values); + std::string actual = "["; + for (size_t i = 0; i < values.size(); ++i) { + if (i > 0) actual += ", "; + actual += values[i]; + } + actual += "]"; + EXPECT_EQ(actual, expected_literal); + } else { + // For other operations, check single literal + const auto& literal_predicate = + internal::checked_pointer_cast>( + predicate); + auto literal = literal_predicate->literals().front(); + std::string output = std::to_string(std::get(literal.value())); + EXPECT_EQ(output, expected_literal); + } + } + } + + void AssertProjectionInclusiveValue(const PartitionSpec& spec, const Schema& schema, + const std::shared_ptr& filter, + Expression::Operation expected_op) { + auto evaluator = Projections::Inclusive(spec, schema, true); + ICEBERG_UNWRAP_OR_FAIL(auto projection, evaluator->Project(filter)); + AssertProjectionValue(projection, expected_op); + } +}; + +TEST_F(BucketingProjectionTest, BucketIntegerStrict) { + int32_t value = 100; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", int32())}, 0); + auto bucket_transform = Transform::Bucket(10); + PartitionField pt_field(1, 1000, "value_bucket", bucket_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + // Bind predicates first + auto not_equal_pred = Expressions::NotEqual("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_not_equal, not_equal_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + auto less_than_pred = Expressions::LessThan("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto less_equal_pred = Expressions::LessThanOrEqual("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_equal, less_equal_pred->Bind(*schema, true)); + + auto greater_than_pred = Expressions::GreaterThan("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_greater_than, greater_than_pred->Bind(*schema, true)); + + auto greater_equal_pred = Expressions::GreaterThanOrEqual("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_greater_equal, + greater_equal_pred->Bind(*schema, true)); + + // The bucket number of 100 with 10 buckets is 6 + AssertProjectionStrict(*spec, *schema, bound_not_equal, Expression::Operation::kNotEq, + "6"); + AssertProjectionStrictValue(*spec, *schema, bound_equal, Expression::Operation::kFalse); + AssertProjectionStrictValue(*spec, *schema, bound_less_than, + Expression::Operation::kFalse); + AssertProjectionStrictValue(*spec, *schema, bound_less_equal, + Expression::Operation::kFalse); + AssertProjectionStrictValue(*spec, *schema, bound_greater_than, + Expression::Operation::kFalse); + AssertProjectionStrictValue(*spec, *schema, bound_greater_equal, + Expression::Operation::kFalse); +} + +TEST_F(BucketingProjectionTest, BucketIntegerInclusive) { + int32_t value = 100; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", int32())}, 0); + auto bucket_transform = Transform::Bucket(10); + PartitionField pt_field(1, 1000, "value_bucket", bucket_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + // Bind predicates first + auto equal_pred = Expressions::Equal("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + auto not_equal_pred = Expressions::NotEqual("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_not_equal, not_equal_pred->Bind(*schema, true)); + + auto less_than_pred = Expressions::LessThan("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto less_equal_pred = Expressions::LessThanOrEqual("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_equal, less_equal_pred->Bind(*schema, true)); + + auto greater_than_pred = Expressions::GreaterThan("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_greater_than, greater_than_pred->Bind(*schema, true)); + + auto greater_equal_pred = Expressions::GreaterThanOrEqual("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_greater_equal, + greater_equal_pred->Bind(*schema, true)); + + // The bucket number of 100 with 10 buckets is 6 + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq, "6"); + AssertProjectionInclusiveValue(*spec, *schema, bound_not_equal, + Expression::Operation::kTrue); + AssertProjectionInclusiveValue(*spec, *schema, bound_less_than, + Expression::Operation::kTrue); + AssertProjectionInclusiveValue(*spec, *schema, bound_less_equal, + Expression::Operation::kTrue); + AssertProjectionInclusiveValue(*spec, *schema, bound_greater_than, + Expression::Operation::kTrue); + AssertProjectionInclusiveValue(*spec, *schema, bound_greater_equal, + Expression::Operation::kTrue); +} + +TEST_F(BucketingProjectionTest, BucketLongStrict) { + int64_t value = 100L; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", int64())}, 0); + auto bucket_transform = Transform::Bucket(10); + PartitionField pt_field(1, 1000, "value_bucket", bucket_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto not_equal_pred = Expressions::NotEqual("value", Literal::Long(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_not_equal, not_equal_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("value", Literal::Long(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + // The bucket number of 100 with 10 buckets is 6 + AssertProjectionStrict(*spec, *schema, bound_not_equal, Expression::Operation::kNotEq, + "6"); + AssertProjectionStrictValue(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(BucketingProjectionTest, BucketLongInclusive) { + int64_t value = 100L; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", int64())}, 0); + auto bucket_transform = Transform::Bucket(10); + PartitionField pt_field(1, 1000, "value_bucket", bucket_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto equal_pred = Expressions::Equal("value", Literal::Long(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + auto not_equal_pred = Expressions::NotEqual("value", Literal::Long(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_not_equal, not_equal_pred->Bind(*schema, true)); + + // The bucket number of 100 with 10 buckets is 6 + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq, "6"); + AssertProjectionInclusiveValue(*spec, *schema, bound_not_equal, + Expression::Operation::kTrue); +} + +TEST_F(BucketingProjectionTest, BucketStringStrict) { + std::string value = "abcdefg"; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", string())}, 0); + auto bucket_transform = Transform::Bucket(10); + PartitionField pt_field(1, 1000, "value_bucket", bucket_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto not_equal_pred = Expressions::NotEqual("value", Literal::String(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_not_equal, not_equal_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("value", Literal::String(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + // The bucket number of "abcdefg" with 10 buckets is 4 + AssertProjectionStrict(*spec, *schema, bound_not_equal, Expression::Operation::kNotEq, + "4"); + AssertProjectionStrictValue(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(BucketingProjectionTest, BucketStringInclusive) { + std::string value = "abcdefg"; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", string())}, 0); + auto bucket_transform = Transform::Bucket(10); + PartitionField pt_field(1, 1000, "value_bucket", bucket_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto equal_pred = Expressions::Equal("value", Literal::String(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + auto not_equal_pred = Expressions::NotEqual("value", Literal::String(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_not_equal, not_equal_pred->Bind(*schema, true)); + + // The bucket number of "abcdefg" with 10 buckets is 4 + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq, "4"); + AssertProjectionInclusiveValue(*spec, *schema, bound_not_equal, + Expression::Operation::kTrue); +} + +// Date projection tests +class DateProjectionTest : public ::testing::Test { + protected: + void AssertProjectionStrict(const PartitionSpec& spec, const Schema& schema, + const std::shared_ptr& filter, + Expression::Operation expected_op) { + auto evaluator = Projections::Strict(spec, schema, true); + ICEBERG_UNWRAP_OR_FAIL(auto projection, evaluator->Project(filter)); + AssertProjectionOperation(projection, expected_op); + } + + void AssertProjectionInclusive(const PartitionSpec& spec, const Schema& schema, + const std::shared_ptr& filter, + Expression::Operation expected_op) { + auto evaluator = Projections::Inclusive(spec, schema, true); + ICEBERG_UNWRAP_OR_FAIL(auto projection, evaluator->Project(filter)); + AssertProjectionOperation(projection, expected_op); + } +}; + +TEST_F(DateProjectionTest, DayStrict) { + int32_t date_value = + TemporalTestHelper::CreateDate({.year = 2017, .month = 1, .day = 1}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "date", date())}, 0); + auto day_transform = Transform::Day(); + PartitionField pt_field(1, 1000, "date_day", day_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto less_equal_pred = Expressions::LessThanOrEqual("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_equal, less_equal_pred->Bind(*schema, true)); + + auto greater_than_pred = Expressions::GreaterThan("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_greater_than, greater_than_pred->Bind(*schema, true)); + + auto greater_equal_pred = + Expressions::GreaterThanOrEqual("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_greater_equal, + greater_equal_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionStrict(*spec, *schema, bound_less_than, Expression::Operation::kLt); + AssertProjectionStrict(*spec, *schema, bound_less_equal, Expression::Operation::kLt); + AssertProjectionStrict(*spec, *schema, bound_greater_than, Expression::Operation::kGt); + AssertProjectionStrict(*spec, *schema, bound_greater_equal, Expression::Operation::kGt); + AssertProjectionStrict(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(DateProjectionTest, DayInclusive) { + int32_t date_value = + TemporalTestHelper::CreateDate({.year = 2017, .month = 1, .day = 1}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "date", date())}, 0); + auto day_transform = Transform::Day(); + PartitionField pt_field(1, 1000, "date_day", day_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto less_equal_pred = Expressions::LessThanOrEqual("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_equal, less_equal_pred->Bind(*schema, true)); + + auto greater_than_pred = Expressions::GreaterThan("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_greater_than, greater_than_pred->Bind(*schema, true)); + + auto greater_equal_pred = + Expressions::GreaterThanOrEqual("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_greater_equal, + greater_equal_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionInclusive(*spec, *schema, bound_less_than, + Expression::Operation::kLtEq); + AssertProjectionInclusive(*spec, *schema, bound_less_equal, + Expression::Operation::kLtEq); + AssertProjectionInclusive(*spec, *schema, bound_greater_than, + Expression::Operation::kGtEq); + AssertProjectionInclusive(*spec, *schema, bound_greater_equal, + Expression::Operation::kGtEq); + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq); +} + +TEST_F(DateProjectionTest, MonthStrict) { + int32_t date_value = + TemporalTestHelper::CreateDate({.year = 2017, .month = 1, .day = 1}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "date", date())}, 0); + auto month_transform = Transform::Month(); + PartitionField pt_field(1, 1000, "date_month", month_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionStrict(*spec, *schema, bound_less_than, Expression::Operation::kLt); + AssertProjectionStrict(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(DateProjectionTest, MonthInclusive) { + int32_t date_value = + TemporalTestHelper::CreateDate({.year = 2017, .month = 1, .day = 1}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "date", date())}, 0); + auto month_transform = Transform::Month(); + PartitionField pt_field(1, 1000, "date_month", month_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionInclusive(*spec, *schema, bound_less_than, + Expression::Operation::kLtEq); + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq); +} + +TEST_F(DateProjectionTest, YearStrict) { + int32_t date_value = + TemporalTestHelper::CreateDate({.year = 2017, .month = 1, .day = 1}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "date", date())}, 0); + auto year_transform = Transform::Year(); + PartitionField pt_field(1, 1000, "date_year", year_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionStrict(*spec, *schema, bound_less_than, Expression::Operation::kLt); + AssertProjectionStrict(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(DateProjectionTest, YearInclusive) { + int32_t date_value = + TemporalTestHelper::CreateDate({.year = 2017, .month = 1, .day = 1}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "date", date())}, 0); + auto year_transform = Transform::Year(); + PartitionField pt_field(1, 1000, "date_year", year_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("date", Literal::Date(date_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionInclusive(*spec, *schema, bound_less_than, + Expression::Operation::kLtEq); + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq); +} + +// Timestamp projection tests +class TimestampProjectionTest : public ::testing::Test { + protected: + void AssertProjectionStrict(const PartitionSpec& spec, const Schema& schema, + const std::shared_ptr& filter, + Expression::Operation expected_op) { + auto evaluator = Projections::Strict(spec, schema, true); + ICEBERG_UNWRAP_OR_FAIL(auto projection, evaluator->Project(filter)); + AssertProjectionOperation(projection, expected_op); + } + + void AssertProjectionInclusive(const PartitionSpec& spec, const Schema& schema, + const std::shared_ptr& filter, + Expression::Operation expected_op) { + auto evaluator = Projections::Inclusive(spec, schema, true); + ICEBERG_UNWRAP_OR_FAIL(auto projection, evaluator->Project(filter)); + AssertProjectionOperation(projection, expected_op); + } +}; + +TEST_F(TimestampProjectionTest, DayStrict) { + int64_t ts_value = TemporalTestHelper::CreateTimestamp({.year = 2017, + .month = 12, + .day = 1, + .hour = 0, + .minute = 0, + .second = 0, + .microsecond = 0}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "timestamp", timestamp())}, + 0); + auto day_transform = Transform::Day(); + PartitionField pt_field(1, 1000, "timestamp_day", day_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionStrict(*spec, *schema, bound_less_than, Expression::Operation::kLt); + AssertProjectionStrict(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(TimestampProjectionTest, DayInclusive) { + int64_t ts_value = TemporalTestHelper::CreateTimestamp({.year = 2017, + .month = 12, + .day = 1, + .hour = 0, + .minute = 0, + .second = 0, + .microsecond = 0}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "timestamp", timestamp())}, + 0); + auto day_transform = Transform::Day(); + PartitionField pt_field(1, 1000, "timestamp_day", day_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionInclusive(*spec, *schema, bound_less_than, + Expression::Operation::kLtEq); + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq); +} + +TEST_F(TimestampProjectionTest, MonthStrict) { + int64_t ts_value = TemporalTestHelper::CreateTimestamp({.year = 2017, + .month = 12, + .day = 1, + .hour = 0, + .minute = 0, + .second = 0, + .microsecond = 0}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "timestamp", timestamp())}, + 0); + auto month_transform = Transform::Month(); + PartitionField pt_field(1, 1000, "timestamp_month", month_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionStrict(*spec, *schema, bound_less_than, Expression::Operation::kLt); + AssertProjectionStrict(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(TimestampProjectionTest, MonthInclusive) { + int64_t ts_value = TemporalTestHelper::CreateTimestamp({.year = 2017, + .month = 12, + .day = 1, + .hour = 0, + .minute = 0, + .second = 0, + .microsecond = 0}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "timestamp", timestamp())}, + 0); + auto month_transform = Transform::Month(); + PartitionField pt_field(1, 1000, "timestamp_month", month_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionInclusive(*spec, *schema, bound_less_than, + Expression::Operation::kLtEq); + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq); +} + +TEST_F(TimestampProjectionTest, YearStrict) { + int64_t ts_value = TemporalTestHelper::CreateTimestamp({.year = 2017, + .month = 1, + .day = 1, + .hour = 0, + .minute = 0, + .second = 0, + .microsecond = 0}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "timestamp", timestamp())}, + 0); + auto year_transform = Transform::Year(); + PartitionField pt_field(1, 1000, "timestamp_year", year_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionStrict(*spec, *schema, bound_less_than, Expression::Operation::kLt); + AssertProjectionStrict(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(TimestampProjectionTest, YearInclusive) { + int64_t ts_value = TemporalTestHelper::CreateTimestamp({.year = 2017, + .month = 1, + .day = 1, + .hour = 0, + .minute = 0, + .second = 0, + .microsecond = 0}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "timestamp", timestamp())}, + 0); + auto year_transform = Transform::Year(); + PartitionField pt_field(1, 1000, "timestamp_year", year_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionInclusive(*spec, *schema, bound_less_than, + Expression::Operation::kLtEq); + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq); +} + +TEST_F(TimestampProjectionTest, HourStrict) { + int64_t ts_value = TemporalTestHelper::CreateTimestamp({.year = 2017, + .month = 12, + .day = 1, + .hour = 10, + .minute = 0, + .second = 0, + .microsecond = 0}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "timestamp", timestamp())}, + 0); + auto hour_transform = Transform::Hour(); + PartitionField pt_field(1, 1000, "timestamp_hour", hour_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionStrict(*spec, *schema, bound_less_than, Expression::Operation::kLt); + AssertProjectionStrict(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(TimestampProjectionTest, HourInclusive) { + int64_t ts_value = TemporalTestHelper::CreateTimestamp({.year = 2017, + .month = 12, + .day = 1, + .hour = 10, + .minute = 0, + .second = 0, + .microsecond = 0}); + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "timestamp", timestamp())}, + 0); + auto hour_transform = Transform::Hour(); + PartitionField pt_field(1, 1000, "timestamp_hour", hour_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("timestamp", Literal::Timestamp(ts_value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionInclusive(*spec, *schema, bound_less_than, + Expression::Operation::kLtEq); + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq); +} + +// Truncate projection tests +class TruncateProjectionTest : public ::testing::Test { + protected: + void AssertProjectionStrict(const PartitionSpec& spec, const Schema& schema, + const std::shared_ptr& filter, + Expression::Operation expected_op) { + auto evaluator = Projections::Strict(spec, schema, true); + ICEBERG_UNWRAP_OR_FAIL(auto projection, evaluator->Project(filter)); + AssertProjectionOperation(projection, expected_op); + } + + void AssertProjectionInclusive(const PartitionSpec& spec, const Schema& schema, + const std::shared_ptr& filter, + Expression::Operation expected_op) { + auto evaluator = Projections::Inclusive(spec, schema, true); + ICEBERG_UNWRAP_OR_FAIL(auto projection, evaluator->Project(filter)); + AssertProjectionOperation(projection, expected_op); + } +}; + +TEST_F(TruncateProjectionTest, IntegerStrict) { + int32_t value = 100; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", int32())}, 0); + auto truncate_transform = Transform::Truncate(10); + PartitionField pt_field(1, 1000, "value_trunc", truncate_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionStrict(*spec, *schema, bound_less_than, Expression::Operation::kLt); + AssertProjectionStrict(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(TruncateProjectionTest, IntegerInclusive) { + int32_t value = 100; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", int32())}, 0); + auto truncate_transform = Transform::Truncate(10); + PartitionField pt_field(1, 1000, "value_trunc", truncate_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("value", Literal::Int(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionInclusive(*spec, *schema, bound_less_than, + Expression::Operation::kLtEq); + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq); +} + +TEST_F(TruncateProjectionTest, LongStrict) { + int64_t value = 100L; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", int64())}, 0); + auto truncate_transform = Transform::Truncate(10); + PartitionField pt_field(1, 1000, "value_trunc", truncate_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("value", Literal::Long(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("value", Literal::Long(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionStrict(*spec, *schema, bound_less_than, Expression::Operation::kLt); + AssertProjectionStrict(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(TruncateProjectionTest, LongInclusive) { + int64_t value = 100L; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", int64())}, 0); + auto truncate_transform = Transform::Truncate(10); + PartitionField pt_field(1, 1000, "value_trunc", truncate_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("value", Literal::Long(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("value", Literal::Long(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionInclusive(*spec, *schema, bound_less_than, + Expression::Operation::kLtEq); + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq); +} + +TEST_F(TruncateProjectionTest, StringStrict) { + std::string value = "abcdefg"; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", string())}, 0); + auto truncate_transform = Transform::Truncate(5); + PartitionField pt_field(1, 1000, "value_trunc", truncate_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("value", Literal::String(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("value", Literal::String(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionStrict(*spec, *schema, bound_less_than, Expression::Operation::kLt); + AssertProjectionStrict(*spec, *schema, bound_equal, Expression::Operation::kFalse); +} + +TEST_F(TruncateProjectionTest, StringInclusive) { + std::string value = "abcdefg"; + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(1, "value", string())}, 0); + auto truncate_transform = Transform::Truncate(5); + PartitionField pt_field(1, 1000, "value_trunc", truncate_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(0, {pt_field})); + + auto less_than_pred = Expressions::LessThan("value", Literal::String(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_less_than, less_than_pred->Bind(*schema, true)); + + auto equal_pred = Expressions::Equal("value", Literal::String(value)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_equal, equal_pred->Bind(*schema, true)); + + AssertProjectionInclusive(*spec, *schema, bound_less_than, + Expression::Operation::kLtEq); + AssertProjectionInclusive(*spec, *schema, bound_equal, Expression::Operation::kEq); +} + +// Complex expression tests +TEST_F(ProjectionsTest, ComplexExpressionWithOr) { + auto schema = std::make_shared( + std::vector{ + SchemaField::MakeRequired(1, "id", int64()), + SchemaField::MakeOptional(2, "data", string()), + SchemaField::MakeRequired(3, "hour", int32()), + SchemaField::MakeRequired(4, "dateint", int32()), + }, + 0); + + auto identity_transform = Transform::Identity(); + PartitionField pt_field(4, 1000, "dateint", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(*schema, 0, {pt_field}, false)); + + // Create filter: dateint = 20180416 OR ((dateint = 20180415 AND hour >= 20) OR + // (dateint = 20180417 AND hour <= 4)) + auto dateint_eq1 = Expressions::Equal("dateint", Literal::Int(20180416)); + auto dateint_eq2 = Expressions::Equal("dateint", Literal::Int(20180415)); + auto hour_ge = Expressions::GreaterThanOrEqual("hour", Literal::Int(20)); + auto dateint_eq3 = Expressions::Equal("dateint", Literal::Int(20180417)); + auto hour_le = Expressions::LessThanOrEqual("hour", Literal::Int(4)); + + auto and1 = Expressions::And(dateint_eq2, hour_ge); + auto and2 = Expressions::And(dateint_eq3, hour_le); + auto or1 = Expressions::Or(and1, and2); + auto filter = Expressions::Or(dateint_eq1, or1); + + // Project + auto evaluator = Projections::Inclusive(*spec, *schema, true); + ICEBERG_UNWRAP_OR_FAIL(auto projection, evaluator->Project(filter)); + + // The projection should be an OR expression + // Non-partition predicates (hour) are removed, and AND expressions simplify + // Expected: dateint = 20180416 OR (dateint = 20180415 OR dateint = 20180417) + EXPECT_EQ(projection->op(), Expression::Operation::kOr); + + auto or_expr = internal::checked_pointer_cast(projection); + + // Left side: dateint = 20180416 + auto dateint1_expr = + std::dynamic_pointer_cast>(or_expr->left()); + EXPECT_EQ(dateint1_expr->reference()->name(), "dateint"); + EXPECT_EQ(dateint1_expr->op(), Expression::Operation::kEq); + EXPECT_EQ(dateint1_expr->literals().front(), Literal::Int(20180416)); + + // Right side: OR of the two dateint predicates (AND expressions simplified) + auto or1_expr = internal::checked_pointer_cast(or_expr->right()); + EXPECT_EQ(or1_expr->op(), Expression::Operation::kOr); + + // Left of inner OR: dateint = 20180415 (simplified from AND with hour >= 20) + auto dateint2_expr = + std::dynamic_pointer_cast>(or1_expr->left()); + EXPECT_EQ(dateint2_expr->reference()->name(), "dateint"); + EXPECT_EQ(dateint2_expr->op(), Expression::Operation::kEq); + EXPECT_EQ(dateint2_expr->literals().front(), Literal::Int(20180415)); + + // Right of inner OR: dateint = 20180417 (simplified from AND with hour <= 4) + auto dateint3_expr = + std::dynamic_pointer_cast>(or1_expr->right()); + EXPECT_EQ(dateint3_expr->reference()->name(), "dateint"); + EXPECT_EQ(dateint3_expr->op(), Expression::Operation::kEq); + EXPECT_EQ(dateint3_expr->literals().front(), Literal::Int(20180417)); +} + +} // namespace iceberg