Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/.licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ header:
- 'NOTICE'
- 'src/iceberg/expected.h'
- 'src/iceberg/util/murmurhash3_internal.*'
- 'test/resources/**'

comment: on-failure
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ repos:
rev: v19.1.5
hooks:
- id: clang-format
exclude: ^test/resources/.*\.json$
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does clang want to format json? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, without this change the pre-commit fails and complains that clang-format does not support JSON file.


- repo: https://github.com/cheshirekow/cmake-format-precommit
rev: v0.6.10
Expand Down
38 changes: 38 additions & 0 deletions src/iceberg/constants.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <cstdint>

namespace iceberg {

constexpr int8_t kDefaultTableFormatVersion = 2;
constexpr int8_t kSupportedTableFormatVersion = 3;
constexpr int8_t kMinFormatVersionRowLineage = 3;
constexpr int32_t kInitialSpecId = 0;
constexpr int32_t kInitialSortOrderId = 1;
constexpr int32_t kInitialSchemaId = 0;
constexpr int64_t kInitialRowId = 0;
constexpr int64_t kInitialSequenceNumber = 0;
constexpr int64_t kInvalidSequenceNumber = -1;
constexpr int64_t kInvalidSnapshotId = -1;
constexpr int32_t kInvalidFieldId = -1;

} // namespace iceberg
92 changes: 63 additions & 29 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
#include <algorithm>
#include <cstdint>
#include <format>
#include <ranges>
#include <regex>
#include <type_traits>
#include <unordered_set>

#include <nlohmann/json.hpp>

#include "iceberg/constants.h"
#include "iceberg/partition_field.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
Expand Down Expand Up @@ -248,7 +251,7 @@ Result<std::vector<T>> FromJsonList(
list.emplace_back(std::move(entry));
}
}
return {};
return list;
}

/// \brief Parse a list of items from a JSON object.
Expand Down Expand Up @@ -471,7 +474,7 @@ nlohmann::json ToJson(const Type& type) {

nlohmann::json ToJson(const Schema& schema) {
nlohmann::json json = ToJson(static_cast<const Type&>(schema));
json[kSchemaId] = schema.schema_id();
SetOptionalField(json, kSchemaId, schema.schema_id());
// TODO(gangwu): add identifier-field-ids.
return json;
}
Expand All @@ -496,7 +499,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
nlohmann::json json;
json[kSnapshotId] = snapshot.snapshot_id;
SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id);
if (snapshot.sequence_number > TableMetadata::kInitialSequenceNumber) {
if (snapshot.sequence_number > kInitialSequenceNumber) {
json[kSequenceNumber] = snapshot.sequence_number;
}
json[kTimestampMs] = snapshot.timestamp_ms;
Expand Down Expand Up @@ -625,7 +628,7 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
}

Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValue<int32_t>(json, kSchemaId));
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));
ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json));

if (type->type_id() != TypeId::kStruct) [[unlikely]] {
Expand Down Expand Up @@ -658,9 +661,16 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec) {
}

Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
const nlohmann::json& json) {
const nlohmann::json& json, bool allow_field_id_missing) {
ICEBERG_ASSIGN_OR_RAISE(auto source_id, GetJsonValue<int32_t>(json, kSourceId));
ICEBERG_ASSIGN_OR_RAISE(auto field_id, GetJsonValue<int32_t>(json, kFieldId));
int32_t field_id;
if (allow_field_id_missing) {
// Partition field id in v1 is not tracked, so we use -1 to indicate that.
ICEBERG_ASSIGN_OR_RAISE(
field_id, GetJsonValueOrDefault<int32_t>(json, kFieldId, kInvalidFieldId));
} else {
ICEBERG_ASSIGN_OR_RAISE(field_id, GetJsonValue<int32_t>(json, kFieldId));
}
ICEBERG_ASSIGN_OR_RAISE(
auto transform,
GetJsonValue<std::string>(json, kTransform).and_then(TransformFromString));
Expand Down Expand Up @@ -755,9 +765,8 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));

return std::make_unique<Snapshot>(
snapshot_id, parent_snapshot_id,
sequence_number.value_or(TableMetadata::kInitialSequenceNumber), timestamp_ms,
manifest_list, std::move(summary), schema_id);
snapshot_id, parent_snapshot_id, sequence_number.value_or(kInitialSequenceNumber),
timestamp_ms, manifest_list, std::move(summary), schema_id);
}

nlohmann::json ToJson(const BlobMetadata& blob_metadata) {
Expand Down Expand Up @@ -905,7 +914,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
}

// write the current schema ID and schema list
json[kCurrentSchemaId] = table_metadata.current_schema_id;
SetOptionalField(json, kCurrentSchemaId, table_metadata.current_schema_id);
json[kSchemas] = ToJsonList(table_metadata.schemas);

