-
Notifications
You must be signed in to change notification settings - Fork 70
feat: snapshot serde #74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
3ec142a
ef171a4
8847ca8
296d04f
e3fd584
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,13 +21,15 @@ | |
|
|
||
| #include <format> | ||
| #include <regex> | ||
| #include <unordered_set> | ||
|
|
||
| #include <nlohmann/json.hpp> | ||
|
|
||
| #include "iceberg/partition_spec.h" | ||
| #include "iceberg/result.h" | ||
| #include "iceberg/schema.h" | ||
| #include "iceberg/schema_internal.h" | ||
| #include "iceberg/snapshot.h" | ||
| #include "iceberg/sort_order.h" | ||
| #include "iceberg/transform.h" | ||
| #include "iceberg/type.h" | ||
|
|
@@ -70,6 +72,53 @@ constexpr std::string_view kValueRequired = "value-required"; | |
|
|
||
| constexpr std::string_view kFieldId = "field-id"; | ||
| constexpr std::string_view kSpecId = "spec-id"; | ||
| constexpr std::string_view kSnapshotId = "snapshot-id"; | ||
| constexpr std::string_view kParentSnapshotId = "parent-snapshot-id"; | ||
| constexpr std::string_view kSequenceNumber = "sequence-number"; | ||
| constexpr std::string_view kTimestampMs = "timestamp-ms"; | ||
| constexpr std::string_view kManifestList = "manifest-list"; | ||
| constexpr std::string_view kSummary = "summary"; | ||
| constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep"; | ||
| constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms"; | ||
| constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms"; | ||
|
|
||
| const std::unordered_set<std::string_view> kValidSnapshotSummaryFields = { | ||
| SnapshotSummaryFields::kOperation, | ||
| SnapshotSummaryFields::kAddedDataFiles, | ||
| SnapshotSummaryFields::kDeletedDataFiles, | ||
| SnapshotSummaryFields::kTotalDataFiles, | ||
| SnapshotSummaryFields::kAddedDeleteFiles, | ||
| SnapshotSummaryFields::kAddedEqDeleteFiles, | ||
| SnapshotSummaryFields::kRemovedEqDeleteFiles, | ||
| SnapshotSummaryFields::kAddedPosDeleteFiles, | ||
| SnapshotSummaryFields::kRemovedPosDeleteFiles, | ||
| SnapshotSummaryFields::kAddedDVs, | ||
| SnapshotSummaryFields::kRemovedDVs, | ||
| SnapshotSummaryFields::kRemovedDeleteFiles, | ||
| SnapshotSummaryFields::kTotalDeleteFiles, | ||
| SnapshotSummaryFields::kAddedRecords, | ||
| SnapshotSummaryFields::kDeletedRecords, | ||
| SnapshotSummaryFields::kTotalRecords, | ||
| SnapshotSummaryFields::kAddedFileSize, | ||
| SnapshotSummaryFields::kRemovedFileSize, | ||
| SnapshotSummaryFields::kTotalFileSize, | ||
| SnapshotSummaryFields::kAddedPosDeletes, | ||
| SnapshotSummaryFields::kRemovedPosDeletes, | ||
| SnapshotSummaryFields::kTotalPosDeletes, | ||
| SnapshotSummaryFields::kAddedEqDeletes, | ||
| SnapshotSummaryFields::kRemovedEqDeletes, | ||
| SnapshotSummaryFields::kTotalEqDeletes, | ||
| SnapshotSummaryFields::kDeletedDuplicatedFiles, | ||
| SnapshotSummaryFields::kChangedPartitionCountProp, | ||
| SnapshotSummaryFields::kWAPID, | ||
| SnapshotSummaryFields::kPublishedWAPID, | ||
| SnapshotSummaryFields::kSourceSnapshotID, | ||
| SnapshotSummaryFields::kEngineName, | ||
| SnapshotSummaryFields::kEngineVersion}; | ||
|
|
||
| const std::unordered_set<std::string_view> kValidDataOperation = { | ||
| DataOperation::kAppend, DataOperation::kReplace, DataOperation::kOverwrite, | ||
| DataOperation::kDelete}; | ||
|
|
||
| template <typename T> | ||
| Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) { | ||
|
|
@@ -89,6 +138,22 @@ Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) { | |
| } | ||
| } | ||
|
|
||
| template <typename T> | ||
| Result<std::optional<T>> GetJsonValueOptional(const nlohmann::json& json, | ||
| std::string_view key) { | ||
| if (!json.contains(key)) { | ||
| return std::nullopt; | ||
| } | ||
| try { | ||
| return json.at(key).get<T>(); | ||
| } catch (const std::exception& ex) { | ||
| return unexpected<Error>({ | ||
| .kind = ErrorKind::kJsonParseError, | ||
| .message = std::format("Failed to parse key '{}' in {}", key, json.dump()), | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| } // namespace | ||
|
|
||
| nlohmann::json ToJson(const SortField& sort_field) { | ||
|
|
@@ -231,6 +296,53 @@ nlohmann::json SchemaToJson(const Schema& schema) { | |
| return json; | ||
| } | ||
|
|
||
| nlohmann::json SnapshotRefToJson(const SnapshotRef& ref) { | ||
| nlohmann::json json; | ||
| json[kSnapshotId] = ref.snapshot_id; | ||
| json[kType] = SnapshotRefTypeToString(ref.type()); | ||
| if (ref.type() == SnapshotRefType::kBranch) { | ||
| const auto& branch = std::get<SnapshotRef::Branch>(ref.retention); | ||
| if (branch.min_snapshots_to_keep.has_value()) { | ||
| json[kMinSnapshotsToKeep] = *branch.min_snapshots_to_keep; | ||
| } | ||
zhjwpku marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (branch.max_snapshot_age_ms.has_value()) { | ||
| json[kMaxSnapshotAgeMs] = *branch.max_snapshot_age_ms; | ||
| } | ||
| if (branch.max_ref_age_ms.has_value()) { | ||
| json[kMaxRefAgeMs] = *branch.max_ref_age_ms; | ||
| } | ||
| } else if (ref.type() == SnapshotRefType::kTag) { | ||
| const auto& tag = std::get<SnapshotRef::Tag>(ref.retention); | ||
| if (tag.max_ref_age_ms.has_value()) { | ||
| json[kMaxRefAgeMs] = *tag.max_ref_age_ms; | ||
| } | ||
| } | ||
| return json; | ||
| } | ||
|
|
||
| nlohmann::json SnapshotToJson(const Snapshot& snapshot) { | ||
| nlohmann::json json; | ||
| json[kSnapshotId] = snapshot.snapshot_id; | ||
| if (snapshot.parent_snapshot_id.has_value()) { | ||
| json[kParentSnapshotId] = *snapshot.parent_snapshot_id; | ||
| } | ||
| json[kSequenceNumber] = snapshot.sequence_number; | ||
| json[kTimestampMs] = snapshot.timestamp_ms; | ||
zhjwpku marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| json[kManifestList] = snapshot.manifest_list; | ||
|
|
||
| nlohmann::json summary_json; | ||
| for (const auto& [key, value] : snapshot.summary) { | ||
| summary_json[key] = value; | ||
| } | ||
| json[kSummary] = summary_json; | ||
zhjwpku marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if (snapshot.schema_id.has_value()) { | ||
| json[kSchemaId] = *snapshot.schema_id; | ||
| } | ||
|
|
||
| return json; | ||
| } | ||
|
|
||
| namespace { | ||
|
|
||
| Result<std::unique_ptr<Type>> StructTypeFromJson(const nlohmann::json& json) { | ||
|
|
@@ -419,4 +531,75 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson( | |
| return std::make_unique<PartitionSpec>(schema, spec_id, std::move(partition_fields)); | ||
| } | ||
|
|
||
| Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) { | ||
| ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId)); | ||
| ICEBERG_ASSIGN_OR_RAISE( | ||
| auto type, | ||
| GetJsonValue<std::string>(json, kType).and_then(SnapshotRefTypeFromString)); | ||
| if (type == SnapshotRefType::kBranch) { | ||
| ICEBERG_ASSIGN_OR_RAISE(auto min_snapshots_to_keep, | ||
| GetJsonValueOptional<int32_t>(json, kMinSnapshotsToKeep)); | ||
| ICEBERG_ASSIGN_OR_RAISE(auto max_snapshot_age_ms, | ||
| GetJsonValueOptional<int64_t>(json, kMaxSnapshotAgeMs)); | ||
| ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms, | ||
| GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs)); | ||
|
|
||
| return std::make_unique<SnapshotRef>( | ||
| snapshot_id, SnapshotRef::Branch{.min_snapshots_to_keep = min_snapshots_to_keep, | ||
| .max_snapshot_age_ms = max_snapshot_age_ms, | ||
| .max_ref_age_ms = max_ref_age_ms}); | ||
| } else { | ||
| ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms, | ||
| GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs)); | ||
|
|
||
| return std::make_unique<SnapshotRef>( | ||
| snapshot_id, SnapshotRef::Tag{.max_ref_age_ms = max_ref_age_ms}); | ||
| } | ||
| } | ||
|
|
||
| Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to be consistent with the Java impl: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/SnapshotParser.java. Specifically, we need to deal with cases where sequence number or summary is missing. @Fokko Will it actually happen that a snapshot does not have
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the spec,
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're correct!
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It sounds like we need to set
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed, please take a look. |
||
| ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId)); | ||
| ICEBERG_ASSIGN_OR_RAISE(auto sequence_number, | ||
zhjwpku marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| GetJsonValue<int64_t>(json, kSequenceNumber)); | ||
| ICEBERG_ASSIGN_OR_RAISE(auto timestamp_ms, GetJsonValue<int64_t>(json, kTimestampMs)); | ||
| ICEBERG_ASSIGN_OR_RAISE(auto manifest_list, | ||
| GetJsonValue<std::string>(json, kManifestList)); | ||
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot_id, | ||
| GetJsonValueOptional<int64_t>(json, kParentSnapshotId)); | ||
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(auto summary_json, | ||
| GetJsonValue<nlohmann::json>(json, kSummary)); | ||
| std::unordered_map<std::string, std::string> summary; | ||
| for (const auto& [key, value] : summary_json.items()) { | ||
| if (!kValidSnapshotSummaryFields.contains(key)) { | ||
| return unexpected<Error>({ | ||
| .kind = ErrorKind::kJsonParseError, | ||
| .message = std::format("Invalid snapshot summary field: {}", key), | ||
| }); | ||
| } | ||
| if (!value.is_string()) { | ||
| return unexpected<Error>({ | ||
| .kind = ErrorKind::kJsonParseError, | ||
| .message = | ||
| std::format("Invalid snapshot summary field value: {}", value.dump()), | ||
| }); | ||
| } | ||
| if (key == SnapshotSummaryFields::kOperation && | ||
| !kValidDataOperation.contains(value.get<std::string>())) { | ||
| return unexpected<Error>({ | ||
| .kind = ErrorKind::kJsonParseError, | ||
| .message = std::format("Invalid snapshot operation: {}", value.dump()), | ||
| }); | ||
| } | ||
| summary[key] = value.get<std::string>(); | ||
| } | ||
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId)); | ||
|
|
||
| return std::make_unique<Snapshot>(snapshot_id, parent_snapshot_id, sequence_number, | ||
| timestamp_ms, manifest_list, std::move(summary), | ||
| schema_id); | ||
| } | ||
|
|
||
| } // namespace iceberg | ||
Uh oh!
There was an error while loading. Please reload this page.