Skip to content

Commit 6fe80fe

Browse files
authored
refactor: make PartitionSpec ctor private and add factory methods with validation (#316)
1 parent acd62e3 commit 6fe80fe

14 files changed

+526
-80
lines changed

src/iceberg/json_internal.cc

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,8 @@ Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
533533
}
534534

535535
Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
536-
const std::shared_ptr<Schema>& schema, const nlohmann::json& json) {
536+
const std::shared_ptr<Schema>& schema, const nlohmann::json& json,
537+
int32_t default_spec_id) {
537538
ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, kSpecId));
538539
ICEBERG_ASSIGN_OR_RAISE(auto fields, GetJsonValue<nlohmann::json>(json, kFields));
539540

@@ -542,9 +543,18 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
542543
ICEBERG_ASSIGN_OR_RAISE(auto partition_field, PartitionFieldFromJson(field_json));
543544
partition_fields.push_back(std::move(*partition_field));
544545
}
545-
// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
546-
// partition field from schema and then verify it
547-
return std::make_unique<PartitionSpec>(spec_id, std::move(partition_fields));
546+
547+
std::unique_ptr<PartitionSpec> spec;
548+
if (default_spec_id == spec_id) {
549+
ICEBERG_ASSIGN_OR_RAISE(
550+
spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
551+
/*allow_missing_fields=*/false));
552+
} else {
553+
ICEBERG_ASSIGN_OR_RAISE(
554+
spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
555+
/*allow_missing_fields=*/true));
556+
}
557+
return spec;
548558
}
549559

550560
Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
@@ -885,8 +895,8 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
885895
ICEBERG_ASSIGN_OR_RAISE(default_spec_id, GetJsonValue<int32_t>(json, kDefaultSpecId));
886896

887897
for (const auto& spec_json : spec_array) {
888-
ICEBERG_ASSIGN_OR_RAISE(auto spec,
889-
PartitionSpecFromJson(current_schema, spec_json));
898+
ICEBERG_ASSIGN_OR_RAISE(
899+
auto spec, PartitionSpecFromJson(current_schema, spec_json, default_spec_id));
890900
partition_specs.push_back(std::move(spec));
891901
}
892902
} else {
@@ -917,10 +927,11 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
917927
std::move(field->transform()));
918928
}
919929

920-
// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
921-
// partition field from schema and then verify it
922-
auto spec =
923-
std::make_unique<PartitionSpec>(PartitionSpec::kInitialSpecId, std::move(fields));
930+
// Create partition spec with schema validation
931+
ICEBERG_ASSIGN_OR_RAISE(
932+
auto spec,
933+
PartitionSpec::Make(*current_schema, PartitionSpec::kInitialSpecId,
934+
std::move(fields), /*allow_missing_fields=*/false));
924935
default_spec_id = spec->spec_id();
925936
partition_specs.push_back(std::move(spec));
926937
}

src/iceberg/json_internal.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,12 @@ ICEBERG_EXPORT Result<std::string> ToJsonString(const PartitionSpec& partition_s
176176
///
177177
/// \param schema The current schema.
178178
/// \param json The JSON object representing a `PartitionSpec`.
179+
/// \param default_spec_id The default spec ID from the table metadata.
179180
/// \return An `expected` value containing either a `PartitionSpec` object or an error. If
180181
/// the JSON is malformed or missing expected fields, an error will be returned.
181182
ICEBERG_EXPORT Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
182-
const std::shared_ptr<Schema>& schema, const nlohmann::json& json);
183+
const std::shared_ptr<Schema>& schema, const nlohmann::json& json,
184+
int32_t default_spec_id);
183185

184186
/// \brief Serializes a `SnapshotRef` object to JSON.
185187
///

src/iceberg/partition_field.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include <format>
2323

2424
#include "iceberg/transform.h"
25-
#include "iceberg/type.h"
2625
#include "iceberg/util/formatter.h" // IWYU pragma: keep
2726

