From 3ec142a4209eb7019b04a89d648cf00aab44a2ba Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Fri, 11 Apr 2025 20:51:06 +0800 Subject: [PATCH 1/5] feat: snapshot ser/der Signed-off-by: Junwang Zhao --- src/iceberg/json_internal.cc | 183 +++++++++++++++++++++++++++++++++++ src/iceberg/json_internal.h | 25 +++++ src/iceberg/snapshot.cc | 28 ++++++ src/iceberg/snapshot.h | 59 +++++++++++ src/iceberg/type_fwd.h | 4 +- test/json_internal_test.cc | 104 ++++++++++++++++++-- 6 files changed, 396 insertions(+), 7 deletions(-) diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index d69b46297..c5e5c1439 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -21,6 +21,7 @@ #include #include +#include #include @@ -28,6 +29,7 @@ #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" +#include "iceberg/snapshot.h" #include "iceberg/sort_order.h" #include "iceberg/transform.h" #include "iceberg/type.h" @@ -70,6 +72,53 @@ constexpr std::string_view kValueRequired = "value-required"; constexpr std::string_view kFieldId = "field-id"; constexpr std::string_view kSpecId = "spec-id"; +constexpr std::string_view kSnapshotId = "snapshot-id"; +constexpr std::string_view kParentSnapshotId = "parent-snapshot-id"; +constexpr std::string_view kSequenceNumber = "sequence-number"; +constexpr std::string_view kTimestampMs = "timestamp-ms"; +constexpr std::string_view kManifestList = "manifest-list"; +constexpr std::string_view kSummary = "summary"; +constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep"; +constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms"; +constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms"; + +const std::unordered_set kValidSnapshotSummaryFields = { + SnapshotSummaryFields::kOperation, + SnapshotSummaryFields::kAddedDataFiles, + SnapshotSummaryFields::kDeletedDataFiles, + SnapshotSummaryFields::kTotalDataFiles, + SnapshotSummaryFields::kAddedDeleteFiles, + SnapshotSummaryFields::kAddedEqDeleteFiles, + SnapshotSummaryFields::kRemovedEqDeleteFiles, + SnapshotSummaryFields::kAddedPosDeleteFiles, + SnapshotSummaryFields::kRemovedPosDeleteFiles, + SnapshotSummaryFields::kAddedDVs, + SnapshotSummaryFields::kRemovedDVs, + SnapshotSummaryFields::kRemovedDeleteFiles, + SnapshotSummaryFields::kTotalDeleteFiles, + SnapshotSummaryFields::kAddedRecords, + SnapshotSummaryFields::kDeletedRecords, + SnapshotSummaryFields::kTotalRecords, + SnapshotSummaryFields::kAddedFileSize, + SnapshotSummaryFields::kRemovedFileSize, + SnapshotSummaryFields::kTotalFileSize, + SnapshotSummaryFields::kAddedPosDeletes, + SnapshotSummaryFields::kRemovedPosDeletes, + SnapshotSummaryFields::kTotalPosDeletes, + SnapshotSummaryFields::kAddedEqDeletes, + SnapshotSummaryFields::kRemovedEqDeletes, + SnapshotSummaryFields::kTotalEqDeletes, + SnapshotSummaryFields::kDeletedDuplicatedFiles, + SnapshotSummaryFields::kChangedPartitionCountProp, + SnapshotSummaryFields::kWAPID, + SnapshotSummaryFields::kPublishedWAPID, + SnapshotSummaryFields::kSourceSnapshotID, + SnapshotSummaryFields::kEngineName, + SnapshotSummaryFields::kEngineVersion}; + +const std::unordered_set kValidDataOperation = { + DataOperation::kAppend, DataOperation::kReplace, DataOperation::kOverwrite, + DataOperation::kDelete}; template Result GetJsonValue(const nlohmann::json& json, std::string_view key) { @@ -89,6 +138,22 @@ Result GetJsonValue(const nlohmann::json& json, std::string_view key) { } } +template +Result> GetJsonValueOptional(const nlohmann::json& json, + std::string_view key) { + if (!json.contains(key)) { + return std::nullopt; + } + try { + return json.at(key).get(); + } catch (const std::exception& ex) { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Failed to parse key '{}' in {}", key, json.dump()), + }); + } +} + } // namespace nlohmann::json ToJson(const SortField& sort_field) { @@ -231,6 +296,53 @@ nlohmann::json SchemaToJson(const Schema& schema) { return json; } +nlohmann::json SnapshotRefToJson(const SnapshotRef& ref) { + nlohmann::json json; + json[kSnapshotId] = ref.snapshot_id; + json[kType] = SnapshotRefTypeToString(ref.type()); + if (ref.type() == SnapshotRefType::kBranch) { + const auto& branch = std::get(ref.retention); + if (branch.min_snapshots_to_keep.has_value()) { + json[kMinSnapshotsToKeep] = *branch.min_snapshots_to_keep; + } + if (branch.max_snapshot_age_ms.has_value()) { + json[kMaxSnapshotAgeMs] = *branch.max_snapshot_age_ms; + } + if (branch.max_ref_age_ms.has_value()) { + json[kMaxRefAgeMs] = *branch.max_ref_age_ms; + } + } else if (ref.type() == SnapshotRefType::kTag) { + const auto& tag = std::get(ref.retention); + if (tag.max_ref_age_ms.has_value()) { + json[kMaxRefAgeMs] = *tag.max_ref_age_ms; + } + } + return json; +} + +nlohmann::json SnapshotToJson(const Snapshot& snapshot) { + nlohmann::json json; + json[kSnapshotId] = snapshot.snapshot_id; + if (snapshot.parent_snapshot_id.has_value()) { + json[kParentSnapshotId] = *snapshot.parent_snapshot_id; + } + json[kSequenceNumber] = snapshot.sequence_number; + json[kTimestampMs] = snapshot.timestamp_ms; + json[kManifestList] = snapshot.manifest_list; + + nlohmann::json summary_json; + for (const auto& [key, value] : snapshot.summary) { + summary_json[key] = value; + } + json[kSummary] = summary_json; + + if (snapshot.schema_id.has_value()) { + json[kSchemaId] = *snapshot.schema_id; + } + + return json; +} + namespace { Result> StructTypeFromJson(const nlohmann::json& json) { @@ -419,4 +531,75 @@ Result> PartitionSpecFromJson( return std::make_unique(schema, spec_id, std::move(partition_fields)); } +Result> SnapshotRefFromJson(const nlohmann::json& json) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue(json, kSnapshotId)); + ICEBERG_ASSIGN_OR_RAISE( + auto type, + GetJsonValue(json, kType).and_then(SnapshotRefTypeFromString)); + if (type == SnapshotRefType::kBranch) { + ICEBERG_ASSIGN_OR_RAISE(auto min_snapshots_to_keep, + GetJsonValueOptional(json, kMinSnapshotsToKeep)); + ICEBERG_ASSIGN_OR_RAISE(auto max_snapshot_age_ms, + GetJsonValueOptional(json, kMaxSnapshotAgeMs)); + ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms, + GetJsonValueOptional(json, kMaxRefAgeMs)); + + return std::make_unique( + snapshot_id, SnapshotRef::Branch{.min_snapshots_to_keep = min_snapshots_to_keep, + .max_snapshot_age_ms = max_snapshot_age_ms, + .max_ref_age_ms = max_ref_age_ms}); + } else { + ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms, + GetJsonValueOptional(json, kMaxRefAgeMs)); + + return std::make_unique( + snapshot_id, SnapshotRef::Tag{.max_ref_age_ms = max_ref_age_ms}); + } +} + +Result> SnapshotFromJson(const nlohmann::json& json) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue(json, kSnapshotId)); + ICEBERG_ASSIGN_OR_RAISE(auto sequence_number, + GetJsonValue(json, kSequenceNumber)); + ICEBERG_ASSIGN_OR_RAISE(auto timestamp_ms, GetJsonValue(json, kTimestampMs)); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_list, + GetJsonValue(json, kManifestList)); + + ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot_id, + GetJsonValueOptional(json, kParentSnapshotId)); + + ICEBERG_ASSIGN_OR_RAISE(auto summary_json, + GetJsonValue(json, kSummary)); + std::unordered_map summary; + for (const auto& [key, value] : summary_json.items()) { + if (!kValidSnapshotSummaryFields.contains(key)) { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Invalid snapshot summary field: {}", key), + }); + } + if (!value.is_string()) { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = + std::format("Invalid snapshot summary field value: {}", value.dump()), + }); + } + if (key == SnapshotSummaryFields::kOperation && + !kValidDataOperation.contains(value.get())) { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Invalid snapshot operation: {}", value.dump()), + }); + } + summary[key] = value.get(); + } + + ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional(json, kSchemaId)); + + return std::make_unique(snapshot_id, parent_snapshot_id, sequence_number, + timestamp_ms, manifest_list, std::move(summary), + schema_id); +} + } // namespace iceberg diff --git a/src/iceberg/json_internal.h b/src/iceberg/json_internal.h index 4e754fd61..52447b267 100644 --- a/src/iceberg/json_internal.h +++ b/src/iceberg/json_internal.h @@ -88,6 +88,18 @@ nlohmann::json TypeToJson(const Type& type); /// \return The JSON representation of the field. nlohmann::json FieldToJson(const SchemaField& field); +/// \brief Convert an Iceberg SnapshotRef to JSON. +/// +/// \param[in] snapshot_ref The Iceberg snapshot reference to convert. +/// \return The JSON representation of the snapshot reference. +nlohmann::json SnapshotRefToJson(const SnapshotRef& snapshot_ref); + +/// \brief Convert an Iceberg Snapshot to JSON. +/// +/// \param[in] snapshot The Iceberg snapshot to convert. +/// \return The JSON representation of the snapshot. +nlohmann::json SnapshotToJson(const Snapshot& snapshot); + /// \brief Convert JSON to an Iceberg Schema. /// /// \param[in] json The JSON representation of the schema. @@ -153,4 +165,17 @@ Result> PartitionFieldFromJson( /// the JSON is malformed or missing expected fields, an error will be returned. Result> PartitionSpecFromJson( const std::shared_ptr& schema, const nlohmann::json& json); + +/// \brief Convert JSON to an Iceberg SnapshotRef. +/// +/// \param[in] json The JSON representation of the snapshot reference. +/// \return The Iceberg snapshot reference or an error if the conversion fails. +Result> SnapshotRefFromJson(const nlohmann::json& json); + +/// \brief Convert JSON to an Iceberg Snapshot. +/// +/// \param[in] json The JSON representation of the snapshot. +/// \return The Iceberg snapshot or an error if the conversion fails. +Result> SnapshotFromJson(const nlohmann::json& json); + } // namespace iceberg diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc index a17d9e8b2..fb994f8b7 100644 --- a/src/iceberg/snapshot.cc +++ b/src/iceberg/snapshot.cc @@ -21,6 +21,16 @@ namespace iceberg { +bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const { + return min_snapshots_to_keep == other.min_snapshots_to_keep && + max_snapshot_age_ms == other.max_snapshot_age_ms && + max_ref_age_ms == other.max_ref_age_ms; +} + +bool SnapshotRef::Tag::Equals(const SnapshotRef::Tag& other) const { + return max_ref_age_ms == other.max_ref_age_ms; +} + SnapshotRefType SnapshotRef::type() const noexcept { return std::visit( [&](const auto& retention) -> SnapshotRefType { @@ -34,6 +44,24 @@ SnapshotRefType SnapshotRef::type() const noexcept { retention); } +bool SnapshotRef::Equals(const SnapshotRef& other) const { + if (this == &other) { + return true; + } + if (type() != other.type()) { + return false; + } + + if (type() == SnapshotRefType::kBranch) { + return snapshot_id == other.snapshot_id && + std::get(retention) == std::get(other.retention); + + } else { + return snapshot_id == other.snapshot_id && + std::get(retention) == std::get(other.retention); + } +} + std::optional Snapshot::operation() const { auto it = summary.find(SnapshotSummaryFields::kOperation); if (it != summary.end()) { diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index 29577e9b1..0457f8de1 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -26,6 +26,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/result.h" namespace iceberg { @@ -39,6 +40,28 @@ enum class SnapshotRefType { kTag, }; +/// \brief Get the relative snapshot reference type name +ICEBERG_EXPORT constexpr std::string_view SnapshotRefTypeToString( + SnapshotRefType type) noexcept { + switch (type) { + case SnapshotRefType::kBranch: + return "branch"; + case SnapshotRefType::kTag: + return "tag"; + default: + return "invalid"; + } +} +/// \brief Get the relative snapshot reference type from name +ICEBERG_EXPORT constexpr Result SnapshotRefTypeFromString( + std::string_view str) noexcept { + if (str == "branch") return SnapshotRefType::kBranch; + if (str == "tag") return SnapshotRefType::kTag; + return unexpected( + {.kind = ErrorKind::kInvalidArgument, + .message = "Invalid snapshot reference type: {}" + std::string(str)}); +} + /// \brief A reference to a snapshot, either a branch or a tag. struct ICEBERG_EXPORT SnapshotRef { struct ICEBERG_EXPORT Branch { @@ -54,6 +77,18 @@ struct ICEBERG_EXPORT SnapshotRef { /// of the snapshot reference to keep while expiring snapshots. Defaults to table /// property history.expire.max-ref-age-ms. The main branch never expires. std::optional max_ref_age_ms; + + /// \brief Compare two branches for equality. + friend bool operator==(const Branch& lhs, const Branch& rhs) { + return lhs.Equals(rhs); + } + + /// \brief Compare two branches for inequality. + friend bool operator!=(const Branch& lhs, const Branch& rhs) { return !(lhs == rhs); } + + private: + /// \brief Compare two branches for equality. + bool Equals(const Branch& other) const; }; struct ICEBERG_EXPORT Tag { @@ -61,6 +96,16 @@ struct ICEBERG_EXPORT SnapshotRef { /// of the snapshot reference to keep while expiring snapshots. Defaults to table /// property history.expire.max-ref-age-ms. The main branch never expires. std::optional max_ref_age_ms; + + /// \brief Compare two tags for equality. + friend bool operator==(const Tag& lhs, const Tag& rhs) { return lhs.Equals(rhs); } + + /// \brief Compare two tags for inequality. + friend bool operator!=(const Tag& lhs, const Tag& rhs) { return !(lhs == rhs); } + + private: + /// \brief Compare two tags for equality. + bool Equals(const Tag& other) const; }; /// A reference's snapshot ID. The tagged snapshot or latest snapshot of a branch. @@ -69,6 +114,20 @@ struct ICEBERG_EXPORT SnapshotRef { std::variant retention; SnapshotRefType type() const noexcept; + + /// \brief Compare two snapshot refs for equality + friend bool operator==(const SnapshotRef& lhs, const SnapshotRef& rhs) { + return lhs.Equals(rhs); + } + + /// \brief Compare two snapshot refs for inequality. + friend bool operator!=(const SnapshotRef& lhs, const SnapshotRef& rhs) { + return !(lhs == rhs); + } + + private: + /// \brief Compare two snapshot refs for equality. + bool Equals(const SnapshotRef& other) const; }; /// \brief Optional Snapshot Summary Fields diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 519164ef9..44db1c096 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -97,10 +97,12 @@ class Transaction; class HistoryEntry; class Snapshot; +struct SnapshotRef; +enum class SnapshotRefType; class SortField; class SortOrder; class StructLike; -class TableMetadata; +struct TableMetadata; class Transform; enum class TransformType; class TransformFunction; diff --git a/test/json_internal_test.cc b/test/json_internal_test.cc index 10b25f56d..4fb920562 100644 --- a/test/json_internal_test.cc +++ b/test/json_internal_test.cc @@ -19,14 +19,15 @@ #include "iceberg/json_internal.h" -#include #include #include +#include #include #include "iceberg/partition_spec.h" #include "iceberg/schema.h" +#include "iceberg/snapshot.h" #include "iceberg/sort_field.h" #include "iceberg/sort_order.h" #include "iceberg/transform.h" @@ -37,21 +38,20 @@ namespace iceberg { namespace { // Specialized FromJson helper based on type template -expected, Error> FromJsonHelper(const nlohmann::json& json); +Result> FromJsonHelper(const nlohmann::json& json); template <> -expected, Error> FromJsonHelper(const nlohmann::json& json) { +Result> FromJsonHelper(const nlohmann::json& json) { return SortFieldFromJson(json); } template <> -expected, Error> FromJsonHelper(const nlohmann::json& json) { +Result> FromJsonHelper(const nlohmann::json& json) { return SortOrderFromJson(json); } template <> -expected, Error> FromJsonHelper( - const nlohmann::json& json) { +Result> FromJsonHelper(const nlohmann::json& json) { return PartitionFieldFromJson(json); } @@ -148,4 +148,96 @@ TEST(JsonPartitionTest, PartitionSpec) { EXPECT_EQ(spec, *parsed_spec_result.value()); } +TEST(JsonInternalTest, SnapshotRefBranch) { + SnapshotRef ref(1234567890, SnapshotRef::Branch{.min_snapshots_to_keep = 10, + .max_snapshot_age_ms = 123456789, + .max_ref_age_ms = 987654321}); + + // Create a JSON object with the expected values + nlohmann::json expected_json = + R"({"snapshot-id":1234567890, + "type":"branch", + "min-snapshots-to-keep":10, + "max-snapshot-age-ms":123456789, + "max-ref-age-ms":987654321})"_json; + + auto json = SnapshotRefToJson(ref); + EXPECT_EQ(expected_json, json) << "JSON conversion mismatch."; + + auto obj_ex = SnapshotRefFromJson(expected_json); + EXPECT_TRUE(obj_ex.has_value()) << "Failed to deserialize JSON."; + EXPECT_EQ(ref, *obj_ex.value()) << "Deserialized object mismatch."; +} + +TEST(JsonInternalTest, SnapshotRefTag) { + SnapshotRef ref(9876543210, SnapshotRef::Tag{.max_ref_age_ms = 54321}); + + // Create a JSON object with the expected values + nlohmann::json expected_json = + R"({"snapshot-id":9876543210, + "type":"tag", + "max-ref-age-ms":54321})"_json; + + auto json = SnapshotRefToJson(ref); + EXPECT_EQ(expected_json, json) << "JSON conversion mismatch."; + + auto obj_ex = SnapshotRefFromJson(expected_json); + EXPECT_TRUE(obj_ex.has_value()) << "Failed to deserialize JSON."; + EXPECT_EQ(ref, *obj_ex.value()) << "Deserialized object mismatch."; +} + +TEST(JsonInternalTest, Snapshot) { + std::unordered_map summary = { + {SnapshotSummaryFields::kOperation, DataOperation::kAppend}, + {SnapshotSummaryFields::kAddedDataFiles, "50"}}; + + Snapshot snapshot{.snapshot_id = 1234567890, + .parent_snapshot_id = 9876543210, + .sequence_number = 99, + .timestamp_ms = 1234567890123, + .manifest_list = "/path/to/manifest_list", + .summary = summary, + .schema_id = 42}; + + // Create a JSON object with the expected values + nlohmann::json expected_json = + R"({"snapshot-id":1234567890, + "parent-snapshot-id":9876543210, + "sequence-number":99, + "timestamp-ms":1234567890123, + "manifest-list":"/path/to/manifest_list", + "summary":{ + "operation":"append", + "added-data-files":"50" + }, + "schema-id":42})"_json; + + auto json = SnapshotToJson(snapshot); + EXPECT_EQ(expected_json, json) << "JSON conversion mismatch."; + + auto obj_ex = SnapshotFromJson(expected_json); + EXPECT_TRUE(obj_ex.has_value()) << "Failed to deserialize JSON."; + EXPECT_EQ(snapshot, *obj_ex.value()) << "Deserialized object mismatch."; +} + +TEST(JsonInternalTest, SnapshotFromJsonWithInvalidSummary) { + nlohmann::json invalid_json_snapshot = + R"({"snapshot-id":1234567890, + "parent-snapshot-id":9876543210, + "sequence-number":99, + "timestamp-ms":1234567890123, + "manifest-list":"/path/to/manifest_list", + "summary":{ + "invalid-field":"value" + }, + "schema-id":42})"_json; + + auto result = SnapshotFromJson(invalid_json_snapshot); + ASSERT_FALSE(result.has_value()); + + EXPECT_EQ(result.error().kind, ErrorKind::kJsonParseError); + EXPECT_TRUE(result.error().message.find("Invalid snapshot summary field") != + std::string::npos); +} + } // namespace iceberg From ef171a4aa9e54e8bd8c2ef8377cc10d8c92e5d8c Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 15 Apr 2025 12:48:23 +0800 Subject: [PATCH 2/5] fix: review comments --- src/iceberg/json_internal.cc | 47 ++++++++++++++---------------------- src/iceberg/snapshot.h | 8 +++--- test/json_internal_test.cc | 6 ++--- 3 files changed, 24 insertions(+), 37 deletions(-) diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index c5e5c1439..0ec5dd8fe 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -110,9 +110,9 @@ const std::unordered_set kValidSnapshotSummaryFields = { SnapshotSummaryFields::kTotalEqDeletes, SnapshotSummaryFields::kDeletedDuplicatedFiles, SnapshotSummaryFields::kChangedPartitionCountProp, - SnapshotSummaryFields::kWAPID, - SnapshotSummaryFields::kPublishedWAPID, - SnapshotSummaryFields::kSourceSnapshotID, + SnapshotSummaryFields::kWAPId, + SnapshotSummaryFields::kPublishedWAPId, + SnapshotSummaryFields::kSourceSnapshotId, SnapshotSummaryFields::kEngineName, SnapshotSummaryFields::kEngineVersion}; @@ -154,6 +154,14 @@ Result> GetJsonValueOptional(const nlohmann::json& json, } } +template +void SetOptionalField(nlohmann::json& json, std::string_view key, + const std::optional& value) { + if (value.has_value()) { + json[key] = *value; + } +} + } // namespace nlohmann::json ToJson(const SortField& sort_field) { @@ -302,20 +310,12 @@ nlohmann::json SnapshotRefToJson(const SnapshotRef& ref) { json[kType] = SnapshotRefTypeToString(ref.type()); if (ref.type() == SnapshotRefType::kBranch) { const auto& branch = std::get(ref.retention); - if (branch.min_snapshots_to_keep.has_value()) { - json[kMinSnapshotsToKeep] = *branch.min_snapshots_to_keep; - } - if (branch.max_snapshot_age_ms.has_value()) { - json[kMaxSnapshotAgeMs] = *branch.max_snapshot_age_ms; - } - if (branch.max_ref_age_ms.has_value()) { - json[kMaxRefAgeMs] = *branch.max_ref_age_ms; - } + SetOptionalField(json, kMinSnapshotsToKeep, branch.min_snapshots_to_keep); + SetOptionalField(json, kMaxSnapshotAgeMs, branch.max_snapshot_age_ms); + SetOptionalField(json, kMaxRefAgeMs, branch.max_ref_age_ms); } else if (ref.type() == SnapshotRefType::kTag) { const auto& tag = std::get(ref.retention); - if (tag.max_ref_age_ms.has_value()) { - json[kMaxRefAgeMs] = *tag.max_ref_age_ms; - } + SetOptionalField(json, kMaxRefAgeMs, tag.max_ref_age_ms); } return json; } @@ -323,23 +323,12 @@ nlohmann::json SnapshotRefToJson(const SnapshotRef& ref) { nlohmann::json SnapshotToJson(const Snapshot& snapshot) { nlohmann::json json; json[kSnapshotId] = snapshot.snapshot_id; - if (snapshot.parent_snapshot_id.has_value()) { - json[kParentSnapshotId] = *snapshot.parent_snapshot_id; - } + SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id); json[kSequenceNumber] = snapshot.sequence_number; json[kTimestampMs] = snapshot.timestamp_ms; json[kManifestList] = snapshot.manifest_list; - - nlohmann::json summary_json; - for (const auto& [key, value] : snapshot.summary) { - summary_json[key] = value; - } - json[kSummary] = summary_json; - - if (snapshot.schema_id.has_value()) { - json[kSchemaId] = *snapshot.schema_id; - } - + json[kSummary] = snapshot.summary; + SetOptionalField(json, kSchemaId, snapshot.schema_id); return json; } diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index 0457f8de1..da9dd95b0 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -48,8 +48,6 @@ ICEBERG_EXPORT constexpr std::string_view SnapshotRefTypeToString( return "branch"; case SnapshotRefType::kTag: return "tag"; - default: - return "invalid"; } } /// \brief Get the relative snapshot reference type from name @@ -198,11 +196,11 @@ struct SnapshotSummaryFields { /// Other Fields, see https://iceberg.apache.org/spec/#other-fields /// \brief The Write-Audit-Publish id of a staged snapshot - inline static const std::string kWAPID = "wap.id"; + inline static const std::string kWAPId = "wap.id"; /// \brief The Write-Audit-Publish id of a snapshot already been published - inline static const std::string kPublishedWAPID = "published-wap-id"; + inline static const std::string kPublishedWAPId = "published-wap-id"; /// \brief The original id of a cherry-picked snapshot - inline static const std::string kSourceSnapshotID = "source-snapshot-id"; + inline static const std::string kSourceSnapshotId = "source-snapshot-id"; /// \brief Name of the engine that created the snapshot inline static const std::string kEngineName = "engine-name"; /// \brief Version of the engine that created the snapshot diff --git a/test/json_internal_test.cc b/test/json_internal_test.cc index 4fb920562..382995af5 100644 --- a/test/json_internal_test.cc +++ b/test/json_internal_test.cc @@ -32,6 +32,7 @@ #include "iceberg/sort_order.h" #include "iceberg/transform.h" #include "iceberg/util/formatter.h" // IWYU pragma: keep +#include "matchers.h" namespace iceberg { @@ -235,9 +236,8 @@ TEST(JsonInternalTest, SnapshotFromJsonWithInvalidSummary) { auto result = SnapshotFromJson(invalid_json_snapshot); ASSERT_FALSE(result.has_value()); - EXPECT_EQ(result.error().kind, ErrorKind::kJsonParseError); - EXPECT_TRUE(result.error().message.find("Invalid snapshot summary field") != - std::string::npos); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Invalid snapshot summary field")); } } // namespace iceberg From 8847ca8d22090d7e3b6d730c9fcc088c31a15220 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 15 Apr 2025 20:38:47 +0800 Subject: [PATCH 3/5] use ToJson for Snapshot serialization --- src/iceberg/json_internal.cc | 55 ++++++++++++++++++++---------------- src/iceberg/json_internal.h | 26 ++++++++--------- test/json_internal_test.cc | 35 +++++++++++------------ 3 files changed, 59 insertions(+), 57 deletions(-) diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index 0ec5dd8fe..a8ad110cc 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -304,7 +304,7 @@ nlohmann::json SchemaToJson(const Schema& schema) { return json; } -nlohmann::json SnapshotRefToJson(const SnapshotRef& ref) { +nlohmann::json ToJson(const SnapshotRef& ref) { nlohmann::json json; json[kSnapshotId] = ref.snapshot_id; json[kType] = SnapshotRefTypeToString(ref.type()); @@ -320,14 +320,17 @@ nlohmann::json SnapshotRefToJson(const SnapshotRef& ref) { return json; } -nlohmann::json SnapshotToJson(const Snapshot& snapshot) { +nlohmann::json ToJson(const Snapshot& snapshot) { nlohmann::json json; json[kSnapshotId] = snapshot.snapshot_id; SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id); json[kSequenceNumber] = snapshot.sequence_number; json[kTimestampMs] = snapshot.timestamp_ms; json[kManifestList] = snapshot.manifest_list; - json[kSummary] = snapshot.summary; + // If there is an operation, write the summary map + if (snapshot.operation().has_value()) { + json[kSummary] = snapshot.summary; + } SetOptionalField(json, kSchemaId, snapshot.schema_id); return json; } @@ -558,30 +561,32 @@ Result> SnapshotFromJson(const nlohmann::json& json) { GetJsonValueOptional(json, kParentSnapshotId)); ICEBERG_ASSIGN_OR_RAISE(auto summary_json, - GetJsonValue(json, kSummary)); + GetJsonValueOptional(json, kSummary)); std::unordered_map summary; - for (const auto& [key, value] : summary_json.items()) { - if (!kValidSnapshotSummaryFields.contains(key)) { - return unexpected({ - .kind = ErrorKind::kJsonParseError, - .message = std::format("Invalid snapshot summary field: {}", key), - }); - } - if (!value.is_string()) { - return unexpected({ - .kind = ErrorKind::kJsonParseError, - .message = - std::format("Invalid snapshot summary field value: {}", value.dump()), - }); - } - if (key == SnapshotSummaryFields::kOperation && - !kValidDataOperation.contains(value.get())) { - return unexpected({ - .kind = ErrorKind::kJsonParseError, - .message = std::format("Invalid snapshot operation: {}", value.dump()), - }); + if (summary_json.has_value()) { + for (const auto& [key, value] : summary_json->items()) { + if (!kValidSnapshotSummaryFields.contains(key)) { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Invalid snapshot summary field: {}", key), + }); + } + if (!value.is_string()) { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = + std::format("Invalid snapshot summary field value: {}", value.dump()), + }); + } + if (key == SnapshotSummaryFields::kOperation && + !kValidDataOperation.contains(value.get())) { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Invalid snapshot operation: {}", value.dump()), + }); + } + summary[key] = value.get(); } - summary[key] = value.get(); } ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional(json, kSchemaId)); diff --git a/src/iceberg/json_internal.h b/src/iceberg/json_internal.h index 52447b267..450578ff1 100644 --- a/src/iceberg/json_internal.h +++ b/src/iceberg/json_internal.h @@ -88,17 +88,17 @@ nlohmann::json TypeToJson(const Type& type); /// \return The JSON representation of the field. nlohmann::json FieldToJson(const SchemaField& field); -/// \brief Convert an Iceberg SnapshotRef to JSON. +/// \brief Serializes a `SnapshotRef` object to JSON. /// -/// \param[in] snapshot_ref The Iceberg snapshot reference to convert. -/// \return The JSON representation of the snapshot reference. -nlohmann::json SnapshotRefToJson(const SnapshotRef& snapshot_ref); +/// \param[in] snapshot_ref The `SnapshotRef` object to be serialized. +/// \return A JSON object representing the `SnapshotRef`. +nlohmann::json ToJson(const SnapshotRef& snapshot_ref); -/// \brief Convert an Iceberg Snapshot to JSON. +/// \brief Serializes a `Snapshot` object to JSON. /// -/// \param[in] snapshot The Iceberg snapshot to convert. -/// \return The JSON representation of the snapshot. -nlohmann::json SnapshotToJson(const Snapshot& snapshot); +/// \param[in] snapshot The `Snapshot` object to be serialized. +/// \return A JSON object representing the `snapshot`. +nlohmann::json ToJson(const Snapshot& snapshot); /// \brief Convert JSON to an Iceberg Schema. /// @@ -166,16 +166,16 @@ Result> PartitionFieldFromJson( Result> PartitionSpecFromJson( const std::shared_ptr& schema, const nlohmann::json& json); -/// \brief Convert JSON to an Iceberg SnapshotRef. +/// \brief Deserializes a JSON object into a `SnapshotRef` object. /// -/// \param[in] json The JSON representation of the snapshot reference. -/// \return The Iceberg snapshot reference or an error if the conversion fails. +/// \param[in] json The JSON object representing a `SnapshotRef`. +/// \return A `SnapshotRef` object or an error if the conversion fails. Result> SnapshotRefFromJson(const nlohmann::json& json); -/// \brief Convert JSON to an Iceberg Snapshot. +/// \brief Deserializes a JSON object into a `Snapshot` object. /// /// \param[in] json The JSON representation of the snapshot. -/// \return The Iceberg snapshot or an error if the conversion fails. +/// \return A `Snapshot` object or an error if the conversion fails. Result> SnapshotFromJson(const nlohmann::json& json); } // namespace iceberg diff --git a/test/json_internal_test.cc b/test/json_internal_test.cc index 382995af5..4c6eaf361 100644 --- a/test/json_internal_test.cc +++ b/test/json_internal_test.cc @@ -25,6 +25,7 @@ #include #include +#include "gmock/gmock.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" @@ -56,6 +57,16 @@ Result> FromJsonHelper(const nlohmann::json& jso return PartitionFieldFromJson(json); } +template <> +Result> FromJsonHelper(const nlohmann::json& json) { + return SnapshotRefFromJson(json); +} + +template <> +Result> FromJsonHelper(const nlohmann::json& json) { + return SnapshotFromJson(json); +} + // Helper function to reduce duplication in testing template void TestJsonConversion(const T& obj, const nlohmann::json& expected_json) { @@ -117,7 +128,8 @@ TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) { auto result = PartitionFieldFromJson(invalid_json); EXPECT_FALSE(result.has_value()); - EXPECT_EQ(result.error().kind, ErrorKind::kJsonParseError); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Missing 'source-id'")); } TEST(JsonPartitionTest, PartitionSpec) { @@ -162,12 +174,7 @@ TEST(JsonInternalTest, SnapshotRefBranch) { "max-snapshot-age-ms":123456789, "max-ref-age-ms":987654321})"_json; - auto json = SnapshotRefToJson(ref); - EXPECT_EQ(expected_json, json) << "JSON conversion mismatch."; - - auto obj_ex = SnapshotRefFromJson(expected_json); - EXPECT_TRUE(obj_ex.has_value()) << "Failed to deserialize JSON."; - EXPECT_EQ(ref, *obj_ex.value()) << "Deserialized object mismatch."; + TestJsonConversion(ref, expected_json); } TEST(JsonInternalTest, SnapshotRefTag) { @@ -179,12 +186,7 @@ TEST(JsonInternalTest, SnapshotRefTag) { "type":"tag", "max-ref-age-ms":54321})"_json; - auto json = SnapshotRefToJson(ref); - EXPECT_EQ(expected_json, json) << "JSON conversion mismatch."; - - auto obj_ex = SnapshotRefFromJson(expected_json); - EXPECT_TRUE(obj_ex.has_value()) << "Failed to deserialize JSON."; - EXPECT_EQ(ref, *obj_ex.value()) << "Deserialized object mismatch."; + TestJsonConversion(ref, expected_json); } TEST(JsonInternalTest, Snapshot) { @@ -213,12 +215,7 @@ TEST(JsonInternalTest, Snapshot) { }, "schema-id":42})"_json; - auto json = SnapshotToJson(snapshot); - EXPECT_EQ(expected_json, json) << "JSON conversion mismatch."; - - auto obj_ex = SnapshotFromJson(expected_json); - EXPECT_TRUE(obj_ex.has_value()) << "Failed to deserialize JSON."; - EXPECT_EQ(snapshot, *obj_ex.value()) << "Deserialized object mismatch."; + TestJsonConversion(snapshot, expected_json); } TEST(JsonInternalTest, SnapshotFromJsonWithInvalidSummary) { From 296d04f21226aa3883ed2cf1dc7068b019f247d0 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 15 Apr 2025 21:47:15 +0800 Subject: [PATCH 4/5] handle initial sequence number --- src/iceberg/json_internal.cc | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index a8ad110cc..064fb1d34 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -19,6 +19,7 @@ #include "iceberg/json_internal.h" +#include #include #include #include @@ -82,6 +83,8 @@ constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep"; constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms"; constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms"; +constexpr int64_t kInitialSequenceNumber = 0; + const std::unordered_set kValidSnapshotSummaryFields = { SnapshotSummaryFields::kOperation, SnapshotSummaryFields::kAddedDataFiles, @@ -324,7 +327,9 @@ nlohmann::json ToJson(const Snapshot& snapshot) { nlohmann::json json; json[kSnapshotId] = snapshot.snapshot_id; SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id); - json[kSequenceNumber] = snapshot.sequence_number; + if (snapshot.sequence_number > kInitialSequenceNumber) { + json[kSequenceNumber] = snapshot.sequence_number; + } json[kTimestampMs] = snapshot.timestamp_ms; json[kManifestList] = snapshot.manifest_list; // If there is an operation, write the summary map @@ -552,7 +557,7 @@ Result> SnapshotRefFromJson(const nlohmann::json& j Result> SnapshotFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue(json, kSnapshotId)); ICEBERG_ASSIGN_OR_RAISE(auto sequence_number, - GetJsonValue(json, kSequenceNumber)); + GetJsonValueOptional(json, kSequenceNumber)); ICEBERG_ASSIGN_OR_RAISE(auto timestamp_ms, GetJsonValue(json, kTimestampMs)); ICEBERG_ASSIGN_OR_RAISE(auto manifest_list, GetJsonValue(json, kManifestList)); @@ -591,9 +596,10 @@ Result> SnapshotFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional(json, kSchemaId)); - return std::make_unique(snapshot_id, parent_snapshot_id, sequence_number, - timestamp_ms, manifest_list, std::move(summary), - schema_id); + return std::make_unique( + snapshot_id, parent_snapshot_id, + sequence_number.has_value() ? *sequence_number : kInitialSequenceNumber, + timestamp_ms, manifest_list, std::move(summary), schema_id); } } // namespace iceberg From e3fd584a1cf875688ca6489e0596e45aacc07ce3 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 15 Apr 2025 23:28:16 +0800 Subject: [PATCH 5/5] fix: when summary with no operation field, set operation to overwrite --- src/iceberg/json_internal.cc | 4 ++++ test/json_internal_test.cc | 23 +++++++++++++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index 064fb1d34..ec34be1b3 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -592,6 +592,10 @@ Result> SnapshotFromJson(const nlohmann::json& json) { } summary[key] = value.get(); } + // If summary is available but operation is missing, set operation to overwrite. + if (!summary.contains(SnapshotSummaryFields::kOperation)) { + summary[SnapshotSummaryFields::kOperation] = DataOperation::kOverwrite; + } } ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional(json, kSchemaId)); diff --git a/test/json_internal_test.cc b/test/json_internal_test.cc index 4c6eaf361..6b6a22044 100644 --- a/test/json_internal_test.cc +++ b/test/json_internal_test.cc @@ -219,7 +219,7 @@ TEST(JsonInternalTest, Snapshot) { } TEST(JsonInternalTest, SnapshotFromJsonWithInvalidSummary) { - nlohmann::json invalid_json_snapshot = + nlohmann::json invalid_json = R"({"snapshot-id":1234567890, "parent-snapshot-id":9876543210, "sequence-number":99, @@ -229,12 +229,31 @@ TEST(JsonInternalTest, SnapshotFromJsonWithInvalidSummary) { "invalid-field":"value" }, "schema-id":42})"_json; + // malformed summary field - auto result = SnapshotFromJson(invalid_json_snapshot); + auto result = SnapshotFromJson(invalid_json); ASSERT_FALSE(result.has_value()); EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); EXPECT_THAT(result, HasErrorMessage("Invalid snapshot summary field")); } +TEST(JsonInternalTest, SnapshotFromJsonSummaryWithNoOperation) { + nlohmann::json snapshot_json = + R"({"snapshot-id":1234567890, + "parent-snapshot-id":9876543210, + "sequence-number":99, + "timestamp-ms":1234567890123, + "manifest-list":"/path/to/manifest_list", + "summary":{ + "added-data-files":"50" + }, + "schema-id":42})"_json; + + auto result = SnapshotFromJson(snapshot_json); + ASSERT_TRUE(result.has_value()); + + ASSERT_EQ(result.value()->operation(), DataOperation::kOverwrite); +} + } // namespace iceberg