Skip to content

Commit dbc3ceb

Browse files
committed
refactor: make PartitionSpec constructor private and add factory methods with validation
1 parent 7f7f85b commit dbc3ceb

13 files changed

+510
-74
lines changed

src/iceberg/json_internal.cc

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -542,9 +542,10 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
542542
ICEBERG_ASSIGN_OR_RAISE(auto partition_field, PartitionFieldFromJson(field_json));
543543
partition_fields.push_back(std::move(*partition_field));
544544
}
545-
// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
546-
// partition field from schema and then verify it
547-
return std::make_unique<PartitionSpec>(spec_id, std::move(partition_fields));
545+
ICEBERG_ASSIGN_OR_RAISE(
546+
auto spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
547+
true /*allow_missing_fields*/));
548+
return spec;
548549
}
549550

550551
Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
@@ -917,10 +918,10 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
917918
std::move(field->transform()));
918919
}
919920

920-
// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
921-
// partition field from schema and then verify it
922-
auto spec =
923-
std::make_unique<PartitionSpec>(PartitionSpec::kInitialSpecId, std::move(fields));
921+
// Create partition spec with schema validation
922+
ICEBERG_ASSIGN_OR_RAISE(
923+
auto spec, PartitionSpec::Make(*current_schema, PartitionSpec::kInitialSpecId,
924+
std::move(fields), true /*allow_missing_fields*/));
924925
default_spec_id = spec->spec_id();
925926
partition_specs.push_back(std::move(spec));
926927
}

src/iceberg/partition_field.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include <format>
2323

2424
#include "iceberg/transform.h"
25-
#include "iceberg/type.h"
2625
#include "iceberg/util/formatter.h" // IWYU pragma: keep
2726

2827
namespace iceberg {

src/iceberg/partition_spec.cc

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,19 @@
2020
#include "iceberg/partition_spec.h"
2121

2222
#include <algorithm>
23+
#include <cstdint>
2324
#include <format>
2425
#include <memory>
2526
#include <ranges>
27+
#include <unordered_map>
2628

29+
#include "iceberg/result.h"
2730
#include "iceberg/schema.h"
2831
#include "iceberg/schema_field.h"
2932
#include "iceberg/transform.h"
3033
#include "iceberg/util/formatter.h" // IWYU pragma: keep
3134
#include "iceberg/util/macros.h"
35+
#include "iceberg/util/type_util.h"
3236

3337
namespace iceberg {
3438

@@ -47,9 +51,8 @@ PartitionSpec::PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields
4751
}
4852

4953
const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
50-
static const std::shared_ptr<PartitionSpec> unpartitioned =
51-
std::make_shared<PartitionSpec>(kInitialSpecId, std::vector<PartitionField>{},
52-
kLegacyPartitionDataIdStart - 1);
54+
static const std::shared_ptr<PartitionSpec> unpartitioned(new PartitionSpec(
55+
kInitialSpecId, std::vector<PartitionField>{}, kLegacyPartitionDataIdStart - 1));
5356
return unpartitioned;
5457
}
5558

@@ -104,4 +107,67 @@ bool PartitionSpec::Equals(const PartitionSpec& other) const {
104107
return spec_id_ == other.spec_id_ && fields_ == other.fields_;
105108
}
106109

110+
Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields) const {
111+
std::unordered_map<int32_t, int32_t> parents = indexParents(schema);
112+
for (const auto& partition_field : fields_) {
113+
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
114+
schema.FindFieldById(partition_field.source_id()));
115+
// In the case the underlying field is dropped, we cannot check if they are compatible
116+
if (allow_missing_fields && !source_field.has_value()) {
117+
continue;
118+
}
119+
const auto& field_transform = partition_field.transform();
120+
121+
// In the case of a Version 1 partition-spec field gets deleted, it is replaced with a
122+
// void transform, see: https://iceberg.apache.org/spec/#partition-transforms. We
123+
// don't care about the source type since a VoidTransform is always compatible and
124+
// skip the checks
125+
if (field_transform->transform_type() != TransformType::kVoid) {
126+
if (!source_field.has_value()) {
127+
return InvalidArgument("Cannot find source column for partition field: {}",
128+
partition_field);
129+
}
130+
const auto& source_type = source_field.value().get().type();
131+
if (!field_transform->CanTransform(*source_type)) {
132+
return InvalidArgument("Invalid source type {} for transform {}",
133+
source_type->ToString(), field_transform->ToString());
134+
}
135+
136+
// The only valid parent types for a PartitionField are StructTypes. This must be
137+
// checked recursively.
138+
auto parent_id_iter = parents.find(partition_field.source_id());
139+
while (parent_id_iter != parents.end()) {
140+
int32_t parent_id = parent_id_iter->second;
141+
ICEBERG_ASSIGN_OR_RAISE(auto parent_field, schema.FindFieldById(parent_id));
142+
if (!parent_field.has_value()) {
143+
return InvalidArgument("Cannot find parent field with ID: {}", parent_id);
144+
}
145+
const auto& parent_type = parent_field.value().get().type();
146+
if (parent_type->type_id() != TypeId::kStruct) {
147+
return InvalidArgument("Invalid partition field parent type: {}",
148+
parent_type->ToString());
149+
}
150+
parent_id_iter = parents.find(parent_id);
151+
}
152+
}
153+
}
154+
return {};
155+
}
156+
157+
Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
158+
const Schema& schema, int32_t spec_id, std::vector<PartitionField> fields,
159+
bool allow_missing_fields, std::optional<int32_t> last_assigned_field_id) {
160+
auto partition_spec = std::unique_ptr<PartitionSpec>(
161+
new PartitionSpec(spec_id, std::move(fields), last_assigned_field_id));
162+
ICEBERG_RETURN_UNEXPECTED(partition_spec->Validate(schema, allow_missing_fields));
163+
return partition_spec;
164+
}
165+
166+
Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
167+
int32_t spec_id, std::vector<PartitionField> fields,
168+
std::optional<int32_t> last_assigned_field_id) {
169+
return std::unique_ptr<PartitionSpec>(
170+
new PartitionSpec(spec_id, std::move(fields), last_assigned_field_id));
171+
}
172+
107173
} // namespace iceberg

