Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@

#include "iceberg/json_internal.h"

#include <cstdint>
#include <format>
#include <regex>
#include <unordered_set>

#include <nlohmann/json.hpp>

#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/snapshot.h"
#include "iceberg/sort_order.h"
#include "iceberg/transform.h"
#include "iceberg/type.h"
Expand Down Expand Up @@ -70,6 +73,55 @@ constexpr std::string_view kValueRequired = "value-required";

constexpr std::string_view kFieldId = "field-id";
constexpr std::string_view kSpecId = "spec-id";
constexpr std::string_view kSnapshotId = "snapshot-id";
constexpr std::string_view kParentSnapshotId = "parent-snapshot-id";
constexpr std::string_view kSequenceNumber = "sequence-number";
constexpr std::string_view kTimestampMs = "timestamp-ms";
constexpr std::string_view kManifestList = "manifest-list";
constexpr std::string_view kSummary = "summary";
constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep";
constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms";
constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms";

constexpr int64_t kInitialSequenceNumber = 0;

const std::unordered_set<std::string_view> kValidSnapshotSummaryFields = {
SnapshotSummaryFields::kOperation,
SnapshotSummaryFields::kAddedDataFiles,
SnapshotSummaryFields::kDeletedDataFiles,
SnapshotSummaryFields::kTotalDataFiles,
SnapshotSummaryFields::kAddedDeleteFiles,
SnapshotSummaryFields::kAddedEqDeleteFiles,
SnapshotSummaryFields::kRemovedEqDeleteFiles,
SnapshotSummaryFields::kAddedPosDeleteFiles,
SnapshotSummaryFields::kRemovedPosDeleteFiles,
SnapshotSummaryFields::kAddedDVs,
SnapshotSummaryFields::kRemovedDVs,
SnapshotSummaryFields::kRemovedDeleteFiles,
SnapshotSummaryFields::kTotalDeleteFiles,
SnapshotSummaryFields::kAddedRecords,
SnapshotSummaryFields::kDeletedRecords,
SnapshotSummaryFields::kTotalRecords,
SnapshotSummaryFields::kAddedFileSize,
SnapshotSummaryFields::kRemovedFileSize,
SnapshotSummaryFields::kTotalFileSize,
SnapshotSummaryFields::kAddedPosDeletes,
SnapshotSummaryFields::kRemovedPosDeletes,
SnapshotSummaryFields::kTotalPosDeletes,
SnapshotSummaryFields::kAddedEqDeletes,
SnapshotSummaryFields::kRemovedEqDeletes,
SnapshotSummaryFields::kTotalEqDeletes,
SnapshotSummaryFields::kDeletedDuplicatedFiles,
SnapshotSummaryFields::kChangedPartitionCountProp,
SnapshotSummaryFields::kWAPId,
SnapshotSummaryFields::kPublishedWAPId,
SnapshotSummaryFields::kSourceSnapshotId,
SnapshotSummaryFields::kEngineName,
SnapshotSummaryFields::kEngineVersion};

const std::unordered_set<std::string_view> kValidDataOperation = {
DataOperation::kAppend, DataOperation::kReplace, DataOperation::kOverwrite,
DataOperation::kDelete};

template <typename T>
Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) {
Expand All @@ -89,6 +141,30 @@ Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) {
}
}

template <typename T>
Result<std::optional<T>> GetJsonValueOptional(const nlohmann::json& json,
std::string_view key) {
if (!json.contains(key)) {
return std::nullopt;
}
try {
return json.at(key).get<T>();
} catch (const std::exception& ex) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Failed to parse key '{}' in {}", key, json.dump()),
});
}
}

template <typename T>
void SetOptionalField(nlohmann::json& json, std::string_view key,
const std::optional<T>& value) {
if (value.has_value()) {
json[key] = *value;
}
}

} // namespace

