Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,8 @@ Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
}

Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
const std::shared_ptr<Schema>& schema, const nlohmann::json& json) {
const std::shared_ptr<Schema>& schema, const nlohmann::json& json,
int32_t default_spec_id) {
ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, kSpecId));
ICEBERG_ASSIGN_OR_RAISE(auto fields, GetJsonValue<nlohmann::json>(json, kFields));

Expand All @@ -542,9 +543,18 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
ICEBERG_ASSIGN_OR_RAISE(auto partition_field, PartitionFieldFromJson(field_json));
partition_fields.push_back(std::move(*partition_field));
}
// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
// partition field from schema and then verify it
return std::make_unique<PartitionSpec>(spec_id, std::move(partition_fields));

std::unique_ptr<PartitionSpec> spec;
if (default_spec_id == spec_id) {
ICEBERG_ASSIGN_OR_RAISE(
spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
/*allow_missing_fields=*/false));
} else {
ICEBERG_ASSIGN_OR_RAISE(
spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
/*allow_missing_fields=*/true));
}
return spec;
}

Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
Expand Down Expand Up @@ -885,8 +895,8 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
ICEBERG_ASSIGN_OR_RAISE(default_spec_id, GetJsonValue<int32_t>(json, kDefaultSpecId));

for (const auto& spec_json : spec_array) {
ICEBERG_ASSIGN_OR_RAISE(auto spec,
PartitionSpecFromJson(current_schema, spec_json));
ICEBERG_ASSIGN_OR_RAISE(
auto spec, PartitionSpecFromJson(current_schema, spec_json, default_spec_id));
partition_specs.push_back(std::move(spec));
}
} else {
Expand Down Expand Up @@ -917,10 +927,11 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
std::move(field->transform()));
}

// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
// partition field from schema and then verify it
auto spec =
std::make_unique<PartitionSpec>(PartitionSpec::kInitialSpecId, std::move(fields));
// Create partition spec with schema validation
ICEBERG_ASSIGN_OR_RAISE(
auto spec,
PartitionSpec::Make(*current_schema, PartitionSpec::kInitialSpecId,
std::move(fields), /*allow_missing_fields=*/false));
default_spec_id = spec->spec_id();
partition_specs.push_back(std::move(spec));
}
Expand Down
4 changes: 3 additions & 1 deletion src/iceberg/json_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,12 @@ ICEBERG_EXPORT Result<std::string> ToJsonString(const PartitionSpec& partition_s
///
/// \param schema The current schema.
/// \param json The JSON object representing a `PartitionSpec`.
/// \param default_spec_id The default spec ID from the table metadata.
/// \return An `expected` value containing either a `PartitionSpec` object or an error. If
/// the JSON is malformed or missing expected fields, an error will be returned.
ICEBERG_EXPORT Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
const std::shared_ptr<Schema>& schema, const nlohmann::json& json);
const std::shared_ptr<Schema>& schema, const nlohmann::json& json,
int32_t default_spec_id);

/// \brief Serializes a `SnapshotRef` object to JSON.
///
Expand Down
1 change: 0 additions & 1 deletion src/iceberg/partition_field.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <format>

#include "iceberg/transform.h"
#include "iceberg/type.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep

namespace iceberg {
Expand Down
72 changes: 69 additions & 3 deletions src/iceberg/partition_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@
#include "iceberg/partition_spec.h"

#include <algorithm>
#include <cstdint>
#include <format>
#include <memory>
#include <ranges>
#include <unordered_map>

#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/schema_field.h"
#include "iceberg/transform.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
#include "iceberg/util/macros.h"
#include "iceberg/util/type_util.h"

namespace iceberg {

Expand All @@ -47,9 +51,8 @@ PartitionSpec::PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields
}

const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
static const std::shared_ptr<PartitionSpec> unpartitioned =
std::make_shared<PartitionSpec>(kInitialSpecId, std::vector<PartitionField>{},
kLegacyPartitionDataIdStart - 1);
static const std::shared_ptr<PartitionSpec> unpartitioned(new PartitionSpec(
kInitialSpecId, std::vector<PartitionField>{}, kLegacyPartitionDataIdStart - 1));
return unpartitioned;
}

