Skip to content

Commit 9c096d6

Browse files
committed
test: add table metadata deserialization test
1 parent 06ce20f commit 9c096d6

35 files changed

+1355
-87
lines changed

.github/.licenserc.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@ header:
1313
- 'NOTICE'
1414
- 'src/iceberg/expected.h'
1515
- 'src/iceberg/util/murmurhash3_internal.*'
16+
- 'test/resources/**'
1617

1718
comment: on-failure

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ repos:
3333
rev: v19.1.5
3434
hooks:
3535
- id: clang-format
36+
exclude: ^test/resources/.*\.json$
3637

3738
- repo: https://github.com/cheshirekow/cmake-format-precommit
3839
rev: v0.6.10

src/iceberg/constants.h

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <cstdint>
23+
24+
namespace iceberg {
25+
26+
constexpr int8_t kDefaultTableFormatVersion = 2;
27+
constexpr int8_t kSupportedTableFormatVersion = 3;
28+
constexpr int8_t kMinFormatVersionRowLineage = 3;
29+
constexpr int32_t kInitialSpecId = 0;
30+
constexpr int32_t kInitialSortOrderId = 1;
31+
constexpr int32_t kInitialSchemaId = 0;
32+
constexpr int64_t kInitialRowId = 0;
33+
constexpr int64_t kInitialSequenceNumber = 0;
34+
constexpr int64_t kInvalidSequenceNumber = -1;
35+
constexpr int64_t kInvalidSnapshotId = -1;
36+
constexpr int32_t kInvalidFieldId = -1;
37+
38+
} // namespace iceberg

src/iceberg/json_internal.cc

Lines changed: 63 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
#include <algorithm>
2323
#include <cstdint>
2424
#include <format>
25+
#include <ranges>
2526
#include <regex>
2627
#include <type_traits>
2728
#include <unordered_set>
2829

2930
#include <nlohmann/json.hpp>
3031

32+
#include "iceberg/constants.h"
33+
#include "iceberg/partition_field.h"
3134
#include "iceberg/partition_spec.h"
3235
#include "iceberg/result.h"
3336
#include "iceberg/schema.h"
@@ -248,7 +251,7 @@ Result<std::vector<T>> FromJsonList(
248251
list.emplace_back(std::move(entry));
249252
}
250253
}
251-
return {};
254+
return list;
252255
}
253256

254257
/// \brief Parse a list of items from a JSON object.
@@ -471,7 +474,7 @@ nlohmann::json ToJson(const Type& type) {
471474

472475
nlohmann::json ToJson(const Schema& schema) {
473476
nlohmann::json json = ToJson(static_cast<const Type&>(schema));
474-
json[kSchemaId] = schema.schema_id();
477+
SetOptionalField(json, kSchemaId, schema.schema_id());
475478
// TODO(gangwu): add identifier-field-ids.
476479
return json;
477480
}
@@ -496,7 +499,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
496499
nlohmann::json json;
497500
json[kSnapshotId] = snapshot.snapshot_id;
498501
SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id);
499-
if (snapshot.sequence_number > TableMetadata::kInitialSequenceNumber) {
502+
if (snapshot.sequence_number > kInitialSequenceNumber) {
500503
json[kSequenceNumber] = snapshot.sequence_number;
501504
}
502505
json[kTimestampMs] = snapshot.timestamp_ms;
@@ -625,7 +628,7 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
625628
}
626629

627630
Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::json& json) {
628-
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValue<int32_t>(json, kSchemaId));
631+
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));
629632
ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json));
630633

631634
if (type->type_id() != TypeId::kStruct) [[unlikely]] {
@@ -658,9 +661,16 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec) {
658661
}
659662

660663
Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
661-
const nlohmann::json& json) {
664+
const nlohmann::json& json, bool allow_field_id_missing) {
662665
ICEBERG_ASSIGN_OR_RAISE(auto source_id, GetJsonValue<int32_t>(json, kSourceId));
663-
ICEBERG_ASSIGN_OR_RAISE(auto field_id, GetJsonValue<int32_t>(json, kFieldId));
666+
int32_t field_id;
667+
if (allow_field_id_missing) {
668+
// Partition field id in v1 is not tracked, so we use -1 to indicate that.
669+
ICEBERG_ASSIGN_OR_RAISE(
670+
field_id, GetJsonValueOrDefault<int32_t>(json, kFieldId, kInvalidFieldId));
671+
} else {
672+
ICEBERG_ASSIGN_OR_RAISE(field_id, GetJsonValue<int32_t>(json, kFieldId));
673+
}
664674
ICEBERG_ASSIGN_OR_RAISE(
665675
auto transform,
666676
GetJsonValue<std::string>(json, kTransform).and_then(TransformFromString));
@@ -755,9 +765,8 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
755765
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));
756766