nlohmann::json ToJson(const SortField& sort_field) {
Expand Down Expand Up @@ -231,6 +307,39 @@ nlohmann::json SchemaToJson(const Schema& schema) {
return json;
}

nlohmann::json ToJson(const SnapshotRef& ref) {
nlohmann::json json;
json[kSnapshotId] = ref.snapshot_id;
json[kType] = SnapshotRefTypeToString(ref.type());
if (ref.type() == SnapshotRefType::kBranch) {
const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
SetOptionalField(json, kMinSnapshotsToKeep, branch.min_snapshots_to_keep);
SetOptionalField(json, kMaxSnapshotAgeMs, branch.max_snapshot_age_ms);
SetOptionalField(json, kMaxRefAgeMs, branch.max_ref_age_ms);
} else if (ref.type() == SnapshotRefType::kTag) {
const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
SetOptionalField(json, kMaxRefAgeMs, tag.max_ref_age_ms);
}
return json;
}

nlohmann::json ToJson(const Snapshot& snapshot) {
nlohmann::json json;
json[kSnapshotId] = snapshot.snapshot_id;
SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id);
if (snapshot.sequence_number > kInitialSequenceNumber) {
json[kSequenceNumber] = snapshot.sequence_number;
}
json[kTimestampMs] = snapshot.timestamp_ms;
json[kManifestList] = snapshot.manifest_list;
// If there is an operation, write the summary map
if (snapshot.operation().has_value()) {
json[kSummary] = snapshot.summary;
}
SetOptionalField(json, kSchemaId, snapshot.schema_id);
return json;
}

namespace {

Result<std::unique_ptr<Type>> StructTypeFromJson(const nlohmann::json& json) {
Expand Down Expand Up @@ -419,4 +528,78 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
return std::make_unique<PartitionSpec>(schema, spec_id, std::move(partition_fields));
}

Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(
auto type,
GetJsonValue<std::string>(json, kType).and_then(SnapshotRefTypeFromString));
if (type == SnapshotRefType::kBranch) {
ICEBERG_ASSIGN_OR_RAISE(auto min_snapshots_to_keep,
GetJsonValueOptional<int32_t>(json, kMinSnapshotsToKeep));
ICEBERG_ASSIGN_OR_RAISE(auto max_snapshot_age_ms,
GetJsonValueOptional<int64_t>(json, kMaxSnapshotAgeMs));
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms,
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));

return std::make_unique<SnapshotRef>(
snapshot_id, SnapshotRef::Branch{.min_snapshots_to_keep = min_snapshots_to_keep,
.max_snapshot_age_ms = max_snapshot_age_ms,
.max_ref_age_ms = max_ref_age_ms});
} else {
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms,
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));

return std::make_unique<SnapshotRef>(
snapshot_id, SnapshotRef::Tag{.max_ref_age_ms = max_ref_age_ms});
}
}

Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to be consistent with the Java impl: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/SnapshotParser.java. Specifically, we need to deal with cases where sequence number or summary is missing.

@Fokko Will it actually happen that a snapshot does not have summary (and thus operation is also missing)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the spec, summary is required for v2 and v3 but optional for v1. So I believe the spec answers my question. We have to handle this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct!

  • For V1 the summary is optional.
  • For V2/V3 the summary is required, and also the operation. Some writers produced some malformed metadata in the past. Instead of throwing an exception, we would it is an overwrite operation, since that's the most generic one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like we need to set operation to overwrite when summary is available but operation is missing. @zhjwpku

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, please take a look.

ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(auto sequence_number,
GetJsonValueOptional<int64_t>(json, kSequenceNumber));
ICEBERG_ASSIGN_OR_RAISE(auto timestamp_ms, GetJsonValue<int64_t>(json, kTimestampMs));
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list,
GetJsonValue<std::string>(json, kManifestList));

ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot_id,
GetJsonValueOptional<int64_t>(json, kParentSnapshotId));

