Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion src/iceberg/sort_order.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@
#include "iceberg/sort_order.h"

#include <format>
#include <optional>
#include <ranges>

#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/sort_field.h"
#include "iceberg/transform.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
#include "iceberg/util/macros.h"

namespace iceberg {

Expand All @@ -31,7 +37,7 @@ SortOrder::SortOrder(int32_t order_id, std::vector<SortField> fields)

const std::shared_ptr<SortOrder>& SortOrder::Unsorted() {
static const std::shared_ptr<SortOrder> unsorted =
std::make_shared<SortOrder>(/*order_id=*/0, std::vector<SortField>{});
std::make_shared<SortOrder>(kUnsortedOrderId, std::vector<SortField>{});
return unsorted;
}

Expand Down Expand Up @@ -80,4 +86,60 @@ bool SortOrder::Equals(const SortOrder& other) const {
return order_id_ == other.order_id_ && fields_ == other.fields_;
}

Status SortOrder::Validate(const Schema& schema) const {
for (const auto& field : fields_) {
auto schema_field = schema.FindFieldById(field.source_id());
if (!schema_field.has_value() || schema_field.value() == std::nullopt) {
return InvalidArgument("Cannot find schema field for sort field: {}", field);
}

const auto& source_type = schema_field.value().value().get().type();

if (!field.transform()->CanTransform(*source_type)) {
return InvalidArgument("Cannot transform schema field type {} with transform {}",
source_type->ToString(), field.transform()->ToString());
}
}
return {};
}

Result<std::unique_ptr<SortOrder>> SortOrder::Make(const Schema& schema, int32_t sort_id,
std::vector<SortField> fields) {
if (!fields.empty() && sort_id == kUnsortedOrderId) [[unlikely]] {
return InvalidArgument("{} is reserved for unsorted sort order", kUnsortedOrderId);
}

if (fields.empty() && sort_id != kUnsortedOrderId) [[unlikely]] {
return InvalidArgument("Sort order must have at least one sort field");
}

for (const auto& field : fields) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldById(field.source_id()));
if (schema_field == std::nullopt) {
return InvalidArgument("Cannot find schema field for sort field: {}", field);
}
Comment on lines 118 to 120
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (schema_field == std::nullopt) {
return InvalidArgument("Cannot find schema field for sort field: {}", field);
}
if (schema_field == std::nullopt) {
continue;
}

I think we should not error out if the field is missing. The Java impl is as below:

  public SortOrder bind(Schema schema) {
    SortOrder.Builder builder = SortOrder.builderFor(schema).withOrderId(orderId);

    for (UnboundSortField field : fields) {
      Type sourceType = schema.findType(field.sourceId);
      Transform<?, ?> transform;
      if (sourceType != null) {
        transform = Transforms.fromString(sourceType, field.transform.toString());
      } else {
        transform = field.transform;
      }
      builder.addSortField(transform, field.sourceId, field.direction, field.nullOrder);
    }

    return builder.build();
  }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, UnboundSortOrder doesn't check, but checkCompatibility should do the check, see [1].

[1] https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/SortOrder.java#L300-L302

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!


const auto& source_type = schema_field.value().get().type();
if (field.transform()->CanTransform(*source_type) == false) {
return InvalidArgument("Cannot transform schema field type {} with transform {}",
source_type->ToString(), field.transform()->ToString());
}
}

return std::make_unique<SortOrder>(sort_id, std::move(fields));
}

Result<std::unique_ptr<SortOrder>> SortOrder::Make(int32_t sort_id,
std::vector<SortField> fields) {
if (!fields.empty() && sort_id == kUnsortedOrderId) [[unlikely]] {
return InvalidArgument("{} is reserved for unsorted sort order", kUnsortedOrderId);
}

if (fields.empty() && sort_id != kUnsortedOrderId) [[unlikely]] {
return InvalidArgument("Sort order must have at least one sort field");
}

return std::make_unique<SortOrder>(sort_id, std::move(fields));
}

} // namespace iceberg
25 changes: 25 additions & 0 deletions src/iceberg/sort_order.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
#pragma once

