Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/iceberg/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ICEBERG_EXPORT Schema : public StructType {
/// evolution.
[[nodiscard]] int32_t schema_id() const;

[[nodiscard]] std::string ToString() const;
[[nodiscard]] std::string ToString() const override;

friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); }

Expand Down
171 changes: 167 additions & 4 deletions src/iceberg/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "iceberg/schema_internal.h"

#include <cstring>
#include <format>
#include <optional>
#include <string>
Expand All @@ -32,6 +33,8 @@ namespace {

constexpr const char* kArrowExtensionName = "ARROW:extension:name";
constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata";
constexpr const char* kArrowUuidExtensionName = "arrow.uuid";
constexpr int32_t kUnknownFieldId = -1;

// Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code.
ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view name,
Expand Down Expand Up @@ -141,7 +144,7 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, /*fixed_size=*/16));
NANOARROW_RETURN_NOT_OK(
ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName),
ArrowCharView("arrow.uuid")));
ArrowCharView(kArrowUuidExtensionName)));
} break;
}

Expand Down Expand Up @@ -183,11 +186,171 @@ expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out) {
return {};
}

namespace {

int32_t GetFieldId(const ArrowSchema& schema) {
if (schema.metadata == nullptr) {
return kUnknownFieldId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for now. Later on, we can fetch the field IDs from name mapping if they are not in the metadata. Or we can assign fresh IDs in case we want to create a new table from an Arrow schema 👍

}

ArrowStringView field_id_key{.data = kFieldIdKey.data(),
.size_bytes = kFieldIdKey.size()};
ArrowStringView field_id_value;
if (ArrowMetadataGetValue(schema.metadata, field_id_key, &field_id_value) !=
NANOARROW_OK) {
return kUnknownFieldId;
}

return std::stoi(std::string(field_id_value.data, field_id_value.size_bytes));
}

expected<std::shared_ptr<Type>, Error> FromArrowSchema(const ArrowSchema& schema) {
auto to_schema_field =
[](const ArrowSchema& schema) -> expected<std::unique_ptr<SchemaField>, Error> {
auto field_type_result = FromArrowSchema(schema);
if (!field_type_result) {
return unexpected<Error>(field_type_result.error());
}

auto field_id = GetFieldId(schema);
bool is_optional = (schema.flags & ARROW_FLAG_NULLABLE) != 0;
return std::make_unique<SchemaField>(
field_id, schema.name, std::move(field_type_result.value()), is_optional);
};

ArrowError arrow_error;
ArrowErrorInit(&arrow_error);

ArrowSchemaView schema_view;
if (auto error_code = ArrowSchemaViewInit(&schema_view, &schema, &arrow_error);
error_code != NANOARROW_OK) {
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = std::format("Failed to read Arrow schema, code: {}, message: {}",
error_code, arrow_error.message)}};
}

switch (schema_view.type) {
case NANOARROW_TYPE_STRUCT: {
std::vector<SchemaField> fields;
fields.reserve(schema.n_children);

for (int i = 0; i < schema.n_children; i++) {
auto field_result = to_schema_field(*schema.children[i]);
if (!field_result) {
return unexpected<Error>(field_result.error());
}
fields.emplace_back(std::move(*field_result.value()));
}

return std::make_shared<StructType>(std::move(fields));
}
case NANOARROW_TYPE_LIST: {
auto element_field_result = to_schema_field(*schema.children[0]);
if (!element_field_result) {
return unexpected<Error>(element_field_result.error());
}
return std::make_shared<ListType>(std::move(*element_field_result.value()));
}
case NANOARROW_TYPE_MAP: {
auto key_field_result = to_schema_field(*schema.children[0]->children[0]);
if (!key_field_result) {
return unexpected<Error>(key_field_result.error());
}

auto value_field_result = to_schema_field(*schema.children[0]->children[1]);
if (!value_field_result) {
return unexpected<Error>(value_field_result.error());
}

return std::make_shared<MapType>(std::move(*key_field_result.value()),
std::move(*value_field_result.value()));
}
case NANOARROW_TYPE_BOOL:
return std::make_shared<BooleanType>();
case NANOARROW_TYPE_INT32:
return std::make_shared<IntType>();
case NANOARROW_TYPE_INT64:
return std::make_shared<LongType>();
case NANOARROW_TYPE_FLOAT:
return std::make_shared<FloatType>();
case NANOARROW_TYPE_DOUBLE:
return std::make_shared<DoubleType>();
case NANOARROW_TYPE_DECIMAL128:
return std::make_shared<DecimalType>(schema_view.decimal_precision,
schema_view.decimal_scale);
case NANOARROW_TYPE_DATE32:
return std::make_shared<DateType>();
case NANOARROW_TYPE_TIME64:
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = std::format("Unsupported time unit for Arrow time type: {}",
static_cast<int>(schema_view.time_unit))}};
}
return std::make_shared<TimeType>();
case NANOARROW_TYPE_TIMESTAMP: {
bool with_timezone =
schema_view.timezone != nullptr && std::strlen(schema_view.timezone) > 0;
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = std::format("Unsupported time unit for Arrow timestamp type: {}",
static_cast<int>(schema_view.time_unit))}};
}
if (with_timezone) {
return std::make_shared<TimestampTzType>();
} else {
return std::make_shared<TimestampType>();
}
}
case NANOARROW_TYPE_STRING:
return std::make_shared<StringType>();
case NANOARROW_TYPE_BINARY:
return std::make_shared<BinaryType>();
case NANOARROW_TYPE_FIXED_SIZE_BINARY: {
if (auto extension_name = std::string_view(schema_view.extension_name.data,
schema_view.extension_name.size_bytes);
extension_name == kArrowUuidExtensionName) {
if (schema_view.fixed_size != 16) {
return unexpected<Error>{{.kind = ErrorKind::kInvalidSchema,
.message = "UUID type must have a fixed size of 16"}};
}
return std::make_shared<UuidType>();
}
return std::make_shared<FixedType>(schema_view.fixed_size);
}
default:
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = std::format("Unsupported Arrow type: {}",
ArrowTypeString(schema_view.type))}};
}
}

} // namespace

expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
int32_t schema_id) {
// TODO(wgtmac): Implement this
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema, .message = "Not implemented yet"}};
auto type_result = FromArrowSchema(schema);
if (!type_result) {
return unexpected<Error>(type_result.error());
}

auto& type = type_result.value();
if (type->type_id() != TypeId::kStruct) {
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = "Arrow schema must be a struct type for Iceberg schema"}};
}

auto* struct_type = static_cast<StructType*>(type.get());
std::vector<SchemaField> fields;
fields.reserve(struct_type->fields().size());
for (auto& field : struct_type->fields()) {
fields.emplace_back(std::move(field));
}
return std::make_unique<Schema>(schema_id, std::move(fields));
}

} // namespace iceberg
Loading
Loading