diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index d69b46297..ec34be1b3 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -19,8 +19,10 @@ #include "iceberg/json_internal.h" +#include #include #include +#include #include @@ -28,6 +30,7 @@ #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" +#include "iceberg/snapshot.h" #include "iceberg/sort_order.h" #include "iceberg/transform.h" #include "iceberg/type.h" @@ -70,6 +73,55 @@ constexpr std::string_view kValueRequired = "value-required"; constexpr std::string_view kFieldId = "field-id"; constexpr std::string_view kSpecId = "spec-id"; +constexpr std::string_view kSnapshotId = "snapshot-id"; +constexpr std::string_view kParentSnapshotId = "parent-snapshot-id"; +constexpr std::string_view kSequenceNumber = "sequence-number"; +constexpr std::string_view kTimestampMs = "timestamp-ms"; +constexpr std::string_view kManifestList = "manifest-list"; +constexpr std::string_view kSummary = "summary"; +constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep"; +constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms"; +constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms"; + +constexpr int64_t kInitialSequenceNumber = 0; + +const std::unordered_set kValidSnapshotSummaryFields = { + SnapshotSummaryFields::kOperation, + SnapshotSummaryFields::kAddedDataFiles, + SnapshotSummaryFields::kDeletedDataFiles, + SnapshotSummaryFields::kTotalDataFiles, + SnapshotSummaryFields::kAddedDeleteFiles, + SnapshotSummaryFields::kAddedEqDeleteFiles, + SnapshotSummaryFields::kRemovedEqDeleteFiles, + SnapshotSummaryFields::kAddedPosDeleteFiles, + SnapshotSummaryFields::kRemovedPosDeleteFiles, + SnapshotSummaryFields::kAddedDVs, + SnapshotSummaryFields::kRemovedDVs, + SnapshotSummaryFields::kRemovedDeleteFiles, + SnapshotSummaryFields::kTotalDeleteFiles, + SnapshotSummaryFields::kAddedRecords, + SnapshotSummaryFields::kDeletedRecords, + SnapshotSummaryFields::kTotalRecords, + SnapshotSummaryFields::kAddedFileSize, + SnapshotSummaryFields::kRemovedFileSize, + SnapshotSummaryFields::kTotalFileSize, + SnapshotSummaryFields::kAddedPosDeletes, + SnapshotSummaryFields::kRemovedPosDeletes, + SnapshotSummaryFields::kTotalPosDeletes, + SnapshotSummaryFields::kAddedEqDeletes, + SnapshotSummaryFields::kRemovedEqDeletes, + SnapshotSummaryFields::kTotalEqDeletes, + SnapshotSummaryFields::kDeletedDuplicatedFiles, + SnapshotSummaryFields::kChangedPartitionCountProp, + SnapshotSummaryFields::kWAPId, + SnapshotSummaryFields::kPublishedWAPId, + SnapshotSummaryFields::kSourceSnapshotId, + SnapshotSummaryFields::kEngineName, + SnapshotSummaryFields::kEngineVersion}; + +const std::unordered_set kValidDataOperation = { + DataOperation::kAppend, DataOperation::kReplace, DataOperation::kOverwrite, + DataOperation::kDelete}; template Result GetJsonValue(const nlohmann::json& json, std::string_view key) { @@ -89,6 +141,30 @@ Result GetJsonValue(const nlohmann::json& json, std::string_view key) { } } +template +Result> GetJsonValueOptional(const nlohmann::json& json, + std::string_view key) { + if (!json.contains(key)) { + return std::nullopt; + } + try { + return json.at(key).get(); + } catch (const std::exception& ex) { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Failed to parse key '{}' in {}", key, json.dump()), + }); + } +} + +template +void SetOptionalField(nlohmann::json& json, std::string_view key, + const std::optional& value) { + if (value.has_value()) { + json[key] = *value; + } +} + } // namespace nlohmann::json ToJson(const SortField& sort_field) { @@ -231,6 +307,39 @@ nlohmann::json SchemaToJson(const Schema& schema) { return json; } +nlohmann::json ToJson(const SnapshotRef& ref) { + nlohmann::json json; + json[kSnapshotId] = ref.snapshot_id; + json[kType] = SnapshotRefTypeToString(ref.type()); + if (ref.type() == SnapshotRefType::kBranch) { + const auto& branch = std::get(ref.retention); + SetOptionalField(json, kMinSnapshotsToKeep, branch.min_snapshots_to_keep); + SetOptionalField(json, kMaxSnapshotAgeMs, branch.max_snapshot_age_ms); + SetOptionalField(json, kMaxRefAgeMs, branch.max_ref_age_ms); + } else if (ref.type() == SnapshotRefType::kTag) { + const auto& tag = std::get(ref.retention); + SetOptionalField(json, kMaxRefAgeMs, tag.max_ref_age_ms); + } + return json; +} + +nlohmann::json ToJson(const Snapshot& snapshot) { + nlohmann::json json; + json[kSnapshotId] = snapshot.snapshot_id; + SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id); + if (snapshot.sequence_number > kInitialSequenceNumber) { + json[kSequenceNumber] = snapshot.sequence_number; + } + json[kTimestampMs] = snapshot.timestamp_ms; + json[kManifestList] = snapshot.manifest_list; + // If there is an operation, write the summary map + if (snapshot.operation().has_value()) { + json[kSummary] = snapshot.summary; + } + SetOptionalField(json, kSchemaId, snapshot.schema_id); + return json; +} + namespace { Result> StructTypeFromJson(const nlohmann::json& json) { @@ -419,4 +528,82 @@ Result> PartitionSpecFromJson( return std::make_unique(schema, spec_id, std::move(partition_fields)); } +Result> SnapshotRefFromJson(const nlohmann::json& json) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue(json, kSnapshotId)); + ICEBERG_ASSIGN_OR_RAISE( + auto type, + GetJsonValue(json, kType).and_then(SnapshotRefTypeFromString)); + if (type == SnapshotRefType::kBranch) { + ICEBERG_ASSIGN_OR_RAISE(auto min_snapshots_to_keep, + GetJsonValueOptional(json, kMinSnapshotsToKeep)); + ICEBERG_ASSIGN_OR_RAISE(auto max_snapshot_age_ms, + GetJsonValueOptional(json, kMaxSnapshotAgeMs)); + ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms, + GetJsonValueOptional(json, kMaxRefAgeMs)); + + return std::make_unique( + snapshot_id, SnapshotRef::Branch{.min_snapshots_to_keep = min_snapshots_to_keep, + .max_snapshot_age_ms = max_snapshot_age_ms, + .max_ref_age_ms = max_ref_age_ms}); + } else { + ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms, + GetJsonValueOptional(json, kMaxRefAgeMs)); + + return std::make_unique( + snapshot_id, SnapshotRef::Tag{.max_ref_age_ms = max_ref_age_ms}); + } +} + +Result> SnapshotFromJson(const nlohmann::json& json) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue(json, kSnapshotId)); + ICEBERG_ASSIGN_OR_RAISE(auto sequence_number, + GetJsonValueOptional(json, kSequenceNumber)); + ICEBERG_ASSIGN_OR_RAISE(auto timestamp_ms, GetJsonValue(json, kTimestampMs)); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_list, + GetJsonValue(json, kManifestList)); + + ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot_id, + GetJsonValueOptional(json, kParentSnapshotId)); + + ICEBERG_ASSIGN_OR_RAISE(auto summary_json, + GetJsonValueOptional(json, kSummary)); + std::unordered_map summary; + if (summary_json.has_value()) { + for (const auto& [key, value] : summary_json->items()) { + if (!kValidSnapshotSummaryFields.contains(key)) { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Invalid snapshot summary field: {}", key), + }); + } + if (!value.is_string()) { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = + std::format("Invalid snapshot summary field value: {}", value.dump()), + }); + } + if (key == SnapshotSummaryFields::kOperation && + !kValidDataOperation.contains(value.get())) { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Invalid snapshot operation: {}", value.dump()), + }); + } + summary[key] = value.get(); + } + // If summary is available but operation is missing, set operation to overwrite. + if (!summary.contains(SnapshotSummaryFields::kOperation)) { + summary[SnapshotSummaryFields::kOperation] = DataOperation::kOverwrite; + } + } + + ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional(json, kSchemaId)); + + return std::make_unique( + snapshot_id, parent_snapshot_id, + sequence_number.has_value() ? *sequence_number : kInitialSequenceNumber, + timestamp_ms, manifest_list, std::move(summary), schema_id); +} + } // namespace iceberg diff --git a/src/iceberg/json_internal.h b/src/iceberg/json_internal.h index 4e754fd61..450578ff1 100644 --- a/src/iceberg/json_internal.h +++ b/src/iceberg/json_internal.h @@ -88,6 +88,18 @@ nlohmann::json TypeToJson(const Type& type); /// \return The JSON representation of the field. nlohmann::json FieldToJson(const SchemaField& field); +/// \brief Serializes a `SnapshotRef` object to JSON. +/// +/// \param[in] snapshot_ref The `SnapshotRef` object to be serialized. +/// \return A JSON object representing the `SnapshotRef`. +nlohmann::json ToJson(const SnapshotRef& snapshot_ref); + +/// \brief Serializes a `Snapshot` object to JSON. +/// +/// \param[in] snapshot The `Snapshot` object to be serialized. +/// \return A JSON object representing the `snapshot`. +nlohmann::json ToJson(const Snapshot& snapshot); + /// \brief Convert JSON to an Iceberg Schema. /// /// \param[in] json The JSON representation of the schema. @@ -153,4 +165,17 @@ Result> PartitionFieldFromJson( /// the JSON is malformed or missing expected fields, an error will be returned. Result> PartitionSpecFromJson( const std::shared_ptr& schema, const nlohmann::json& json); + +/// \brief Deserializes a JSON object into a `SnapshotRef` object. +/// +/// \param[in] json The JSON object representing a `SnapshotRef`. +/// \return A `SnapshotRef` object or an error if the conversion fails. +Result> SnapshotRefFromJson(const nlohmann::json& json); + +/// \brief Deserializes a JSON object into a `Snapshot` object. +/// +/// \param[in] json The JSON representation of the snapshot. +/// \return A `Snapshot` object or an error if the conversion fails. +Result> SnapshotFromJson(const nlohmann::json& json); + } // namespace iceberg diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc index a17d9e8b2..fb994f8b7 100644 --- a/src/iceberg/snapshot.cc +++ b/src/iceberg/snapshot.cc @@ -21,6 +21,16 @@ namespace iceberg { +bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const { + return min_snapshots_to_keep == other.min_snapshots_to_keep && + max_snapshot_age_ms == other.max_snapshot_age_ms && + max_ref_age_ms == other.max_ref_age_ms; +} + +bool SnapshotRef::Tag::Equals(const SnapshotRef::Tag& other) const { + return max_ref_age_ms == other.max_ref_age_ms; +} + SnapshotRefType SnapshotRef::type() const noexcept { return std::visit( [&](const auto& retention) -> SnapshotRefType { @@ -34,6 +44,24 @@ SnapshotRefType SnapshotRef::type() const noexcept { retention); } +bool SnapshotRef::Equals(const SnapshotRef& other) const { + if (this == &other) { + return true; + } + if (type() != other.type()) { + return false; + } + + if (type() == SnapshotRefType::kBranch) { + return snapshot_id == other.snapshot_id && + std::get(retention) == std::get(other.retention); + + } else { + return snapshot_id == other.snapshot_id && + std::get(retention) == std::get(other.retention); + } +} + std::optional Snapshot::operation() const { auto it = summary.find(SnapshotSummaryFields::kOperation); if (it != summary.end()) { diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index 29577e9b1..da9dd95b0 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -26,6 +26,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/result.h" namespace iceberg { @@ -39,6 +40,26 @@ enum class SnapshotRefType { kTag, }; +/// \brief Get the relative snapshot reference type name +ICEBERG_EXPORT constexpr std::string_view SnapshotRefTypeToString( + SnapshotRefType type) noexcept { + switch (type) { + case SnapshotRefType::kBranch: + return "branch"; + case SnapshotRefType::kTag: + return "tag"; + } +} +/// \brief Get the relative snapshot reference type from name +ICEBERG_EXPORT constexpr Result SnapshotRefTypeFromString( + std::string_view str) noexcept { + if (str == "branch") return SnapshotRefType::kBranch; + if (str == "tag") return SnapshotRefType::kTag; + return unexpected( + {.kind = ErrorKind::kInvalidArgument, + .message = "Invalid snapshot reference type: {}" + std::string(str)}); +} + /// \brief A reference to a snapshot, either a branch or a tag. struct ICEBERG_EXPORT SnapshotRef { struct ICEBERG_EXPORT Branch { @@ -54,6 +75,18 @@ struct ICEBERG_EXPORT SnapshotRef { /// of the snapshot reference to keep while expiring snapshots. Defaults to table /// property history.expire.max-ref-age-ms. The main branch never expires. std::optional max_ref_age_ms; + + /// \brief Compare two branches for equality. + friend bool operator==(const Branch& lhs, const Branch& rhs) { + return lhs.Equals(rhs); + } + + /// \brief Compare two branches for inequality. + friend bool operator!=(const Branch& lhs, const Branch& rhs) { return !(lhs == rhs); } + + private: + /// \brief Compare two branches for equality. + bool Equals(const Branch& other) const; }; struct ICEBERG_EXPORT Tag { @@ -61,6 +94,16 @@ struct ICEBERG_EXPORT SnapshotRef { /// of the snapshot reference to keep while expiring snapshots. Defaults to table /// property history.expire.max-ref-age-ms. The main branch never expires. std::optional max_ref_age_ms; + + /// \brief Compare two tags for equality. + friend bool operator==(const Tag& lhs, const Tag& rhs) { return lhs.Equals(rhs); } + + /// \brief Compare two tags for inequality. + friend bool operator!=(const Tag& lhs, const Tag& rhs) { return !(lhs == rhs); } + + private: + /// \brief Compare two tags for equality. + bool Equals(const Tag& other) const; }; /// A reference's snapshot ID. The tagged snapshot or latest snapshot of a branch. @@ -69,6 +112,20 @@ struct ICEBERG_EXPORT SnapshotRef { std::variant retention; SnapshotRefType type() const noexcept; + + /// \brief Compare two snapshot refs for equality + friend bool operator==(const SnapshotRef& lhs, const SnapshotRef& rhs) { + return lhs.Equals(rhs); + } + + /// \brief Compare two snapshot refs for inequality. + friend bool operator!=(const SnapshotRef& lhs, const SnapshotRef& rhs) { + return !(lhs == rhs); + } + + private: + /// \brief Compare two snapshot refs for equality. + bool Equals(const SnapshotRef& other) const; }; /// \brief Optional Snapshot Summary Fields @@ -139,11 +196,11 @@ struct SnapshotSummaryFields { /// Other Fields, see https://iceberg.apache.org/spec/#other-fields /// \brief The Write-Audit-Publish id of a staged snapshot - inline static const std::string kWAPID = "wap.id"; + inline static const std::string kWAPId = "wap.id"; /// \brief The Write-Audit-Publish id of a snapshot already been published - inline static const std::string kPublishedWAPID = "published-wap-id"; + inline static const std::string kPublishedWAPId = "published-wap-id"; /// \brief The original id of a cherry-picked snapshot - inline static const std::string kSourceSnapshotID = "source-snapshot-id"; + inline static const std::string kSourceSnapshotId = "source-snapshot-id"; /// \brief Name of the engine that created the snapshot inline static const std::string kEngineName = "engine-name"; /// \brief Version of the engine that created the snapshot diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 519164ef9..44db1c096 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -97,10 +97,12 @@ class Transaction; class HistoryEntry; class Snapshot; +struct SnapshotRef; +enum class SnapshotRefType; class SortField; class SortOrder; class StructLike; -class TableMetadata; +struct TableMetadata; class Transform; enum class TransformType; class TransformFunction; diff --git a/test/json_internal_test.cc b/test/json_internal_test.cc index 10b25f56d..6b6a22044 100644 --- a/test/json_internal_test.cc +++ b/test/json_internal_test.cc @@ -19,42 +19,54 @@ #include "iceberg/json_internal.h" -#include #include #include +#include #include +#include "gmock/gmock.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" +#include "iceberg/snapshot.h" #include "iceberg/sort_field.h" #include "iceberg/sort_order.h" #include "iceberg/transform.h" #include "iceberg/util/formatter.h" // IWYU pragma: keep +#include "matchers.h" namespace iceberg { namespace { // Specialized FromJson helper based on type template -expected, Error> FromJsonHelper(const nlohmann::json& json); +Result> FromJsonHelper(const nlohmann::json& json); template <> -expected, Error> FromJsonHelper(const nlohmann::json& json) { +Result> FromJsonHelper(const nlohmann::json& json) { return SortFieldFromJson(json); } template <> -expected, Error> FromJsonHelper(const nlohmann::json& json) { +Result> FromJsonHelper(const nlohmann::json& json) { return SortOrderFromJson(json); } template <> -expected, Error> FromJsonHelper( - const nlohmann::json& json) { +Result> FromJsonHelper(const nlohmann::json& json) { return PartitionFieldFromJson(json); } +template <> +Result> FromJsonHelper(const nlohmann::json& json) { + return SnapshotRefFromJson(json); +} + +template <> +Result> FromJsonHelper(const nlohmann::json& json) { + return SnapshotFromJson(json); +} + // Helper function to reduce duplication in testing template void TestJsonConversion(const T& obj, const nlohmann::json& expected_json) { @@ -116,7 +128,8 @@ TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) { auto result = PartitionFieldFromJson(invalid_json); EXPECT_FALSE(result.has_value()); - EXPECT_EQ(result.error().kind, ErrorKind::kJsonParseError); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Missing 'source-id'")); } TEST(JsonPartitionTest, PartitionSpec) { @@ -148,4 +161,99 @@ TEST(JsonPartitionTest, PartitionSpec) { EXPECT_EQ(spec, *parsed_spec_result.value()); } +TEST(JsonInternalTest, SnapshotRefBranch) { + SnapshotRef ref(1234567890, SnapshotRef::Branch{.min_snapshots_to_keep = 10, + .max_snapshot_age_ms = 123456789, + .max_ref_age_ms = 987654321}); + + // Create a JSON object with the expected values + nlohmann::json expected_json = + R"({"snapshot-id":1234567890, + "type":"branch", + "min-snapshots-to-keep":10, + "max-snapshot-age-ms":123456789, + "max-ref-age-ms":987654321})"_json; + + TestJsonConversion(ref, expected_json); +} + +TEST(JsonInternalTest, SnapshotRefTag) { + SnapshotRef ref(9876543210, SnapshotRef::Tag{.max_ref_age_ms = 54321}); + + // Create a JSON object with the expected values + nlohmann::json expected_json = + R"({"snapshot-id":9876543210, + "type":"tag", + "max-ref-age-ms":54321})"_json; + + TestJsonConversion(ref, expected_json); +} + +TEST(JsonInternalTest, Snapshot) { + std::unordered_map summary = { + {SnapshotSummaryFields::kOperation, DataOperation::kAppend}, + {SnapshotSummaryFields::kAddedDataFiles, "50"}}; + + Snapshot snapshot{.snapshot_id = 1234567890, + .parent_snapshot_id = 9876543210, + .sequence_number = 99, + .timestamp_ms = 1234567890123, + .manifest_list = "/path/to/manifest_list", + .summary = summary, + .schema_id = 42}; + + // Create a JSON object with the expected values + nlohmann::json expected_json = + R"({"snapshot-id":1234567890, + "parent-snapshot-id":9876543210, + "sequence-number":99, + "timestamp-ms":1234567890123, + "manifest-list":"/path/to/manifest_list", + "summary":{ + "operation":"append", + "added-data-files":"50" + }, + "schema-id":42})"_json; + + TestJsonConversion(snapshot, expected_json); +} + +TEST(JsonInternalTest, SnapshotFromJsonWithInvalidSummary) { + nlohmann::json invalid_json = + R"({"snapshot-id":1234567890, + "parent-snapshot-id":9876543210, + "sequence-number":99, + "timestamp-ms":1234567890123, + "manifest-list":"/path/to/manifest_list", + "summary":{ + "invalid-field":"value" + }, + "schema-id":42})"_json; + // malformed summary field + + auto result = SnapshotFromJson(invalid_json); + ASSERT_FALSE(result.has_value()); + + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Invalid snapshot summary field")); +} + +TEST(JsonInternalTest, SnapshotFromJsonSummaryWithNoOperation) { + nlohmann::json snapshot_json = + R"({"snapshot-id":1234567890, + "parent-snapshot-id":9876543210, + "sequence-number":99, + "timestamp-ms":1234567890123, + "manifest-list":"/path/to/manifest_list", + "summary":{ + "added-data-files":"50" + }, + "schema-id":42})"_json; + + auto result = SnapshotFromJson(snapshot_json); + ASSERT_TRUE(result.has_value()); + + ASSERT_EQ(result.value()->operation(), DataOperation::kOverwrite); +} + } // namespace iceberg