Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion src/iceberg/sort_order.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@
#include "iceberg/sort_order.h"

#include <format>
#include <optional>
#include <ranges>

#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/sort_field.h"
#include "iceberg/transform.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
#include "iceberg/util/macros.h"

namespace iceberg {

Expand All @@ -31,7 +37,7 @@ SortOrder::SortOrder(int32_t order_id, std::vector<SortField> fields)

const std::shared_ptr<SortOrder>& SortOrder::Unsorted() {
static const std::shared_ptr<SortOrder> unsorted =
std::make_shared<SortOrder>(/*order_id=*/0, std::vector<SortField>{});
std::make_shared<SortOrder>(kUnsortedOrderId, std::vector<SortField>{});
return unsorted;
}

Expand Down Expand Up @@ -80,4 +86,66 @@ bool SortOrder::Equals(const SortOrder& other) const {
return order_id_ == other.order_id_ && fields_ == other.fields_;
}

bool SortOrder::IsBoundToSchema(const Schema& schema) const {
for (const auto& field : fields_) {
auto schema_field = schema.FindFieldById(field.source_id());
if (!schema_field.has_value() || schema_field.value() == std::nullopt) {
return false;
}

const auto& source_type = schema_field.value().value().get().type();
if (!source_type->is_primitive()) {
return false;
}

auto result = field.transform()->ResultType(source_type);
if (!result) {
return false;
}
}
return true;
}

Result<std::unique_ptr<SortOrder>> SortOrder::Make(const Schema& schema, int32_t sort_id,
std::vector<SortField> fields) {
if (!fields.empty() && sort_id == kUnsortedOrderId) [[unlikely]] {
return InvalidArgument("{} is reserved for unsorted sort order", kUnsortedOrderId);
}

if (fields.empty() && sort_id != kUnsortedOrderId) [[unlikely]] {
return InvalidArgument("Sort order must have at least one sort field");
}

for (const auto& field : fields) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldById(field.source_id()));
if (schema_field == std::nullopt) {
return InvalidArgument("Cannot find source column for sort field: {}", field);
}

const auto& source_type = schema_field.value().get().type();

if (!source_type->is_primitive()) {
return InvalidArgument("Cannot sort by non-primitive source field: {}",
*source_type);
}

ICEBERG_RETURN_UNEXPECTED(field.transform()->ResultType(source_type));
}

return std::make_unique<SortOrder>(sort_id, std::move(fields));
}

Result<std::unique_ptr<SortOrder>> SortOrder::Make(int32_t sort_id,
std::vector<SortField> fields) {
if (!fields.empty() && sort_id == kUnsortedOrderId) [[unlikely]] {
return InvalidArgument("{} is reserved for unsorted sort order", kUnsortedOrderId);
}

if (fields.empty() && sort_id != kUnsortedOrderId) [[unlikely]] {
return InvalidArgument("Sort order must have at least one sort field");
}

return std::make_unique<SortOrder>(sort_id, std::move(fields));
}

} // namespace iceberg
25 changes: 25 additions & 0 deletions src/iceberg/sort_order.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
#pragma once

#include <cstdint>
#include <memory>
#include <span>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/sort_field.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/formattable.h"

