Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

#include <format>
#include <regex>
#include <unordered_set>

#include <nlohmann/json.hpp>

#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/snapshot.h"
#include "iceberg/sort_order.h"
#include "iceberg/transform.h"
#include "iceberg/type.h"
Expand Down Expand Up @@ -70,6 +72,53 @@ constexpr std::string_view kValueRequired = "value-required";

constexpr std::string_view kFieldId = "field-id";
constexpr std::string_view kSpecId = "spec-id";
constexpr std::string_view kSnapshotId = "snapshot-id";
constexpr std::string_view kParentSnapshotId = "parent-snapshot-id";
constexpr std::string_view kSequenceNumber = "sequence-number";
constexpr std::string_view kTimestampMs = "timestamp-ms";
constexpr std::string_view kManifestList = "manifest-list";
constexpr std::string_view kSummary = "summary";
constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep";
constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms";
constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms";

const std::unordered_set<std::string_view> kValidSnapshotSummaryFields = {
SnapshotSummaryFields::kOperation,
SnapshotSummaryFields::kAddedDataFiles,
SnapshotSummaryFields::kDeletedDataFiles,
SnapshotSummaryFields::kTotalDataFiles,
SnapshotSummaryFields::kAddedDeleteFiles,
SnapshotSummaryFields::kAddedEqDeleteFiles,
SnapshotSummaryFields::kRemovedEqDeleteFiles,
SnapshotSummaryFields::kAddedPosDeleteFiles,
SnapshotSummaryFields::kRemovedPosDeleteFiles,
SnapshotSummaryFields::kAddedDVs,
SnapshotSummaryFields::kRemovedDVs,
SnapshotSummaryFields::kRemovedDeleteFiles,
SnapshotSummaryFields::kTotalDeleteFiles,
SnapshotSummaryFields::kAddedRecords,
SnapshotSummaryFields::kDeletedRecords,
SnapshotSummaryFields::kTotalRecords,
SnapshotSummaryFields::kAddedFileSize,
SnapshotSummaryFields::kRemovedFileSize,
SnapshotSummaryFields::kTotalFileSize,
SnapshotSummaryFields::kAddedPosDeletes,
SnapshotSummaryFields::kRemovedPosDeletes,
SnapshotSummaryFields::kTotalPosDeletes,
SnapshotSummaryFields::kAddedEqDeletes,
SnapshotSummaryFields::kRemovedEqDeletes,
SnapshotSummaryFields::kTotalEqDeletes,
SnapshotSummaryFields::kDeletedDuplicatedFiles,
SnapshotSummaryFields::kChangedPartitionCountProp,
SnapshotSummaryFields::kWAPId,
SnapshotSummaryFields::kPublishedWAPId,
SnapshotSummaryFields::kSourceSnapshotId,
SnapshotSummaryFields::kEngineName,
SnapshotSummaryFields::kEngineVersion};

const std::unordered_set<std::string_view> kValidDataOperation = {
DataOperation::kAppend, DataOperation::kReplace, DataOperation::kOverwrite,
DataOperation::kDelete};

template <typename T>
Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) {
Expand All @@ -89,6 +138,30 @@ Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) {
}
}

template <typename T>
Result<std::optional<T>> GetJsonValueOptional(const nlohmann::json& json,
std::string_view key) {
if (!json.contains(key)) {
return std::nullopt;
}
try {
return json.at(key).get<T>();
} catch (const std::exception& ex) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Failed to parse key '{}' in {}", key, json.dump()),
});
}
}

template <typename T>
void SetOptionalField(nlohmann::json& json, std::string_view key,
const std::optional<T>& value) {
if (value.has_value()) {
json[key] = *value;
}
}

} // namespace

