Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ enum class ErrorKind {
kNotImplemented,
kUnknownError,
kNotSupported,
kJsonParseError,
};

/// \brief Error with a kind and a message.
Expand Down
289 changes: 284 additions & 5 deletions src/iceberg/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,49 @@
#include <cstring>
#include <format>
#include <optional>
#include <regex>
#include <string>

#include <nlohmann/json.hpp>

#include "iceberg/expected.h"
#include "iceberg/schema.h"
#include "iceberg/type.h"
#include "iceberg/util/macro_internal.h"

namespace iceberg {

namespace {

// Constants for Arrow schema metadata
constexpr const char* kArrowExtensionName = "ARROW:extension:name";
constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata";
constexpr const char* kArrowUuidExtensionName = "arrow.uuid";
constexpr int32_t kUnknownFieldId = -1;

// Constants for schema json serialization
constexpr std::string_view kSchemaId = "schema-id";
constexpr std::string_view kIdentifierFieldIds = "identifier-field-ids";
constexpr std::string_view kType = "type";
constexpr std::string_view kStruct = "struct";
constexpr std::string_view kList = "list";
constexpr std::string_view kMap = "map";
constexpr std::string_view kFields = "fields";
constexpr std::string_view kElement = "element";
constexpr std::string_view kKey = "key";
constexpr std::string_view kValue = "value";
constexpr std::string_view kDoc = "doc";
constexpr std::string_view kName = "name";
constexpr std::string_view kId = "id";
constexpr std::string_view kInitialDefault = "initial-default";
constexpr std::string_view kWriteDefault = "write-default";
constexpr std::string_view kElementId = "element-id";
constexpr std::string_view kKeyId = "key-id";
constexpr std::string_view kValueId = "value-id";
constexpr std::string_view kRequired = "required";
constexpr std::string_view kElementRequired = "element-required";
constexpr std::string_view kValueRequired = "value-required";

// Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code.
ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view name,
std::optional<int32_t> field_id, ArrowSchema* schema) {
Expand Down Expand Up @@ -328,6 +357,15 @@ expected<std::shared_ptr<Type>, Error> FromArrowSchema(const ArrowSchema& schema
}
}

std::unique_ptr<Schema> FromStructType(StructType&& struct_type, int32_t schema_id) {
std::vector<SchemaField> fields;
fields.reserve(struct_type.fields().size());
for (auto& field : struct_type.fields()) {
fields.emplace_back(std::move(field));
}
return std::make_unique<Schema>(schema_id, std::move(fields));
}

} // namespace

expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
Expand All @@ -344,13 +382,254 @@ expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& sche
.message = "Arrow schema must be a struct type for Iceberg schema"}};
}

auto* struct_type = static_cast<StructType*>(type.get());
auto& struct_type = static_cast<StructType&>(*type);
return FromStructType(std::move(struct_type), schema_id);
}

nlohmann::json FieldToJson(const SchemaField& field) {
nlohmann::json json;
json[kId] = field.field_id();
json[kName] = field.name();
json[kRequired] = !field.optional();
json[kType] = TypeToJson(*field.type());
return json;
}

nlohmann::json TypeToJson(const Type& type) {
switch (type.type_id()) {
case TypeId::kStruct: {
const auto& struct_type = static_cast<const StructType&>(type);
nlohmann::json json;
json[kType] = kStruct;
nlohmann::json fields_json = nlohmann::json::array();
for (const auto& field : struct_type.fields()) {
fields_json.push_back(FieldToJson(field));
// TODO(gangwu): add default values
}
json[kFields] = fields_json;
return json;
}
case TypeId::kList: {
const auto& list_type = static_cast<const ListType&>(type);
nlohmann::json json;
json[kType] = kList;

const auto& element_field = list_type.fields().front();
json[kElementId] = element_field.field_id();
json[kElementRequired] = !element_field.optional();
json[kElement] = TypeToJson(*element_field.type());
return json;
}
case TypeId::kMap: {
const auto& map_type = static_cast<const MapType&>(type);
nlohmann::json json;
json[std::string(kType)] = kMap;

const auto& key_field = map_type.key();
json[kKeyId] = key_field.field_id();
json[kKey] = TypeToJson(*key_field.type());

const auto& value_field = map_type.value();
json[kValueId] = value_field.field_id();
json[kValueRequired] = !value_field.optional();
json[kValue] = TypeToJson(*value_field.type());
return json;
}
case TypeId::kBoolean:
return "boolean";
case TypeId::kInt:
return "int";
case TypeId::kLong:
return "long";
case TypeId::kFloat:
return "float";
case TypeId::kDouble:
return "double";
case TypeId::kDecimal: {
const auto& decimal_type = static_cast<const DecimalType&>(type);
return std::format("decimal({},{})", decimal_type.precision(),
decimal_type.scale());
}
case TypeId::kDate:
return "date";
case TypeId::kTime:
return "time";
case TypeId::kTimestamp:
return "timestamp";
case TypeId::kTimestampTz:
return "timestamptz";
case TypeId::kString:
return "string";
case TypeId::kBinary:
return "binary";
case TypeId::kFixed: {
const auto& fixed_type = static_cast<const FixedType&>(type);
return std::format("fixed[{}]", fixed_type.length());
}
case TypeId::kUuid:
return "uuid";
}
}

