Skip to content

Commit c343e94

Browse files
authored
feat: snapshot serde (#74)
Signed-off-by: Junwang Zhao <[email protected]>
1 parent 5ba0a84 commit c343e94

File tree

6 files changed

+418
-11
lines changed

6 files changed

+418
-11
lines changed

src/iceberg/json_internal.cc

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,18 @@
1919

2020
#include "iceberg/json_internal.h"
2121

22+
#include <cstdint>
2223
#include <format>
2324
#include <regex>
25+
#include <unordered_set>
2426

2527
#include <nlohmann/json.hpp>
2628

2729
#include "iceberg/partition_spec.h"
2830
#include "iceberg/result.h"
2931
#include "iceberg/schema.h"
3032
#include "iceberg/schema_internal.h"
33+
#include "iceberg/snapshot.h"
3134
#include "iceberg/sort_order.h"
3235
#include "iceberg/transform.h"
3336
#include "iceberg/type.h"
@@ -70,6 +73,55 @@ constexpr std::string_view kValueRequired = "value-required";
7073

7174
constexpr std::string_view kFieldId = "field-id";
7275
constexpr std::string_view kSpecId = "spec-id";
76+
constexpr std::string_view kSnapshotId = "snapshot-id";
77+
constexpr std::string_view kParentSnapshotId = "parent-snapshot-id";
78+
constexpr std::string_view kSequenceNumber = "sequence-number";
79+
constexpr std::string_view kTimestampMs = "timestamp-ms";
80+
constexpr std::string_view kManifestList = "manifest-list";
81+
constexpr std::string_view kSummary = "summary";
82+
constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep";
83+
constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms";
84+
constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms";
85+
86+
constexpr int64_t kInitialSequenceNumber = 0;
87+
88+
const std::unordered_set<std::string_view> kValidSnapshotSummaryFields = {
89+
SnapshotSummaryFields::kOperation,
90+
SnapshotSummaryFields::kAddedDataFiles,
91+
SnapshotSummaryFields::kDeletedDataFiles,
92+
SnapshotSummaryFields::kTotalDataFiles,
93+
SnapshotSummaryFields::kAddedDeleteFiles,
94+
SnapshotSummaryFields::kAddedEqDeleteFiles,
95+
SnapshotSummaryFields::kRemovedEqDeleteFiles,
96+
SnapshotSummaryFields::kAddedPosDeleteFiles,
97+
SnapshotSummaryFields::kRemovedPosDeleteFiles,
98+
SnapshotSummaryFields::kAddedDVs,
99+
SnapshotSummaryFields::kRemovedDVs,
100+
SnapshotSummaryFields::kRemovedDeleteFiles,
101+
SnapshotSummaryFields::kTotalDeleteFiles,
102+
SnapshotSummaryFields::kAddedRecords,
103+
SnapshotSummaryFields::kDeletedRecords,
104+
SnapshotSummaryFields::kTotalRecords,
105+
SnapshotSummaryFields::kAddedFileSize,
106+
SnapshotSummaryFields::kRemovedFileSize,
107+
SnapshotSummaryFields::kTotalFileSize,
108+
SnapshotSummaryFields::kAddedPosDeletes,
109+
SnapshotSummaryFields::kRemovedPosDeletes,
110+
SnapshotSummaryFields::kTotalPosDeletes,
111+
SnapshotSummaryFields::kAddedEqDeletes,
112+
SnapshotSummaryFields::kRemovedEqDeletes,
113+
SnapshotSummaryFields::kTotalEqDeletes,
114+
SnapshotSummaryFields::kDeletedDuplicatedFiles,
115+
SnapshotSummaryFields::kChangedPartitionCountProp,
116+
SnapshotSummaryFields::kWAPId,
117+
SnapshotSummaryFields::kPublishedWAPId,
118+
SnapshotSummaryFields::kSourceSnapshotId,
119+
SnapshotSummaryFields::kEngineName,
120+
SnapshotSummaryFields::kEngineVersion};
121+
122+
const std::unordered_set<std::string_view> kValidDataOperation = {
123+
DataOperation::kAppend, DataOperation::kReplace, DataOperation::kOverwrite,
124+
DataOperation::kDelete};
73125

74126
template <typename T>
75127
Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) {
@@ -89,6 +141,30 @@ Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) {
89141
}
90142
}
91143

144+
template <typename T>
145+
Result<std::optional<T>> GetJsonValueOptional(const nlohmann::json& json,
146+
std::string_view key) {
147+
if (!json.contains(key)) {
148+
return std::nullopt;
149+
}
150+
try {
151+
return json.at(key).get<T>();
152+
} catch (const std::exception& ex) {
153+
return unexpected<Error>({
154+
.kind = ErrorKind::kJsonParseError,
155+
.message = std::format("Failed to parse key '{}' in {}", key, json.dump()),
156+
});
157+
}
158+
}
159+
160+
template <typename T>
161+
void SetOptionalField(nlohmann::json& json, std::string_view key,
162+
const std::optional<T>& value) {
163+
if (value.has_value()) {
164+
json[key] = *value;
165+
}
166+
}
167+
92168
} // namespace
93169