nlohmann::json ToJson(const SortField& sort_field) {
Expand Down Expand Up @@ -231,6 +304,34 @@ nlohmann::json SchemaToJson(const Schema& schema) {
return json;
}

nlohmann::json SnapshotRefToJson(const SnapshotRef& ref) {
nlohmann::json json;
json[kSnapshotId] = ref.snapshot_id;
json[kType] = SnapshotRefTypeToString(ref.type());
if (ref.type() == SnapshotRefType::kBranch) {
const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
SetOptionalField(json, kMinSnapshotsToKeep, branch.min_snapshots_to_keep);
SetOptionalField(json, kMaxSnapshotAgeMs, branch.max_snapshot_age_ms);
SetOptionalField(json, kMaxRefAgeMs, branch.max_ref_age_ms);
} else if (ref.type() == SnapshotRefType::kTag) {
const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
SetOptionalField(json, kMaxRefAgeMs, tag.max_ref_age_ms);
}
return json;
}

nlohmann::json SnapshotToJson(const Snapshot& snapshot) {
nlohmann::json json;
json[kSnapshotId] = snapshot.snapshot_id;
SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id);
json[kSequenceNumber] = snapshot.sequence_number;
json[kTimestampMs] = snapshot.timestamp_ms;
json[kManifestList] = snapshot.manifest_list;
json[kSummary] = snapshot.summary;
SetOptionalField(json, kSchemaId, snapshot.schema_id);
return json;
}

namespace {

Result<std::unique_ptr<Type>> StructTypeFromJson(const nlohmann::json& json) {
Expand Down Expand Up @@ -419,4 +520,75 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
return std::make_unique<PartitionSpec>(schema, spec_id, std::move(partition_fields));
}

Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(
auto type,
GetJsonValue<std::string>(json, kType).and_then(SnapshotRefTypeFromString));
if (type == SnapshotRefType::kBranch) {
ICEBERG_ASSIGN_OR_RAISE(auto min_snapshots_to_keep,
GetJsonValueOptional<int32_t>(json, kMinSnapshotsToKeep));
ICEBERG_ASSIGN_OR_RAISE(auto max_snapshot_age_ms,
GetJsonValueOptional<int64_t>(json, kMaxSnapshotAgeMs));
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms,
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));

return std::make_unique<SnapshotRef>(
snapshot_id, SnapshotRef::Branch{.min_snapshots_to_keep = min_snapshots_to_keep,
.max_snapshot_age_ms = max_snapshot_age_ms,
.max_ref_age_ms = max_ref_age_ms});
} else {
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms,
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));

return std::make_unique<SnapshotRef>(
snapshot_id, SnapshotRef::Tag{.max_ref_age_ms = max_ref_age_ms});
}
}

Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to be consistent with the Java impl: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/SnapshotParser.java. Specifically, we need to deal with cases where sequence number or summary is missing.

@Fokko Will it actually happen that a snapshot does not have summary (and thus operation is also missing)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the spec, summary is required for v2 and v3 but optional for v1. So I believe the spec answers my question. We have to handle this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct!

  • For V1 the summary is optional.
  • For V2/V3 the summary is required, and also the operation. Some writers produced some malformed metadata in the past. Instead of throwing an exception, we would it is an overwrite operation, since that's the most generic one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like we need to set operation to overwrite when summary is available but operation is missing. @zhjwpku

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, please take a look.

ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(auto sequence_number,
GetJsonValue<int64_t>(json, kSequenceNumber));
ICEBERG_ASSIGN_OR_RAISE(auto timestamp_ms, GetJsonValue<int64_t>(json, kTimestampMs));
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list,
GetJsonValue<std::string>(json, kManifestList));

ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot_id,
GetJsonValueOptional<int64_t>(json, kParentSnapshotId));

