diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h index c58802d2f..0da0c4031 100644 --- a/src/iceberg/schema.h +++ b/src/iceberg/schema.h @@ -48,7 +48,7 @@ class ICEBERG_EXPORT Schema : public StructType { /// evolution. [[nodiscard]] int32_t schema_id() const; - [[nodiscard]] std::string ToString() const; + [[nodiscard]] std::string ToString() const override; friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); } diff --git a/src/iceberg/schema_internal.cc b/src/iceberg/schema_internal.cc index 7ca48d15d..3f7bdff0a 100644 --- a/src/iceberg/schema_internal.cc +++ b/src/iceberg/schema_internal.cc @@ -19,6 +19,7 @@ #include "iceberg/schema_internal.h" +#include #include #include #include @@ -32,6 +33,8 @@ namespace { constexpr const char* kArrowExtensionName = "ARROW:extension:name"; constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata"; +constexpr const char* kArrowUuidExtensionName = "arrow.uuid"; +constexpr int32_t kUnknownFieldId = -1; // Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code. ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view name, @@ -141,7 +144,7 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, /*fixed_size=*/16)); NANOARROW_RETURN_NOT_OK( ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName), - ArrowCharView("arrow.uuid"))); + ArrowCharView(kArrowUuidExtensionName))); } break; } @@ -183,11 +186,171 @@ expected ToArrowSchema(const Schema& schema, ArrowSchema* out) { return {}; } +namespace { + +int32_t GetFieldId(const ArrowSchema& schema) { + if (schema.metadata == nullptr) { + return kUnknownFieldId; + } + + ArrowStringView field_id_key{.data = kFieldIdKey.data(), + .size_bytes = kFieldIdKey.size()}; + ArrowStringView field_id_value; + if (ArrowMetadataGetValue(schema.metadata, field_id_key, &field_id_value) != + NANOARROW_OK) { + return kUnknownFieldId; + } + + return std::stoi(std::string(field_id_value.data, field_id_value.size_bytes)); +} + +expected, Error> FromArrowSchema(const ArrowSchema& schema) { + auto to_schema_field = + [](const ArrowSchema& schema) -> expected, Error> { + auto field_type_result = FromArrowSchema(schema); + if (!field_type_result) { + return unexpected(field_type_result.error()); + } + + auto field_id = GetFieldId(schema); + bool is_optional = (schema.flags & ARROW_FLAG_NULLABLE) != 0; + return std::make_unique( + field_id, schema.name, std::move(field_type_result.value()), is_optional); + }; + + ArrowError arrow_error; + ArrowErrorInit(&arrow_error); + + ArrowSchemaView schema_view; + if (auto error_code = ArrowSchemaViewInit(&schema_view, &schema, &arrow_error); + error_code != NANOARROW_OK) { + return unexpected{ + {.kind = ErrorKind::kInvalidSchema, + .message = std::format("Failed to read Arrow schema, code: {}, message: {}", + error_code, arrow_error.message)}}; + } + + switch (schema_view.type) { + case NANOARROW_TYPE_STRUCT: { + std::vector fields; + fields.reserve(schema.n_children); + + for (int i = 0; i < schema.n_children; i++) { + auto field_result = to_schema_field(*schema.children[i]); + if (!field_result) { + return unexpected(field_result.error()); + } + fields.emplace_back(std::move(*field_result.value())); + } + + return std::make_shared(std::move(fields)); + } + case NANOARROW_TYPE_LIST: { + auto element_field_result = to_schema_field(*schema.children[0]); + if (!element_field_result) { + return unexpected(element_field_result.error()); + } + return std::make_shared(std::move(*element_field_result.value())); + } + case NANOARROW_TYPE_MAP: { + auto key_field_result = to_schema_field(*schema.children[0]->children[0]); + if (!key_field_result) { + return unexpected(key_field_result.error()); + } + + auto value_field_result = to_schema_field(*schema.children[0]->children[1]); + if (!value_field_result) { + return unexpected(value_field_result.error()); + } + + return std::make_shared(std::move(*key_field_result.value()), + std::move(*value_field_result.value())); + } + case NANOARROW_TYPE_BOOL: + return std::make_shared(); + case NANOARROW_TYPE_INT32: + return std::make_shared(); + case NANOARROW_TYPE_INT64: + return std::make_shared(); + case NANOARROW_TYPE_FLOAT: + return std::make_shared(); + case NANOARROW_TYPE_DOUBLE: + return std::make_shared(); + case NANOARROW_TYPE_DECIMAL128: + return std::make_shared(schema_view.decimal_precision, + schema_view.decimal_scale); + case NANOARROW_TYPE_DATE32: + return std::make_shared(); + case NANOARROW_TYPE_TIME64: + if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) { + return unexpected{ + {.kind = ErrorKind::kInvalidSchema, + .message = std::format("Unsupported time unit for Arrow time type: {}", + static_cast(schema_view.time_unit))}}; + } + return std::make_shared(); + case NANOARROW_TYPE_TIMESTAMP: { + bool with_timezone = + schema_view.timezone != nullptr && std::strlen(schema_view.timezone) > 0; + if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) { + return unexpected{ + {.kind = ErrorKind::kInvalidSchema, + .message = std::format("Unsupported time unit for Arrow timestamp type: {}", + static_cast(schema_view.time_unit))}}; + } + if (with_timezone) { + return std::make_shared(); + } else { + return std::make_shared(); + } + } + case NANOARROW_TYPE_STRING: + return std::make_shared(); + case NANOARROW_TYPE_BINARY: + return std::make_shared(); + case NANOARROW_TYPE_FIXED_SIZE_BINARY: { + if (auto extension_name = std::string_view(schema_view.extension_name.data, + schema_view.extension_name.size_bytes); + extension_name == kArrowUuidExtensionName) { + if (schema_view.fixed_size != 16) { + return unexpected{{.kind = ErrorKind::kInvalidSchema, + .message = "UUID type must have a fixed size of 16"}}; + } + return std::make_shared(); + } + return std::make_shared(schema_view.fixed_size); + } + default: + return unexpected{ + {.kind = ErrorKind::kInvalidSchema, + .message = std::format("Unsupported Arrow type: {}", + ArrowTypeString(schema_view.type))}}; + } +} + +} // namespace + expected, Error> FromArrowSchema(const ArrowSchema& schema, int32_t schema_id) { - // TODO(wgtmac): Implement this - return unexpected{ - {.kind = ErrorKind::kInvalidSchema, .message = "Not implemented yet"}}; + auto type_result = FromArrowSchema(schema); + if (!type_result) { + return unexpected(type_result.error()); + } + + auto& type = type_result.value(); + if (type->type_id() != TypeId::kStruct) { + return unexpected{ + {.kind = ErrorKind::kInvalidSchema, + .message = "Arrow schema must be a struct type for Iceberg schema"}}; + } + + auto* struct_type = static_cast(type.get()); + std::vector fields; + fields.reserve(struct_type->fields().size()); + for (auto& field : struct_type->fields()) { + fields.emplace_back(std::move(field)); + } + return std::make_unique(schema_id, std::move(fields)); } } // namespace iceberg diff --git a/test/arrow_test.cc b/test/arrow_test.cc index b48df086e..85dcc91f9 100644 --- a/test/arrow_test.cc +++ b/test/arrow_test.cc @@ -18,6 +18,7 @@ */ #include +#include #include #include @@ -259,4 +260,229 @@ TEST(ToArrowSchemaTest, MapType) { /*nullable=*/true, kValueFieldId)); } +struct FromArrowSchemaParam { + std::shared_ptr arrow_type; + bool optional = true; + std::shared_ptr iceberg_type; +}; + +class FromArrowSchemaTest : public ::testing::TestWithParam {}; + +TEST_P(FromArrowSchemaTest, PrimitiveType) { + constexpr std::string_view kFieldName = "foo"; + constexpr int32_t kFieldId = 1024; + const auto& param = GetParam(); + + auto metadata = + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kFieldIdKey), std::to_string(kFieldId)}}); + auto arrow_schema = ::arrow::schema({::arrow::field( + std::string(kFieldName), param.arrow_type, param.optional, std::move(metadata))}); + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); + + auto type_result = FromArrowSchema(exported_schema, /*schema_id=*/1); + ASSERT_THAT(type_result, IsOk()); + + const auto& schema = type_result.value(); + ASSERT_EQ(schema->schema_id(), 1); + ASSERT_EQ(schema->fields().size(), 1); + + const auto& field = schema->fields()[0]; + ASSERT_EQ(field.name(), kFieldName); + ASSERT_EQ(field.field_id(), kFieldId); + ASSERT_EQ(field.optional(), param.optional); + ASSERT_EQ(*field.type(), *param.iceberg_type); +} + +INSTANTIATE_TEST_SUITE_P( + SchemaConversion, FromArrowSchemaTest, + ::testing::Values( + FromArrowSchemaParam{.arrow_type = ::arrow::boolean(), + .optional = false, + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{.arrow_type = ::arrow::int32(), + .optional = true, + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{.arrow_type = ::arrow::int64(), + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{.arrow_type = ::arrow::float32(), + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{.arrow_type = ::arrow::float64(), + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{.arrow_type = ::arrow::decimal128(10, 2), + .iceberg_type = std::make_shared(10, 2)}, + FromArrowSchemaParam{.arrow_type = ::arrow::date32(), + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{.arrow_type = ::arrow::time64(arrow::TimeUnit::MICRO), + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO), + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{ + .arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO, "UTC"), + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{.arrow_type = ::arrow::utf8(), + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{.arrow_type = ::arrow::binary(), + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{.arrow_type = ::arrow::extension::uuid(), + .iceberg_type = std::make_shared()}, + FromArrowSchemaParam{.arrow_type = ::arrow::fixed_size_binary(20), + .iceberg_type = std::make_shared(20)})); + +TEST(FromArrowSchemaTest, StructType) { + constexpr int32_t kStructFieldId = 1; + constexpr int32_t kIntFieldId = 2; + constexpr int32_t kStrFieldId = 3; + + constexpr std::string_view kStructFieldName = "struct_field"; + constexpr std::string_view kIntFieldName = "int_field"; + constexpr std::string_view kStrFieldName = "str_field"; + + auto int_field = ::arrow::field( + std::string(kIntFieldName), ::arrow::int32(), /*nullable=*/false, + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kFieldIdKey), std::to_string(kIntFieldId)}})); + auto str_field = ::arrow::field( + std::string(kStrFieldName), ::arrow::utf8(), /*nullable=*/true, + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kFieldIdKey), std::to_string(kStrFieldId)}})); + auto struct_type = ::arrow::struct_({int_field, str_field}); + auto struct_field = ::arrow::field( + std::string(kStructFieldName), struct_type, /*nullable=*/false, + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kFieldIdKey), std::to_string(kStructFieldId)}})); + auto arrow_schema = ::arrow::schema({struct_field}); + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); + + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ASSERT_THAT(schema_result, IsOk()); + + const auto& iceberg_schema = schema_result.value(); + ASSERT_EQ(iceberg_schema->schema_id(), 0); + ASSERT_EQ(iceberg_schema->fields().size(), 1); + + const auto& field = iceberg_schema->fields()[0]; + ASSERT_EQ(field.name(), kStructFieldName); + ASSERT_EQ(field.field_id(), kStructFieldId); + ASSERT_FALSE(field.optional()); + ASSERT_EQ(field.type()->type_id(), TypeId::kStruct); + + auto* struct_field_type = dynamic_cast(field.type().get()); + ASSERT_NE(struct_field_type, nullptr); + ASSERT_EQ(struct_field_type->fields().size(), 2); + + const auto& int_iceberg_field = struct_field_type->fields()[0]; + ASSERT_EQ(int_iceberg_field.name(), kIntFieldName); + ASSERT_EQ(int_iceberg_field.field_id(), kIntFieldId); + ASSERT_FALSE(int_iceberg_field.optional()); + ASSERT_EQ(int_iceberg_field.type()->type_id(), TypeId::kInt); + + const auto& str_iceberg_field = struct_field_type->fields()[1]; + ASSERT_EQ(str_iceberg_field.name(), kStrFieldName); + ASSERT_EQ(str_iceberg_field.field_id(), kStrFieldId); + ASSERT_TRUE(str_iceberg_field.optional()); + ASSERT_EQ(str_iceberg_field.type()->type_id(), TypeId::kString); +} + +TEST(FromArrowSchemaTest, ListType) { + constexpr std::string_view kListFieldName = "list_field"; + constexpr std::string_view kElemFieldName = "element"; + constexpr int32_t kListFieldId = 1; + constexpr int32_t kElemFieldId = 2; + + auto element_field = ::arrow::field( + std::string(kElemFieldName), ::arrow::int64(), /*nullable=*/true, + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kFieldIdKey), std::to_string(kElemFieldId)}})); + auto list_type = ::arrow::list(element_field); + auto list_field = ::arrow::field( + std::string(kListFieldName), list_type, /*nullable=*/false, + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kFieldIdKey), std::to_string(kListFieldId)}})); + auto arrow_schema = ::arrow::schema({list_field}); + + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); + + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ASSERT_THAT(schema_result, IsOk()); + + const auto& iceberg_schema = schema_result.value(); + ASSERT_EQ(iceberg_schema->schema_id(), 0); + ASSERT_EQ(iceberg_schema->fields().size(), 1); + + const auto& field = iceberg_schema->fields()[0]; + ASSERT_EQ(field.name(), kListFieldName); + ASSERT_EQ(field.field_id(), kListFieldId); + ASSERT_FALSE(field.optional()); + ASSERT_EQ(field.type()->type_id(), TypeId::kList); + + auto* list_field_type = dynamic_cast(field.type().get()); + ASSERT_NE(list_field_type, nullptr); + + const auto& element = list_field_type->fields()[0]; + ASSERT_EQ(element.name(), kElemFieldName); + ASSERT_EQ(element.field_id(), kElemFieldId); + ASSERT_TRUE(element.optional()); + ASSERT_EQ(element.type()->type_id(), TypeId::kLong); +} + +TEST(FromArrowSchemaTest, MapType) { + constexpr std::string_view kMapFieldName = "map_field"; + constexpr std::string_view kKeyFieldName = "key"; + constexpr std::string_view kValueFieldName = "value"; + + constexpr int32_t kFieldId = 1; + constexpr int32_t kKeyFieldId = 2; + constexpr int32_t kValueFieldId = 3; + + auto key_field = ::arrow::field( + std::string(kKeyFieldName), ::arrow::utf8(), /*nullable=*/false, + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kFieldIdKey), std::to_string(kKeyFieldId)}})); + auto value_field = ::arrow::field( + std::string(kValueFieldName), ::arrow::int32(), /*nullable=*/true, + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kFieldIdKey), std::to_string(kValueFieldId)}})); + auto map_type = std::make_shared<::arrow::MapType>(key_field, value_field); + auto map_field = ::arrow::field( + std::string(kMapFieldName), map_type, /*nullable=*/true, + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kFieldIdKey), std::to_string(kFieldId)}})); + auto arrow_schema = ::arrow::schema({map_field}); + + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); + + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ASSERT_THAT(schema_result, IsOk()); + + const auto& iceberg_schema = schema_result.value(); + ASSERT_EQ(iceberg_schema->schema_id(), 0); + ASSERT_EQ(iceberg_schema->fields().size(), 1); + + const auto& field = iceberg_schema->fields()[0]; + ASSERT_EQ(field.name(), kMapFieldName); + ASSERT_EQ(field.field_id(), kFieldId); + ASSERT_TRUE(field.optional()); + ASSERT_EQ(field.type()->type_id(), TypeId::kMap); + + auto* map_field_type = dynamic_cast(field.type().get()); + ASSERT_NE(map_field_type, nullptr); + + const auto& key = map_field_type->key(); + ASSERT_EQ(key.name(), kKeyFieldName); + ASSERT_EQ(key.field_id(), kKeyFieldId); + ASSERT_FALSE(key.optional()); + ASSERT_EQ(key.type()->type_id(), TypeId::kString); + + const auto& value = map_field_type->value(); + ASSERT_EQ(value.name(), kValueFieldName); + ASSERT_EQ(value.field_id(), kValueFieldId); + ASSERT_TRUE(value.optional()); + ASSERT_EQ(value.type()->type_id(), TypeId::kInt); +} + } // namespace iceberg