Skip to content

Commit d4c0068

Browse files
committed
feat: convert arrow schema to iceberg schema
1 parent faf9cc8 commit d4c0068

File tree

3 files changed

+394
-5
lines changed

3 files changed

+394
-5
lines changed

src/iceberg/schema.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class ICEBERG_EXPORT Schema : public StructType {
4848
/// evolution.
4949
[[nodiscard]] int32_t schema_id() const;
5050

51-
[[nodiscard]] std::string ToString() const;
51+
[[nodiscard]] std::string ToString() const override;
5252

5353
friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); }
5454

src/iceberg/schema_internal.cc

Lines changed: 167 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "iceberg/schema_internal.h"
2121

22+
#include <cstring>
2223
#include <format>
2324
#include <optional>
2425
#include <string>
@@ -32,6 +33,8 @@ namespace {
3233

3334
constexpr const char* kArrowExtensionName = "ARROW:extension:name";
3435
constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata";
36+
constexpr const char* kArrowUuidExtensionName = "arrow.uuid";
37+
constexpr int32_t kUnknownFieldId = -1;
3538

3639
// Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code.
3740
ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view name,
@@ -141,7 +144,7 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n
141144
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, /*fixed_size=*/16));
142145
NANOARROW_RETURN_NOT_OK(
143146
ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName),
144-
ArrowCharView("arrow.uuid")));
147+
ArrowCharView(kArrowUuidExtensionName)));
145148
} break;
146149
}
147150

@@ -183,11 +186,171 @@ expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out) {
183186
return {};
184187
}
185188