ICEBERG_ASSIGN_OR_RAISE(auto summary_json,
GetJsonValue<nlohmann::json>(json, kSummary));
std::unordered_map<std::string, std::string> summary;
for (const auto& [key, value] : summary_json.items()) {
if (!kValidSnapshotSummaryFields.contains(key)) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Invalid snapshot summary field: {}", key),
});
}
if (!value.is_string()) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message =
std::format("Invalid snapshot summary field value: {}", value.dump()),
});
}
if (key == SnapshotSummaryFields::kOperation &&
!kValidDataOperation.contains(value.get<std::string>())) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Invalid snapshot operation: {}", value.dump()),
});
}
summary[key] = value.get<std::string>();
}

ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));

return std::make_unique<Snapshot>(snapshot_id, parent_snapshot_id, sequence_number,
timestamp_ms, manifest_list, std::move(summary),
schema_id);
}

} // namespace iceberg
25 changes: 25 additions & 0 deletions src/iceberg/json_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ nlohmann::json TypeToJson(const Type& type);
/// \return The JSON representation of the field.
nlohmann::json FieldToJson(const SchemaField& field);

/// \brief Convert an Iceberg SnapshotRef to JSON.
///
/// \param[in] snapshot_ref The Iceberg snapshot reference to convert.
/// \return The JSON representation of the snapshot reference.
nlohmann::json SnapshotRefToJson(const SnapshotRef& snapshot_ref);

/// \brief Convert an Iceberg Snapshot to JSON.
///
/// \param[in] snapshot The Iceberg snapshot to convert.
/// \return The JSON representation of the snapshot.
nlohmann::json SnapshotToJson(const Snapshot& snapshot);

/// \brief Convert JSON to an Iceberg Schema.
///
/// \param[in] json The JSON representation of the schema.
Expand Down Expand Up @@ -153,4 +165,17 @@ Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
/// the JSON is malformed or missing expected fields, an error will be returned.
Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
const std::shared_ptr<Schema>& schema, const nlohmann::json& json);

/// \brief Convert JSON to an Iceberg SnapshotRef.
///
/// \param[in] json The JSON representation of the snapshot reference.
/// \return The Iceberg snapshot reference or an error if the conversion fails.
Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json);

/// \brief Convert JSON to an Iceberg Snapshot.
///
/// \param[in] json The JSON representation of the snapshot.
/// \return The Iceberg snapshot or an error if the conversion fails.
Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json);

} // namespace iceberg
28 changes: 28 additions & 0 deletions src/iceberg/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@