#include <cstdint>
#include <memory>
#include <span>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/sort_field.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/formattable.h"

namespace iceberg {
Expand All @@ -36,6 +38,7 @@ namespace iceberg {
/// applied to the data.
class ICEBERG_EXPORT SortOrder : public util::Formattable {
public:
static constexpr int32_t kUnsortedOrderId = 0;
static constexpr int32_t kInitialSortOrderId = 1;

SortOrder(int32_t order_id, std::vector<SortField> fields);
Expand Down Expand Up @@ -69,6 +72,28 @@ class ICEBERG_EXPORT SortOrder : public util::Formattable {
return lhs.Equals(rhs);
}

/// \brief Validates the sort order against a schema.
/// \param schema The schema to validate against.
/// \return Error status if the sort order is not bound to the schema.
Status Validate(const Schema& schema) const;

/// \brief Create a SortOrder.
/// \param schema The schema to bind the sort order to.
/// \param sort_id The sort order id.
/// \param fields The sort fields.
/// \return A Result containing the SortOrder or an error.
static Result<std::unique_ptr<SortOrder>> Make(const Schema& schema, int32_t sort_id,
std::vector<SortField> fields);

/// \brief Create a SortOrder without binding to a schema.
/// \param sort_id The sort order id.
/// \param fields The sort fields.
/// \return A Result containing the SortOrder or an error.
/// \note This method does not check whether the sort fields are valid for any schema.
/// Use IsBoundToSchema to check if the sort order is valid for a given schema.
static Result<std::unique_ptr<SortOrder>> Make(int32_t sort_id,
std::vector<SortField> fields);

private:
/// \brief Compare two sort orders for equality.
bool Equals(const SortOrder& other) const;
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/test/schema_field_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ TEST(SchemaFieldTest, Equality) {
iceberg::SchemaField field1(1, "foo", iceberg::int32(), false);
iceberg::SchemaField field2(2, "foo", iceberg::int32(), false);
iceberg::SchemaField field3(1, "bar", iceberg::int32(), false);
iceberg::SchemaField field4(1, "foo", std::make_shared<iceberg::LongType>(), false);
iceberg::SchemaField field4(1, "foo", iceberg::int64(), false);
iceberg::SchemaField field5(1, "foo", iceberg::int32(), true);
iceberg::SchemaField field6(1, "foo", iceberg::int32(), false);

Expand Down
1 change: 0 additions & 1 deletion src/iceberg/test/schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "gtest/gtest.h"
#include "iceberg/result.h"
#include "iceberg/schema_field.h"
#include "iceberg/test/matchers.h"
Expand Down
109 changes: 109 additions & 0 deletions src/iceberg/test/sort_order_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,40 @@

#include "iceberg/schema.h"
#include "iceberg/sort_field.h"
#include "iceberg/test/matchers.h"
#include "iceberg/transform.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep

namespace iceberg {

class SortOrderMakeTest : public ::testing::Test {
protected:
void SetUp() override {
field1_ = std::make_unique<SchemaField>(1, "x", int32(), true);
field2_ = std::make_unique<SchemaField>(2, "y", string(), true);
field3_ = std::make_unique<SchemaField>(3, "time", timestamp(), true);

schema_ = std::make_unique<Schema>(
std::vector<SchemaField>{*field1_, *field2_, *field3_}, 1);

sort_field1_ = std::make_unique<SortField>(
1, Transform::Identity(), SortDirection::kAscending, NullOrder::kFirst);
sort_field2_ = std::make_unique<SortField>(
2, Transform::Bucket(10), SortDirection::kDescending, NullOrder::kLast);
sort_field3_ = std::make_unique<SortField>(
3, Transform::Day(), SortDirection::kAscending, NullOrder::kFirst);
}

std::unique_ptr<Schema> schema_;
std::unique_ptr<SchemaField> field1_;
std::unique_ptr<SchemaField> field2_;
std::unique_ptr<SchemaField> field3_;

std::unique_ptr<SortField> sort_field1_;
std::unique_ptr<SortField> sort_field2_;
std::unique_ptr<SortField> sort_field3_;
};

TEST(SortOrderTest, Basics) {
{
SchemaField field1(5, "ts", iceberg::timestamp(), true);
Expand Down Expand Up @@ -148,4 +177,84 @@ TEST(SortOrderTest, Satisfies) {
EXPECT_FALSE(sort_order2.Satisfies(sort_order4));
}

TEST_F(SortOrderMakeTest, MakeValidSortOrder) {
ICEBERG_UNWRAP_OR_FAIL(
auto sort_order,
SortOrder::Make(*schema_, 1, std::vector<SortField>{*sort_field1_, *sort_field2_}));
ASSERT_NE(sort_order, nullptr);

EXPECT_TRUE(sort_order->is_sorted());
ASSERT_EQ(sort_order->fields().size(), 2);
EXPECT_EQ(sort_order->fields()[0], *sort_field1_);
EXPECT_EQ(sort_order->fields()[1], *sort_field2_);
}

TEST_F(SortOrderMakeTest, MakeInvalidSortOrderEmptyFields) {
auto sort_order = SortOrder::Make(*schema_, 1, std::vector<SortField>{});
EXPECT_THAT(sort_order, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(sort_order,
HasErrorMessage("Sort order must have at least one sort field"));
}

TEST_F(SortOrderMakeTest, MakeInvalidSortOrderUnsortedId) {
auto sort_order = SortOrder::Make(*schema_, SortOrder::kUnsortedOrderId,
std::vector<SortField>{*sort_field1_});
EXPECT_THAT(sort_order, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(sort_order,
HasErrorMessage(std::format("{} is reserved for unsorted sort order",
SortOrder::kUnsortedOrderId)));
}

TEST_F(SortOrderMakeTest, MakeValidUnsortedSortOrder) {
ICEBERG_UNWRAP_OR_FAIL(auto sort_order, SortOrder::Make(SortOrder::kUnsortedOrderId,
std::vector<SortField>{}));
ASSERT_NE(sort_order, nullptr);

EXPECT_TRUE(sort_order->is_unsorted());
EXPECT_EQ(sort_order->fields().size(), 0);
}

TEST_F(SortOrderMakeTest, MakeInvalidSortOrderNonPrimitiveField) {
auto struct_field = std::make_unique<SchemaField>(
4, "struct_field",
std::make_shared<StructType>(std::vector<SchemaField>{
SchemaField::MakeRequired(41, "inner_field", iceberg::int32()),
}),
true);

Schema schema_with_struct(
std::vector<SchemaField>{*field1_, *field2_, *field3_, *struct_field}, 1);

SortField sort_field_invalid(4, Transform::Identity(), SortDirection::kAscending,
NullOrder::kFirst);

auto sort_order = SortOrder::Make(
schema_with_struct, 1, std::vector<SortField>{*sort_field1_, sort_field_invalid});
EXPECT_THAT(sort_order, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(sort_order, HasErrorMessage("Cannot transform schema field type"));
}

TEST_F(SortOrderMakeTest, MakeInvalidSortOrderFieldNotInSchema) {
SortField sort_field_invalid(999, Transform::Identity(), SortDirection::kAscending,
NullOrder::kFirst);

auto sort_order = SortOrder::Make(
*schema_, 1, std::vector<SortField>{*sort_field1_, sort_field_invalid});
EXPECT_THAT(sort_order, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(sort_order, HasErrorMessage("Cannot find schema field for sort field"));
}

TEST_F(SortOrderMakeTest, MakeUnboundSortOrder) {
SortField sort_field_invalid(999, Transform::Identity(), SortDirection::kAscending,
NullOrder::kFirst);

auto sort_order =
SortOrder::Make(1, std::vector<SortField>{*sort_field1_, sort_field_invalid});
ASSERT_THAT(sort_order, IsOk());
auto validate_status = sort_order.value()->Validate(*schema_);
EXPECT_THAT(validate_status, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(validate_status,
HasErrorMessage("Cannot find schema field for sort field"));
}

} // namespace iceberg
73 changes: 73 additions & 0 deletions src/iceberg/test/transform_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <gtest/gtest.h>

#include "iceberg/expression/literal.h"
#include "iceberg/schema_field.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/temporal_test_helper.h"
#include "iceberg/type.h"
Expand Down Expand Up @@ -841,4 +842,76 @@ TEST(TransformSatisfiesOrderOfTest, SatisfiesOrderOf) {
}
}

TEST(TransformCanTransformTest, CanTransform) {
struct Case {
std::string transform_str;
std::shared_ptr<Type> source_type;
bool expected;
};

const std::vector<Case> cases = {
// Identity can transform all primitive types
{.transform_str = "identity", .source_type = int32(), .expected = true},
{.transform_str = "identity", .source_type = string(), .expected = true},
{.transform_str = "identity", .source_type = boolean(), .expected = true},
{.transform_str = "identity",
.source_type = list(SchemaField(123, "element", int32(), false)),
.expected = false},

// Void can transform any type
{.transform_str = "void", .source_type = iceberg::int32(), .expected = true},
{.transform_str = "void",
.source_type = iceberg::map(SchemaField(123, "key", iceberg::string(), false),
SchemaField(124, "value", iceberg::int32(), true)),
.expected = true},

// Bucket can transform specific types
{.transform_str = "bucket[16]", .source_type = iceberg::int32(), .expected = true},
{.transform_str = "bucket[16]", .source_type = iceberg::string(), .expected = true},
{.transform_str = "bucket[16]",
.source_type = iceberg::float32(),
.expected = false},

// Truncate can transform specific types
{.transform_str = "truncate[32]",
.source_type = iceberg::int32(),
.expected = true},
{.transform_str = "truncate[32]",
.source_type = iceberg::string(),
.expected = true},
{.transform_str = "truncate[32]",
.source_type = iceberg::boolean(),
.expected = false},

// Year can transform date and timestamp types
{.transform_str = "year", .source_type = iceberg::date(), .expected = true},
{.transform_str = "year", .source_type = iceberg::timestamp(), .expected = true},
{.transform_str = "year", .source_type = iceberg::string(), .expected = false},

// Month can transform date and timestamp types
{.transform_str = "month", .source_type = iceberg::date(), .expected = true},
{.transform_str = "month", .source_type = iceberg::timestamp(), .expected = true},
{.transform_str = "month", .source_type = iceberg::binary(), .expected = false},

// Day can transform date and timestamp types
{.transform_str = "day", .source_type = iceberg::date(), .expected = true},
{.transform_str = "day", .source_type = iceberg::timestamp(), .expected = true},
{.transform_str = "day", .source_type = iceberg::uuid(), .expected = false},

// Hour can transform timestamp types
{.transform_str = "hour", .source_type = iceberg::timestamp(), .expected = true},
{.transform_str = "hour", .source_type = iceberg::timestamp_tz(), .expected = true},
{.transform_str = "hour", .source_type = iceberg::int32(), .expected = false},
};

for (const auto& c : cases) {
auto transform = TransformFromString(c.transform_str);
ASSERT_TRUE(transform.has_value()) << "Failed to parse: " << c.transform_str;

EXPECT_EQ(transform.value()->CanTransform(*c.source_type), c.expected)
<< "Unexpected result for transform: " << c.transform_str
<< " and source type: " << c.source_type->ToString();
}
}

} // namespace iceberg
Loading
Loading