Skip to content

Commit 2aed97b

Browse files
authored
test: add table metadata deserialization test (#82)
- Added a set of test files from https://github.com/apache/iceberg/tree/main/core/src/test/resources - Fixed table metadata deserialization - Added singleton for unsorted sort order and unpartitioned spec - Added test case for deserializing valid v1 and v2 metadata files
1 parent 06ce20f commit 2aed97b

37 files changed

+1335
-76
lines changed

.github/.licenserc.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@ header:
1313
- 'NOTICE'
1414
- 'src/iceberg/expected.h'
1515
- 'src/iceberg/util/murmurhash3_internal.*'
16+
- 'test/resources/**'
1617

1718
comment: on-failure

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ repos:
3333
rev: v19.1.5
3434
hooks:
3535
- id: clang-format
36+
exclude: ^test/resources/.*\.json$
3637

3738
- repo: https://github.com/cheshirekow/cmake-format-precommit
3839
rev: v0.6.10

src/iceberg/json_internal.cc

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
#include <algorithm>
2323
#include <cstdint>
2424
#include <format>
25+
#include <ranges>
2526
#include <regex>
2627
#include <type_traits>
2728
#include <unordered_set>
2829

30+
#include <iceberg/table.h>
2931
#include <nlohmann/json.hpp>
3032

33+
#include "iceberg/partition_field.h"
3134
#include "iceberg/partition_spec.h"
3235
#include "iceberg/result.h"
3336
#include "iceberg/schema.h"
@@ -248,7 +251,7 @@ Result<std::vector<T>> FromJsonList(
248251
list.emplace_back(std::move(entry));
249252
}
250253
}
251-
return {};
254+
return list;
252255
}
253256

254257
/// \brief Parse a list of items from a JSON object.
@@ -471,7 +474,7 @@ nlohmann::json ToJson(const Type& type) {
471474

472475
nlohmann::json ToJson(const Schema& schema) {
473476
nlohmann::json json = ToJson(static_cast<const Type&>(schema));
474-
json[kSchemaId] = schema.schema_id();
477+
SetOptionalField(json, kSchemaId, schema.schema_id());
475478
// TODO(gangwu): add identifier-field-ids.
476479
return json;
477480
}
@@ -625,7 +628,7 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
625628
}
626629

627630
Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::json& json) {
628-
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValue<int32_t>(json, kSchemaId));
631+
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));
629632
ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json));
630633

631634
if (type->type_id() != TypeId::kStruct) [[unlikely]] {
@@ -658,9 +661,16 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec) {
658661
}
659662

660663
Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
661-
const nlohmann::json& json) {
664+
const nlohmann::json& json, bool allow_field_id_missing) {
662665
ICEBERG_ASSIGN_OR_RAISE(auto source_id, GetJsonValue<int32_t>(json, kSourceId));
663-
ICEBERG_ASSIGN_OR_RAISE(auto field_id, GetJsonValue<int32_t>(json, kFieldId));
666+
int32_t field_id;
667+
if (allow_field_id_missing) {
668+
// Partition field id in v1 is not tracked, so we use -1 to indicate that.
669+
ICEBERG_ASSIGN_OR_RAISE(field_id, GetJsonValueOrDefault<int32_t>(
670+
json, kFieldId, SchemaField::kInvalidFieldId));
671+
} else {
672+
ICEBERG_ASSIGN_OR_RAISE(field_id, GetJsonValue<int32_t>(json, kFieldId));
673+
}
664674
ICEBERG_ASSIGN_OR_RAISE(
665675
auto transform,
666676
GetJsonValue<std::string>(json, kTransform).and_then(TransformFromString));
@@ -905,7 +915,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
905915
}
906916

907917
// write the current schema ID and schema list
908-
json[kCurrentSchemaId] = table_metadata.current_schema_id;
918+
SetOptionalField(json, kCurrentSchemaId, table_metadata.current_schema_id);
909919
json[kSchemas] = ToJsonList(table_metadata.schemas);
910920