2827
namespace iceberg {

src/iceberg/partition_spec.cc

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,19 @@
2020
#include "iceberg/partition_spec.h"
2121

2222
#include <algorithm>
23+
#include <cstdint>
2324
#include <format>
2425
#include <memory>
2526
#include <ranges>
27+
#include <unordered_map>
2628

29+
#include "iceberg/result.h"
2730
#include "iceberg/schema.h"
2831
#include "iceberg/schema_field.h"
2932
#include "iceberg/transform.h"
3033
#include "iceberg/util/formatter.h" // IWYU pragma: keep
3134
#include "iceberg/util/macros.h"
35+
#include "iceberg/util/type_util.h"
3236

3337
namespace iceberg {
3438

@@ -47,9 +51,8 @@ PartitionSpec::PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields
4751
}
4852

4953
const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
50-
static const std::shared_ptr<PartitionSpec> unpartitioned =
51-
std::make_shared<PartitionSpec>(kInitialSpecId, std::vector<PartitionField>{},
52-
kLegacyPartitionDataIdStart - 1);
54+
static const std::shared_ptr<PartitionSpec> unpartitioned(new PartitionSpec(
55+
kInitialSpecId, std::vector<PartitionField>{}, kLegacyPartitionDataIdStart - 1));
5356
return unpartitioned;
5457
}
5558

@@ -104,4 +107,67 @@ bool PartitionSpec::Equals(const PartitionSpec& other) const {
104107
return spec_id_ == other.spec_id_ && fields_ == other.fields_;
105108
}
106109

110+
Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields) const {
111+
std::unordered_map<int32_t, int32_t> parents = indexParents(schema);
112+
for (const auto& partition_field : fields_) {
113+
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
114+
schema.FindFieldById(partition_field.source_id()));
115+
// In the case the underlying field is dropped, we cannot check if they are compatible
116+
if (allow_missing_fields && !source_field.has_value()) {
117+
continue;
118+
}
119+
const auto& field_transform = partition_field.transform();
120+
121+
// In the case of a Version 1 partition-spec field gets deleted, it is replaced with a
122+
// void transform, see: https://iceberg.apache.org/spec/#partition-transforms. We
123+
// don't care about the source type since a VoidTransform is always compatible and
124+
// skip the checks
125+
if (field_transform->transform_type() != TransformType::kVoid) {
126+
if (!source_field.has_value()) {
127+
return InvalidArgument("Cannot find source column for partition field: {}",
128+
partition_field);
129+
}
130+
const auto& source_type = source_field.value().get().type();
131+
if (!field_transform->CanTransform(*source_type)) {
132+
return InvalidArgument("Invalid source type {} for transform {}",
133+
source_type->ToString(), field_transform->ToString());
134+
}
135+
136+
// The only valid parent types for a PartitionField are StructTypes. This must be
137+
// checked recursively.
138+
auto parent_id_iter = parents.find(partition_field.source_id());
139+
while (parent_id_iter != parents.end()) {
140+
int32_t parent_id = parent_id_iter->second;
141+
ICEBERG_ASSIGN_OR_RAISE(auto parent_field, schema.FindFieldById(parent_id));
142+
if (!parent_field.has_value()) {
143+
return InvalidArgument("Cannot find parent field with ID: {}", parent_id);
144+
}
145+
const auto& parent_type = parent_field.value().get().type();
146+
if (parent_type->type_id() != TypeId::kStruct) {
147+
return InvalidArgument("Invalid partition field parent type: {}",
148+
parent_type->ToString());
149+
}
150+
parent_id_iter = parents.find(parent_id);
151+
}
152+
}
153+
}
154+
return {};
155+
}
156+
157+
Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
158+
const Schema& schema, int32_t spec_id, std::vector<PartitionField> fields,
159+
bool allow_missing_fields, std::optional<int32_t> last_assigned_field_id) {
160+
auto partition_spec = std::unique_ptr<PartitionSpec>(
161+
new PartitionSpec(spec_id, std::move(fields), last_assigned_field_id));
162+
ICEBERG_RETURN_UNEXPECTED(partition_spec->Validate(schema, allow_missing_fields));
163+
return partition_spec;
164+
}
165+
166+
Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
167+
int32_t spec_id, std::vector<PartitionField> fields,
168+
std::optional<int32_t> last_assigned_field_id) {
169+
return std::unique_ptr<PartitionSpec>(
170+
new PartitionSpec(spec_id, std::move(fields), last_assigned_field_id));
171+
}
172+
107173
} // namespace iceberg

src/iceberg/partition_spec.h

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,6 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
5050
static constexpr int32_t kLegacyPartitionDataIdStart = 1000;
5151
static constexpr int32_t kInvalidPartitionFieldId = -1;
5252