189+
namespace {
190+
191+
int32_t GetFieldId(const ArrowSchema& schema) {
192+
if (schema.metadata == nullptr) {
193+
return kUnknownFieldId;
194+
}
195+
196+
ArrowStringView field_id_key{.data = kFieldIdKey.data(),
197+
.size_bytes = kFieldIdKey.size()};
198+
ArrowStringView field_id_value;
199+
if (ArrowMetadataGetValue(schema.metadata, field_id_key, &field_id_value) !=
200+
NANOARROW_OK) {
201+
return kUnknownFieldId;
202+
}
203+
204+
return std::stoi(std::string(field_id_value.data, field_id_value.size_bytes));
205+
}
206+
207+
expected<std::shared_ptr<Type>, Error> FromArrowSchema(const ArrowSchema& schema) {
208+
auto to_schema_field =
209+
[](const ArrowSchema& schema) -> expected<std::unique_ptr<SchemaField>, Error> {
210+
auto field_type_result = FromArrowSchema(schema);
211+
if (!field_type_result) {
212+
return unexpected<Error>(field_type_result.error());
213+
}
214+
215+
auto field_id = GetFieldId(schema);
216+
bool is_optional = (schema.flags & ARROW_FLAG_NULLABLE) != 0;
217+
return std::make_unique<SchemaField>(
218+
field_id, schema.name, std::move(field_type_result.value()), is_optional);
219+
};
220+
221+
ArrowError arrow_error;
222+
ArrowErrorInit(&arrow_error);
223+
224+
ArrowSchemaView schema_view;
225+
if (auto error_code = ArrowSchemaViewInit(&schema_view, &schema, &arrow_error);
226+
error_code != NANOARROW_OK) {
227+
return unexpected<Error>{
228+
{.kind = ErrorKind::kInvalidSchema,
229+
.message = std::format("Failed to read Arrow schema, code: {}, message: {}",
230+
error_code, arrow_error.message)}};
231+
}
232+
233+
switch (schema_view.type) {
234+
case NANOARROW_TYPE_STRUCT: {
235+
std::vector<SchemaField> fields;
236+
fields.reserve(schema.n_children);
237+
238+
for (int i = 0; i < schema.n_children; i++) {
239+
auto field_result = to_schema_field(*schema.children[i]);
240+
if (!field_result) {
241+
return unexpected<Error>(field_result.error());
242+
}
243+
fields.emplace_back(std::move(*field_result.value()));
244+
}
245+
246+
return std::make_shared<StructType>(std::move(fields));
247+
}
248+
case NANOARROW_TYPE_LIST: {
249+
auto element_field_result = to_schema_field(*schema.children[0]);
250+
if (!element_field_result) {
251+
return unexpected<Error>(element_field_result.error());
252+
}
253+
return std::make_shared<ListType>(std::move(*element_field_result.value()));
254+
}
255+
case NANOARROW_TYPE_MAP: {
256+
auto key_field_result = to_schema_field(*schema.children[0]->children[0]);
257+
if (!key_field_result) {
258+
return unexpected<Error>(key_field_result.error());
259+
}
260+
261+
auto value_field_result = to_schema_field(*schema.children[0]->children[1]);
262+
if (!value_field_result) {
263+
return unexpected<Error>(value_field_result.error());
264+
}
265+
266+
return std::make_shared<MapType>(std::move(*key_field_result.value()),
267+
std::move(*value_field_result.value()));
268+
}
269+
case NANOARROW_TYPE_BOOL:
270+
return std::make_shared<BooleanType>();
271+
case NANOARROW_TYPE_INT32:
272+
return std::make_shared<IntType>();
273+
case NANOARROW_TYPE_INT64:
274+
return std::make_shared<LongType>();
275+
case NANOARROW_TYPE_FLOAT:
276+
return std::make_shared<FloatType>();
277+
case NANOARROW_TYPE_DOUBLE:
278+
return std::make_shared<DoubleType>();
279+
case NANOARROW_TYPE_DECIMAL128:
280+
return std::make_shared<DecimalType>(schema_view.decimal_precision,
281+
schema_view.decimal_scale);
282+
case NANOARROW_TYPE_DATE32:
283+
return std::make_shared<DateType>();
284+
case NANOARROW_TYPE_TIME64:
285+
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
286+
return unexpected<Error>{
287+
{.kind = ErrorKind::kInvalidSchema,
288+
.message = std::format("Unsupported time unit for Arrow time type: {}",
289+
static_cast<int>(schema_view.time_unit))}};
290+
}
291+
return std::make_shared<TimeType>();
292+
case NANOARROW_TYPE_TIMESTAMP: {
293+
bool with_timezone =
294+
schema_view.timezone != nullptr && std::strlen(schema_view.timezone) > 0;
295+
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
296+
return unexpected<Error>{
297+
{.kind = ErrorKind::kInvalidSchema,
298+
.message = std::format("Unsupported time unit for Arrow timestamp type: {}",
299+
static_cast<int>(schema_view.time_unit))}};
300+
}
301+
if (with_timezone) {
302+
return std::make_shared<TimestampTzType>();
303+
} else {
304+
return std::make_shared<TimestampType>();
305+
}
306+
}
307+
case NANOARROW_TYPE_STRING:
308+
return std::make_shared<StringType>();
309+
case NANOARROW_TYPE_BINARY:
310+
return std::make_shared<BinaryType>();
311+
case NANOARROW_TYPE_FIXED_SIZE_BINARY: {
312+
if (auto extension_name = std::string_view(schema_view.extension_name.data,
313+
schema_view.extension_name.size_bytes);
314+
extension_name == kArrowUuidExtensionName) {
315+
if (schema_view.fixed_size != 16) {
316+
return unexpected<Error>{{.kind = ErrorKind::kInvalidSchema,
317+
.message = "UUID type must have a fixed size of 16"}};
318+
}
319+
return std::make_shared<UuidType>();
320+
}
321+
return std::make_shared<FixedType>(schema_view.fixed_size);
322+
}
323+
default:
324+
return unexpected<Error>{
325+
{.kind = ErrorKind::kInvalidSchema,
326+
.message = std::format("Unsupported Arrow type: {}",
327+
ArrowTypeString(schema_view.type))}};
328+
}
329+
}
330+
331+
} // namespace
332+
186333
expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
187334
int32_t schema_id) {
188-
// TODO(wgtmac): Implement this
189-
return unexpected<Error>{
190-
{.kind = ErrorKind::kInvalidSchema, .message = "Not implemented yet"}};
335+
auto type_result = FromArrowSchema(schema);
336+
if (!type_result) {
337+
return unexpected<Error>(type_result.error());
338+
}
339+
340+
auto& type = type_result.value();
341+
if (type->type_id() != TypeId::kStruct) {
342+
return unexpected<Error>{
343+
{.kind = ErrorKind::kInvalidSchema,
344+
.message = "Arrow schema must be a struct type for Iceberg schema"}};
345+
}
346+
347+
auto* struct_type = static_cast<StructType*>(type.get());
348+
std::vector<SchemaField> fields;
349+
fields.reserve(struct_type->fields().size());
350+
for (auto& field : struct_type->fields()) {
351+
fields.emplace_back(std::move(field));
352+
}
353+
return std::make_unique<Schema>(schema_id, std::move(fields));
191354
}
192355

193356
} // namespace iceberg

0 commit comments

Comments
 (0)