Skip to content

Commit 524fa68

Browse files
committed
feat: convert arrow schema to iceberg schema
1 parent faf9cc8 commit 524fa68

File tree

3 files changed

+393
-5
lines changed

3 files changed

+393
-5
lines changed

src/iceberg/schema.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class ICEBERG_EXPORT Schema : public StructType {
4848
/// evolution.
4949
[[nodiscard]] int32_t schema_id() const;
5050

51-
[[nodiscard]] std::string ToString() const;
51+
[[nodiscard]] std::string ToString() const override;
5252

5353
friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); }
5454

src/iceberg/schema_internal.cc

Lines changed: 166 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "iceberg/schema_internal.h"
2121

22+
#include <cstring>
2223
#include <format>
2324
#include <optional>
2425
#include <string>
@@ -32,6 +33,8 @@ namespace {
3233

3334
constexpr const char* kArrowExtensionName = "ARROW:extension:name";
3435
constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata";
36+
constexpr const char* kArrowUuidExtensionName = "arrow.uuid";
37+
constexpr int32_t kUnknownFieldId = -1;
3538

3639
// Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code.
3740
ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view name,
@@ -141,7 +144,7 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n
141144
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, /*fixed_size=*/16));
142145
NANOARROW_RETURN_NOT_OK(
143146
ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName),
144-
ArrowCharView("arrow.uuid")));
147+
ArrowCharView(kArrowUuidExtensionName)));
145148
} break;
146149
}
147150

@@ -183,11 +186,170 @@ expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out) {
183186
return {};
184187
}
185188

