Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ enum class ErrorKind {
kNotImplemented,
kUnknownError,
kNotSupported,
kJsonParseError,
};

/// \brief Error with a kind and a message.
Expand Down
298 changes: 293 additions & 5 deletions src/iceberg/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,49 @@
#include <cstring>
#include <format>
#include <optional>
#include <regex>
#include <string>

#include <nlohmann/json.hpp>

#include "iceberg/expected.h"
#include "iceberg/schema.h"
#include "iceberg/type.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

// Constants for Arrow schema metadata
constexpr const char* kArrowExtensionName = "ARROW:extension:name";
constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata";
constexpr const char* kArrowUuidExtensionName = "arrow.uuid";
constexpr int32_t kUnknownFieldId = -1;

// Constants for schema json serialization
constexpr std::string_view kSchemaId = "schema-id";
constexpr std::string_view kIdentifierFieldIds = "identifier-field-ids";
constexpr std::string_view kType = "type";
constexpr std::string_view kStruct = "struct";
constexpr std::string_view kList = "list";
constexpr std::string_view kMap = "map";
constexpr std::string_view kFields = "fields";
constexpr std::string_view kElement = "element";
constexpr std::string_view kKey = "key";
constexpr std::string_view kValue = "value";
constexpr std::string_view kDoc = "doc";
constexpr std::string_view kName = "name";
constexpr std::string_view kId = "id";
constexpr std::string_view kInitialDefault = "initial-default";
constexpr std::string_view kWriteDefault = "write-default";
constexpr std::string_view kElementId = "element-id";
constexpr std::string_view kKeyId = "key-id";
constexpr std::string_view kValueId = "value-id";
constexpr std::string_view kRequired = "required";
constexpr std::string_view kElementRequired = "element-required";
constexpr std::string_view kValueRequired = "value-required";

// Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code.
ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view name,
std::optional<int32_t> field_id, ArrowSchema* schema) {
Expand Down Expand Up @@ -328,6 +357,15 @@ expected<std::shared_ptr<Type>, Error> FromArrowSchema(const ArrowSchema& schema
}
}

std::unique_ptr<Schema> FromStructType(StructType&& struct_type, int32_t schema_id) {
std::vector<SchemaField> fields;
fields.reserve(struct_type.fields().size());
for (auto& field : struct_type.fields()) {
fields.emplace_back(std::move(field));
}
return std::make_unique<Schema>(schema_id, std::move(fields));
}

} // namespace

expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
Expand All @@ -344,13 +382,263 @@ expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& sche
.message = "Arrow schema must be a struct type for Iceberg schema"}};
}

auto* struct_type = static_cast<StructType*>(type.get());
auto& struct_type = static_cast<StructType&>(*type);
return FromStructType(std::move(struct_type), schema_id);
}

nlohmann::json FieldToJson(const SchemaField& field) {
nlohmann::json json;
json[kId] = field.field_id();
json[kName] = field.name();
json[kRequired] = !field.optional();
json[kType] = TypeToJson(*field.type());
return json;
}

nlohmann::json TypeToJson(const Type& type) {
switch (type.type_id()) {
case TypeId::kStruct: {
const auto& struct_type = static_cast<const StructType&>(type);
nlohmann::json json;
json[kType] = kStruct;
nlohmann::json fields_json = nlohmann::json::array();
for (const auto& field : struct_type.fields()) {
fields_json.push_back(FieldToJson(field));
// TODO(gangwu): add default values
}
json[kFields] = fields_json;
return json;
}
case TypeId::kList: {
const auto& list_type = static_cast<const ListType&>(type);
nlohmann::json json;
json[kType] = kList;

const auto& element_field = list_type.fields().front();
json[kElementId] = element_field.field_id();
json[kElementRequired] = !element_field.optional();
json[kElement] = TypeToJson(*element_field.type());
return json;
}
case TypeId::kMap: {
const auto& map_type = static_cast<const MapType&>(type);
nlohmann::json json;
json[std::string(kType)] = kMap;

const auto& key_field = map_type.key();
json[kKeyId] = key_field.field_id();
json[kKey] = TypeToJson(*key_field.type());

const auto& value_field = map_type.value();
json[kValueId] = value_field.field_id();
json[kValueRequired] = !value_field.optional();
json[kValue] = TypeToJson(*value_field.type());
return json;
}
case TypeId::kBoolean:
return "boolean";
case TypeId::kInt:
return "int";
case TypeId::kLong:
return "long";
case TypeId::kFloat:
return "float";
case TypeId::kDouble:
return "double";
case TypeId::kDecimal: {
const auto& decimal_type = static_cast<const DecimalType&>(type);
return std::format("decimal({},{})", decimal_type.precision(),
decimal_type.scale());
}
case TypeId::kDate:
return "date";
case TypeId::kTime:
return "time";
case TypeId::kTimestamp:
return "timestamp";
case TypeId::kTimestampTz:
return "timestamptz";
case TypeId::kString:
return "string";
case TypeId::kBinary:
return "binary";
case TypeId::kFixed: {
const auto& fixed_type = static_cast<const FixedType&>(type);
return std::format("fixed[{}]", fixed_type.length());
}
case TypeId::kUuid:
return "uuid";
}
}

