Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,10 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
ICEBERG_ASSIGN_OR_RAISE(auto partition_field, PartitionFieldFromJson(field_json));
partition_fields.push_back(std::move(*partition_field));
}
// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
// partition field from schema and then verify it
return std::make_unique<PartitionSpec>(spec_id, std::move(partition_fields));
ICEBERG_ASSIGN_OR_RAISE(
auto spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
true /*allow_missing_fields*/));
return spec;
}

Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
Expand Down Expand Up @@ -917,10 +918,10 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
std::move(field->transform()));
}

// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
// partition field from schema and then verify it
auto spec =
std::make_unique<PartitionSpec>(PartitionSpec::kInitialSpecId, std::move(fields));
// Create partition spec with schema validation
ICEBERG_ASSIGN_OR_RAISE(
auto spec, PartitionSpec::Make(*current_schema, PartitionSpec::kInitialSpecId,
std::move(fields), true /*allow_missing_fields*/));
default_spec_id = spec->spec_id();
partition_specs.push_back(std::move(spec));
}
Expand Down
1 change: 0 additions & 1 deletion src/iceberg/partition_field.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <format>

#include "iceberg/transform.h"
#include "iceberg/type.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep

namespace iceberg {
Expand Down
72 changes: 69 additions & 3 deletions src/iceberg/partition_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@
#include "iceberg/partition_spec.h"

#include <algorithm>
#include <cstdint>
#include <format>
#include <memory>
#include <ranges>
#include <unordered_map>

#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/schema_field.h"
#include "iceberg/transform.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
#include "iceberg/util/macros.h"
#include "iceberg/util/type_util.h"

namespace iceberg {

Expand All @@ -47,9 +51,8 @@ PartitionSpec::PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields
}

const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
static const std::shared_ptr<PartitionSpec> unpartitioned =
std::make_shared<PartitionSpec>(kInitialSpecId, std::vector<PartitionField>{},
kLegacyPartitionDataIdStart - 1);
static const std::shared_ptr<PartitionSpec> unpartitioned(new PartitionSpec(
kInitialSpecId, std::vector<PartitionField>{}, kLegacyPartitionDataIdStart - 1));
return unpartitioned;
}

Expand Down Expand Up @@ -104,4 +107,67 @@ bool PartitionSpec::Equals(const PartitionSpec& other) const {
return spec_id_ == other.spec_id_ && fields_ == other.fields_;
}

Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields) const {
std::unordered_map<int32_t, int32_t> parents = indexParents(schema);
for (const auto& partition_field : fields_) {
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
schema.FindFieldById(partition_field.source_id()));
// In the case the underlying field is dropped, we cannot check if they are compatible
if (allow_missing_fields && !source_field.has_value()) {
continue;
}
const auto& field_transform = partition_field.transform();

// In the case of a Version 1 partition-spec field gets deleted, it is replaced with a
// void transform, see: https://iceberg.apache.org/spec/#partition-transforms. We
// don't care about the source type since a VoidTransform is always compatible and
// skip the checks
if (field_transform->transform_type() != TransformType::kVoid) {
if (!source_field.has_value()) {
return InvalidArgument("Cannot find source column for partition field: {}",
partition_field);
}
const auto& source_type = source_field.value().get().type();
if (!field_transform->CanTransform(*source_type)) {
return InvalidArgument("Invalid source type {} for transform {}",
source_type->ToString(), field_transform->ToString());
}

// The only valid parent types for a PartitionField are StructTypes. This must be
// checked recursively.
auto parent_id_iter = parents.find(partition_field.source_id());
while (parent_id_iter != parents.end()) {
int32_t parent_id = parent_id_iter->second;
ICEBERG_ASSIGN_OR_RAISE(auto parent_field, schema.FindFieldById(parent_id));
if (!parent_field.has_value()) {
return InvalidArgument("Cannot find parent field with ID: {}", parent_id);
}
const auto& parent_type = parent_field.value().get().type();
if (parent_type->type_id() != TypeId::kStruct) {
return InvalidArgument("Invalid partition field parent type: {}",
parent_type->ToString());
}
parent_id_iter = parents.find(parent_id);
}
}
}
return {};
}

Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
const Schema& schema, int32_t spec_id, std::vector<PartitionField> fields,
bool allow_missing_fields, std::optional<int32_t> last_assigned_field_id) {
auto partition_spec = std::unique_ptr<PartitionSpec>(
new PartitionSpec(spec_id, std::move(fields), last_assigned_field_id));
ICEBERG_RETURN_UNEXPECTED(partition_spec->Validate(schema, allow_missing_fields));
return partition_spec;
}

Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
int32_t spec_id, std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id) {
return std::unique_ptr<PartitionSpec>(
new PartitionSpec(spec_id, std::move(fields), last_assigned_field_id));
}

} // namespace iceberg
52 changes: 42 additions & 10 deletions src/iceberg/partition_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,6 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
static constexpr int32_t kLegacyPartitionDataIdStart = 1000;
static constexpr int32_t kInvalidPartitionFieldId = -1;

