Skip to content

Commit 7fb6854

Browse files
committed
feat: convert arrow schema to iceberg schema
1 parent faf9cc8 commit 7fb6854

File tree

3 files changed

+392
-5
lines changed

3 files changed

+392
-5
lines changed

src/iceberg/schema.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class ICEBERG_EXPORT Schema : public StructType {
4848
/// evolution.
4949
[[nodiscard]] int32_t schema_id() const;
5050

51-
[[nodiscard]] std::string ToString() const;
51+
[[nodiscard]] std::string ToString() const override;
5252

5353
friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); }
5454

src/iceberg/schema_internal.cc

Lines changed: 165 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ namespace {
3232

3333
constexpr const char* kArrowExtensionName = "ARROW:extension:name";
3434
constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata";
35+
constexpr const char* kArrowUuidExtensionName = "arrow.uuid";
36+
constexpr int32_t kUnknownFieldId = -1;
3537

3638
// Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code.
3739
ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view name,
@@ -141,7 +143,7 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n
141143
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, /*fixed_size=*/16));
142144
NANOARROW_RETURN_NOT_OK(
143145
ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName),
144-
ArrowCharView("arrow.uuid")));
146+
ArrowCharView(kArrowUuidExtensionName)));
145147
} break;
146148
}
147149

@@ -183,11 +185,170 @@ expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out) {
183185
return {};
184186
}
185187