nlohmann::json SchemaToJson(const Schema& schema) {
nlohmann::json json = TypeToJson(static_cast<const Type&>(schema));
json[kSchemaId] = schema.schema_id();
// TODO(gangwu): add identifier-field-ids.
return json;
}

namespace {

#define ICEBERG_CHECK_JSON_FIELD(field_name, json) \
if (!json.contains(field_name)) [[unlikely]] { \
return unexpected<Error>({ \
.kind = ErrorKind::kJsonParseError, \
.message = std::format("Missing '{}' in {}", field_name, json.dump()), \
}); \
}

expected<std::shared_ptr<Type>, Error> StructTypeFromJson(const nlohmann::json& json) {
ICEBERG_CHECK_JSON_FIELD(kFields, json);

std::vector<SchemaField> fields;
fields.reserve(struct_type->fields().size());
for (auto& field : struct_type->fields()) {
fields.emplace_back(std::move(field));
for (const auto& field_json : json[kFields]) {
ICEBERG_ASSIGN_OR_RAISE(auto field, FieldFromJson(field_json));
fields.push_back(std::move(field));
}
return std::make_unique<Schema>(schema_id, std::move(fields));

return std::make_shared<StructType>(std::move(fields));
}

expected<std::shared_ptr<Type>, Error> ListTypeFromJson(const nlohmann::json& json) {
ICEBERG_CHECK_JSON_FIELD(kElement, json);
ICEBERG_CHECK_JSON_FIELD(kElementId, json);
ICEBERG_CHECK_JSON_FIELD(kElementRequired, json);

ICEBERG_ASSIGN_OR_RAISE(auto element_type, TypeFromJson(json[kElement]));
int32_t element_id = json[kElementId].get<int32_t>();
bool element_required = json[kElementRequired].get<bool>();

return std::make_shared<ListType>(
SchemaField(element_id, std::string(ListType::kElementName),
std::move(element_type), !element_required));
}

expected<std::shared_ptr<Type>, Error> MapTypeFromJson(const nlohmann::json& json) {
ICEBERG_CHECK_JSON_FIELD(kKey, json);
ICEBERG_CHECK_JSON_FIELD(kValue, json);
ICEBERG_CHECK_JSON_FIELD(kKeyId, json);
ICEBERG_CHECK_JSON_FIELD(kValueId, json);
ICEBERG_CHECK_JSON_FIELD(kValueRequired, json);

ICEBERG_ASSIGN_OR_RAISE(auto key_type, TypeFromJson(json[kKey]));
ICEBERG_ASSIGN_OR_RAISE(auto value_type, TypeFromJson(json[kValue]));
int32_t key_id = json[kKeyId].get<int32_t>();
int32_t value_id = json[kValueId].get<int32_t>();
bool value_required = json[kValueRequired].get<bool>();

SchemaField key_field(key_id, std::string(MapType::kKeyName), std::move(key_type),
/*optional=*/false);
SchemaField value_field(value_id, std::string(MapType::kValueName),
std::move(value_type), !value_required);
return std::make_shared<MapType>(std::move(key_field), std::move(value_field));
}

} // namespace

expected<std::shared_ptr<Type>, Error> TypeFromJson(const nlohmann::json& json) {
if (json.is_string()) {
std::string type_str = json.get<std::string>();
if (type_str == "boolean") {
return std::make_shared<BooleanType>();
} else if (type_str == "int") {
return std::make_shared<IntType>();
} else if (type_str == "long") {
return std::make_shared<LongType>();
} else if (type_str == "float") {
return std::make_shared<FloatType>();
} else if (type_str == "double") {
return std::make_shared<DoubleType>();
} else if (type_str == "date") {
return std::make_shared<DateType>();
} else if (type_str == "time") {
return std::make_shared<TimeType>();
} else if (type_str == "timestamp") {
return std::make_shared<TimestampType>();
} else if (type_str == "timestamptz") {
return std::make_shared<TimestampTzType>();
} else if (type_str == "string") {
return std::make_shared<StringType>();
} else if (type_str == "binary") {
return std::make_shared<BinaryType>();
} else if (type_str == "uuid") {
return std::make_shared<UuidType>();
} else if (type_str.starts_with("fixed")) {
std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
std::smatch match;
if (std::regex_match(type_str, match, fixed_regex)) {
return std::make_shared<FixedType>(std::stoi(match[1].str()));
}
} else if (type_str.starts_with("decimal")) {
std::regex decimal_regex(R"(decimal\(\s*(\d+)\s*,\s*(\d+)\s*\))");
std::smatch match;
if (std::regex_match(type_str, match, decimal_regex)) {
return std::make_shared<DecimalType>(std::stoi(match[1].str()),
std::stoi(match[2].str()));
}
} else {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Unknown primitive type: {}", type_str),
});
}
}