namespace iceberg {

bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const {
return min_snapshots_to_keep == other.min_snapshots_to_keep &&
max_snapshot_age_ms == other.max_snapshot_age_ms &&
max_ref_age_ms == other.max_ref_age_ms;
}

bool SnapshotRef::Tag::Equals(const SnapshotRef::Tag& other) const {
return max_ref_age_ms == other.max_ref_age_ms;
}

SnapshotRefType SnapshotRef::type() const noexcept {
return std::visit(
[&](const auto& retention) -> SnapshotRefType {
Expand All @@ -34,6 +44,24 @@ SnapshotRefType SnapshotRef::type() const noexcept {
retention);
}

bool SnapshotRef::Equals(const SnapshotRef& other) const {
if (this == &other) {
return true;
}
if (type() != other.type()) {
return false;
}

if (type() == SnapshotRefType::kBranch) {
return snapshot_id == other.snapshot_id &&
std::get<Branch>(retention) == std::get<Branch>(other.retention);

} else {
return snapshot_id == other.snapshot_id &&
std::get<Tag>(retention) == std::get<Tag>(other.retention);
}
}

std::optional<std::string_view> Snapshot::operation() const {
auto it = summary.find(SnapshotSummaryFields::kOperation);
if (it != summary.end()) {
Expand Down
63 changes: 60 additions & 3 deletions src/iceberg/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <variant>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"

namespace iceberg {

Expand All @@ -39,6 +40,26 @@ enum class SnapshotRefType {
kTag,
};

/// \brief Get the relative snapshot reference type name
ICEBERG_EXPORT constexpr std::string_view SnapshotRefTypeToString(
SnapshotRefType type) noexcept {
switch (type) {
case SnapshotRefType::kBranch:
return "branch";
case SnapshotRefType::kTag:
return "tag";
}
}
/// \brief Get the relative snapshot reference type from name
ICEBERG_EXPORT constexpr Result<SnapshotRefType> SnapshotRefTypeFromString(
std::string_view str) noexcept {
if (str == "branch") return SnapshotRefType::kBranch;
if (str == "tag") return SnapshotRefType::kTag;
return unexpected<Error>(
{.kind = ErrorKind::kInvalidArgument,
.message = "Invalid snapshot reference type: {}" + std::string(str)});
}

/// \brief A reference to a snapshot, either a branch or a tag.
struct ICEBERG_EXPORT SnapshotRef {
struct ICEBERG_EXPORT Branch {
Expand All @@ -54,13 +75,35 @@ struct ICEBERG_EXPORT SnapshotRef {
/// of the snapshot reference to keep while expiring snapshots. Defaults to table
/// property history.expire.max-ref-age-ms. The main branch never expires.
std::optional<int64_t> max_ref_age_ms;

/// \brief Compare two branches for equality.
friend bool operator==(const Branch& lhs, const Branch& rhs) {
return lhs.Equals(rhs);
}

/// \brief Compare two branches for inequality.
friend bool operator!=(const Branch& lhs, const Branch& rhs) { return !(lhs == rhs); }

private:
/// \brief Compare two branches for equality.
bool Equals(const Branch& other) const;
};

struct ICEBERG_EXPORT Tag {
/// For snapshot references except the main branch, a positive number for the max age
/// of the snapshot reference to keep while expiring snapshots. Defaults to table
/// property history.expire.max-ref-age-ms. The main branch never expires.
std::optional<int64_t> max_ref_age_ms;

/// \brief Compare two tags for equality.
friend bool operator==(const Tag& lhs, const Tag& rhs) { return lhs.Equals(rhs); }

/// \brief Compare two tags for inequality.
friend bool operator!=(const Tag& lhs, const Tag& rhs) { return !(lhs == rhs); }

private:
/// \brief Compare two tags for equality.
bool Equals(const Tag& other) const;
};

/// A reference's snapshot ID. The tagged snapshot or latest snapshot of a branch.
Expand All @@ -69,6 +112,20 @@ struct ICEBERG_EXPORT SnapshotRef {
std::variant<Branch, Tag> retention;

SnapshotRefType type() const noexcept;

/// \brief Compare two snapshot refs for equality
friend bool operator==(const SnapshotRef& lhs, const SnapshotRef& rhs) {
return lhs.Equals(rhs);
}

/// \brief Compare two snapshot refs for inequality.
friend bool operator!=(const SnapshotRef& lhs, const SnapshotRef& rhs) {
return !(lhs == rhs);
}

private:
/// \brief Compare two snapshot refs for equality.
bool Equals(const SnapshotRef& other) const;
};

/// \brief Optional Snapshot Summary Fields
Expand Down Expand Up @@ -139,11 +196,11 @@ struct SnapshotSummaryFields {
/// Other Fields, see https://iceberg.apache.org/spec/#other-fields

/// \brief The Write-Audit-Publish id of a staged snapshot
inline static const std::string kWAPID = "wap.id";
inline static const std::string kWAPId = "wap.id";
/// \brief The Write-Audit-Publish id of a snapshot already been published
inline static const std::string kPublishedWAPID = "published-wap-id";
inline static const std::string kPublishedWAPId = "published-wap-id";
/// \brief The original id of a cherry-picked snapshot
inline static const std::string kSourceSnapshotID = "source-snapshot-id";
inline static const std::string kSourceSnapshotId = "source-snapshot-id";
/// \brief Name of the engine that created the snapshot
inline static const std::string kEngineName = "engine-name";
/// \brief Version of the engine that created the snapshot
Expand Down
Loading
Loading