94170
nlohmann::json ToJson(const SortField& sort_field) {
@@ -231,6 +307,39 @@ nlohmann::json SchemaToJson(const Schema& schema) {
231307
return json;
232308
}
233309

310+
nlohmann::json ToJson(const SnapshotRef& ref) {
311+
nlohmann::json json;
312+
json[kSnapshotId] = ref.snapshot_id;
313+
json[kType] = SnapshotRefTypeToString(ref.type());
314+
if (ref.type() == SnapshotRefType::kBranch) {
315+
const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
316+
SetOptionalField(json, kMinSnapshotsToKeep, branch.min_snapshots_to_keep);
317+
SetOptionalField(json, kMaxSnapshotAgeMs, branch.max_snapshot_age_ms);
318+
SetOptionalField(json, kMaxRefAgeMs, branch.max_ref_age_ms);
319+
} else if (ref.type() == SnapshotRefType::kTag) {
320+
const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
321+
SetOptionalField(json, kMaxRefAgeMs, tag.max_ref_age_ms);
322+
}
323+
return json;
324+
}
325+
326+
nlohmann::json ToJson(const Snapshot& snapshot) {
327+
nlohmann::json json;
328+
json[kSnapshotId] = snapshot.snapshot_id;
329+
SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id);
330+
if (snapshot.sequence_number > kInitialSequenceNumber) {
331+
json[kSequenceNumber] = snapshot.sequence_number;
332+
}
333+
json[kTimestampMs] = snapshot.timestamp_ms;
334+
json[kManifestList] = snapshot.manifest_list;
335+
// If there is an operation, write the summary map
336+
if (snapshot.operation().has_value()) {
337+
json[kSummary] = snapshot.summary;
338+
}
339+
SetOptionalField(json, kSchemaId, snapshot.schema_id);
340+
return json;
341+
}
342+
234343
namespace {
235344

236345
Result<std::unique_ptr<Type>> StructTypeFromJson(const nlohmann::json& json) {
@@ -419,4 +528,82 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
419528
return std::make_unique<PartitionSpec>(schema, spec_id, std::move(partition_fields));
420529
}
421530

531+
Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
532+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
533+
ICEBERG_ASSIGN_OR_RAISE(
534+
auto type,
535+
GetJsonValue<std::string>(json, kType).and_then(SnapshotRefTypeFromString));
536+
if (type == SnapshotRefType::kBranch) {
537+
ICEBERG_ASSIGN_OR_RAISE(auto min_snapshots_to_keep,
538+
GetJsonValueOptional<int32_t>(json, kMinSnapshotsToKeep));
539+
ICEBERG_ASSIGN_OR_RAISE(auto max_snapshot_age_ms,
540+
GetJsonValueOptional<int64_t>(json, kMaxSnapshotAgeMs));
541+
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms,
542+
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
543+
544+
return std::make_unique<SnapshotRef>(
545+
snapshot_id, SnapshotRef::Branch{.min_snapshots_to_keep = min_snapshots_to_keep,
546+
.max_snapshot_age_ms = max_snapshot_age_ms,
547+
.max_ref_age_ms = max_ref_age_ms});
548+
} else {
549+
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms,
550+
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
551+
552+
return std::make_unique<SnapshotRef>(
553+
snapshot_id, SnapshotRef::Tag{.max_ref_age_ms = max_ref_age_ms});
554+
}
555+
}
556+
557+
Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
558+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
559+
ICEBERG_ASSIGN_OR_RAISE(auto sequence_number,
560+
GetJsonValueOptional<int64_t>(json, kSequenceNumber));
561+
ICEBERG_ASSIGN_OR_RAISE(auto timestamp_ms, GetJsonValue<int64_t>(json, kTimestampMs));
562+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list,
563+
GetJsonValue<std::string>(json, kManifestList));
564+
565+
ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot_id,
566+
GetJsonValueOptional<int64_t>(json, kParentSnapshotId));
567+
568+
ICEBERG_ASSIGN_OR_RAISE(auto summary_json,
569+
GetJsonValueOptional<nlohmann::json>(json, kSummary));
570+
std::unordered_map<std::string, std::string> summary;
571+
if (summary_json.has_value()) {
572+
for (const auto& [key, value] : summary_json->items()) {
573+
if (!kValidSnapshotSummaryFields.contains(key)) {
574+
return unexpected<Error>({
575+
.kind = ErrorKind::kJsonParseError,
576+
.message = std::format("Invalid snapshot summary field: {}", key),
577+
});
578+
}
579+
if (!value.is_string()) {
580+
return unexpected<Error>({
581+
.kind = ErrorKind::kJsonParseError,
582+
.message =
583+
std::format("Invalid snapshot summary field value: {}", value.dump()),
584+
});
585+
}
586+
if (key == SnapshotSummaryFields::kOperation &&
587+
!kValidDataOperation.contains(value.get<std::string>())) {
588+
return unexpected<Error>({
589+
.kind = ErrorKind::kJsonParseError,
590+
.message = std::format("Invalid snapshot operation: {}", value.dump()),
591+
});
592+
}
593+
summary[key] = value.get<std::string>();
594+
}
595+
// If summary is available but operation is missing, set operation to overwrite.
596+
if (!summary.contains(SnapshotSummaryFields::kOperation)) {
597+
summary[SnapshotSummaryFields::kOperation] = DataOperation::kOverwrite;
598+
}
599+
}
600+
601+
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));
602+
603+
return std::make_unique<Snapshot>(
604+
snapshot_id, parent_snapshot_id,
605+
sequence_number.has_value() ? *sequence_number : kInitialSequenceNumber,
606+
timestamp_ms, manifest_list, std::move(summary), schema_id);
607+
}
608+
422609
} // namespace iceberg