nlohmann::json SchemaToJson(const Schema& schema) {
nlohmann::json json = TypeToJson(static_cast<const Type&>(schema));
json[kSchemaId] = schema.schema_id();
// TODO(gangwu): add identifier-field-ids.
return json;
}

namespace {

#define ICEBERG_CHECK_JSON_FIELD(field_name, json) \
if (!json.contains(field_name)) [[unlikely]] { \
return unexpected<Error>({ \
.kind = ErrorKind::kJsonParseError, \
.message = std::format("Missing '{}' in {}", field_name, json.dump()), \
}); \
}

expected<std::unique_ptr<Type>, Error> StructTypeFromJson(const nlohmann::json& json) {
ICEBERG_CHECK_JSON_FIELD(kFields, json);

std::vector<SchemaField> fields;
fields.reserve(struct_type->fields().size());
for (auto& field : struct_type->fields()) {
fields.emplace_back(std::move(field));
for (const auto& field_json : json[kFields]) {
ICEBERG_ASSIGN_OR_RAISE(auto field, FieldFromJson(field_json));
fields.emplace_back(std::move(*field));
}
return std::make_unique<Schema>(schema_id, std::move(fields));

return std::make_unique<StructType>(std::move(fields));
}

expected<std::unique_ptr<Type>, Error> ListTypeFromJson(const nlohmann::json& json) {
ICEBERG_CHECK_JSON_FIELD(kElement, json);
ICEBERG_CHECK_JSON_FIELD(kElementId, json);
ICEBERG_CHECK_JSON_FIELD(kElementRequired, json);

ICEBERG_ASSIGN_OR_RAISE(auto element_type, TypeFromJson(json[kElement]));
int32_t element_id = json[kElementId].get<int32_t>();
bool element_required = json[kElementRequired].get<bool>();

return std::make_unique<ListType>(
SchemaField(element_id, std::string(ListType::kElementName),
std::move(element_type), !element_required));
}

expected<std::unique_ptr<Type>, Error> MapTypeFromJson(const nlohmann::json& json) {
ICEBERG_CHECK_JSON_FIELD(kKey, json);
ICEBERG_CHECK_JSON_FIELD(kValue, json);
ICEBERG_CHECK_JSON_FIELD(kKeyId, json);
ICEBERG_CHECK_JSON_FIELD(kValueId, json);
ICEBERG_CHECK_JSON_FIELD(kValueRequired, json);

ICEBERG_ASSIGN_OR_RAISE(auto key_type, TypeFromJson(json[kKey]));
ICEBERG_ASSIGN_OR_RAISE(auto value_type, TypeFromJson(json[kValue]));
int32_t key_id = json[kKeyId].get<int32_t>();
int32_t value_id = json[kValueId].get<int32_t>();
bool value_required = json[kValueRequired].get<bool>();

SchemaField key_field(key_id, std::string(MapType::kKeyName), std::move(key_type),
/*optional=*/false);
SchemaField value_field(value_id, std::string(MapType::kValueName),
std::move(value_type), !value_required);
return std::make_unique<MapType>(std::move(key_field), std::move(value_field));
}

} // namespace

expected<std::unique_ptr<Type>, Error> TypeFromJson(const nlohmann::json& json) {
if (json.is_string()) {
std::string type_str = json.get<std::string>();
if (type_str == "boolean") {
return std::make_unique<BooleanType>();
} else if (type_str == "int") {
return std::make_unique<IntType>();
} else if (type_str == "long") {
return std::make_unique<LongType>();
} else if (type_str == "float") {
return std::make_unique<FloatType>();
} else if (type_str == "double") {
return std::make_unique<DoubleType>();
} else if (type_str == "date") {
return std::make_unique<DateType>();
} else if (type_str == "time") {
return std::make_unique<TimeType>();
} else if (type_str == "timestamp") {
return std::make_unique<TimestampType>();
} else if (type_str == "timestamptz") {
return std::make_unique<TimestampTzType>();
} else if (type_str == "string") {
return std::make_unique<StringType>();
} else if (type_str == "binary") {
return std::make_unique<BinaryType>();
} else if (type_str == "uuid") {
return std::make_unique<UuidType>();
} else if (type_str.starts_with("fixed")) {
std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
std::smatch match;
if (std::regex_match(type_str, match, fixed_regex)) {
return std::make_unique<FixedType>(std::stoi(match[1].str()));
}
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Invalid fixed type: {}", type_str),
});
} else if (type_str.starts_with("decimal")) {
std::regex decimal_regex(R"(decimal\(\s*(\d+)\s*,\s*(\d+)\s*\))");
std::smatch match;
if (std::regex_match(type_str, match, decimal_regex)) {
return std::make_unique<DecimalType>(std::stoi(match[1].str()),
std::stoi(match[2].str()));
}
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Invalid decimal type: {}", type_str),
});
} else {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Unknown primitive type: {}", type_str),
});
}
}