// for older readers, continue writing the default spec as "partition-spec"
Expand Down Expand Up @@ -963,7 +972,8 @@ namespace {
///
/// \return The current schema or parse error.
Result<std::shared_ptr<Schema>> ParseSchemas(
const nlohmann::json& json, int8_t format_version, int32_t& current_schema_id,
const nlohmann::json& json, int8_t format_version,
std::optional<int32_t>& current_schema_id,
std::vector<std::shared_ptr<Schema>>& schemas) {
std::shared_ptr<Schema> current_schema;
if (json.contains(kSchemas)) {
Expand All @@ -986,7 +996,7 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
}
if (!current_schema) {
return JsonParseError("Cannot find schema with {}={} from {}", kCurrentSchemaId,
current_schema_id, schema_array.dump());
current_schema_id.value(), schema_array.dump());
}
} else {
if (format_version != 1) {
Expand Down Expand Up @@ -1031,13 +1041,30 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
return JsonParseError("{} must exist in format v{}", kPartitionSpecs,
format_version);
}
default_spec_id = TableMetadata::kInitialSpecId;

ICEBERG_ASSIGN_OR_RAISE(auto spec, GetJsonValue<nlohmann::json>(json, kPartitionSpec)
.and_then([current_schema](const auto& json) {
return PartitionSpecFromJson(current_schema,
json);
}));
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec_json,
GetJsonValue<nlohmann::json>(json, kPartitionSpec));
if (!partition_spec_json.is_array()) {
return JsonParseError("Cannot parse v1 partition spec from non-array: {}",
partition_spec_json.dump());
}

int32_t next_partition_field_id = PartitionSpec::kLegacyPartitionDataIdStart;
std::vector<PartitionField> fields;
for (const auto& entry_json : partition_spec_json) {
ICEBERG_ASSIGN_OR_RAISE(auto field, PartitionFieldFromJson(entry_json));
int32_t field_id = field->field_id();
if (field_id == kInvalidFieldId) {
// If the field ID is not set, we need to assign a new one
field_id = next_partition_field_id++;
}
fields.emplace_back(field->source_id(), field_id, std::string(field->name()),
std::move(field->transform()));
}

auto spec = std::make_unique<PartitionSpec>(current_schema, kInitialSpecId,
std::move(fields));
default_spec_id = spec->spec_id();
partition_specs.push_back(std::move(spec));
}

Expand Down Expand Up @@ -1066,7 +1093,9 @@ Status ParseSortOrders(const nlohmann::json& json, int8_t format_version,
if (format_version > 1) {
return JsonParseError("{} must exist in format v{}", kSortOrders, format_version);
}
return NotImplementedError("Assign a default sort order");
auto sort_order = SortOrder::Unsorted();
default_sort_order_id = sort_order->order_id();
sort_orders.push_back(std::move(sort_order));
}
return {};
}
Expand All @@ -1083,7 +1112,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
ICEBERG_ASSIGN_OR_RAISE(table_metadata->format_version,
GetJsonValue<int8_t>(json, kFormatVersion));
if (table_metadata->format_version < 1 ||
table_metadata->format_version > TableMetadata::kSupportedTableFormatVersion) {
table_metadata->format_version > kSupportedTableFormatVersion) {
return JsonParseError("Cannot read unsupported version: {}",
table_metadata->format_version);
}
Expand All @@ -1097,7 +1126,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_sequence_number,
GetJsonValue<int64_t>(json, kLastSequenceNumber));
} else {
table_metadata->last_sequence_number = TableMetadata::kInitialSequenceNumber;
table_metadata->last_sequence_number = kInitialSequenceNumber;
}
ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_column_id,
GetJsonValue<int32_t>(json, kLastColumnId));
Expand All @@ -1119,10 +1148,16 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
return JsonParseError("{} must exist in format v{}", kLastPartitionId,
table_metadata->format_version);
}
// TODO(gangwu): iterate all partition specs to find the largest partition
// field id or assign a default value for unpartitioned tables. However,
// PartitionSpec::lastAssignedFieldId() is not implemented yet.
return NotImplementedError("Find the largest partition field id");

if (table_metadata->partition_specs.empty()) {
table_metadata->last_partition_id =
PartitionSpec::Unpartitioned()->last_assigned_field_id();
} else {
table_metadata->last_partition_id =
std::ranges::max(table_metadata->partition_specs, {}, [](const auto& spec) {
return spec->last_assigned_field_id();
})->last_assigned_field_id();
}
}