189+
namespace {
190+
191+
int32_t GetFieldId(const ArrowSchema& schema) {
192+
if (schema.metadata == nullptr) {
193+
return kUnknownFieldId;
194+
}
195+
196+
ArrowStringView field_id_key{.data = kFieldIdKey.data(),
197+
.size_bytes = kFieldIdKey.size()};
198+
ArrowStringView field_id_value;
199+
if (ArrowMetadataGetValue(schema.metadata, field_id_key, &field_id_value) !=
200+
NANOARROW_OK) {
201+
return kUnknownFieldId;
202+
}
203+
204+
return std::stoi(std::string(field_id_value.data, field_id_value.size_bytes));
205+
}
206+
207+
expected<std::shared_ptr<Type>, Error> FromArrowSchema(const ArrowSchema& schema) {
208+
auto to_schema_field = [](const ArrowSchema& schema) -> expected<SchemaField, Error> {
209+
auto field_type_result = FromArrowSchema(schema);
210+
if (!field_type_result) {
211+
return unexpected<Error>(field_type_result.error());
212+
}
213+
214+
auto field_id = GetFieldId(schema);
215+
bool is_optional = (schema.flags & ARROW_FLAG_NULLABLE) != 0;
216+
return SchemaField(field_id, schema.name, std::move(field_type_result.value()),
217+
is_optional);
218+
};
219+
220+
ArrowError arrow_error;
221+
ArrowErrorInit(&arrow_error);
222+
223+
ArrowSchemaView schema_view;
224+
if (auto error_code = ArrowSchemaViewInit(&schema_view, &schema, &arrow_error);
225+
error_code != NANOARROW_OK) {
226+
return unexpected<Error>{
227+
{.kind = ErrorKind::kInvalidSchema,
228+
.message = std::format("Failed to read Arrow schema, code: {}, message: {}",
229+
error_code, arrow_error.message)}};
230+
}
231+
232+
switch (schema_view.type) {
233+
case NANOARROW_TYPE_STRUCT: {
234+
std::vector<SchemaField> fields;
235+
fields.reserve(schema.n_children);
236+
237+
for (int i = 0; i < schema.n_children; i++) {
238+
auto field_result = to_schema_field(*schema.children[i]);
239+
if (!field_result) {
240+
return unexpected<Error>(field_result.error());
241+
}
242+
fields.emplace_back(std::move(*field_result));
243+
}
244+
245+
return std::make_shared<StructType>(std::move(fields));
246+
} break;
247+
case NANOARROW_TYPE_LIST: {
248+
auto element_field_result = to_schema_field(*schema.children[0]);
249+
if (!element_field_result) {
250+
return unexpected<Error>(element_field_result.error());
251+
}
252+
return std::make_shared<ListType>(std::move(element_field_result.value()));
253+
} break;
254+
case NANOARROW_TYPE_MAP: {
255+
auto key_field_result = to_schema_field(*schema.children[0]->children[0]);
256+
if (!key_field_result) {
257+
return unexpected<Error>(key_field_result.error());
258+
}
259+
260+
auto value_field_result = to_schema_field(*schema.children[0]->children[1]);
261+
if (!value_field_result) {
262+
return unexpected<Error>(value_field_result.error());
263+
}
264+
265+
return std::make_shared<MapType>(std::move(key_field_result.value()),
266+
std::move(value_field_result.value()));
267+
} break;
268+
case NANOARROW_TYPE_BOOL:
269+
return std::make_shared<BooleanType>();
270+
case NANOARROW_TYPE_INT32:
271+
return std::make_shared<IntType>();
272+
case NANOARROW_TYPE_INT64:
273+
return std::make_shared<LongType>();
274+
case NANOARROW_TYPE_FLOAT:
275+
return std::make_shared<FloatType>();
276+
case NANOARROW_TYPE_DOUBLE:
277+
return std::make_shared<DoubleType>();
278+
case NANOARROW_TYPE_DECIMAL128:
279+
return std::make_shared<DecimalType>(schema_view.decimal_precision,
280+
schema_view.decimal_scale);
281+
case NANOARROW_TYPE_DATE32:
282+
return std::make_shared<DateType>();
283+
case NANOARROW_TYPE_TIME64:
284+
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
285+
return unexpected<Error>{
286+
{.kind = ErrorKind::kInvalidSchema,
287+
.message = std::format("Unsupported time unit for Arrow time type: {}",
288+
static_cast<int>(schema_view.time_unit))}};
289+
}
290+
return std::make_shared<TimeType>();
291+
case NANOARROW_TYPE_TIMESTAMP: {
292+
bool with_timezone =
293+
schema_view.timezone != nullptr && std::strlen(schema_view.timezone) > 0;
294+
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
295+
return unexpected<Error>{
296+
{.kind = ErrorKind::kInvalidSchema,
297+
.message = std::format("Unsupported time unit for Arrow timestamp type: {}",
298+
static_cast<int>(schema_view.time_unit))}};
299+
}
300+
if (with_timezone) {
301+
return std::make_shared<TimestampTzType>();
302+
} else {
303+
return std::make_shared<TimestampType>();
304+
}
305+
}
306+
case NANOARROW_TYPE_STRING:
307+
return std::make_shared<StringType>();
308+
case NANOARROW_TYPE_BINARY:
309+
return std::make_shared<BinaryType>();
310+
case NANOARROW_TYPE_FIXED_SIZE_BINARY: {
311+
if (auto extension_name = std::string_view(schema_view.extension_name.data,
312+
schema_view.extension_name.size_bytes);
313+
extension_name == kArrowUuidExtensionName) {
314+
if (schema_view.fixed_size != 16) {
315+
return unexpected<Error>{{.kind = ErrorKind::kInvalidSchema,
316+
.message = "UUID type must have a fixed size of 16"}};
317+
}
318+
return std::make_shared<UuidType>();
319+
}
320+
return std::make_shared<FixedType>(schema_view.fixed_size);
321+
}
322+
default:
323+
return unexpected<Error>{
324+
{.kind = ErrorKind::kInvalidSchema,
325+
.message = std::format("Unsupported Arrow type: {}",
326+
ArrowTypeString(schema_view.type))}};
327+
}
328+
}
329+
330+
} // namespace
331+
186332
expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
187333
int32_t schema_id) {
188-
// TODO(wgtmac): Implement this
189-
return unexpected<Error>{
190-
{.kind = ErrorKind::kInvalidSchema, .message = "Not implemented yet"}};
334+
auto type_result = FromArrowSchema(schema);
335+
if (!type_result) {
336+
return unexpected<Error>(type_result.error());
337+
}
338+
339+
auto& type = type_result.value();
340+
if (type->type_id() != TypeId::kStruct) {
341+
return unexpected<Error>{
342+
{.kind = ErrorKind::kInvalidSchema,
343+
.message = "Arrow schema must be a struct type for Iceberg schema"}};
344+
}
345+
346+
auto* struct_type = static_cast<StructType*>(type.get());
347+
std::vector<SchemaField> fields;
348+
fields.reserve(struct_type->fields().size());
349+
for (auto& field : struct_type->fields()) {
350+
fields.emplace_back(std::move(field));
351+
}
352+
return std::make_unique<Schema>(schema_id, std::move(fields));
191353
}
192354

193355
} // namespace iceberg

0 commit comments

Comments
 (0)