Expand Down Expand Up @@ -104,4 +107,67 @@ bool PartitionSpec::Equals(const PartitionSpec& other) const {
return spec_id_ == other.spec_id_ && fields_ == other.fields_;
}

Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields) const {
std::unordered_map<int32_t, int32_t> parents = indexParents(schema);
for (const auto& partition_field : fields_) {
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
schema.FindFieldById(partition_field.source_id()));
// In the case the underlying field is dropped, we cannot check if they are compatible
if (allow_missing_fields && !source_field.has_value()) {
continue;
}
const auto& field_transform = partition_field.transform();

// In the case of a Version 1 partition-spec field gets deleted, it is replaced with a
// void transform, see: https://iceberg.apache.org/spec/#partition-transforms. We
// don't care about the source type since a VoidTransform is always compatible and
// skip the checks
if (field_transform->transform_type() != TransformType::kVoid) {
if (!source_field.has_value()) {
return InvalidArgument("Cannot find source column for partition field: {}",
partition_field);
}
const auto& source_type = source_field.value().get().type();
if (!field_transform->CanTransform(*source_type)) {
return InvalidArgument("Invalid source type {} for transform {}",
source_type->ToString(), field_transform->ToString());
}

// The only valid parent types for a PartitionField are StructTypes. This must be
// checked recursively.
auto parent_id_iter = parents.find(partition_field.source_id());
while (parent_id_iter != parents.end()) {
int32_t parent_id = parent_id_iter->second;
ICEBERG_ASSIGN_OR_RAISE(auto parent_field, schema.FindFieldById(parent_id));
if (!parent_field.has_value()) {
return InvalidArgument("Cannot find parent field with ID: {}", parent_id);
}
const auto& parent_type = parent_field.value().get().type();
if (parent_type->type_id() != TypeId::kStruct) {
return InvalidArgument("Invalid partition field parent type: {}",
parent_type->ToString());
}
parent_id_iter = parents.find(parent_id);
}
}
}
return {};
}

Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
const Schema& schema, int32_t spec_id, std::vector<PartitionField> fields,
bool allow_missing_fields, std::optional<int32_t> last_assigned_field_id) {
auto partition_spec = std::unique_ptr<PartitionSpec>(
new PartitionSpec(spec_id, std::move(fields), last_assigned_field_id));
ICEBERG_RETURN_UNEXPECTED(partition_spec->Validate(schema, allow_missing_fields));
return partition_spec;
}

Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
int32_t spec_id, std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id) {
return std::unique_ptr<PartitionSpec>(
new PartitionSpec(spec_id, std::move(fields), last_assigned_field_id));
}

} // namespace iceberg
52 changes: 42 additions & 10 deletions src/iceberg/partition_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,6 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
static constexpr int32_t kLegacyPartitionDataIdStart = 1000;
static constexpr int32_t kInvalidPartitionFieldId = -1;

/// \brief Create a new partition spec.
///
/// \param schema The table schema.
/// \param spec_id The spec ID.
/// \param fields The partition fields.
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
/// be calculated from the fields.
PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

/// \brief Get an unsorted partition spec singleton.
static const std::shared_ptr<PartitionSpec>& Unpartitioned();

Expand All @@ -80,7 +70,49 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
return lhs.Equals(rhs);
}

/// \brief Validates the partition spec against a schema.
/// \param schema The schema to validate against.
/// \param allowMissingFields Whether to skip validation for partition fields whose
/// source columns have been dropped from the schema.
/// \return Error status if the partition spec is invalid.
Status Validate(const Schema& schema, bool allow_missing_fields) const;

