Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/.licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ header:
- 'NOTICE'
- 'src/iceberg/expected.h'
- 'src/iceberg/util/murmurhash3_internal.*'
- 'test/resources/**'

comment: on-failure
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ repos:
rev: v19.1.5
hooks:
- id: clang-format
exclude: ^test/resources/.*\.json$
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does clang want to format json? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, without this change the pre-commit fails and complains that clang-format does not support JSON file.


- repo: https://github.com/cheshirekow/cmake-format-precommit
rev: v0.6.10
Expand Down
83 changes: 59 additions & 24 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
#include <algorithm>
#include <cstdint>
#include <format>
#include <ranges>
#include <regex>
#include <type_traits>
#include <unordered_set>

#include <iceberg/table.h>
#include <nlohmann/json.hpp>

#include "iceberg/partition_field.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
Expand Down Expand Up @@ -248,7 +251,7 @@ Result<std::vector<T>> FromJsonList(
list.emplace_back(std::move(entry));
}
}
return {};
return list;
}

/// \brief Parse a list of items from a JSON object.
Expand Down Expand Up @@ -471,7 +474,7 @@ nlohmann::json ToJson(const Type& type) {

nlohmann::json ToJson(const Schema& schema) {
nlohmann::json json = ToJson(static_cast<const Type&>(schema));
json[kSchemaId] = schema.schema_id();
SetOptionalField(json, kSchemaId, schema.schema_id());
// TODO(gangwu): add identifier-field-ids.
return json;
}
Expand Down Expand Up @@ -625,7 +628,7 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
}

Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValue<int32_t>(json, kSchemaId));
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));
ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json));

if (type->type_id() != TypeId::kStruct) [[unlikely]] {
Expand Down Expand Up @@ -658,9 +661,16 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec) {
}

Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
const nlohmann::json& json) {
const nlohmann::json& json, bool allow_field_id_missing) {
ICEBERG_ASSIGN_OR_RAISE(auto source_id, GetJsonValue<int32_t>(json, kSourceId));
ICEBERG_ASSIGN_OR_RAISE(auto field_id, GetJsonValue<int32_t>(json, kFieldId));
int32_t field_id;
if (allow_field_id_missing) {
// Partition field id in v1 is not tracked, so we use -1 to indicate that.
ICEBERG_ASSIGN_OR_RAISE(field_id, GetJsonValueOrDefault<int32_t>(
json, kFieldId, SchemaField::kInvalidFieldId));
} else {
ICEBERG_ASSIGN_OR_RAISE(field_id, GetJsonValue<int32_t>(json, kFieldId));
}
ICEBERG_ASSIGN_OR_RAISE(
auto transform,
GetJsonValue<std::string>(json, kTransform).and_then(TransformFromString));
Expand Down Expand Up @@ -905,7 +915,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
}

// write the current schema ID and schema list
json[kCurrentSchemaId] = table_metadata.current_schema_id;
SetOptionalField(json, kCurrentSchemaId, table_metadata.current_schema_id);
json[kSchemas] = ToJsonList(table_metadata.schemas);

// for older readers, continue writing the default spec as "partition-spec"
Expand Down Expand Up @@ -963,7 +973,8 @@ namespace {
///
/// \return The current schema or parse error.
Result<std::shared_ptr<Schema>> ParseSchemas(
const nlohmann::json& json, int8_t format_version, int32_t& current_schema_id,
const nlohmann::json& json, int8_t format_version,
std::optional<int32_t>& current_schema_id,
std::vector<std::shared_ptr<Schema>>& schemas) {
std::shared_ptr<Schema> current_schema;
if (json.contains(kSchemas)) {
Expand All @@ -986,7 +997,7 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
}
if (!current_schema) {
return JsonParseError("Cannot find schema with {}={} from {}", kCurrentSchemaId,
current_schema_id, schema_array.dump());
current_schema_id.value(), schema_array.dump());
}
} else {
if (format_version != 1) {
Expand Down Expand Up @@ -1031,13 +1042,30 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
return JsonParseError("{} must exist in format v{}", kPartitionSpecs,
format_version);
}
default_spec_id = TableMetadata::kInitialSpecId;

ICEBERG_ASSIGN_OR_RAISE(auto spec, GetJsonValue<nlohmann::json>(json, kPartitionSpec)
.and_then([current_schema](const auto& json) {
return PartitionSpecFromJson(current_schema,
json);
}));
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec_json,
GetJsonValue<nlohmann::json>(json, kPartitionSpec));
if (!partition_spec_json.is_array()) {
return JsonParseError("Cannot parse v1 partition spec from non-array: {}",
partition_spec_json.dump());
}