src/iceberg/partition_spec.h

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,6 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
5050
static constexpr int32_t kLegacyPartitionDataIdStart = 1000;
5151
static constexpr int32_t kInvalidPartitionFieldId = -1;
5252

53-
/// \brief Create a new partition spec.
54-
///
55-
/// \param schema The table schema.
56-
/// \param spec_id The spec ID.
57-
/// \param fields The partition fields.
58-
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
59-
/// be calculated from the fields.
60-
PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
61-
std::optional<int32_t> last_assigned_field_id = std::nullopt);
62-
6353
/// \brief Get an unsorted partition spec singleton.
6454
static const std::shared_ptr<PartitionSpec>& Unpartitioned();
6555

@@ -80,7 +70,49 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
8070
return lhs.Equals(rhs);
8171
}
8272

73+
/// \brief Validates the partition spec against a schema.
74+
/// \param schema The schema to validate against.
75+
/// \param allowMissingFields Whether to skip validation for partition fields whose
76+
/// source columns have been dropped from the schema.
77+
/// \return Error status if the partition spec is invalid.
78+
Status Validate(const Schema& schema, bool allow_missing_fields) const;
79+
80+
/// \brief Create a PartitionSpec binding to a schema.
81+
/// \param schema The schema to bind the partition spec to.
82+
/// \param spec_id The spec ID.
83+
/// \param fields The partition fields.
84+
/// \param allowMissingFields Whether to skip validation for partition fields whose
85+
/// source columns have been dropped from the schema.
86+
/// \param last_assigned_field_id The last assigned field ID assigned to ensure new
87+
/// fields get unique IDs.
88+
/// \return A Result containing the partition spec or an error.
89+
static Result<std::unique_ptr<PartitionSpec>> Make(
90+
const Schema& schema, int32_t spec_id, std::vector<PartitionField> fields,
91+
bool allow_missing_fields,
92+
std::optional<int32_t> last_assigned_field_id = std::nullopt);
93+
94+
/// \brief Create a PartitionSpec without binding to a schema.
95+
/// \param spec_id The spec ID.
96+
/// \param fields The partition fields.
97+
/// \param last_assigned_field_id The last assigned field ID assigned to ensure new
98+
/// fields get unique IDs.
99+
/// \return A Result containing the partition spec or an error.
100+
/// \note This method does not check whether the sort fields are valid for any schema.
101+
static Result<std::unique_ptr<PartitionSpec>> Make(
102+
int32_t spec_id, std::vector<PartitionField> fields,
103+
std::optional<int32_t> last_assigned_field_id = std::nullopt);
104+
83105
private:
106+
/// \brief Create a new partition spec.
107+
///
108+
/// \param schema The table schema.
109+
/// \param spec_id The spec ID.
110+
/// \param fields The partition fields.
111+
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
112+
/// be calculated from the fields.
113+
PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
114+
std::optional<int32_t> last_assigned_field_id = std::nullopt);
115+
84116
/// \brief Compare two partition specs for equality.
85117
bool Equals(const PartitionSpec& other) const;
86118