src/iceberg/json_internal.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,18 @@ nlohmann::json TypeToJson(const Type& type);
8888
/// \return The JSON representation of the field.
8989
nlohmann::json FieldToJson(const SchemaField& field);
9090

91+
/// \brief Serializes a `SnapshotRef` object to JSON.
92+
///
93+
/// \param[in] snapshot_ref The `SnapshotRef` object to be serialized.
94+
/// \return A JSON object representing the `SnapshotRef`.
95+
nlohmann::json ToJson(const SnapshotRef& snapshot_ref);
96+
97+
/// \brief Serializes a `Snapshot` object to JSON.
98+
///
99+
/// \param[in] snapshot The `Snapshot` object to be serialized.
100+
/// \return A JSON object representing the `snapshot`.
101+
nlohmann::json ToJson(const Snapshot& snapshot);
102+
91103
/// \brief Convert JSON to an Iceberg Schema.
92104
///
93105
/// \param[in] json The JSON representation of the schema.
@@ -153,4 +165,17 @@ Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
153165
/// the JSON is malformed or missing expected fields, an error will be returned.
154166
Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
155167
const std::shared_ptr<Schema>& schema, const nlohmann::json& json);
168+
169+
/// \brief Deserializes a JSON object into a `SnapshotRef` object.
170+
///
171+
/// \param[in] json The JSON object representing a `SnapshotRef`.
172+
/// \return A `SnapshotRef` object or an error if the conversion fails.
173+
Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json);
174+
175+
/// \brief Deserializes a JSON object into a `Snapshot` object.
176+
///
177+
/// \param[in] json The JSON representation of the snapshot.
178+
/// \return A `Snapshot` object or an error if the conversion fails.
179+
Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json);
180+
156181
} // namespace iceberg

src/iceberg/snapshot.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@
2121

2222
namespace iceberg {
2323

24+
bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const {
25+
return min_snapshots_to_keep == other.min_snapshots_to_keep &&
26+
max_snapshot_age_ms == other.max_snapshot_age_ms &&
27+
max_ref_age_ms == other.max_ref_age_ms;
28+
}
29+
30+
bool SnapshotRef::Tag::Equals(const SnapshotRef::Tag& other) const {
31+
return max_ref_age_ms == other.max_ref_age_ms;
32+
}
33+
2434
SnapshotRefType SnapshotRef::type() const noexcept {
2535
return std::visit(
2636
[&](const auto& retention) -> SnapshotRefType {
@@ -34,6 +44,24 @@ SnapshotRefType SnapshotRef::type() const noexcept {
3444
retention);
3545
}
3646

47+
bool SnapshotRef::Equals(const SnapshotRef& other) const {
48+
if (this == &other) {
49+
return true;
50+
}
51+
if (type() != other.type()) {
52+
return false;
53+
}
54+
55+
if (type() == SnapshotRefType::kBranch) {
56+
return snapshot_id == other.snapshot_id &&
57+
std::get<Branch>(retention) == std::get<Branch>(other.retention);
58+
59+
} else {
60+
return snapshot_id == other.snapshot_id &&
61+
std::get<Tag>(retention) == std::get<Tag>(other.retention);
62+
}
63+
}
64+
3765
std::optional<std::string_view> Snapshot::operation() const {
3866
auto it = summary.find(SnapshotSummaryFields::kOperation);
3967
if (it != summary.end()) {

0 commit comments

Comments
 (0)