911921
// for older readers, continue writing the default spec as "partition-spec"
@@ -963,7 +973,8 @@ namespace {
963973
///
964974
/// \return The current schema or parse error.
965975
Result<std::shared_ptr<Schema>> ParseSchemas(
966-
const nlohmann::json& json, int8_t format_version, int32_t& current_schema_id,
976+
const nlohmann::json& json, int8_t format_version,
977+
std::optional<int32_t>& current_schema_id,
967978
std::vector<std::shared_ptr<Schema>>& schemas) {
968979
std::shared_ptr<Schema> current_schema;
969980
if (json.contains(kSchemas)) {
@@ -986,7 +997,7 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
986997
}
987998
if (!current_schema) {
988999
return JsonParseError("Cannot find schema with {}={} from {}", kCurrentSchemaId,
989-
current_schema_id, schema_array.dump());
1000+
current_schema_id.value(), schema_array.dump());
9901001
}
9911002
} else {
9921003
if (format_version != 1) {
@@ -1031,13 +1042,30 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
10311042
return JsonParseError("{} must exist in format v{}", kPartitionSpecs,
10321043
format_version);
10331044
}
1034-
default_spec_id = TableMetadata::kInitialSpecId;
10351045

1036-
ICEBERG_ASSIGN_OR_RAISE(auto spec, GetJsonValue<nlohmann::json>(json, kPartitionSpec)
1037-
.and_then([current_schema](const auto& json) {
1038-
return PartitionSpecFromJson(current_schema,
1039-
json);
1040-
}));
1046+
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec_json,
1047+
GetJsonValue<nlohmann::json>(json, kPartitionSpec));
1048+
if (!partition_spec_json.is_array()) {
1049+
return JsonParseError("Cannot parse v1 partition spec from non-array: {}",
1050+
partition_spec_json.dump());
1051+
}
1052+
1053+
int32_t next_partition_field_id = PartitionSpec::kLegacyPartitionDataIdStart;
1054+
std::vector<PartitionField> fields;
1055+
for (const auto& entry_json : partition_spec_json) {
1056+
ICEBERG_ASSIGN_OR_RAISE(auto field, PartitionFieldFromJson(entry_json));
1057+
int32_t field_id = field->field_id();
1058+
if (field_id == SchemaField::kInvalidFieldId) {
1059+
// If the field ID is not set, we need to assign a new one
1060+
field_id = next_partition_field_id++;
1061+
}
1062+
fields.emplace_back(field->source_id(), field_id, std::string(field->name()),
1063+
std::move(field->transform()));
1064+
}
1065+
1066+
auto spec = std::make_unique<PartitionSpec>(
1067+
current_schema, PartitionSpec::kInitialSpecId, std::move(fields));
1068+
default_spec_id = spec->spec_id();
10411069
partition_specs.push_back(std::move(spec));
10421070
}
10431071

@@ -1066,7 +1094,9 @@ Status ParseSortOrders(const nlohmann::json& json, int8_t format_version,
10661094
if (format_version > 1) {
10671095
return JsonParseError("{} must exist in format v{}", kSortOrders, format_version);
10681096
}
1069-
return NotImplementedError("Assign a default sort order");
1097+
auto sort_order = SortOrder::Unsorted();
1098+
default_sort_order_id = sort_order->order_id();
1099+
sort_orders.push_back(std::move(sort_order));
10701100
}
10711101
return {};
10721102
}
@@ -1119,10 +1149,16 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
11191149
return JsonParseError("{} must exist in format v{}", kLastPartitionId,
11201150
table_metadata->format_version);
11211151
}
1122-
// TODO(gangwu): iterate all partition specs to find the largest partition
1123-
// field id or assign a default value for unpartitioned tables. However,
1124-
// PartitionSpec::lastAssignedFieldId() is not implemented yet.
1125-
return NotImplementedError("Find the largest partition field id");
1152+
1153+
if (table_metadata->partition_specs.empty()) {
1154+
table_metadata->last_partition_id =
1155+
PartitionSpec::Unpartitioned()->last_assigned_field_id();
1156+
} else {
1157+
table_metadata->last_partition_id =
1158+
std::ranges::max(table_metadata->partition_specs, {}, [](const auto& spec) {
1159+
return spec->last_assigned_field_id();
1160+
})->last_assigned_field_id();
1161+
}
11261162
}
11271163