int32_t next_partition_field_id = PartitionSpec::kLegacyPartitionDataIdStart;
std::vector<PartitionField> fields;
for (const auto& entry_json : partition_spec_json) {
ICEBERG_ASSIGN_OR_RAISE(auto field, PartitionFieldFromJson(entry_json));
int32_t field_id = field->field_id();
if (field_id == SchemaField::kInvalidFieldId) {
// If the field ID is not set, we need to assign a new one
field_id = next_partition_field_id++;
}
fields.emplace_back(field->source_id(), field_id, std::string(field->name()),
std::move(field->transform()));
}

auto spec = std::make_unique<PartitionSpec>(
current_schema, PartitionSpec::kInitialSpecId, std::move(fields));
default_spec_id = spec->spec_id();
partition_specs.push_back(std::move(spec));
}

Expand Down Expand Up @@ -1066,7 +1094,9 @@ Status ParseSortOrders(const nlohmann::json& json, int8_t format_version,
if (format_version > 1) {
return JsonParseError("{} must exist in format v{}", kSortOrders, format_version);
}
return NotImplementedError("Assign a default sort order");
auto sort_order = SortOrder::Unsorted();
default_sort_order_id = sort_order->order_id();
sort_orders.push_back(std::move(sort_order));
}
return {};
}
Expand Down Expand Up @@ -1119,10 +1149,16 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
return JsonParseError("{} must exist in format v{}", kLastPartitionId,
table_metadata->format_version);
}
// TODO(gangwu): iterate all partition specs to find the largest partition
// field id or assign a default value for unpartitioned tables. However,
// PartitionSpec::lastAssignedFieldId() is not implemented yet.
return NotImplementedError("Find the largest partition field id");

if (table_metadata->partition_specs.empty()) {
table_metadata->last_partition_id =
PartitionSpec::Unpartitioned()->last_assigned_field_id();
} else {
table_metadata->last_partition_id =
std::ranges::max(table_metadata->partition_specs, {}, [](const auto& spec) {
return spec->last_assigned_field_id();
})->last_assigned_field_id();
}
}

ICEBERG_RETURN_UNEXPECTED(ParseSortOrders(json, table_metadata->format_version,
Expand All @@ -1134,10 +1170,9 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
}

// This field is optional, but internally we set this to -1 when not set
ICEBERG_ASSIGN_OR_RAISE(
table_metadata->current_snapshot_id,
GetJsonValueOrDefault<int64_t>(json, kCurrentSnapshotId,
TableMetadata::kInvalidSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(table_metadata->current_snapshot_id,
GetJsonValueOrDefault<int64_t>(json, kCurrentSnapshotId,
Snapshot::kInvalidSnapshotId));

if (table_metadata->format_version >= 3) {
ICEBERG_ASSIGN_OR_RAISE(table_metadata->next_row_id,
Expand All @@ -1155,7 +1190,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
ICEBERG_ASSIGN_OR_RAISE(
table_metadata->refs,
FromJsonMap<std::shared_ptr<SnapshotRef>>(json, kRefs, SnapshotRefFromJson));
} else if (table_metadata->current_snapshot_id != TableMetadata::kInvalidSnapshotId) {
} else if (table_metadata->current_snapshot_id != Snapshot::kInvalidSnapshotId) {
table_metadata->refs["main"] = std::make_unique<SnapshotRef>(SnapshotRef{
.snapshot_id = table_metadata->current_snapshot_id,
.retention = SnapshotRef::Branch{},
Expand Down
4 changes: 3 additions & 1 deletion src/iceberg/json_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ nlohmann::json ToJson(const PartitionField& partition_field);
/// and name.
///
/// \param json The JSON object representing a `PartitionField`.
/// \param allow_field_id_missing Whether the field ID is allowed to be missing. This can
/// happen when deserializing partition fields from V1 metadata files.
/// \return An `expected` value containing either a `PartitionField` object or an error.
/// If the JSON is malformed or missing expected fields, an error will be returned.
Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
const nlohmann::json& json);
const nlohmann::json& json, bool allow_field_id_missing = false);

/// \brief Serializes a `PartitionSpec` object to JSON.
///
Expand Down
29 changes: 24 additions & 5 deletions src/iceberg/partition_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,37 @@

#include "iceberg/partition_spec.h"

#include <algorithm>
#include <format>
#include <ranges>

#include "iceberg/schema.h"
#include "iceberg/type.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep

namespace iceberg {

PartitionSpec::PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
std::vector<PartitionField> fields)
: schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) {}
std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id)
: schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) {
if (last_assigned_field_id) {
last_assigned_field_id_ = last_assigned_field_id.value();
} else if (fields_.empty()) {
last_assigned_field_id_ = kLegacyPartitionDataIdStart - 1;
} else {
last_assigned_field_id_ = std::ranges::max(fields_, {}, [](const auto& field) {
return field.field_id();
}).field_id();
}
}

const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
static const std::shared_ptr<PartitionSpec> unpartitioned =
std::make_shared<PartitionSpec>(
/*schema=*/nullptr, kInitialSpecId, std::vector<PartitionField>{},
kLegacyPartitionDataIdStart - 1);
return unpartitioned;
}

const std::shared_ptr<Schema>& PartitionSpec::schema() const { return schema_; }

Expand All @@ -47,8 +67,7 @@ std::string PartitionSpec::ToString() const {
}

bool PartitionSpec::Equals(const PartitionSpec& other) const {
return *schema_ == *other.schema_ && spec_id_ == other.spec_id_ &&
fields_ == other.fields_;
return spec_id_ == other.spec_id_ && fields_ == other.fields_;
}

} // namespace iceberg
23 changes: 22 additions & 1 deletion src/iceberg/partition_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
/// Partition specs for Iceberg tables.