188+
namespace {
189+
190+
int32_t GetFieldId(const ArrowSchema& schema) {
191+
if (schema.metadata == nullptr) {
192+
return kUnknownFieldId;
193+
}
194+
195+
ArrowStringView field_id_key{.data = kFieldIdKey.data(),
196+
.size_bytes = kFieldIdKey.size()};
197+
ArrowStringView field_id_value;
198+
if (ArrowMetadataGetValue(schema.metadata, field_id_key, &field_id_value) !=
199+
NANOARROW_OK) {
200+
return kUnknownFieldId;
201+
}
202+
203+
return std::stoi(std::string(field_id_value.data, field_id_value.size_bytes));
204+
}
205+
206+
expected<std::shared_ptr<Type>, Error> FromArrowSchema(const ArrowSchema& schema) {
207+
auto to_schema_field = [](const ArrowSchema& schema) -> expected<SchemaField, Error> {
208+
auto field_type_result = FromArrowSchema(schema);
209+
if (!field_type_result) {
210+
return unexpected<Error>(field_type_result.error());
211+
}
212+
213+
auto field_id = GetFieldId(schema);
214+
bool is_optional = (schema.flags & ARROW_FLAG_NULLABLE) != 0;
215+
return SchemaField(field_id, schema.name, std::move(field_type_result.value()),
216+
is_optional);
217+
};
218+
219+
ArrowError arrow_error;
220+
ArrowErrorInit(&arrow_error);
221+
222+
ArrowSchemaView schema_view;
223+
if (auto error_code = ArrowSchemaViewInit(&schema_view, &schema, &arrow_error);
224+
error_code != NANOARROW_OK) {
225+
return unexpected<Error>{
226+
{.kind = ErrorKind::kInvalidSchema,
227+
.message = std::format("Failed to read Arrow schema, code: {}, message: {}",
228+
error_code, arrow_error.message)}};
229+
}
230+
231+
switch (schema_view.type) {
232+
case NANOARROW_TYPE_STRUCT: {
233+
std::vector<SchemaField> fields;
234+
fields.reserve(schema.n_children);
235+
236+
for (int i = 0; i < schema.n_children; i++) {
237+
auto field_result = to_schema_field(*schema.children[i]);
238+
if (!field_result) {
239+
return unexpected<Error>(field_result.error());
240+
}
241+
fields.emplace_back(std::move(*field_result));
242+
}
243+
244+
return std::make_shared<StructType>(std::move(fields));
245+
} break;
246+
case NANOARROW_TYPE_LIST: {
247+
auto element_field_result = to_schema_field(*schema.children[0]);
248+
if (!element_field_result) {
249+
return unexpected<Error>(element_field_result.error());
250+
}
251+
return std::make_shared<ListType>(std::move(element_field_result.value()));
252+
} break;
253+
case NANOARROW_TYPE_MAP: {
254+
auto key_field_result = to_schema_field(*schema.children[0]->children[0]);
255+
if (!key_field_result) {
256+
return unexpected<Error>(key_field_result.error());
257+
}
258+
259+
auto value_field_result = to_schema_field(*schema.children[0]->children[1]);
260+
if (!value_field_result) {
261+
return unexpected<Error>(value_field_result.error());
262+
}
263+
264+
return std::make_shared<MapType>(std::move(key_field_result.value()),
265+
std::move(value_field_result.value()));
266+
} break;
267+
case NANOARROW_TYPE_BOOL:
268+
return std::make_shared<BooleanType>();
269+
case NANOARROW_TYPE_INT32:
270+
return std::make_shared<IntType>();
271+
case NANOARROW_TYPE_INT64:
272+
return std::make_shared<LongType>();
273+
case NANOARROW_TYPE_FLOAT:
274+
return std::make_shared<FloatType>();
275+
case NANOARROW_TYPE_DOUBLE:
276+
return std::make_shared<DoubleType>();
277+
case NANOARROW_TYPE_DECIMAL128:
278+
return std::make_shared<DecimalType>(schema_view.decimal_precision,
279+
schema_view.decimal_scale);
280+
case NANOARROW_TYPE_DATE32:
281+
return std::make_shared<DateType>();
282+
case NANOARROW_TYPE_TIME64:
283+
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
284+
return unexpected<Error>{
285+
{.kind = ErrorKind::kInvalidSchema,
286+
.message = std::format("Unsupported time unit for Arrow time type: {}",
287+
static_cast<int>(schema_view.time_unit))}};
288+
}
289+
return std::make_shared<TimeType>();
290+
case NANOARROW_TYPE_TIMESTAMP: {
291+
bool with_timezone =
292+
schema_view.timezone != nullptr && std::strlen(schema_view.timezone) > 0;
293+
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
294+
return unexpected<Error>{
295+
{.kind = ErrorKind::kInvalidSchema,
296+
.message = std::format("Unsupported time unit for Arrow timestamp type: {}",
297+
static_cast<int>(schema_view.time_unit))}};
298+
}
299+
if (with_timezone) {
300+
return std::make_shared<TimestampTzType>();
301+
} else {
302+
return std::make_shared<TimestampType>();
303+
}
304+
}
305+
case NANOARROW_TYPE_STRING:
306+
return std::make_shared<StringType>();
307+
case NANOARROW_TYPE_BINARY:
308+
return std::make_shared<BinaryType>();
309+
case NANOARROW_TYPE_FIXED_SIZE_BINARY: {
310+
if (auto extension_name = std::string_view(schema_view.extension_name.data,
311+
schema_view.extension_name.size_bytes);
312+
extension_name == kArrowUuidExtensionName) {
313+
if (schema_view.fixed_size != 16) {
314+
return unexpected<Error>{{.kind = ErrorKind::kInvalidSchema,
315+
.message = "UUID type must have a fixed size of 16"}};
316+
}
317+
return std::make_shared<UuidType>();
318+
}
319+
return std::make_shared<FixedType>(schema_view.fixed_size);
320+
}
321+
default:
322+
return unexpected<Error>{
323+
{.kind = ErrorKind::kInvalidSchema,
324+
.message = std::format("Unsupported Arrow type: {}",
325+
ArrowTypeString(schema_view.type))}};
326+
}
327+
}
328+
329+
} // namespace
330+
186331
expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
187332
int32_t schema_id) {
188-
// TODO(wgtmac): Implement this
189-
return unexpected<Error>{
190-
{.kind = ErrorKind::kInvalidSchema, .message = "Not implemented yet"}};
333+
auto type_result = FromArrowSchema(schema);
334+
if (!type_result) {
335+
return unexpected<Error>(type_result.error());
336+
}
337+
338+
auto& type = type_result.value();
339+
if (type->type_id() != TypeId::kStruct) {
340+
return unexpected<Error>{
341+
{.kind = ErrorKind::kInvalidSchema,
342+
.message = "Arrow schema must be a struct type for Iceberg schema"}};
343+
}
344+
345+
auto* struct_type = static_cast<StructType*>(type.get());
346+
std::vector<SchemaField> fields;
347+
fields.reserve(struct_type->fields().size());
348+
for (auto& field : struct_type->fields()) {
349+
fields.emplace_back(std::move(field));
350+
}
351+
return std::make_unique<Schema>(schema_id, std::move(fields));
191352
}
192353

193354
} // namespace iceberg

0 commit comments

Comments
 (0)