757767
return std::make_unique<Snapshot>(
758-
snapshot_id, parent_snapshot_id,
759-
sequence_number.value_or(TableMetadata::kInitialSequenceNumber), timestamp_ms,
760-
manifest_list, std::move(summary), schema_id);
768+
snapshot_id, parent_snapshot_id, sequence_number.value_or(kInitialSequenceNumber),
769+
timestamp_ms, manifest_list, std::move(summary), schema_id);
761770
}
762771

763772
nlohmann::json ToJson(const BlobMetadata& blob_metadata) {
@@ -905,7 +914,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
905914
}
906915

907916
// write the current schema ID and schema list
908-
json[kCurrentSchemaId] = table_metadata.current_schema_id;
917+
SetOptionalField(json, kCurrentSchemaId, table_metadata.current_schema_id);
909918
json[kSchemas] = ToJsonList(table_metadata.schemas);
910919

911920
// for older readers, continue writing the default spec as "partition-spec"
@@ -963,7 +972,8 @@ namespace {
963972
///
964973
/// \return The current schema or parse error.
965974
Result<std::shared_ptr<Schema>> ParseSchemas(
966-
const nlohmann::json& json, int8_t format_version, int32_t& current_schema_id,
975+
const nlohmann::json& json, int8_t format_version,
976+
std::optional<int32_t>& current_schema_id,
967977
std::vector<std::shared_ptr<Schema>>& schemas) {
968978
std::shared_ptr<Schema> current_schema;
969979
if (json.contains(kSchemas)) {
@@ -986,7 +996,7 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
986996
}
987997
if (!current_schema) {
988998
return JsonParseError("Cannot find schema with {}={} from {}", kCurrentSchemaId,
989-
current_schema_id, schema_array.dump());
999+
current_schema_id.value(), schema_array.dump());
9901000
}
9911001
} else {
9921002
if (format_version != 1) {
@@ -1031,13 +1041,30 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
10311041
return JsonParseError("{} must exist in format v{}", kPartitionSpecs,
10321042
format_version);
10331043
}
1034-
default_spec_id = TableMetadata::kInitialSpecId;
10351044

1036-
ICEBERG_ASSIGN_OR_RAISE(auto spec, GetJsonValue<nlohmann::json>(json, kPartitionSpec)
1037-
.and_then([current_schema](const auto& json) {
1038-
return PartitionSpecFromJson(current_schema,
1039-
json);
1040-
}));
1045+
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec_json,
1046+
GetJsonValue<nlohmann::json>(json, kPartitionSpec));
1047+
if (!partition_spec_json.is_array()) {
1048+
return JsonParseError("Cannot parse v1 partition spec from non-array: {}",
1049+
partition_spec_json.dump());
1050+
}
1051+
1052+
int32_t next_partition_field_id = PartitionSpec::kLegacyPartitionDataIdStart;
1053+
std::vector<PartitionField> fields;
1054+
for (const auto& entry_json : partition_spec_json) {
1055+
ICEBERG_ASSIGN_OR_RAISE(auto field, PartitionFieldFromJson(entry_json));
1056+
int32_t field_id = field->field_id();
1057+
if (field_id == kInvalidFieldId) {
1058+
// If the field ID is not set, we need to assign a new one
1059+
field_id = next_partition_field_id++;
1060+
}
1061+
fields.emplace_back(field->source_id(), field_id, std::string(field->name()),
1062+
std::move(field->transform()));
1063+
}
1064+
1065+
auto spec = std::make_unique<PartitionSpec>(current_schema, kInitialSpecId,
1066+
std::move(fields));
1067+
default_spec_id = spec->spec_id();
10411068
partition_specs.push_back(std::move(spec));
10421069
}
10431070