#include <cstdint>
#include <optional>
#include <span>
#include <string>
#include <vector>
Expand All @@ -40,8 +41,25 @@ namespace iceberg {
/// evolution.
class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
public:
static constexpr int32_t kInitialSpecId = 0;
/// \brief The start ID for partition field. It is only used to generate
/// partition field id for v1 metadata where it is tracked.
static constexpr int32_t kLegacyPartitionDataIdStart = 1000;

/// \brief Create a new partition spec.
///
/// \param schema The table schema.
/// \param spec_id The spec ID.
/// \param fields The partition fields.
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
/// be calculated from the fields.
PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
std::vector<PartitionField> fields);
std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

/// \brief Get an unsorted partition spec singleton.
static const std::shared_ptr<PartitionSpec>& Unpartitioned();

/// \brief Get the table schema
const std::shared_ptr<Schema>& schema() const;
/// \brief Get the spec ID.
Expand All @@ -51,6 +69,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {

std::string ToString() const override;

int32_t last_assigned_field_id() const { return last_assigned_field_id_; }

friend bool operator==(const PartitionSpec& lhs, const PartitionSpec& rhs) {
return lhs.Equals(rhs);
}
Expand All @@ -66,6 +86,7 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
std::shared_ptr<Schema> schema_;
const int32_t spec_id_;
std::vector<PartitionField> fields_;
int32_t last_assigned_field_id_;
};

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum class ErrorKind {
kNotSupported,
kInvalidExpression,
kJsonParseError,
kNotFound,
};

/// \brief Error with a kind and a message.
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@

namespace iceberg {

Schema::Schema(int32_t schema_id, std::vector<SchemaField> fields)
Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id)
: StructType(std::move(fields)), schema_id_(schema_id) {}

int32_t Schema::schema_id() const { return schema_id_; }
std::optional<int32_t> Schema::schema_id() const { return schema_id_; }

std::string Schema::ToString() const {
std::string repr = "schema<";
Expand Down
9 changes: 6 additions & 3 deletions src/iceberg/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
/// and any utility functions. See iceberg/type.h and iceberg/field.h as well.

#include <cstdint>
#include <optional>
#include <string>
#include <vector>

Expand All @@ -40,13 +41,15 @@ namespace iceberg {
/// evolution.
class ICEBERG_EXPORT Schema : public StructType {
public:
Schema(int32_t schema_id, std::vector<SchemaField> fields);
static constexpr int32_t kInitialSchemaId = 0;

Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id);

/// \brief Get the schema ID.
///
/// A schema is identified by a unique ID for the purposes of schema
/// evolution.
[[nodiscard]] int32_t schema_id() const;
[[nodiscard]] std::optional<int32_t> schema_id() const;

[[nodiscard]] std::string ToString() const override;

Expand All @@ -58,7 +61,7 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \brief Compare two schemas for equality.
[[nodiscard]] bool Equals(const Schema& other) const;

const int32_t schema_id_;
const std::optional<int32_t> schema_id_;
};

} // namespace iceberg
2 changes: 2 additions & 0 deletions src/iceberg/schema_field.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ namespace iceberg {
/// \brief A type combined with a name.
class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable {
public:
static constexpr int32_t kInvalidFieldId = -1;

/// \brief Construct a field.
/// \param[in] field_id The field ID.
/// \param[in] name The field name.
Expand Down
Loading
Loading