Skip to content

Commit 3ec142a

Browse files
committed
feat: snapshot ser/der
Signed-off-by: Junwang Zhao <[email protected]>
1 parent 5ba0a84 commit 3ec142a

File tree

6 files changed

+396
-7
lines changed

6 files changed

+396
-7
lines changed

src/iceberg/json_internal.cc

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121

2222
#include <format>
2323
#include <regex>
24+
#include <unordered_set>
2425

2526
#include <nlohmann/json.hpp>
2627

2728
#include "iceberg/partition_spec.h"
2829
#include "iceberg/result.h"
2930
#include "iceberg/schema.h"
3031
#include "iceberg/schema_internal.h"
32+
#include "iceberg/snapshot.h"
3133
#include "iceberg/sort_order.h"
3234
#include "iceberg/transform.h"
3335
#include "iceberg/type.h"
@@ -70,6 +72,53 @@ constexpr std::string_view kValueRequired = "value-required";
7072

7173
constexpr std::string_view kFieldId = "field-id";
7274
constexpr std::string_view kSpecId = "spec-id";
75+
constexpr std::string_view kSnapshotId = "snapshot-id";
76+
constexpr std::string_view kParentSnapshotId = "parent-snapshot-id";
77+
constexpr std::string_view kSequenceNumber = "sequence-number";
78+
constexpr std::string_view kTimestampMs = "timestamp-ms";
79+
constexpr std::string_view kManifestList = "manifest-list";
80+
constexpr std::string_view kSummary = "summary";
81+
constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep";
82+
constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms";
83+
constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms";
84+
85+
const std::unordered_set<std::string_view> kValidSnapshotSummaryFields = {
86+
SnapshotSummaryFields::kOperation,
87+
SnapshotSummaryFields::kAddedDataFiles,
88+
SnapshotSummaryFields::kDeletedDataFiles,
89+
SnapshotSummaryFields::kTotalDataFiles,
90+
SnapshotSummaryFields::kAddedDeleteFiles,
91+
SnapshotSummaryFields::kAddedEqDeleteFiles,
92+
SnapshotSummaryFields::kRemovedEqDeleteFiles,
93+
SnapshotSummaryFields::kAddedPosDeleteFiles,
94+
SnapshotSummaryFields::kRemovedPosDeleteFiles,
95+
SnapshotSummaryFields::kAddedDVs,
96+
SnapshotSummaryFields::kRemovedDVs,
97+
SnapshotSummaryFields::kRemovedDeleteFiles,
98+
SnapshotSummaryFields::kTotalDeleteFiles,
99+
SnapshotSummaryFields::kAddedRecords,
100+
SnapshotSummaryFields::kDeletedRecords,
101+
SnapshotSummaryFields::kTotalRecords,
102+
SnapshotSummaryFields::kAddedFileSize,
103+
SnapshotSummaryFields::kRemovedFileSize,
104+
SnapshotSummaryFields::kTotalFileSize,
105+
SnapshotSummaryFields::kAddedPosDeletes,
106+
SnapshotSummaryFields::kRemovedPosDeletes,
107+
SnapshotSummaryFields::kTotalPosDeletes,
108+
SnapshotSummaryFields::kAddedEqDeletes,
109+
SnapshotSummaryFields::kRemovedEqDeletes,
110+
SnapshotSummaryFields::kTotalEqDeletes,
111+
SnapshotSummaryFields::kDeletedDuplicatedFiles,
112+
SnapshotSummaryFields::kChangedPartitionCountProp,
113+
SnapshotSummaryFields::kWAPID,
114+
SnapshotSummaryFields::kPublishedWAPID,
115+
SnapshotSummaryFields::kSourceSnapshotID,
116+
SnapshotSummaryFields::kEngineName,
117+
SnapshotSummaryFields::kEngineVersion};
118+
119+
const std::unordered_set<std::string_view> kValidDataOperation = {
120+
DataOperation::kAppend, DataOperation::kReplace, DataOperation::kOverwrite,
121+
DataOperation::kDelete};
73122

74123
template <typename T>
75124
Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) {
@@ -89,6 +138,22 @@ Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) {
89138
}
90139
}
91140

141+
template <typename T>
142+
Result<std::optional<T>> GetJsonValueOptional(const nlohmann::json& json,
143+
std::string_view key) {
144+
if (!json.contains(key)) {
145+
return std::nullopt;
146+
}
147+
try {
148+
return json.at(key).get<T>();
149+
} catch (const std::exception& ex) {
150+
return unexpected<Error>({
151+
.kind = ErrorKind::kJsonParseError,
152+
.message = std::format("Failed to parse key '{}' in {}", key, json.dump()),
153+
});
154+
}
155+
}
156+
92157
} // namespace
93158