11281164
ICEBERG_RETURN_UNEXPECTED(ParseSortOrders(json, table_metadata->format_version,
@@ -1134,10 +1170,9 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
11341170
}
11351171

11361172
// This field is optional, but internally we set this to -1 when not set
1137-
ICEBERG_ASSIGN_OR_RAISE(
1138-
table_metadata->current_snapshot_id,
1139-
GetJsonValueOrDefault<int64_t>(json, kCurrentSnapshotId,
1140-
TableMetadata::kInvalidSnapshotId));
1173+
ICEBERG_ASSIGN_OR_RAISE(table_metadata->current_snapshot_id,
1174+
GetJsonValueOrDefault<int64_t>(json, kCurrentSnapshotId,
1175+
Snapshot::kInvalidSnapshotId));
11411176

11421177
if (table_metadata->format_version >= 3) {
11431178
ICEBERG_ASSIGN_OR_RAISE(table_metadata->next_row_id,
@@ -1155,7 +1190,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
11551190
ICEBERG_ASSIGN_OR_RAISE(
11561191
table_metadata->refs,
11571192
FromJsonMap<std::shared_ptr<SnapshotRef>>(json, kRefs, SnapshotRefFromJson));
1158-
} else if (table_metadata->current_snapshot_id != TableMetadata::kInvalidSnapshotId) {
1193+
} else if (table_metadata->current_snapshot_id != Snapshot::kInvalidSnapshotId) {
11591194
table_metadata->refs["main"] = std::make_unique<SnapshotRef>(SnapshotRef{
11601195
.snapshot_id = table_metadata->current_snapshot_id,
11611196
.retention = SnapshotRef::Branch{},

src/iceberg/json_internal.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,12 @@ nlohmann::json ToJson(const PartitionField& partition_field);
126126
/// and name.
127127
///
128128
/// \param json The JSON object representing a `PartitionField`.
129+
/// \param allow_field_id_missing Whether the field ID is allowed to be missing. This can
130+
/// happen when deserializing partition fields from V1 metadata files.
129131
/// \return An `expected` value containing either a `PartitionField` object or an error.
130132
/// If the JSON is malformed or missing expected fields, an error will be returned.
131133
Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
132-
const nlohmann::json& json);
134+
const nlohmann::json& json, bool allow_field_id_missing = false);
133135

134136
/// \brief Serializes a `PartitionSpec` object to JSON.
135137
///

src/iceberg/partition_spec.cc

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,37 @@
1919

2020
#include "iceberg/partition_spec.h"
2121

22+
#include <algorithm>
2223
#include <format>
24+
#include <ranges>
2325

2426
#include "iceberg/schema.h"
25-
#include "iceberg/type.h"
2627
#include "iceberg/util/formatter.h" // IWYU pragma: keep
2728

2829
namespace iceberg {
2930

3031
PartitionSpec::PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
31-
std::vector<PartitionField> fields)
32-
: schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) {}
32+
std::vector<PartitionField> fields,
33+
std::optional<int32_t> last_assigned_field_id)
34+
: schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) {
35+
if (last_assigned_field_id) {
36+
last_assigned_field_id_ = last_assigned_field_id.value();
37+
} else if (fields_.empty()) {
38+
last_assigned_field_id_ = kLegacyPartitionDataIdStart - 1;
39+
} else {
40+
last_assigned_field_id_ = std::ranges::max(fields_, {}, [](const auto& field) {
41+
return field.field_id();
42+
}).field_id();
43+
}
44+
}
45+
46+
const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
47+
static const std::shared_ptr<PartitionSpec> unpartitioned =
48+
std::make_shared<PartitionSpec>(
49+
/*schema=*/nullptr, kInitialSpecId, std::vector<PartitionField>{},
50+
kLegacyPartitionDataIdStart - 1);
51+
return unpartitioned;
52+
}
3353

3454
const std::shared_ptr<Schema>& PartitionSpec::schema() const { return schema_; }
3555

@@ -47,8 +67,7 @@ std::string PartitionSpec::ToString() const {
4767
}
4868

4969
bool PartitionSpec::Equals(const PartitionSpec& other) const {
50-
return *schema_ == *other.schema_ && spec_id_ == other.spec_id_ &&
51-
fields_ == other.fields_;
70+
return spec_id_ == other.spec_id_ && fields_ == other.fields_;
5271
}
5372

5473
} // namespace iceberg

src/iceberg/partition_spec.h

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
/// Partition specs for Iceberg tables.
2424