@@ -1066,7 +1093,9 @@ Status ParseSortOrders(const nlohmann::json& json, int8_t format_version,
10661093
if (format_version > 1) {
10671094
return JsonParseError("{} must exist in format v{}", kSortOrders, format_version);
10681095
}
1069-
return NotImplementedError("Assign a default sort order");
1096+
auto sort_order = SortOrder::Unsorted();
1097+
default_sort_order_id = sort_order->order_id();
1098+
sort_orders.push_back(std::move(sort_order));
10701099
}
10711100
return {};
10721101
}
@@ -1083,7 +1112,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
10831112
ICEBERG_ASSIGN_OR_RAISE(table_metadata->format_version,
10841113
GetJsonValue<int8_t>(json, kFormatVersion));
10851114
if (table_metadata->format_version < 1 ||
1086-
table_metadata->format_version > TableMetadata::kSupportedTableFormatVersion) {
1115+
table_metadata->format_version > kSupportedTableFormatVersion) {
10871116
return JsonParseError("Cannot read unsupported version: {}",
10881117
table_metadata->format_version);
10891118
}
@@ -1097,7 +1126,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
10971126
ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_sequence_number,
10981127
GetJsonValue<int64_t>(json, kLastSequenceNumber));
10991128
} else {
1100-
table_metadata->last_sequence_number = TableMetadata::kInitialSequenceNumber;
1129+
table_metadata->last_sequence_number = kInitialSequenceNumber;
11011130
}
11021131
ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_column_id,
11031132
GetJsonValue<int32_t>(json, kLastColumnId));
@@ -1119,10 +1148,16 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
11191148
return JsonParseError("{} must exist in format v{}", kLastPartitionId,
11201149
table_metadata->format_version);
11211150
}
1122-
// TODO(gangwu): iterate all partition specs to find the largest partition
1123-
// field id or assign a default value for unpartitioned tables. However,
1124-
// PartitionSpec::lastAssignedFieldId() is not implemented yet.
1125-
return NotImplementedError("Find the largest partition field id");
1151+
1152+
if (table_metadata->partition_specs.empty()) {
1153+
table_metadata->last_partition_id =
1154+
PartitionSpec::Unpartitioned()->last_assigned_field_id();
1155+
} else {
1156+
table_metadata->last_partition_id =
1157+
std::ranges::max(table_metadata->partition_specs, {}, [](const auto& spec) {
1158+
return spec->last_assigned_field_id();
1159+
})->last_assigned_field_id();
1160+
}
11261161
}
11271162

11281163
ICEBERG_RETURN_UNEXPECTED(ParseSortOrders(json, table_metadata->format_version,
@@ -1136,14 +1171,13 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
11361171
// This field is optional, but internally we set this to -1 when not set
11371172
ICEBERG_ASSIGN_OR_RAISE(
11381173
table_metadata->current_snapshot_id,
1139-
GetJsonValueOrDefault<int64_t>(json, kCurrentSnapshotId,
1140-
TableMetadata::kInvalidSnapshotId));
1174+
GetJsonValueOrDefault<int64_t>(json, kCurrentSnapshotId, kInvalidSnapshotId));
11411175

11421176
if (table_metadata->format_version >= 3) {
11431177
ICEBERG_ASSIGN_OR_RAISE(table_metadata->next_row_id,
11441178
GetJsonValue<int64_t>(json, kNextRowId));
11451179
} else {
1146-
table_metadata->next_row_id = TableMetadata::kInitialRowId;
1180+
table_metadata->next_row_id = kInitialRowId;
11471181
}
11481182

11491183
ICEBERG_ASSIGN_OR_RAISE(auto last_updated_ms,
@@ -1155,7 +1189,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
11551189
ICEBERG_ASSIGN_OR_RAISE(
11561190
table_metadata->refs,
11571191
FromJsonMap<std::shared_ptr<SnapshotRef>>(json, kRefs, SnapshotRefFromJson));
1158-
} else if (table_metadata->current_snapshot_id != TableMetadata::kInvalidSnapshotId) {
1192+
} else if (table_metadata->current_snapshot_id != kInvalidSnapshotId) {
11591193
table_metadata->refs["main"] = std::make_unique<SnapshotRef>(SnapshotRef{
11601194
.snapshot_id = table_metadata->current_snapshot_id,
11611195
.retention = SnapshotRef::Branch{},

src/iceberg/json_internal.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,12 @@ nlohmann::json ToJson(const PartitionField& partition_field);
126126
/// and name.
127127
///
128128
/// \param json The JSON object representing a `PartitionField`.
129+
/// \param allow_field_id_missing Whether the field ID is allowed to be missing. This can
130+
/// happen when deserializing partition fields from V1 metadata files.
129131
/// \return An `expected` value containing either a `PartitionField` object or an error.
130132
/// If the JSON is malformed or missing expected fields, an error will be returned.
131133
Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
132-
const nlohmann::json& json);
134+
const nlohmann::json& json, bool allow_field_id_missing = false);
133135

134136
/// \brief Serializes a `PartitionSpec` object to JSON.
135137
///

src/iceberg/partition_spec.cc

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,38 @@
1919

2020
#include "iceberg/partition_spec.h"
2121

22+
#include <algorithm>
2223
#include <format>
24+
#include <ranges>
2325

26+
#include "iceberg/constants.h"
2427
#include "iceberg/schema.h"
25-
#include "iceberg/type.h"
2628
#include "iceberg/util/formatter.h" // IWYU pragma: keep
2729

2830
namespace iceberg {
2931

3032
PartitionSpec::PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
31-
std::vector<PartitionField> fields)
32-
: schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) {}
33+
std::vector<PartitionField> fields,
34+
std::optional<int32_t> last_assigned_field_id)
35+
: schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) {
36+
if (last_assigned_field_id) {
37+
last_assigned_field_id_ = last_assigned_field_id.value();
38+
} else if (fields_.empty()) {
39+
last_assigned_field_id_ = kLegacyPartitionDataIdStart - 1;
40+
} else {
41+
last_assigned_field_id_ = std::ranges::max(fields_, {}, [](const auto& field) {
42+
return field.field_id();
43+
}).field_id();
44+
}
45+
}
46+
47+
const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
48+
static const std::shared_ptr<PartitionSpec> unpartitioned =
49+
std::make_shared<PartitionSpec>(
50+
/*schema=*/nullptr, kInitialSpecId, std::vector<PartitionField>{},
51+
kLegacyPartitionDataIdStart - 1);
52+
return unpartitioned;
53+
}
3354