94159
nlohmann::json ToJson(const SortField& sort_field) {
@@ -231,6 +296,53 @@ nlohmann::json SchemaToJson(const Schema& schema) {
231296
return json;
232297
}
233298

299+
nlohmann::json SnapshotRefToJson(const SnapshotRef& ref) {
300+
nlohmann::json json;
301+
json[kSnapshotId] = ref.snapshot_id;
302+
json[kType] = SnapshotRefTypeToString(ref.type());
303+
if (ref.type() == SnapshotRefType::kBranch) {
304+
const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
305+
if (branch.min_snapshots_to_keep.has_value()) {
306+
json[kMinSnapshotsToKeep] = *branch.min_snapshots_to_keep;
307+
}
308+
if (branch.max_snapshot_age_ms.has_value()) {
309+
json[kMaxSnapshotAgeMs] = *branch.max_snapshot_age_ms;
310+
}
311+
if (branch.max_ref_age_ms.has_value()) {
312+
json[kMaxRefAgeMs] = *branch.max_ref_age_ms;
313+
}
314+
} else if (ref.type() == SnapshotRefType::kTag) {
315+
const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
316+
if (tag.max_ref_age_ms.has_value()) {
317+
json[kMaxRefAgeMs] = *tag.max_ref_age_ms;
318+
}
319+
}
320+
return json;
321+
}
322+
323+
nlohmann::json SnapshotToJson(const Snapshot& snapshot) {
324+
nlohmann::json json;
325+
json[kSnapshotId] = snapshot.snapshot_id;
326+
if (snapshot.parent_snapshot_id.has_value()) {
327+
json[kParentSnapshotId] = *snapshot.parent_snapshot_id;
328+
}
329+
json[kSequenceNumber] = snapshot.sequence_number;
330+
json[kTimestampMs] = snapshot.timestamp_ms;
331+
json[kManifestList] = snapshot.manifest_list;
332+
333+
nlohmann::json summary_json;
334+
for (const auto& [key, value] : snapshot.summary) {
335+
summary_json[key] = value;
336+
}
337+
json[kSummary] = summary_json;
338+
339+
if (snapshot.schema_id.has_value()) {
340+
json[kSchemaId] = *snapshot.schema_id;
341+
}
342+
343+
return json;
344+
}
345+
234346
namespace {
235347

236348
Result<std::unique_ptr<Type>> StructTypeFromJson(const nlohmann::json& json) {
@@ -419,4 +531,75 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
419531
return std::make_unique<PartitionSpec>(schema, spec_id, std::move(partition_fields));
420532
}
421533

534+
Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
535+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
536+
ICEBERG_ASSIGN_OR_RAISE(
537+
auto type,
538+
GetJsonValue<std::string>(json, kType).and_then(SnapshotRefTypeFromString));
539+
if (type == SnapshotRefType::kBranch) {
540+
ICEBERG_ASSIGN_OR_RAISE(auto min_snapshots_to_keep,
541+
GetJsonValueOptional<int32_t>(json, kMinSnapshotsToKeep));
542+
ICEBERG_ASSIGN_OR_RAISE(auto max_snapshot_age_ms,
543+
GetJsonValueOptional<int64_t>(json, kMaxSnapshotAgeMs));
544+
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms,
545+
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
546+
547+
return std::make_unique<SnapshotRef>(
548+
snapshot_id, SnapshotRef::Branch{.min_snapshots_to_keep = min_snapshots_to_keep,
549+
.max_snapshot_age_ms = max_snapshot_age_ms,
550+
.max_ref_age_ms = max_ref_age_ms});
551+
} else {
552+
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms,
553+
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
554+
555+
return std::make_unique<SnapshotRef>(
556+
snapshot_id, SnapshotRef::Tag{.max_ref_age_ms = max_ref_age_ms});
557+
}
558+
}
559+
560+
Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
561+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
562+
ICEBERG_ASSIGN_OR_RAISE(auto sequence_number,
563+
GetJsonValue<int64_t>(json, kSequenceNumber));
564+
ICEBERG_ASSIGN_OR_RAISE(auto timestamp_ms, GetJsonValue<int64_t>(json, kTimestampMs));
565+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list,
566+
GetJsonValue<std::string>(json, kManifestList));
567+
568+
ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot_id,
569+
GetJsonValueOptional<int64_t>(json, kParentSnapshotId));
570+
571+
ICEBERG_ASSIGN_OR_RAISE(auto summary_json,
572+
GetJsonValue<nlohmann::json>(json, kSummary));
573+
std::unordered_map<std::string, std::string> summary;
574+
for (const auto& [key, value] : summary_json.items()) {
575+
if (!kValidSnapshotSummaryFields.contains(key)) {
576+
return unexpected<Error>({
577+
.kind = ErrorKind::kJsonParseError,
578+
.message = std::format("Invalid snapshot summary field: {}", key),
579+
});
580+
}
581+
if (!value.is_string()) {
582+
return unexpected<Error>({
583+
.kind = ErrorKind::kJsonParseError,
584+
.message =
585+
std::format("Invalid snapshot summary field value: {}", value.dump()),
586+
});
587+
}
588+
if (key == SnapshotSummaryFields::kOperation &&
589+
!kValidDataOperation.contains(value.get<std::string>())) {
590+
return unexpected<Error>({
591+
.kind = ErrorKind::kJsonParseError,
592+
.message = std::format("Invalid snapshot operation: {}", value.dump()),
593+
});
594+
}
595+
summary[key] = value.get<std::string>();
596+
}
597+
598+
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));
599+
600+
return std::make_unique<Snapshot>(snapshot_id, parent_snapshot_id, sequence_number,
601+
timestamp_ms, manifest_list, std::move(summary),
602+
schema_id);
603+
}
604+
422605
} // namespace iceberg