src/iceberg/sort_order.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ bool SortOrder::Equals(const SortOrder& other) const {
9090
Status SortOrder::Validate(const Schema& schema) const {
9191
for (const auto& field : fields_) {
9292
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldById(field.source_id()));
93-
if (!schema_field.has_value() || schema_field == std::nullopt) {
93+
if (!schema_field.has_value()) {
9494
return InvalidArgument("Cannot find source column for sort field: {}", field);
9595
}
9696

src/iceberg/sort_order.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ class ICEBERG_EXPORT SortOrder : public util::Formattable {
8888
/// \param fields The sort fields.
8989
/// \return A Result containing the SortOrder or an error.
9090
/// \note This method does not check whether the sort fields are valid for any schema.
91-
/// Use IsBoundToSchema to check if the sort order is valid for a given schema.
9291
static Result<std::unique_ptr<SortOrder>> Make(int32_t sort_id,
9392
std::vector<SortField> fields);
9493

src/iceberg/test/json_internal_test.cc

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,13 @@ TEST(JsonInternalTest, SortField) {
103103

104104
TEST(JsonInternalTest, SortOrder) {
105105
auto schema = std::make_shared<Schema>(
106-
std::vector<SchemaField>{SchemaField(5, "region", iceberg::string(), false),
107-
SchemaField(7, "ts", iceberg::int64(), false)},
106+
std::vector<SchemaField>{SchemaField(5, "region", string(), false),
107+
SchemaField(7, "ts", int64(), false)},
108108
/*schema_id=*/100);
109109
auto identity_transform = Transform::Identity();
110110
SortField st_ts(5, identity_transform, SortDirection::kAscending, NullOrder::kFirst);
111111
SortField st_bar(7, identity_transform, SortDirection::kDescending, NullOrder::kLast);
112-
ICEBERG_UNWRAP_OR_FAIL(auto sort_order, SortOrder::Make(100, {st_ts, st_bar}));
113-
EXPECT_TRUE(sort_order->Validate(*schema));
112+
ICEBERG_UNWRAP_OR_FAIL(auto sort_order, SortOrder::Make(*schema, 100, {st_ts, st_bar}));
114113
nlohmann::json expected_sort_order =
115114
R"({"order-id":100,"fields":[
116115
{"transform":"identity","source-id":5,"direction":"asc","null-order":"nulls-first"},
@@ -132,7 +131,7 @@ TEST(JsonInternalTest, PartitionField) {
132131
TestJsonConversion(field, expected_json);
133132
}
134133

135-
TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) {
134+
TEST(JsonInternalTest, PartitionFieldFromJsonMissingField) {
136135
nlohmann::json invalid_json =
137136
R"({"field-id":101,"transform":"identity","name":"region"})"_json;
138137
// missing source-id
@@ -143,15 +142,19 @@ TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) {
143142
EXPECT_THAT(result, HasErrorMessage("Missing 'source-id'"));
144143
}
145144

146-
TEST(JsonPartitionTest, PartitionSpec) {
145+
TEST(JsonInternalTest, PartitionSpec) {
147146
auto schema = std::make_shared<Schema>(
148147
std::vector<SchemaField>{SchemaField(3, "region", string(), false),
149148
SchemaField(5, "ts", int64(), false)},
150149
/*schema_id=*/100);
151150
auto identity_transform = Transform::Identity();
152-
PartitionSpec spec(1, {PartitionField(3, 101, "region", identity_transform),
153-
PartitionField(5, 102, "ts", identity_transform)});
154-
auto json = ToJson(spec);
151+
ICEBERG_UNWRAP_OR_FAIL(
152+
auto spec,
153+
PartitionSpec::Make(*schema, 1,
154+
{PartitionField(3, 101, "region", identity_transform),
155+
PartitionField(5, 102, "ts", identity_transform)},
156+
false));
157+
auto json = ToJson(*spec);
155158
nlohmann::json expected_json = R"({"spec-id": 1,
156159
"fields": [
157160
{"source-id": 3,
@@ -167,7 +170,7 @@ TEST(JsonPartitionTest, PartitionSpec) {
167170

168171
auto parsed_spec_result = PartitionSpecFromJson(schema, json);
169172
ASSERT_TRUE(parsed_spec_result.has_value()) << parsed_spec_result.error().message;
170-
EXPECT_EQ(spec, *parsed_spec_result.value());
173+
EXPECT_EQ(*spec, *parsed_spec_result.value());
171174
}
172175

173176
TEST(JsonInternalTest, SnapshotRefBranch) {

src/iceberg/test/manifest_reader_writer_test.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ TEST_F(ManifestV1Test, WritePartitionedTest) {
186186
auto identity_transform = Transform::Identity();
187187
std::vector<PartitionField> fields{
188188
PartitionField(1, 1000, "order_ts_hour", identity_transform)};
189-
auto partition_spec = std::make_shared<PartitionSpec>(1, fields);
189+
auto partition_spec_result = PartitionSpec::Make(1, fields);
190+
ASSERT_TRUE(partition_spec_result.has_value());
191+
auto partition_spec =
192+
std::shared_ptr<PartitionSpec>(std::move(partition_spec_result.value()));
190193

191194
auto expected_entries = PreparePartitionedTestData();
192195
auto write_manifest_path = CreateNewTempFilePath();

src/iceberg/test/metadata_serde_test.cc

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,13 @@ TEST(MetadataSerdeTest, DeserializeV1Valid) {
9999
SchemaField::MakeRequired(3, "z", int64())},
100100
/*schema_id=*/std::nullopt);
101101

102-
auto expected_spec = std::make_shared<PartitionSpec>(
102+
auto expected_spec_result = PartitionSpec::Make(
103103
/*spec_id=*/0,
104104
std::vector<PartitionField>{PartitionField(/*source_id=*/1, /*field_id=*/1000, "x",
105105
Transform::Identity())});
106+
ASSERT_TRUE(expected_spec_result.has_value());
107+
auto expected_spec =
108+
std::shared_ptr<PartitionSpec>(std::move(expected_spec_result.value()));
106109

107110
TableMetadata expected{
108111
.format_version = 1,
@@ -133,7 +136,7 @@ TEST(MetadataSerdeTest, DeserializeV2Valid) {
133136
ASSERT_NO_FATAL_FAILURE(ReadTableMetadata("TableMetadataV2Valid.json", &metadata));
134137

135138
auto expected_schema_1 = std::make_shared<Schema>(
136-
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", iceberg::int64(),
139+
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", int64(),
137140
/*optional=*/false)},
138141
/*schema_id=*/0);
139142

@@ -143,10 +146,13 @@ TEST(MetadataSerdeTest, DeserializeV2Valid) {
143146
SchemaField::MakeRequired(3, "z", int64())},
144147
/*schema_id=*/1);
145148

146-
auto expected_spec = std::make_shared<PartitionSpec>(
149+
auto expected_spec_result = PartitionSpec::Make(
147150
/*spec_id=*/0,
148151
std::vector<PartitionField>{PartitionField(/*source_id=*/1, /*field_id=*/1000, "x",
149152
Transform::Identity())});
153+
ASSERT_TRUE(expected_spec_result.has_value());
154+
auto expected_spec =
155+
std::shared_ptr<PartitionSpec>(std::move(expected_spec_result.value()));
150156

151157
ICEBERG_UNWRAP_OR_FAIL(
152158
auto sort_order,
@@ -228,10 +234,13 @@ TEST(MetadataSerdeTest, DeserializeV2ValidMinimal) {
228234
SchemaField::MakeRequired(3, "z", int64())},
229235
/*schema_id=*/0);
230236

231-
auto expected_spec = std::make_shared<PartitionSpec>(
237+
auto expected_spec_result = PartitionSpec::Make(
232238
/*spec_id=*/0,
233239
std::vector<PartitionField>{PartitionField(/*source_id=*/1, /*field_id=*/1000, "x",
234240
Transform::Identity())});
241+
ASSERT_TRUE(expected_spec_result.has_value());
242+
auto expected_spec =
243+
std::shared_ptr<PartitionSpec>(std::move(expected_spec_result.value()));
235244

236245
ICEBERG_UNWRAP_OR_FAIL(
237246
auto sort_order,
@@ -277,12 +286,15 @@ TEST(MetadataSerdeTest, DeserializeStatisticsFiles) {
277286
ReadTableMetadata("TableMetadataStatisticsFiles.json", &metadata));
278287

279288
auto expected_schema = std::make_shared<Schema>(
280-
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", iceberg::int64(),
289+
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", int64(),
281290
/*optional=*/false)},
282291
/*schema_id=*/0);
283292

293+
auto expected_spec_result =
294+
PartitionSpec::Make(/*spec_id=*/0, std::vector<PartitionField>{});
295+
ASSERT_TRUE(expected_spec_result.has_value());
284296
auto expected_spec =
285-
std::make_shared<PartitionSpec>(/*spec_id=*/0, std::vector<PartitionField>{});
297+
std::shared_ptr<PartitionSpec>(std::move(expected_spec_result.value()));
286298

287299
auto expected_snapshot = std::make_shared<Snapshot>(Snapshot{
288300
.snapshot_id = 3055729675574597004,
@@ -353,7 +365,7 @@ TEST(MetadataSerdeTest, DeserializePartitionStatisticsFiles) {
353365
.last_updated_ms = TimePointMsFromUnixMs(1602638573590).value(),
354366
.last_column_id = 3,
355367
.schemas = {std::make_shared<Schema>(
356-
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", iceberg::int64(),
368+
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", int64(),
357369
/*optional=*/false)},
358370
/*schema_id=*/0)},
359371
.current_schema_id = 0,

0 commit comments

Comments
 (0)