namespace iceberg {
Expand All @@ -36,6 +38,7 @@ namespace iceberg {
/// applied to the data.
class ICEBERG_EXPORT SortOrder : public util::Formattable {
public:
static constexpr int32_t kUnsortedOrderId = 0;
static constexpr int32_t kInitialSortOrderId = 1;

SortOrder(int32_t order_id, std::vector<SortField> fields);
Expand Down Expand Up @@ -69,6 +72,28 @@ class ICEBERG_EXPORT SortOrder : public util::Formattable {
return lhs.Equals(rhs);
}

/// \brief Checks whether the sort order is bound to the given schema.
/// \param schema The schema to check against.
/// \return true if the sort order is valid for the given schema.
bool IsBoundToSchema(const Schema& schema) const;

/// \brief Create a SortOrder.
/// \param schema The schema to bind the sort order to.
/// \param sort_id The sort order id.
/// \param fields The sort fields.
/// \return A Result containing the SortOrder or an error.
static Result<std::unique_ptr<SortOrder>> Make(const Schema& schema, int32_t sort_id,
std::vector<SortField> fields);

/// \brief Create a SortOrder without binding to a schema.
/// \param sort_id The sort order id.
/// \param fields The sort fields.
/// \return A Result containing the SortOrder or an error.
/// \note This method does not check whether the sort fields are valid for any schema.
/// Use IsBoundToSchema to check if the sort order is valid for a given schema.
static Result<std::unique_ptr<SortOrder>> Make(int32_t sort_id,
std::vector<SortField> fields);

private:
/// \brief Compare two sort orders for equality.
bool Equals(const SortOrder& other) const;
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/test/schema_field_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ TEST(SchemaFieldTest, Equality) {
iceberg::SchemaField field1(1, "foo", iceberg::int32(), false);
iceberg::SchemaField field2(2, "foo", iceberg::int32(), false);
iceberg::SchemaField field3(1, "bar", iceberg::int32(), false);
iceberg::SchemaField field4(1, "foo", std::make_shared<iceberg::LongType>(), false);
iceberg::SchemaField field4(1, "foo", iceberg::int64(), false);
iceberg::SchemaField field5(1, "foo", iceberg::int32(), true);
iceberg::SchemaField field6(1, "foo", iceberg::int32(), false);

Expand Down
1 change: 0 additions & 1 deletion src/iceberg/test/schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "gtest/gtest.h"
#include "iceberg/result.h"
#include "iceberg/schema_field.h"
#include "iceberg/test/matchers.h"
Expand Down
106 changes: 106 additions & 0 deletions src/iceberg/test/sort_order_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,40 @@

#include "iceberg/schema.h"
#include "iceberg/sort_field.h"
#include "iceberg/test/matchers.h"
#include "iceberg/transform.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep

namespace iceberg {

class SortOrderMakeTest : public ::testing::Test {
protected:
void SetUp() override {
field1_ = std::make_unique<SchemaField>(1, "x", int32(), true);
field2_ = std::make_unique<SchemaField>(2, "y", string(), true);
field3_ = std::make_unique<SchemaField>(3, "time", timestamp(), true);

schema_ = std::make_unique<Schema>(
std::vector<SchemaField>{*field1_, *field2_, *field3_}, 1);

sort_field1_ = std::make_unique<SortField>(
1, Transform::Identity(), SortDirection::kAscending, NullOrder::kFirst);
sort_field2_ = std::make_unique<SortField>(
2, Transform::Bucket(10), SortDirection::kDescending, NullOrder::kLast);
sort_field3_ = std::make_unique<SortField>(
3, Transform::Day(), SortDirection::kAscending, NullOrder::kFirst);
}

std::unique_ptr<Schema> schema_;
std::unique_ptr<SchemaField> field1_;
std::unique_ptr<SchemaField> field2_;
std::unique_ptr<SchemaField> field3_;

std::unique_ptr<SortField> sort_field1_;
std::unique_ptr<SortField> sort_field2_;
std::unique_ptr<SortField> sort_field3_;
};

TEST(SortOrderTest, Basics) {
{
SchemaField field1(5, "ts", iceberg::timestamp(), true);
Expand Down Expand Up @@ -148,4 +177,81 @@ TEST(SortOrderTest, Satisfies) {
EXPECT_FALSE(sort_order2.Satisfies(sort_order4));
}

TEST_F(SortOrderMakeTest, MakeValidSortOrder) {
ICEBERG_UNWRAP_OR_FAIL(
auto sort_order,
SortOrder::Make(*schema_, 1, std::vector<SortField>{*sort_field1_, *sort_field2_}));
ASSERT_NE(sort_order, nullptr);

EXPECT_TRUE(sort_order->is_sorted());
ASSERT_EQ(sort_order->fields().size(), 2);
EXPECT_EQ(sort_order->fields()[0], *sort_field1_);
EXPECT_EQ(sort_order->fields()[1], *sort_field2_);
}

TEST_F(SortOrderMakeTest, MakeInvalidSortOrderEmptyFields) {
auto sort_order = SortOrder::Make(*schema_, 1, std::vector<SortField>{});
EXPECT_THAT(sort_order, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(sort_order,
HasErrorMessage("Sort order must have at least one sort field"));
}

TEST_F(SortOrderMakeTest, MakeInvalidSortOrderUnsortedId) {
auto sort_order = SortOrder::Make(*schema_, SortOrder::kUnsortedOrderId,
std::vector<SortField>{*sort_field1_});
EXPECT_THAT(sort_order, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(sort_order,
HasErrorMessage(std::format("{} is reserved for unsorted sort order",
SortOrder::kUnsortedOrderId)));
}

TEST_F(SortOrderMakeTest, MakeValidUnsortedSortOrder) {
ICEBERG_UNWRAP_OR_FAIL(auto sort_order, SortOrder::Make(SortOrder::kUnsortedOrderId,
std::vector<SortField>{}));
ASSERT_NE(sort_order, nullptr);

EXPECT_TRUE(sort_order->is_unsorted());
EXPECT_EQ(sort_order->fields().size(), 0);
}

TEST_F(SortOrderMakeTest, MakeInvalidSortOrderNonPrimitiveField) {
auto struct_field = std::make_unique<SchemaField>(
4, "struct_field",
std::make_shared<StructType>(std::vector<SchemaField>{
SchemaField::MakeRequired(41, "inner_field", iceberg::int32()),
}),
true);

Schema schema_with_struct(
std::vector<SchemaField>{*field1_, *field2_, *field3_, *struct_field}, 1);

SortField sort_field_invalid(4, Transform::Identity(), SortDirection::kAscending,
NullOrder::kFirst);

auto sort_order = SortOrder::Make(
schema_with_struct, 1, std::vector<SortField>{*sort_field1_, sort_field_invalid});
EXPECT_THAT(sort_order, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(sort_order, HasErrorMessage("Cannot sort by non-primitive source field"));
}

TEST_F(SortOrderMakeTest, MakeInvalidSortOrderFieldNotInSchema) {
SortField sort_field_invalid(999, Transform::Identity(), SortDirection::kAscending,
NullOrder::kFirst);

auto sort_order = SortOrder::Make(
*schema_, 1, std::vector<SortField>{*sort_field1_, sort_field_invalid});
EXPECT_THAT(sort_order, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(sort_order, HasErrorMessage("Cannot find source column for sort field:"));
}

TEST_F(SortOrderMakeTest, MakeUnboundSortOrder) {
SortField sort_field_invalid(999, Transform::Identity(), SortDirection::kAscending,
NullOrder::kFirst);

auto sort_order =
SortOrder::Make(1, std::vector<SortField>{*sort_field1_, sort_field_invalid});
ASSERT_THAT(sort_order, IsOk());
ASSERT_EQ(sort_order.value()->IsBoundToSchema(*schema_), false);
}

} // namespace iceberg
79 changes: 79 additions & 0 deletions src/iceberg/transform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <regex>
#include <utility>

#include "iceberg/result.h"
#include "iceberg/transform_function.h"
#include "iceberg/type.h"

Expand Down Expand Up @@ -125,6 +126,84 @@ Result<std::shared_ptr<TransformFunction>> Transform::Bind(
}
}

Result<std::shared_ptr<Type>> Transform::ResultType(
const std::shared_ptr<Type>& source_type) const {
switch (transform_type_) {
case TransformType::kIdentity:
if (!source_type->is_primitive()) [[unlikely]] {
return InvalidArgument("{} is not a valid input type of identity transform",
source_type->ToString());
}
return source_type;
case TransformType::kVoid:
return source_type;
case TransformType::kUnknown:
return string();
case TransformType::kBucket:
switch (source_type->type_id()) {
case TypeId::kInt:
case TypeId::kLong:
case TypeId::kDecimal:
case TypeId::kDate:
case TypeId::kTime:
case TypeId::kTimestamp:
case TypeId::kTimestampTz:
case TypeId::kString:
case TypeId::kUuid:
case TypeId::kFixed:
case TypeId::kBinary:
return int32();
default:
return InvalidArgument("{} is not a valid input type of bucket transform",
source_type->ToString());
}
case TransformType::kTruncate:
switch (source_type->type_id()) {
case TypeId::kInt:
case TypeId::kLong:
case TypeId::kString:
case TypeId::kBinary:
case TypeId::kDecimal:
return source_type;
default:
return InvalidArgument("{} is not a valid input type of truncate transform",
source_type->ToString());
}
case TransformType::kYear:
case TransformType::kMonth:
switch (source_type->type_id()) {
case TypeId::kDate:
case TypeId::kTimestamp:
case TypeId::kTimestampTz:
return int32();
default:
return InvalidArgument("{} is not a valid input type of {} transform",
source_type->ToString(), this->ToString());
}
case TransformType::kDay:
switch (source_type->type_id()) {
case TypeId::kDate:
case TypeId::kTimestamp:
case TypeId::kTimestampTz:
return date();
default:
return InvalidArgument("{} is not a valid input type of day transform",
source_type->ToString());
}
case TransformType::kHour:
switch (source_type->type_id()) {
case TypeId::kTimestamp:
case TypeId::kTimestampTz:
return int32();
default:
return InvalidArgument("{} is not a valid input type of hour transform",
source_type->ToString());
}
default:
std::unreachable();
}
}

bool Transform::PreservesOrder() const {
switch (transform_type_) {
case TransformType::kUnknown:
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ class ICEBERG_EXPORT Transform : public util::Formattable {
Result<std::shared_ptr<TransformFunction>> Bind(
const std::shared_ptr<Type>& source_type) const;

/// \brief Returns the Type produced by this transform given a source type.
Result<std::shared_ptr<Type>> ResultType(
const std::shared_ptr<Type>& source_type) const;

/// \brief Whether the transform preserves the order of values (is monotonic).
bool PreservesOrder() const;

Expand Down
Loading