diff --git a/src/iceberg/error.h b/src/iceberg/error.h index a4b74a97c..a296fcc0c 100644 --- a/src/iceberg/error.h +++ b/src/iceberg/error.h @@ -38,6 +38,7 @@ enum class ErrorKind { kNotImplemented, kUnknownError, kNotSupported, + kJsonParseError, }; /// \brief Error with a kind and a message. diff --git a/src/iceberg/schema_internal.cc b/src/iceberg/schema_internal.cc index 3f7bdff0a..741c4d7d7 100644 --- a/src/iceberg/schema_internal.cc +++ b/src/iceberg/schema_internal.cc @@ -22,20 +22,49 @@ #include #include #include +#include #include +#include + #include "iceberg/expected.h" #include "iceberg/schema.h" +#include "iceberg/type.h" +#include "iceberg/util/macros.h" namespace iceberg { namespace { +// Constants for Arrow schema metadata constexpr const char* kArrowExtensionName = "ARROW:extension:name"; constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata"; constexpr const char* kArrowUuidExtensionName = "arrow.uuid"; constexpr int32_t kUnknownFieldId = -1; +// Constants for schema json serialization +constexpr std::string_view kSchemaId = "schema-id"; +constexpr std::string_view kIdentifierFieldIds = "identifier-field-ids"; +constexpr std::string_view kType = "type"; +constexpr std::string_view kStruct = "struct"; +constexpr std::string_view kList = "list"; +constexpr std::string_view kMap = "map"; +constexpr std::string_view kFields = "fields"; +constexpr std::string_view kElement = "element"; +constexpr std::string_view kKey = "key"; +constexpr std::string_view kValue = "value"; +constexpr std::string_view kDoc = "doc"; +constexpr std::string_view kName = "name"; +constexpr std::string_view kId = "id"; +constexpr std::string_view kInitialDefault = "initial-default"; +constexpr std::string_view kWriteDefault = "write-default"; +constexpr std::string_view kElementId = "element-id"; +constexpr std::string_view kKeyId = "key-id"; +constexpr std::string_view kValueId = "value-id"; +constexpr std::string_view kRequired = "required"; +constexpr std::string_view kElementRequired = "element-required"; +constexpr std::string_view kValueRequired = "value-required"; + // Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code. ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view name, std::optional field_id, ArrowSchema* schema) { @@ -328,6 +357,15 @@ expected, Error> FromArrowSchema(const ArrowSchema& schema } } +std::unique_ptr FromStructType(StructType&& struct_type, int32_t schema_id) { + std::vector fields; + fields.reserve(struct_type.fields().size()); + for (auto& field : struct_type.fields()) { + fields.emplace_back(std::move(field)); + } + return std::make_unique(schema_id, std::move(fields)); +} + } // namespace expected, Error> FromArrowSchema(const ArrowSchema& schema, @@ -344,13 +382,263 @@ expected, Error> FromArrowSchema(const ArrowSchema& sche .message = "Arrow schema must be a struct type for Iceberg schema"}}; } - auto* struct_type = static_cast(type.get()); + auto& struct_type = static_cast(*type); + return FromStructType(std::move(struct_type), schema_id); +} + +nlohmann::json FieldToJson(const SchemaField& field) { + nlohmann::json json; + json[kId] = field.field_id(); + json[kName] = field.name(); + json[kRequired] = !field.optional(); + json[kType] = TypeToJson(*field.type()); + return json; +} + +nlohmann::json TypeToJson(const Type& type) { + switch (type.type_id()) { + case TypeId::kStruct: { + const auto& struct_type = static_cast(type); + nlohmann::json json; + json[kType] = kStruct; + nlohmann::json fields_json = nlohmann::json::array(); + for (const auto& field : struct_type.fields()) { + fields_json.push_back(FieldToJson(field)); + // TODO(gangwu): add default values + } + json[kFields] = fields_json; + return json; + } + case TypeId::kList: { + const auto& list_type = static_cast(type); + nlohmann::json json; + json[kType] = kList; + + const auto& element_field = list_type.fields().front(); + json[kElementId] = element_field.field_id(); + json[kElementRequired] = !element_field.optional(); + json[kElement] = TypeToJson(*element_field.type()); + return json; + } + case TypeId::kMap: { + const auto& map_type = static_cast(type); + nlohmann::json json; + json[std::string(kType)] = kMap; + + const auto& key_field = map_type.key(); + json[kKeyId] = key_field.field_id(); + json[kKey] = TypeToJson(*key_field.type()); + + const auto& value_field = map_type.value(); + json[kValueId] = value_field.field_id(); + json[kValueRequired] = !value_field.optional(); + json[kValue] = TypeToJson(*value_field.type()); + return json; + } + case TypeId::kBoolean: + return "boolean"; + case TypeId::kInt: + return "int"; + case TypeId::kLong: + return "long"; + case TypeId::kFloat: + return "float"; + case TypeId::kDouble: + return "double"; + case TypeId::kDecimal: { + const auto& decimal_type = static_cast(type); + return std::format("decimal({},{})", decimal_type.precision(), + decimal_type.scale()); + } + case TypeId::kDate: + return "date"; + case TypeId::kTime: + return "time"; + case TypeId::kTimestamp: + return "timestamp"; + case TypeId::kTimestampTz: + return "timestamptz"; + case TypeId::kString: + return "string"; + case TypeId::kBinary: + return "binary"; + case TypeId::kFixed: { + const auto& fixed_type = static_cast(type); + return std::format("fixed[{}]", fixed_type.length()); + } + case TypeId::kUuid: + return "uuid"; + } +} + +nlohmann::json SchemaToJson(const Schema& schema) { + nlohmann::json json = TypeToJson(static_cast(schema)); + json[kSchemaId] = schema.schema_id(); + // TODO(gangwu): add identifier-field-ids. + return json; +} + +namespace { + +#define ICEBERG_CHECK_JSON_FIELD(field_name, json) \ + if (!json.contains(field_name)) [[unlikely]] { \ + return unexpected({ \ + .kind = ErrorKind::kJsonParseError, \ + .message = std::format("Missing '{}' in {}", field_name, json.dump()), \ + }); \ + } + +expected, Error> StructTypeFromJson(const nlohmann::json& json) { + ICEBERG_CHECK_JSON_FIELD(kFields, json); + std::vector fields; - fields.reserve(struct_type->fields().size()); - for (auto& field : struct_type->fields()) { - fields.emplace_back(std::move(field)); + for (const auto& field_json : json[kFields]) { + ICEBERG_ASSIGN_OR_RAISE(auto field, FieldFromJson(field_json)); + fields.emplace_back(std::move(*field)); } - return std::make_unique(schema_id, std::move(fields)); + + return std::make_unique(std::move(fields)); +} + +expected, Error> ListTypeFromJson(const nlohmann::json& json) { + ICEBERG_CHECK_JSON_FIELD(kElement, json); + ICEBERG_CHECK_JSON_FIELD(kElementId, json); + ICEBERG_CHECK_JSON_FIELD(kElementRequired, json); + + ICEBERG_ASSIGN_OR_RAISE(auto element_type, TypeFromJson(json[kElement])); + int32_t element_id = json[kElementId].get(); + bool element_required = json[kElementRequired].get(); + + return std::make_unique( + SchemaField(element_id, std::string(ListType::kElementName), + std::move(element_type), !element_required)); +} + +expected, Error> MapTypeFromJson(const nlohmann::json& json) { + ICEBERG_CHECK_JSON_FIELD(kKey, json); + ICEBERG_CHECK_JSON_FIELD(kValue, json); + ICEBERG_CHECK_JSON_FIELD(kKeyId, json); + ICEBERG_CHECK_JSON_FIELD(kValueId, json); + ICEBERG_CHECK_JSON_FIELD(kValueRequired, json); + + ICEBERG_ASSIGN_OR_RAISE(auto key_type, TypeFromJson(json[kKey])); + ICEBERG_ASSIGN_OR_RAISE(auto value_type, TypeFromJson(json[kValue])); + int32_t key_id = json[kKeyId].get(); + int32_t value_id = json[kValueId].get(); + bool value_required = json[kValueRequired].get(); + + SchemaField key_field(key_id, std::string(MapType::kKeyName), std::move(key_type), + /*optional=*/false); + SchemaField value_field(value_id, std::string(MapType::kValueName), + std::move(value_type), !value_required); + return std::make_unique(std::move(key_field), std::move(value_field)); +} + +} // namespace + +expected, Error> TypeFromJson(const nlohmann::json& json) { + if (json.is_string()) { + std::string type_str = json.get(); + if (type_str == "boolean") { + return std::make_unique(); + } else if (type_str == "int") { + return std::make_unique(); + } else if (type_str == "long") { + return std::make_unique(); + } else if (type_str == "float") { + return std::make_unique(); + } else if (type_str == "double") { + return std::make_unique(); + } else if (type_str == "date") { + return std::make_unique(); + } else if (type_str == "time") { + return std::make_unique(); + } else if (type_str == "timestamp") { + return std::make_unique(); + } else if (type_str == "timestamptz") { + return std::make_unique(); + } else if (type_str == "string") { + return std::make_unique(); + } else if (type_str == "binary") { + return std::make_unique(); + } else if (type_str == "uuid") { + return std::make_unique(); + } else if (type_str.starts_with("fixed")) { + std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])"); + std::smatch match; + if (std::regex_match(type_str, match, fixed_regex)) { + return std::make_unique(std::stoi(match[1].str())); + } + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Invalid fixed type: {}", type_str), + }); + } else if (type_str.starts_with("decimal")) { + std::regex decimal_regex(R"(decimal\(\s*(\d+)\s*,\s*(\d+)\s*\))"); + std::smatch match; + if (std::regex_match(type_str, match, decimal_regex)) { + return std::make_unique(std::stoi(match[1].str()), + std::stoi(match[2].str())); + } + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Invalid decimal type: {}", type_str), + }); + } else { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Unknown primitive type: {}", type_str), + }); + } + } + + // For complex types like struct, list, and map + ICEBERG_CHECK_JSON_FIELD(kType, json); + std::string type_str = json[kType].get(); + if (type_str == kStruct) { + return StructTypeFromJson(json); + } else if (type_str == kList) { + return ListTypeFromJson(json); + } else if (type_str == kMap) { + return MapTypeFromJson(json); + } else { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Unknown complex type: {}", type_str), + }); + } +} + +expected, Error> FieldFromJson(const nlohmann::json& json) { + ICEBERG_CHECK_JSON_FIELD(kId, json); + ICEBERG_CHECK_JSON_FIELD(kName, json); + ICEBERG_CHECK_JSON_FIELD(kType, json); + ICEBERG_CHECK_JSON_FIELD(kRequired, json); + + ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json[kType])); + int32_t field_id = json[kId].get(); + std::string name = json[kName].get(); + bool required = json[kRequired].get(); + + return std::make_unique(field_id, std::move(name), std::move(type), + !required); +} + +expected, Error> SchemaFromJson(const nlohmann::json& json) { + ICEBERG_CHECK_JSON_FIELD(kType, json); + ICEBERG_CHECK_JSON_FIELD(kSchemaId, json); + + ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json)); + if (type->type_id() != TypeId::kStruct) [[unlikely]] { + return unexpected({ + .kind = ErrorKind::kJsonParseError, + .message = std::format("Schema must be a struct type, but got {}", json.dump()), + }); + } + + int32_t schema_id = json[kSchemaId].get(); + auto& struct_type = static_cast(*type); + return FromStructType(std::move(struct_type), schema_id); } } // namespace iceberg diff --git a/src/iceberg/schema_internal.h b/src/iceberg/schema_internal.h index 164044755..6cb2daa02 100644 --- a/src/iceberg/schema_internal.h +++ b/src/iceberg/schema_internal.h @@ -22,6 +22,7 @@ #include #include +#include #include "iceberg/error.h" #include "iceberg/expected.h" @@ -49,4 +50,40 @@ expected ToArrowSchema(const Schema& schema, ArrowSchema* out); expected, Error> FromArrowSchema(const ArrowSchema& schema, int32_t schema_id); +/// \brief Convert an Iceberg Schema to JSON. +/// +/// \param[in] schema The Iceberg schema to convert. +/// \return The JSON representation of the schema. +nlohmann::json SchemaToJson(const Schema& schema); + +/// \brief Convert an Iceberg Type to JSON. +/// +/// \param[in] type The Iceberg type to convert. +/// \return The JSON representation of the type. +nlohmann::json TypeToJson(const Type& type); + +/// \brief Convert an Iceberg SchemaField to JSON. +/// +/// \param[in] field The Iceberg field to convert. +/// \return The JSON representation of the field. +nlohmann::json FieldToJson(const SchemaField& field); + +/// \brief Convert JSON to an Iceberg Schema. +/// +/// \param[in] json The JSON representation of the schema. +/// \return The Iceberg schema or an error if the conversion fails. +expected, Error> SchemaFromJson(const nlohmann::json& json); + +/// \brief Convert JSON to an Iceberg Type. +/// +/// \param[in] json The JSON representation of the type. +/// \return The Iceberg type or an error if the conversion fails. +expected, Error> TypeFromJson(const nlohmann::json& json); + +/// \brief Convert JSON to an Iceberg SchemaField. +/// +/// \param[in] json The JSON representation of the field. +/// \return The Iceberg field or an error if the conversion fails. +expected, Error> FieldFromJson(const nlohmann::json& json); + } // namespace iceberg diff --git a/src/iceberg/util/macros.h b/src/iceberg/util/macros.h new file mode 100644 index 000000000..76f6c8e3e --- /dev/null +++ b/src/iceberg/util/macros.h @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#define ICEBERG_RETURN_UNEXPECTED(result) \ + if (!result) [[unlikely]] { \ + return unexpected(result.error()); \ + } + +#define ICEBERG_ASSIGN_OR_RAISE_IMPL(result_name, lhs, rexpr) \ + auto&& result_name = (rexpr); \ + ICEBERG_RETURN_UNEXPECTED(result_name) \ + lhs = std::move(result_name.value()); + +#define ICEBERG_CONCAT(x, y) x##y + +#define ICEBERG_ASSIGN_OR_RAISE_NAME(x, y) ICEBERG_CONCAT(x, y) + +#define ICEBERG_ASSIGN_OR_RAISE(lhs, rexpr) \ + ICEBERG_ASSIGN_OR_RAISE_IMPL(ICEBERG_ASSIGN_OR_RAISE_NAME(result_, __COUNTER__), lhs, \ + rexpr) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c516f99c5..36fd75a91 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -27,6 +27,7 @@ add_executable(schema_test) target_sources(schema_test PRIVATE schema_test.cc schema_field_test.cc + schema_json_test.cc type_test.cc transform_test.cc partition_field_test.cc diff --git a/test/schema_json_test.cc b/test/schema_json_test.cc new file mode 100644 index 000000000..d538ebead --- /dev/null +++ b/test/schema_json_test.cc @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include +#include +#include + +#include "gtest/gtest.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" +#include "iceberg/type.h" + +namespace iceberg { + +struct SchemaJsonParam { + std::string json; + std::shared_ptr type; +}; + +class TypeJsonTest : public ::testing::TestWithParam {}; + +TEST_P(TypeJsonTest, SingleTypeRoundTrip) { + // To Json + const auto& param = GetParam(); + auto json = TypeToJson(*param.type).dump(); + ASSERT_EQ(param.json, json); + + // From Json + auto type_result = TypeFromJson(nlohmann::json::parse(param.json)); + ASSERT_TRUE(type_result.has_value()) << "Failed to deserialize " << param.json + << " with error " << type_result.error().message; + auto type = std::move(type_result.value()); + ASSERT_EQ(*param.type, *type); +} + +INSTANTIATE_TEST_SUITE_P( + JsonSerailization, TypeJsonTest, + ::testing::Values( + SchemaJsonParam{.json = "\"boolean\"", .type = std::make_shared()}, + SchemaJsonParam{.json = "\"int\"", .type = std::make_shared()}, + SchemaJsonParam{.json = "\"long\"", .type = std::make_shared()}, + SchemaJsonParam{.json = "\"float\"", .type = std::make_shared()}, + SchemaJsonParam{.json = "\"double\"", .type = std::make_shared()}, + SchemaJsonParam{.json = "\"string\"", .type = std::make_shared()}, + SchemaJsonParam{.json = "\"binary\"", .type = std::make_shared()}, + SchemaJsonParam{.json = "\"uuid\"", .type = std::make_shared()}, + SchemaJsonParam{.json = "\"fixed[8]\"", .type = std::make_shared(8)}, + SchemaJsonParam{.json = "\"decimal(10,2)\"", + .type = std::make_shared(10, 2)}, + SchemaJsonParam{.json = "\"date\"", .type = std::make_shared()}, + SchemaJsonParam{.json = "\"time\"", .type = std::make_shared()}, + SchemaJsonParam{.json = "\"timestamp\"", + .type = std::make_shared()}, + SchemaJsonParam{.json = "\"timestamptz\"", + .type = std::make_shared()}, + SchemaJsonParam{ + .json = + R"({"element":"string","element-id":3,"element-required":true,"type":"list"})", + .type = std::make_shared( + SchemaField::MakeRequired(3, "element", std::make_shared()))}, + SchemaJsonParam{ + .json = + R"({"key":"string","key-id":4,"type":"map","value":"double","value-id":5,"value-required":false})", + .type = std::make_shared( + SchemaField::MakeRequired(4, "key", std::make_shared()), + SchemaField::MakeOptional(5, "value", std::make_shared()))}, + SchemaJsonParam{ + .json = + R"({"fields":[{"id":1,"name":"id","required":true,"type":"int"},{"id":2,"name":"name","required":false,"type":"string"}],"type":"struct"})", + .type = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeOptional(2, "name", std::make_shared())})})); + +TEST(TypeJsonTest, FromJsonWithSpaces) { + auto fixed_json = R"("fixed[ 8 ]")"; + auto fixed_result = TypeFromJson(nlohmann::json::parse(fixed_json)); + ASSERT_TRUE(fixed_result.has_value()); + ASSERT_EQ(fixed_result.value()->type_id(), TypeId::kFixed); + auto fixed = dynamic_cast(fixed_result.value().get()); + ASSERT_NE(fixed, nullptr); + ASSERT_EQ(fixed->length(), 8); + + auto decimal_json = "\"decimal( 10, 2 )\""; + auto decimal_result = TypeFromJson(nlohmann::json::parse(decimal_json)); + ASSERT_TRUE(decimal_result.has_value()); + ASSERT_EQ(decimal_result.value()->type_id(), TypeId::kDecimal); + auto decimal = dynamic_cast(decimal_result.value().get()); + ASSERT_NE(decimal, nullptr); + ASSERT_EQ(decimal->precision(), 10); + ASSERT_EQ(decimal->scale(), 2); +} + +TEST(SchemaJsonTest, RoundTrip) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"id","required":true,"type":"int"},{"id":2,"name":"name","required":false,"type":"string"}],"schema-id":1,"type":"struct"})"; + + auto from_json_result = SchemaFromJson(nlohmann::json::parse(json)); + ASSERT_TRUE(from_json_result.has_value()); + auto schema = std::move(from_json_result.value()); + ASSERT_EQ(schema->fields().size(), 2); + ASSERT_EQ(schema->schema_id(), 1); + + auto field1 = schema->fields()[0]; + ASSERT_EQ(field1.field_id(), 1); + ASSERT_EQ(field1.name(), "id"); + ASSERT_EQ(field1.type()->type_id(), TypeId::kInt); + ASSERT_FALSE(field1.optional()); + + auto field2 = schema->fields()[1]; + ASSERT_EQ(field2.field_id(), 2); + ASSERT_EQ(field2.name(), "name"); + ASSERT_EQ(field2.type()->type_id(), TypeId::kString); + ASSERT_TRUE(field2.optional()); + + auto dumped_json = SchemaToJson(*schema).dump(); + ASSERT_EQ(dumped_json, json); +} + +} // namespace iceberg