// For complex types like struct, list, and map
ICEBERG_CHECK_JSON_FIELD(kType, json);
std::string type_str = json[kType].get<std::string>();
if (type_str == kStruct) {
return StructTypeFromJson(json);
} else if (type_str == kList) {
return ListTypeFromJson(json);
} else if (type_str == kMap) {
return MapTypeFromJson(json);
} else {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Unknown complex type: {}", type_str),
});
}
}

expected<std::unique_ptr<SchemaField>, Error> FieldFromJson(const nlohmann::json& json) {
ICEBERG_CHECK_JSON_FIELD(kId, json);
ICEBERG_CHECK_JSON_FIELD(kName, json);
ICEBERG_CHECK_JSON_FIELD(kType, json);
ICEBERG_CHECK_JSON_FIELD(kRequired, json);

ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json[kType]));
int32_t field_id = json[kId].get<int32_t>();
std::string name = json[kName].get<std::string>();
bool required = json[kRequired].get<bool>();

return std::make_unique<SchemaField>(field_id, std::move(name), std::move(type),
!required);
}

expected<std::unique_ptr<Schema>, Error> SchemaFromJson(const nlohmann::json& json) {
ICEBERG_CHECK_JSON_FIELD(kType, json);
ICEBERG_CHECK_JSON_FIELD(kSchemaId, json);

ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json));
if (type->type_id() != TypeId::kStruct) [[unlikely]] {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Schema must be a struct type, but got {}", json.dump()),
});
}

int32_t schema_id = json[kSchemaId].get<int32_t>();
auto& struct_type = static_cast<StructType&>(*type);
return FromStructType(std::move(struct_type), schema_id);
}

} // namespace iceberg
37 changes: 37 additions & 0 deletions src/iceberg/schema_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>

#include <nanoarrow/nanoarrow.h>
#include <nlohmann/json_fwd.hpp>

#include "iceberg/error.h"
#include "iceberg/expected.h"
Expand Down Expand Up @@ -49,4 +50,40 @@ expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out);
expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
int32_t schema_id);

/// \brief Convert an Iceberg Schema to JSON.
///
/// \param[in] schema The Iceberg schema to convert.
/// \return The JSON representation of the schema.
nlohmann::json SchemaToJson(const Schema& schema);

/// \brief Convert an Iceberg Type to JSON.
///
/// \param[in] type The Iceberg type to convert.
/// \return The JSON representation of the type.
nlohmann::json TypeToJson(const Type& type);

/// \brief Convert an Iceberg SchemaField to JSON.
///
/// \param[in] field The Iceberg field to convert.
/// \return The JSON representation of the field.
nlohmann::json FieldToJson(const SchemaField& field);

/// \brief Convert JSON to an Iceberg Schema.
///
/// \param[in] json The JSON representation of the schema.
/// \return The Iceberg schema or an error if the conversion fails.
expected<std::unique_ptr<Schema>, Error> SchemaFromJson(const nlohmann::json& json);

/// \brief Convert JSON to an Iceberg Type.
///
/// \param[in] json The JSON representation of the type.
/// \return The Iceberg type or an error if the conversion fails.
expected<std::unique_ptr<Type>, Error> TypeFromJson(const nlohmann::json& json);

/// \brief Convert JSON to an Iceberg SchemaField.
///
/// \param[in] json The JSON representation of the field.
/// \return The Iceberg field or an error if the conversion fails.
expected<std::unique_ptr<SchemaField>, Error> FieldFromJson(const nlohmann::json& json);

} // namespace iceberg
Loading
Loading