/// \brief Create a new partition spec.
///
/// \param schema The table schema.
/// \param spec_id The spec ID.
/// \param fields The partition fields.
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
/// be calculated from the fields.
PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

/// \brief Get an unsorted partition spec singleton.
static const std::shared_ptr<PartitionSpec>& Unpartitioned();

Expand All @@ -80,7 +70,49 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
return lhs.Equals(rhs);
}

/// \brief Validates the partition spec against a schema.
/// \param schema The schema to validate against.
/// \param allowMissingFields Whether to skip validation for partition fields whose
/// source columns have been dropped from the schema.
/// \return Error status if the partition spec is invalid.
Status Validate(const Schema& schema, bool allow_missing_fields) const;

/// \brief Create a PartitionSpec binding to a schema.
/// \param schema The schema to bind the partition spec to.
/// \param spec_id The spec ID.
/// \param fields The partition fields.
/// \param allowMissingFields Whether to skip validation for partition fields whose
/// source columns have been dropped from the schema.
/// \param last_assigned_field_id The last assigned field ID assigned to ensure new
/// fields get unique IDs.
/// \return A Result containing the partition spec or an error.
static Result<std::unique_ptr<PartitionSpec>> Make(
const Schema& schema, int32_t spec_id, std::vector<PartitionField> fields,
bool allow_missing_fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

/// \brief Create a PartitionSpec without binding to a schema.
/// \param spec_id The spec ID.
/// \param fields The partition fields.
/// \param last_assigned_field_id The last assigned field ID assigned to ensure new
/// fields get unique IDs.
/// \return A Result containing the partition spec or an error.
/// \note This method does not check whether the sort fields are valid for any schema.
static Result<std::unique_ptr<PartitionSpec>> Make(
int32_t spec_id, std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

private:
/// \brief Create a new partition spec.
///
/// \param schema The table schema.
/// \param spec_id The spec ID.
/// \param fields The partition fields.
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
/// be calculated from the fields.
PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

/// \brief Compare two partition specs for equality.
bool Equals(const PartitionSpec& other) const;

Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/sort_order.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ bool SortOrder::Equals(const SortOrder& other) const {
Status SortOrder::Validate(const Schema& schema) const {
for (const auto& field : fields_) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldById(field.source_id()));
if (!schema_field.has_value() || schema_field == std::nullopt) {
if (!schema_field.has_value()) {
return InvalidArgument("Cannot find source column for sort field: {}", field);
}

Expand Down
1 change: 0 additions & 1 deletion src/iceberg/sort_order.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ class ICEBERG_EXPORT SortOrder : public util::Formattable {
/// \param fields The sort fields.
/// \return A Result containing the SortOrder or an error.
/// \note This method does not check whether the sort fields are valid for any schema.
/// Use IsBoundToSchema to check if the sort order is valid for a given schema.
static Result<std::unique_ptr<SortOrder>> Make(int32_t sort_id,
std::vector<SortField> fields);

Expand Down
23 changes: 13 additions & 10 deletions src/iceberg/test/json_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,13 @@ TEST(JsonInternalTest, SortField) {

TEST(JsonInternalTest, SortOrder) {
auto schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField(5, "region", iceberg::string(), false),
SchemaField(7, "ts", iceberg::int64(), false)},
std::vector<SchemaField>{SchemaField(5, "region", string(), false),
SchemaField(7, "ts", int64(), false)},
/*schema_id=*/100);
auto identity_transform = Transform::Identity();
SortField st_ts(5, identity_transform, SortDirection::kAscending, NullOrder::kFirst);
SortField st_bar(7, identity_transform, SortDirection::kDescending, NullOrder::kLast);
ICEBERG_UNWRAP_OR_FAIL(auto sort_order, SortOrder::Make(100, {st_ts, st_bar}));
EXPECT_TRUE(sort_order->Validate(*schema));
ICEBERG_UNWRAP_OR_FAIL(auto sort_order, SortOrder::Make(*schema, 100, {st_ts, st_bar}));
nlohmann::json expected_sort_order =
R"({"order-id":100,"fields":[
{"transform":"identity","source-id":5,"direction":"asc","null-order":"nulls-first"},
Expand All @@ -132,7 +131,7 @@ TEST(JsonInternalTest, PartitionField) {
TestJsonConversion(field, expected_json);
}

TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) {
TEST(JsonInternalTest, PartitionFieldFromJsonMissingField) {
nlohmann::json invalid_json =
R"({"field-id":101,"transform":"identity","name":"region"})"_json;
// missing source-id
Expand All @@ -143,15 +142,19 @@ TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) {
EXPECT_THAT(result, HasErrorMessage("Missing 'source-id'"));
}

TEST(JsonPartitionTest, PartitionSpec) {
TEST(JsonInternalTest, PartitionSpec) {
auto schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField(3, "region", string(), false),
SchemaField(5, "ts", int64(), false)},
/*schema_id=*/100);
auto identity_transform = Transform::Identity();
PartitionSpec spec(1, {PartitionField(3, 101, "region", identity_transform),
PartitionField(5, 102, "ts", identity_transform)});
auto json = ToJson(spec);
ICEBERG_UNWRAP_OR_FAIL(
auto spec,
PartitionSpec::Make(*schema, 1,
{PartitionField(3, 101, "region", identity_transform),
PartitionField(5, 102, "ts", identity_transform)},
false));
auto json = ToJson(*spec);
nlohmann::json expected_json = R"({"spec-id": 1,
"fields": [
{"source-id": 3,
Expand All @@ -167,7 +170,7 @@ TEST(JsonPartitionTest, PartitionSpec) {

auto parsed_spec_result = PartitionSpecFromJson(schema, json);
ASSERT_TRUE(parsed_spec_result.has_value()) << parsed_spec_result.error().message;
EXPECT_EQ(spec, *parsed_spec_result.value());
EXPECT_EQ(*spec, *parsed_spec_result.value());
}

TEST(JsonInternalTest, SnapshotRefBranch) {
Expand Down
5 changes: 4 additions & 1 deletion src/iceberg/test/manifest_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ TEST_F(ManifestV1Test, WritePartitionedTest) {
auto identity_transform = Transform::Identity();
std::vector<PartitionField> fields{
PartitionField(1, 1000, "order_ts_hour", identity_transform)};
auto partition_spec = std::make_shared<PartitionSpec>(1, fields);
auto partition_spec_result = PartitionSpec::Make(1, fields);
ASSERT_TRUE(partition_spec_result.has_value());
auto partition_spec =
std::shared_ptr<PartitionSpec>(std::move(partition_spec_result.value()));

auto expected_entries = PreparePartitionedTestData();
auto write_manifest_path = CreateNewTempFilePath();
Expand Down
26 changes: 19 additions & 7 deletions src/iceberg/test/metadata_serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,13 @@ TEST(MetadataSerdeTest, DeserializeV1Valid) {
SchemaField::MakeRequired(3, "z", int64())},
/*schema_id=*/std::nullopt);

auto expected_spec = std::make_shared<PartitionSpec>(
auto expected_spec_result = PartitionSpec::Make(
/*spec_id=*/0,
std::vector<PartitionField>{PartitionField(/*source_id=*/1, /*field_id=*/1000, "x",
Transform::Identity())});
ASSERT_TRUE(expected_spec_result.has_value());
auto expected_spec =
std::shared_ptr<PartitionSpec>(std::move(expected_spec_result.value()));

TableMetadata expected{
.format_version = 1,
Expand Down Expand Up @@ -133,7 +136,7 @@ TEST(MetadataSerdeTest, DeserializeV2Valid) {
ASSERT_NO_FATAL_FAILURE(ReadTableMetadata("TableMetadataV2Valid.json", &metadata));

auto expected_schema_1 = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", iceberg::int64(),
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", int64(),
/*optional=*/false)},
/*schema_id=*/0);

Expand All @@ -143,10 +146,13 @@ TEST(MetadataSerdeTest, DeserializeV2Valid) {
SchemaField::MakeRequired(3, "z", int64())},
/*schema_id=*/1);

auto expected_spec = std::make_shared<PartitionSpec>(
auto expected_spec_result = PartitionSpec::Make(
/*spec_id=*/0,
std::vector<PartitionField>{PartitionField(/*source_id=*/1, /*field_id=*/1000, "x",
Transform::Identity())});
ASSERT_TRUE(expected_spec_result.has_value());
auto expected_spec =
std::shared_ptr<PartitionSpec>(std::move(expected_spec_result.value()));

ICEBERG_UNWRAP_OR_FAIL(
auto sort_order,
Expand Down Expand Up @@ -228,10 +234,13 @@ TEST(MetadataSerdeTest, DeserializeV2ValidMinimal) {
SchemaField::MakeRequired(3, "z", int64())},
/*schema_id=*/0);

auto expected_spec = std::make_shared<PartitionSpec>(
auto expected_spec_result = PartitionSpec::Make(
/*spec_id=*/0,
std::vector<PartitionField>{PartitionField(/*source_id=*/1, /*field_id=*/1000, "x",
Transform::Identity())});
ASSERT_TRUE(expected_spec_result.has_value());
auto expected_spec =
std::shared_ptr<PartitionSpec>(std::move(expected_spec_result.value()));

ICEBERG_UNWRAP_OR_FAIL(
auto sort_order,
Expand Down Expand Up @@ -277,12 +286,15 @@ TEST(MetadataSerdeTest, DeserializeStatisticsFiles) {
ReadTableMetadata("TableMetadataStatisticsFiles.json", &metadata));

auto expected_schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", iceberg::int64(),
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", int64(),
/*optional=*/false)},
/*schema_id=*/0);

auto expected_spec_result =
PartitionSpec::Make(/*spec_id=*/0, std::vector<PartitionField>{});
ASSERT_TRUE(expected_spec_result.has_value());
auto expected_spec =
std::make_shared<PartitionSpec>(/*spec_id=*/0, std::vector<PartitionField>{});
std::shared_ptr<PartitionSpec>(std::move(expected_spec_result.value()));

auto expected_snapshot = std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 3055729675574597004,
Expand Down Expand Up @@ -353,7 +365,7 @@ TEST(MetadataSerdeTest, DeserializePartitionStatisticsFiles) {
.last_updated_ms = TimePointMsFromUnixMs(1602638573590).value(),
.last_column_id = 3,
.schemas = {std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", iceberg::int64(),
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", int64(),
/*optional=*/false)},
/*schema_id=*/0)},
.current_schema_id = 0,
Expand Down
Loading
Loading