ICEBERG_RETURN_UNEXPECTED(ParseSortOrders(json, table_metadata->format_version,
Expand All @@ -1136,14 +1171,13 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
// This field is optional, but internally we set this to -1 when not set
ICEBERG_ASSIGN_OR_RAISE(
table_metadata->current_snapshot_id,
GetJsonValueOrDefault<int64_t>(json, kCurrentSnapshotId,
TableMetadata::kInvalidSnapshotId));
GetJsonValueOrDefault<int64_t>(json, kCurrentSnapshotId, kInvalidSnapshotId));

if (table_metadata->format_version >= 3) {
ICEBERG_ASSIGN_OR_RAISE(table_metadata->next_row_id,
GetJsonValue<int64_t>(json, kNextRowId));
} else {
table_metadata->next_row_id = TableMetadata::kInitialRowId;
table_metadata->next_row_id = kInitialRowId;
}

ICEBERG_ASSIGN_OR_RAISE(auto last_updated_ms,
Expand All @@ -1155,7 +1189,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
ICEBERG_ASSIGN_OR_RAISE(
table_metadata->refs,
FromJsonMap<std::shared_ptr<SnapshotRef>>(json, kRefs, SnapshotRefFromJson));
} else if (table_metadata->current_snapshot_id != TableMetadata::kInvalidSnapshotId) {
} else if (table_metadata->current_snapshot_id != kInvalidSnapshotId) {
table_metadata->refs["main"] = std::make_unique<SnapshotRef>(SnapshotRef{
.snapshot_id = table_metadata->current_snapshot_id,
.retention = SnapshotRef::Branch{},
Expand Down
4 changes: 3 additions & 1 deletion src/iceberg/json_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ nlohmann::json ToJson(const PartitionField& partition_field);
/// and name.
///
/// \param json The JSON object representing a `PartitionField`.
/// \param allow_field_id_missing Whether the field ID is allowed to be missing. This can
/// happen when deserializing partition fields from V1 metadata files.
/// \return An `expected` value containing either a `PartitionField` object or an error.
/// If the JSON is malformed or missing expected fields, an error will be returned.
Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
const nlohmann::json& json);
const nlohmann::json& json, bool allow_field_id_missing = false);

/// \brief Serializes a `PartitionSpec` object to JSON.
///
Expand Down
30 changes: 25 additions & 5 deletions src/iceberg/partition_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,38 @@

#include "iceberg/partition_spec.h"

#include <algorithm>
#include <format>
#include <ranges>

#include "iceberg/constants.h"
#include "iceberg/schema.h"
#include "iceberg/type.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep

namespace iceberg {

PartitionSpec::PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
std::vector<PartitionField> fields)
: schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) {}
std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id)
: schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) {
if (last_assigned_field_id) {
last_assigned_field_id_ = last_assigned_field_id.value();
} else if (fields_.empty()) {
last_assigned_field_id_ = kLegacyPartitionDataIdStart - 1;
} else {
last_assigned_field_id_ = std::ranges::max(fields_, {}, [](const auto& field) {
return field.field_id();
}).field_id();
}
}

const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
static const std::shared_ptr<PartitionSpec> unpartitioned =
std::make_shared<PartitionSpec>(
/*schema=*/nullptr, kInitialSpecId, std::vector<PartitionField>{},
kLegacyPartitionDataIdStart - 1);
return unpartitioned;
}

const std::shared_ptr<Schema>& PartitionSpec::schema() const { return schema_; }

Expand All @@ -47,8 +68,7 @@ std::string PartitionSpec::ToString() const {
}

bool PartitionSpec::Equals(const PartitionSpec& other) const {
return *schema_ == *other.schema_ && spec_id_ == other.spec_id_ &&
fields_ == other.fields_;
return spec_id_ == other.spec_id_ && fields_ == other.fields_;
}

} // namespace iceberg
22 changes: 21 additions & 1 deletion src/iceberg/partition_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
/// Partition specs for Iceberg tables.

#include <cstdint>
#include <optional>
#include <span>
#include <string>
#include <vector>
Expand All @@ -40,8 +41,24 @@ namespace iceberg {
/// evolution.
class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
public:
/// \brief The start ID for partition field. It is only used to generate
/// partition field id for v1 metadata where it is tracked.
constexpr static int32_t kLegacyPartitionDataIdStart = 1000;

/// \brief Create a new partition spec.
///
/// \param schema The table schema.
/// \param spec_id The spec ID.
/// \param fields The partition fields.
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
/// be calculated from the fields.
PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
std::vector<PartitionField> fields);
std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

/// \brief Get an unsorted partition spec singleton.
static const std::shared_ptr<PartitionSpec>& Unpartitioned();

/// \brief Get the table schema
const std::shared_ptr<Schema>& schema() const;
/// \brief Get the spec ID.
Expand All @@ -51,6 +68,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {

std::string ToString() const override;

int32_t last_assigned_field_id() const { return last_assigned_field_id_; }

friend bool operator==(const PartitionSpec& lhs, const PartitionSpec& rhs) {
return lhs.Equals(rhs);
}
Expand All @@ -66,6 +85,7 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
std::shared_ptr<Schema> schema_;
const int32_t spec_id_;
std::vector<PartitionField> fields_;
int32_t last_assigned_field_id_;
};

} // namespace iceberg
Loading
Loading