// For complex types like struct, list, and map
ICEBERG_CHECK_JSON_FIELD(kType, json);
std::string type_str = json[kType].get<std::string>();
if (type_str == kStruct) {
return StructTypeFromJson(json);
} else if (type_str == kList) {
return ListTypeFromJson(json);
} else if (type_str == kMap) {
return MapTypeFromJson(json);
} else {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Unknown complex type: {}", type_str),
});
}
}

expected<SchemaField, Error> FieldFromJson(const nlohmann::json& json) {
ICEBERG_CHECK_JSON_FIELD(kId, json);
ICEBERG_CHECK_JSON_FIELD(kName, json);
ICEBERG_CHECK_JSON_FIELD(kType, json);
ICEBERG_CHECK_JSON_FIELD(kRequired, json);

ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json[kType]));
int32_t field_id = json[kId].get<int32_t>();
std::string name = json[kName].get<std::string>();
bool required = json[kRequired].get<bool>();

return SchemaField(field_id, std::move(name), std::move(type), !required);
}

expected<std::unique_ptr<Schema>, Error> SchemaFromJson(const nlohmann::json& json) {
ICEBERG_CHECK_JSON_FIELD(kType, json);
ICEBERG_CHECK_JSON_FIELD(kSchemaId, json);

ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json));
if (type->type_id() != TypeId::kStruct) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = "Schema must be a struct type",
});
}

int32_t schema_id = json[kSchemaId].get<int32_t>();
auto& struct_type = static_cast<StructType&>(*type);
return FromStructType(std::move(struct_type), schema_id);
}

} // namespace iceberg
37 changes: 37 additions & 0 deletions src/iceberg/schema_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>

#include <nanoarrow/nanoarrow.h>
#include <nlohmann/json_fwd.hpp>

#include "iceberg/error.h"
#include "iceberg/expected.h"
Expand Down Expand Up @@ -49,4 +50,40 @@ expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out);
expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
int32_t schema_id);

/// \brief Convert an Iceberg Schema to JSON.
///
/// \param[in] schema The Iceberg schema to convert.
/// \return The JSON representation of the schema.
nlohmann::json SchemaToJson(const Schema& schema);

/// \brief Convert an Iceberg Type to JSON.
///
/// \param[in] type The Iceberg type to convert.
/// \return The JSON representation of the type.
nlohmann::json TypeToJson(const Type& type);

/// \brief Convert an Iceberg SchemaField to JSON.
///
/// \param[in] field The Iceberg field to convert.
/// \return The JSON representation of the field.
nlohmann::json FieldToJson(const SchemaField& field);

/// \brief Convert JSON to an Iceberg Schema.
///
/// \param[in] json The JSON representation of the schema.
/// \return The Iceberg schema or an error if the conversion fails.
expected<std::unique_ptr<Schema>, Error> SchemaFromJson(const nlohmann::json& json);

/// \brief Convert JSON to an Iceberg Type.
///
/// \param[in] json The JSON representation of the type.
/// \return The Iceberg type or an error if the conversion fails.
expected<std::shared_ptr<Type>, Error> TypeFromJson(const nlohmann::json& json);

/// \brief Convert JSON to an Iceberg SchemaField.
///
/// \param[in] json The JSON representation of the field.
/// \return The Iceberg field or an error if the conversion fails.
expected<SchemaField, Error> FieldFromJson(const nlohmann::json& json);

} // namespace iceberg
Loading
Loading