ICEBERG_ASSIGN_OR_RAISE(auto summary_json,
GetJsonValueOptional<nlohmann::json>(json, kSummary));
std::unordered_map<std::string, std::string> summary;
if (summary_json.has_value()) {
for (const auto& [key, value] : summary_json->items()) {
if (!kValidSnapshotSummaryFields.contains(key)) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Invalid snapshot summary field: {}", key),
});
}
if (!value.is_string()) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message =
std::format("Invalid snapshot summary field value: {}", value.dump()),
});
}
if (key == SnapshotSummaryFields::kOperation &&
!kValidDataOperation.contains(value.get<std::string>())) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Invalid snapshot operation: {}", value.dump()),
});
}
summary[key] = value.get<std::string>();
}
}

ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));

return std::make_unique<Snapshot>(
snapshot_id, parent_snapshot_id,
sequence_number.has_value() ? *sequence_number : kInitialSequenceNumber,
timestamp_ms, manifest_list, std::move(summary), schema_id);
}

} // namespace iceberg
25 changes: 25 additions & 0 deletions src/iceberg/json_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ nlohmann::json TypeToJson(const Type& type);
/// \return The JSON representation of the field.
nlohmann::json FieldToJson(const SchemaField& field);

/// \brief Serializes a `SnapshotRef` object to JSON.
///
/// \param[in] snapshot_ref The `SnapshotRef` object to be serialized.
/// \return A JSON object representing the `SnapshotRef`.
nlohmann::json ToJson(const SnapshotRef& snapshot_ref);

/// \brief Serializes a `Snapshot` object to JSON.
///
/// \param[in] snapshot The `Snapshot` object to be serialized.
/// \return A JSON object representing the `snapshot`.
nlohmann::json ToJson(const Snapshot& snapshot);

/// \brief Convert JSON to an Iceberg Schema.
///
/// \param[in] json The JSON representation of the schema.
Expand Down Expand Up @@ -153,4 +165,17 @@ Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
/// the JSON is malformed or missing expected fields, an error will be returned.
Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
const std::shared_ptr<Schema>& schema, const nlohmann::json& json);

/// \brief Deserializes a JSON object into a `SnapshotRef` object.
///
/// \param[in] json The JSON object representing a `SnapshotRef`.
/// \return A `SnapshotRef` object or an error if the conversion fails.
Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json);

/// \brief Deserializes a JSON object into a `Snapshot` object.
///
/// \param[in] json The JSON representation of the snapshot.
/// \return A `Snapshot` object or an error if the conversion fails.
Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json);

} // namespace iceberg
28 changes: 28 additions & 0 deletions src/iceberg/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@

namespace iceberg {

bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const {
return min_snapshots_to_keep == other.min_snapshots_to_keep &&
max_snapshot_age_ms == other.max_snapshot_age_ms &&
max_ref_age_ms == other.max_ref_age_ms;
}

bool SnapshotRef::Tag::Equals(const SnapshotRef::Tag& other) const {
return max_ref_age_ms == other.max_ref_age_ms;
}

SnapshotRefType SnapshotRef::type() const noexcept {
return std::visit(
[&](const auto& retention) -> SnapshotRefType {
Expand All @@ -34,6 +44,24 @@ SnapshotRefType SnapshotRef::type() const noexcept {
retention);
}

bool SnapshotRef::Equals(const SnapshotRef& other) const {
if (this == &other) {
return true;
}
if (type() != other.type()) {
return false;
}

if (type() == SnapshotRefType::kBranch) {
return snapshot_id == other.snapshot_id &&
std::get<Branch>(retention) == std::get<Branch>(other.retention);

} else {
return snapshot_id == other.snapshot_id &&
std::get<Tag>(retention) == std::get<Tag>(other.retention);
}
}

std::optional<std::string_view> Snapshot::operation() const {
auto it = summary.find(SnapshotSummaryFields::kOperation);
if (it != summary.end()) {
Expand Down
Loading
Loading