53-
/// \brief Create a new partition spec.
54-
///
55-
/// \param schema The table schema.
56-
/// \param spec_id The spec ID.
57-
/// \param fields The partition fields.
58-
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
59-
/// be calculated from the fields.
60-
PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
61-
std::optional<int32_t> last_assigned_field_id = std::nullopt);
62-
6353
/// \brief Get an unsorted partition spec singleton.
6454
static const std::shared_ptr<PartitionSpec>& Unpartitioned();
6555

@@ -80,7 +70,49 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
8070
return lhs.Equals(rhs);
8171
}
8272

73+
/// \brief Validates the partition spec against a schema.
74+
/// \param schema The schema to validate against.
75+
/// \param allowMissingFields Whether to skip validation for partition fields whose
76+
/// source columns have been dropped from the schema.
77+
/// \return Error status if the partition spec is invalid.
78+
Status Validate(const Schema& schema, bool allow_missing_fields) const;
79+
80+
/// \brief Create a PartitionSpec binding to a schema.
81+
/// \param schema The schema to bind the partition spec to.
82+
/// \param spec_id The spec ID.
83+
/// \param fields The partition fields.
84+
/// \param allowMissingFields Whether to skip validation for partition fields whose
85+
/// source columns have been dropped from the schema.
86+
/// \param last_assigned_field_id The last assigned field ID assigned to ensure new
87+
/// fields get unique IDs.
88+
/// \return A Result containing the partition spec or an error.
89+
static Result<std::unique_ptr<PartitionSpec>> Make(
90+
const Schema& schema, int32_t spec_id, std::vector<PartitionField> fields,
91+
bool allow_missing_fields,
92+
std::optional<int32_t> last_assigned_field_id = std::nullopt);
93+
94+
/// \brief Create a PartitionSpec without binding to a schema.
95+
/// \param spec_id The spec ID.
96+
/// \param fields The partition fields.
97+
/// \param last_assigned_field_id The last assigned field ID assigned to ensure new
98+
/// fields get unique IDs.
99+
/// \return A Result containing the partition spec or an error.
100+
/// \note This method does not check whether the sort fields are valid for any schema.
101+
static Result<std::unique_ptr<PartitionSpec>> Make(
102+
int32_t spec_id, std::vector<PartitionField> fields,
103+
std::optional<int32_t> last_assigned_field_id = std::nullopt);
104+
83105
private:
106+
/// \brief Create a new partition spec.
107+
///
108+
/// \param schema The table schema.
109+
/// \param spec_id The spec ID.
110+
/// \param fields The partition fields.
111+
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
112+
/// be calculated from the fields.
113+
PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
114+
std::optional<int32_t> last_assigned_field_id = std::nullopt);
115+
84116
/// \brief Compare two partition specs for equality.
85117
bool Equals(const PartitionSpec& other) const;
86118