2525
#include <cstdint>
26+
#include <optional>
2627
#include <span>
2728
#include <string>
2829
#include <vector>
@@ -40,8 +41,25 @@ namespace iceberg {
4041
/// evolution.
4142
class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
4243
public:
44+
static constexpr int32_t kInitialSpecId = 0;
45+
/// \brief The start ID for partition field. It is only used to generate
46+
/// partition field id for v1 metadata where it is tracked.
47+
static constexpr int32_t kLegacyPartitionDataIdStart = 1000;
48+
49+
/// \brief Create a new partition spec.
50+
///
51+
/// \param schema The table schema.
52+
/// \param spec_id The spec ID.
53+
/// \param fields The partition fields.
54+
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
55+
/// be calculated from the fields.
4356
PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
44-
std::vector<PartitionField> fields);
57+
std::vector<PartitionField> fields,
58+
std::optional<int32_t> last_assigned_field_id = std::nullopt);
59+
60+
/// \brief Get an unsorted partition spec singleton.
61+
static const std::shared_ptr<PartitionSpec>& Unpartitioned();
62+
4563
/// \brief Get the table schema
4664
const std::shared_ptr<Schema>& schema() const;
4765
/// \brief Get the spec ID.
@@ -51,6 +69,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
5169

5270
std::string ToString() const override;
5371

72+
int32_t last_assigned_field_id() const { return last_assigned_field_id_; }
73+
5474
friend bool operator==(const PartitionSpec& lhs, const PartitionSpec& rhs) {
5575
return lhs.Equals(rhs);
5676
}
@@ -66,6 +86,7 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
6686
std::shared_ptr<Schema> schema_;
6787
const int32_t spec_id_;
6888
std::vector<PartitionField> fields_;
89+
int32_t last_assigned_field_id_;
6990
};
7091

7192
} // namespace iceberg

src/iceberg/result.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ enum class ErrorKind {
4242
kNotSupported,
4343
kInvalidExpression,
4444
kJsonParseError,
45+
kNotFound,
4546
};
4647

4748
/// \brief Error with a kind and a message.

src/iceberg/schema.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626

2727
namespace iceberg {
2828

29-
Schema::Schema(int32_t schema_id, std::vector<SchemaField> fields)
29+
Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id)
3030
: StructType(std::move(fields)), schema_id_(schema_id) {}
3131

32-
int32_t Schema::schema_id() const { return schema_id_; }
32+
std::optional<int32_t> Schema::schema_id() const { return schema_id_; }
3333

3434
std::string Schema::ToString() const {
3535
std::string repr = "schema<";

src/iceberg/schema.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
/// and any utility functions. See iceberg/type.h and iceberg/field.h as well.
2525

2626
#include <cstdint>
27+
#include <optional>
2728
#include <string>
2829
#include <vector>
2930

@@ -40,13 +41,15 @@ namespace iceberg {
4041
/// evolution.
4142
class ICEBERG_EXPORT Schema : public StructType {
4243
public:
43-
Schema(int32_t schema_id, std::vector<SchemaField> fields);
44+
static constexpr int32_t kInitialSchemaId = 0;
45+
46+
Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id);
4447

4548
/// \brief Get the schema ID.
4649
///
4750
/// A schema is identified by a unique ID for the purposes of schema
4851
/// evolution.
49-
[[nodiscard]] int32_t schema_id() const;
52+
[[nodiscard]] std::optional<int32_t> schema_id() const;
5053

5154
[[nodiscard]] std::string ToString() const override;
5255

@@ -58,7 +61,7 @@ class ICEBERG_EXPORT Schema : public StructType {
5861
/// \brief Compare two schemas for equality.
5962
[[nodiscard]] bool Equals(const Schema& other) const;
6063

61-
const int32_t schema_id_;
64+
const std::optional<int32_t> schema_id_;
6265
};
6366

6467
} // namespace iceberg

src/iceberg/schema_field.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ namespace iceberg {
3737
/// \brief A type combined with a name.
3838
class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable {
3939
public:
40+
static constexpr int32_t kInvalidFieldId = -1;
41+
4042
/// \brief Construct a field.
4143
/// \param[in] field_id The field ID.
4244
/// \param[in] name The field name.

0 commit comments

Comments
 (0)