/// \brief Create a PartitionSpec binding to a schema.
/// \param schema The schema to bind the partition spec to.
/// \param spec_id The spec ID.
/// \param fields The partition fields.
/// \param allowMissingFields Whether to skip validation for partition fields whose
/// source columns have been dropped from the schema.
/// \param last_assigned_field_id The last assigned field ID assigned to ensure new
/// fields get unique IDs.
/// \return A Result containing the partition spec or an error.
static Result<std::unique_ptr<PartitionSpec>> Make(
const Schema& schema, int32_t spec_id, std::vector<PartitionField> fields,
bool allow_missing_fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

/// \brief Create a PartitionSpec without binding to a schema.
/// \param spec_id The spec ID.
/// \param fields The partition fields.
/// \param last_assigned_field_id The last assigned field ID assigned to ensure new
/// fields get unique IDs.
/// \return A Result containing the partition spec or an error.
/// \note This method does not check whether the sort fields are valid for any schema.
static Result<std::unique_ptr<PartitionSpec>> Make(
int32_t spec_id, std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

private:
/// \brief Create a new partition spec.
///
/// \param schema The table schema.
/// \param spec_id The spec ID.
/// \param fields The partition fields.
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
/// be calculated from the fields.
PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

/// \brief Compare two partition specs for equality.
bool Equals(const PartitionSpec& other) const;

Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/sort_order.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ bool SortOrder::Equals(const SortOrder& other) const {
Status SortOrder::Validate(const Schema& schema) const {
for (const auto& field : fields_) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldById(field.source_id()));
if (!schema_field.has_value() || schema_field == std::nullopt) {
if (!schema_field.has_value()) {
return InvalidArgument("Cannot find source column for sort field: {}", field);
}

Expand Down
1 change: 0 additions & 1 deletion src/iceberg/sort_order.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ class ICEBERG_EXPORT SortOrder : public util::Formattable {
/// \param fields The sort fields.
/// \return A Result containing the SortOrder or an error.
/// \note This method does not check whether the sort fields are valid for any schema.
/// Use IsBoundToSchema to check if the sort order is valid for a given schema.
static Result<std::unique_ptr<SortOrder>> Make(int32_t sort_id,
std::vector<SortField> fields);

Expand Down
25 changes: 14 additions & 11 deletions src/iceberg/test/json_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,13 @@ TEST(JsonInternalTest, SortField) {

TEST(JsonInternalTest, SortOrder) {
auto schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField(5, "region", iceberg::string(), false),
SchemaField(7, "ts", iceberg::int64(), false)},
std::vector<SchemaField>{SchemaField(5, "region", string(), false),
SchemaField(7, "ts", int64(), false)},
/*schema_id=*/100);
auto identity_transform = Transform::Identity();
SortField st_ts(5, identity_transform, SortDirection::kAscending, NullOrder::kFirst);
SortField st_bar(7, identity_transform, SortDirection::kDescending, NullOrder::kLast);
ICEBERG_UNWRAP_OR_FAIL(auto sort_order, SortOrder::Make(100, {st_ts, st_bar}));
EXPECT_TRUE(sort_order->Validate(*schema));
ICEBERG_UNWRAP_OR_FAIL(auto sort_order, SortOrder::Make(*schema, 100, {st_ts, st_bar}));
nlohmann::json expected_sort_order =
R"({"order-id":100,"fields":[
{"transform":"identity","source-id":5,"direction":"asc","null-order":"nulls-first"},
Expand All @@ -132,7 +131,7 @@ TEST(JsonInternalTest, PartitionField) {
TestJsonConversion(field, expected_json);
}

TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) {
TEST(JsonInternalTest, PartitionFieldFromJsonMissingField) {
nlohmann::json invalid_json =
R"({"field-id":101,"transform":"identity","name":"region"})"_json;
// missing source-id
Expand All @@ -143,15 +142,19 @@ TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) {
EXPECT_THAT(result, HasErrorMessage("Missing 'source-id'"));
}

TEST(JsonPartitionTest, PartitionSpec) {
TEST(JsonInternalTest, PartitionSpec) {
auto schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField(3, "region", string(), false),
SchemaField(5, "ts", int64(), false)},
/*schema_id=*/100);
auto identity_transform = Transform::Identity();
PartitionSpec spec(1, {PartitionField(3, 101, "region", identity_transform),
PartitionField(5, 102, "ts", identity_transform)});
auto json = ToJson(spec);
ICEBERG_UNWRAP_OR_FAIL(
auto spec,
PartitionSpec::Make(*schema, 1,
{PartitionField(3, 101, "region", identity_transform),
PartitionField(5, 102, "ts", identity_transform)},
false));
auto json = ToJson(*spec);
nlohmann::json expected_json = R"({"spec-id": 1,
"fields": [
{"source-id": 3,
Expand All @@ -165,9 +168,9 @@ TEST(JsonPartitionTest, PartitionSpec) {

EXPECT_EQ(json, expected_json);

auto parsed_spec_result = PartitionSpecFromJson(schema, json);
auto parsed_spec_result = PartitionSpecFromJson(schema, json, 1);
ASSERT_TRUE(parsed_spec_result.has_value()) << parsed_spec_result.error().message;
EXPECT_EQ(spec, *parsed_spec_result.value());
EXPECT_EQ(*spec, *parsed_spec_result.value());
}

TEST(JsonInternalTest, SnapshotRefBranch) {
Expand Down
4 changes: 3 additions & 1 deletion src/iceberg/test/manifest_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "iceberg/test/temp_file_test_base.h"
#include "iceberg/test/test_common.h"
#include "iceberg/transform.h"
#include "iceberg/type.h"

namespace iceberg {

Expand Down Expand Up @@ -186,7 +187,8 @@ TEST_F(ManifestV1Test, WritePartitionedTest) {
auto identity_transform = Transform::Identity();
std::vector<PartitionField> fields{
PartitionField(1, 1000, "order_ts_hour", identity_transform)};
auto partition_spec = std::make_shared<PartitionSpec>(1, fields);
ICEBERG_UNWRAP_OR_FAIL(std::shared_ptr<PartitionSpec> partition_spec,
PartitionSpec::Make(*table_schema, 1, fields, false));

auto expected_entries = PreparePartitionedTestData();
auto write_manifest_path = CreateNewTempFilePath();
Expand Down
Loading
Loading