src/iceberg/sort_order.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ bool SortOrder::Equals(const SortOrder& other) const {
9090
Status SortOrder::Validate(const Schema& schema) const {
9191
for (const auto& field : fields_) {
9292
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldById(field.source_id()));
93-
if (!schema_field.has_value() || schema_field == std::nullopt) {
93+
if (!schema_field.has_value()) {
9494
return InvalidArgument("Cannot find source column for sort field: {}", field);
9595
}
9696

src/iceberg/sort_order.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ class ICEBERG_EXPORT SortOrder : public util::Formattable {
8888
/// \param fields The sort fields.
8989
/// \return A Result containing the SortOrder or an error.
9090
/// \note This method does not check whether the sort fields are valid for any schema.
91-
/// Use IsBoundToSchema to check if the sort order is valid for a given schema.
9291
static Result<std::unique_ptr<SortOrder>> Make(int32_t sort_id,
9392
std::vector<SortField> fields);
9493

src/iceberg/test/json_internal_test.cc

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,13 @@ TEST(JsonInternalTest, SortField) {
103103

104104
TEST(JsonInternalTest, SortOrder) {
105105
auto schema = std::make_shared<Schema>(
106-
std::vector<SchemaField>{SchemaField(5, "region", iceberg::string(), false),
107-
SchemaField(7, "ts", iceberg::int64(), false)},
106+
std::vector<SchemaField>{SchemaField(5, "region", string(), false),
107+
SchemaField(7, "ts", int64(), false)},
108108
/*schema_id=*/100);
109109
auto identity_transform = Transform::Identity();
110110
SortField st_ts(5, identity_transform, SortDirection::kAscending, NullOrder::kFirst);
111111
SortField st_bar(7, identity_transform, SortDirection::kDescending, NullOrder::kLast);
112-
ICEBERG_UNWRAP_OR_FAIL(auto sort_order, SortOrder::Make(100, {st_ts, st_bar}));
113-
EXPECT_TRUE(sort_order->Validate(*schema));
112+
ICEBERG_UNWRAP_OR_FAIL(auto sort_order, SortOrder::Make(*schema, 100, {st_ts, st_bar}));
114113
nlohmann::json expected_sort_order =
115114
R"({"order-id":100,"fields":[
116115
{"transform":"identity","source-id":5,"direction":"asc","null-order":"nulls-first"},
@@ -132,7 +131,7 @@ TEST(JsonInternalTest, PartitionField) {
132131
TestJsonConversion(field, expected_json);
133132
}
134133

135-
TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) {
134+
TEST(JsonInternalTest, PartitionFieldFromJsonMissingField) {
136135
nlohmann::json invalid_json =
137136
R"({"field-id":101,"transform":"identity","name":"region"})"_json;
138137
// missing source-id
@@ -143,15 +142,19 @@ TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) {
143142
EXPECT_THAT(result, HasErrorMessage("Missing 'source-id'"));
144143
}
145144

146-
TEST(JsonPartitionTest, PartitionSpec) {
145+
TEST(JsonInternalTest, PartitionSpec) {
147146
auto schema = std::make_shared<Schema>(
148147
std::vector<SchemaField>{SchemaField(3, "region", string(), false),
149148
SchemaField(5, "ts", int64(), false)},
150149
/*schema_id=*/100);
151150
auto identity_transform = Transform::Identity();
152-
PartitionSpec spec(1, {PartitionField(3, 101, "region", identity_transform),
153-
PartitionField(5, 102, "ts", identity_transform)});
154-
auto json = ToJson(spec);
151+
ICEBERG_UNWRAP_OR_FAIL(
152+
auto spec,
153+
PartitionSpec::Make(*schema, 1,
154+
{PartitionField(3, 101, "region", identity_transform),
155+
PartitionField(5, 102, "ts", identity_transform)},
156+
false));
157+
auto json = ToJson(*spec);
155158
nlohmann::json expected_json = R"({"spec-id": 1,
156159
"fields": [
157160
{"source-id": 3,
@@ -165,9 +168,9 @@ TEST(JsonPartitionTest, PartitionSpec) {
165168

166169
EXPECT_EQ(json, expected_json);
167170

168-
auto parsed_spec_result = PartitionSpecFromJson(schema, json);
171+
auto parsed_spec_result = PartitionSpecFromJson(schema, json, 1);
169172
ASSERT_TRUE(parsed_spec_result.has_value()) << parsed_spec_result.error().message;
170-
EXPECT_EQ(spec, *parsed_spec_result.value());
173+
EXPECT_EQ(*spec, *parsed_spec_result.value());
171174
}
172175

173176
TEST(JsonInternalTest, SnapshotRefBranch) {

src/iceberg/test/manifest_reader_writer_test.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "iceberg/test/temp_file_test_base.h"
3535
#include "iceberg/test/test_common.h"
3636
#include "iceberg/transform.h"
37+
#include "iceberg/type.h"
3738

3839
namespace iceberg {
3940

@@ -186,7 +187,8 @@ TEST_F(ManifestV1Test, WritePartitionedTest) {
186187
auto identity_transform = Transform::Identity();
187188
std::vector<PartitionField> fields{
188189
PartitionField(1, 1000, "order_ts_hour", identity_transform)};
189-
auto partition_spec = std::make_shared<PartitionSpec>(1, fields);
190+
ICEBERG_UNWRAP_OR_FAIL(std::shared_ptr<PartitionSpec> partition_spec,
191+
PartitionSpec::Make(*table_schema, 1, fields, false));
190192

191193
auto expected_entries = PreparePartitionedTestData();
192194
auto write_manifest_path = CreateNewTempFilePath();

0 commit comments

Comments
 (0)