3455
const std::shared_ptr<Schema>& PartitionSpec::schema() const { return schema_; }
3556

@@ -47,8 +68,7 @@ std::string PartitionSpec::ToString() const {
4768
}
4869

4970
bool PartitionSpec::Equals(const PartitionSpec& other) const {
50-
return *schema_ == *other.schema_ && spec_id_ == other.spec_id_ &&
51-
fields_ == other.fields_;
71+
return spec_id_ == other.spec_id_ && fields_ == other.fields_;
5272
}
5373

5474
} // namespace iceberg

src/iceberg/partition_spec.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
/// Partition specs for Iceberg tables.
2424

2525
#include <cstdint>
26+
#include <optional>
2627
#include <span>
2728
#include <string>
2829
#include <vector>
@@ -40,8 +41,24 @@ namespace iceberg {
4041
/// evolution.
4142
class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
4243
public:
44+
/// \brief The start ID for partition field. It is only used to generate
45+
/// partition field id for v1 metadata where it is tracked.
46+
constexpr static int32_t kLegacyPartitionDataIdStart = 1000;
47+
48+
/// \brief Create a new partition spec.
49+
///
50+
/// \param schema The table schema.
51+
/// \param spec_id The spec ID.
52+
/// \param fields The partition fields.
53+
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
54+
/// be calculated from the fields.
4355
PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
44-
std::vector<PartitionField> fields);
56+
std::vector<PartitionField> fields,
57+
std::optional<int32_t> last_assigned_field_id = std::nullopt);
58+
59+
/// \brief Get an unsorted partition spec singleton.
60+
static const std::shared_ptr<PartitionSpec>& Unpartitioned();
61+
4562
/// \brief Get the table schema
4663
const std::shared_ptr<Schema>& schema() const;
4764
/// \brief Get the spec ID.
@@ -51,6 +68,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
5168

5269
std::string ToString() const override;
5370

71+
int32_t last_assigned_field_id() const { return last_assigned_field_id_; }
72+
5473
friend bool operator==(const PartitionSpec& lhs, const PartitionSpec& rhs) {
5574
return lhs.Equals(rhs);
5675
}
@@ -66,6 +85,7 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
6685
std::shared_ptr<Schema> schema_;
6786
const int32_t spec_id_;
6887
std::vector<PartitionField> fields_;
88+
int32_t last_assigned_field_id_;
6989
};
7090

7191
} // namespace iceberg

0 commit comments

Comments
 (0)