src/iceberg/json_internal.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,18 @@ nlohmann::json TypeToJson(const Type& type);
8888
/// \return The JSON representation of the field.
8989
nlohmann::json FieldToJson(const SchemaField& field);
9090

91+
/// \brief Convert an Iceberg SnapshotRef to JSON.
92+
///
93+
/// \param[in] snapshot_ref The Iceberg snapshot reference to convert.
94+
/// \return The JSON representation of the snapshot reference.
95+
nlohmann::json SnapshotRefToJson(const SnapshotRef& snapshot_ref);
96+
97+
/// \brief Convert an Iceberg Snapshot to JSON.
98+
///
99+
/// \param[in] snapshot The Iceberg snapshot to convert.
100+
/// \return The JSON representation of the snapshot.
101+
nlohmann::json SnapshotToJson(const Snapshot& snapshot);
102+
91103
/// \brief Convert JSON to an Iceberg Schema.
92104
///
93105
/// \param[in] json The JSON representation of the schema.
@@ -153,4 +165,17 @@ Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
153165
/// the JSON is malformed or missing expected fields, an error will be returned.
154166
Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
155167
const std::shared_ptr<Schema>& schema, const nlohmann::json& json);
168+
169+
/// \brief Convert JSON to an Iceberg SnapshotRef.
170+
///
171+
/// \param[in] json The JSON representation of the snapshot reference.
172+
/// \return The Iceberg snapshot reference or an error if the conversion fails.
173+
Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json);
174+
175+
/// \brief Convert JSON to an Iceberg Snapshot.
176+
///
177+
/// \param[in] json The JSON representation of the snapshot.
178+
/// \return The Iceberg snapshot or an error if the conversion fails.
179+
Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json);
180+
156181
} // namespace iceberg

src/iceberg/snapshot.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@
2121

2222
namespace iceberg {
2323

24+
bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const {
25+
return min_snapshots_to_keep == other.min_snapshots_to_keep &&
26+
max_snapshot_age_ms == other.max_snapshot_age_ms &&
27+
max_ref_age_ms == other.max_ref_age_ms;
28+
}
29+
30+
bool SnapshotRef::Tag::Equals(const SnapshotRef::Tag& other) const {
31+
return max_ref_age_ms == other.max_ref_age_ms;
32+
}
33+
2434
SnapshotRefType SnapshotRef::type() const noexcept {
2535
return std::visit(
2636
[&](const auto& retention) -> SnapshotRefType {
@@ -34,6 +44,24 @@ SnapshotRefType SnapshotRef::type() const noexcept {
3444
retention);
3545
}
3646

47+
bool SnapshotRef::Equals(const SnapshotRef& other) const {
48+
if (this == &other) {
49+
return true;
50+
}
51+
if (type() != other.type()) {
52+
return false;
53+
}
54+
55+
if (type() == SnapshotRefType::kBranch) {
56+
return snapshot_id == other.snapshot_id &&
57+
std::get<Branch>(retention) == std::get<Branch>(other.retention);
58+
59+
} else {
60+
return snapshot_id == other.snapshot_id &&
61+
std::get<Tag>(retention) == std::get<Tag>(other.retention);
62+
}
63+
}
64+
3765
std::optional<std::string_view> Snapshot::operation() const {
3866
auto it = summary.find(SnapshotSummaryFields::kOperation);
3967
if (it != summary.end()) {

src/iceberg/snapshot.h

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <variant>
2727

2828
#include "iceberg/iceberg_export.h"
29+
#include "iceberg/result.h"
2930

3031
namespace iceberg {
3132

@@ -39,6 +40,28 @@ enum class SnapshotRefType {
3940
kTag,
4041
};
4142

43+
/// \brief Get the relative snapshot reference type name
44+
ICEBERG_EXPORT constexpr std::string_view SnapshotRefTypeToString(
45+
SnapshotRefType type) noexcept {
46+
switch (type) {
47+
case SnapshotRefType::kBranch:
48+
return "branch";
49+
case SnapshotRefType::kTag:
50+
return "tag";
51+
default:
52+
return "invalid";
53+
}
54+
}
55+
/// \brief Get the relative snapshot reference type from name
56+
ICEBERG_EXPORT constexpr Result<SnapshotRefType> SnapshotRefTypeFromString(
57+
std::string_view str) noexcept {
58+
if (str == "branch") return SnapshotRefType::kBranch;
59+
if (str == "tag") return SnapshotRefType::kTag;
60+
return unexpected<Error>(
61+
{.kind = ErrorKind::kInvalidArgument,
62+
.message = "Invalid snapshot reference type: {}" + std::string(str)});
63+
}
64+
4265
/// \brief A reference to a snapshot, either a branch or a tag.
4366
struct ICEBERG_EXPORT SnapshotRef {
4467
struct ICEBERG_EXPORT Branch {
@@ -54,13 +77,35 @@ struct ICEBERG_EXPORT SnapshotRef {
5477
/// of the snapshot reference to keep while expiring snapshots. Defaults to table
5578
/// property history.expire.max-ref-age-ms. The main branch never expires.
5679
std::optional<int64_t> max_ref_age_ms;
80+
81+
/// \brief Compare two branches for equality.
82+
friend bool operator==(const Branch& lhs, const Branch& rhs) {
83+
return lhs.Equals(rhs);
84+
}
85+
86+
/// \brief Compare two branches for inequality.
87+
friend bool operator!=(const Branch& lhs, const Branch& rhs) { return !(lhs == rhs); }
88+
89+
private:
90+
/// \brief Compare two branches for equality.
91+
bool Equals(const Branch& other) const;
5792
};
5893

5994
struct ICEBERG_EXPORT Tag {
6095
/// For snapshot references except the main branch, a positive number for the max age
6196
/// of the snapshot reference to keep while expiring snapshots. Defaults to table
6297
/// property history.expire.max-ref-age-ms. The main branch never expires.
6398
std::optional<int64_t> max_ref_age_ms;
99+
100+
/// \brief Compare two tags for equality.
101+
friend bool operator==(const Tag& lhs, const Tag& rhs) { return lhs.Equals(rhs); }
102+
103+
/// \brief Compare two tags for inequality.
104+
friend bool operator!=(const Tag& lhs, const Tag& rhs) { return !(lhs == rhs); }
105+
106+
private:
107+
/// \brief Compare two tags for equality.
108+
bool Equals(const Tag& other) const;
64109
};
65110

66111
/// A reference's snapshot ID. The tagged snapshot or latest snapshot of a branch.
@@ -69,6 +114,20 @@ struct ICEBERG_EXPORT SnapshotRef {
69114
std::variant<Branch, Tag> retention;
70115

71116
SnapshotRefType type() const noexcept;
117+
118+
/// \brief Compare two snapshot refs for equality
119+
friend bool operator==(const SnapshotRef& lhs, const SnapshotRef& rhs) {
120+
return lhs.Equals(rhs);
121+
}
122+
123+
/// \brief Compare two snapshot refs for inequality.
124+
friend bool operator!=(const SnapshotRef& lhs, const SnapshotRef& rhs) {
125+
return !(lhs == rhs);
126+
}
127+
128+
private:
129+
/// \brief Compare two snapshot refs for equality.
130+
bool Equals(const SnapshotRef& other) const;
72131
};
73132

74133
/// \brief Optional Snapshot Summary Fields

0 commit comments

Comments
 (0)