Skip to content

Commit 9d4b195

Browse files
committed
feat(avro): extract avro datum from arrow array
1 parent 8ecee31 commit 9d4b195

File tree

2 files changed

+224
-0
lines changed

2 files changed

+224
-0
lines changed

src/iceberg/avro/avro_data_util.cc

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
* under the License.
1818
*/
1919

20+
#include <ranges>
21+
2022
#include <arrow/array/builder_binary.h>
2123
#include <arrow/array/builder_decimal.h>
2224
#include <arrow/array/builder_nested.h>
@@ -451,4 +453,217 @@ Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
451453
projected_schema, array_builder);
452454
}
453455

456+
namespace {
457+
458+
// ToAvroNodeVisitor uses 0 for null branch and 1 for value branch.
459+
constexpr int64_t kNullBranch = 0;
460+
constexpr int64_t kValueBranch = 1;
461+
462+
} // namespace
463+
464+
Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index,
465+
::avro::GenericDatum* datum) {
466+
if (index < 0 || index >= array.length()) {
467+
return InvalidArgument("Cannot extract datum from array at index {} of length {}",
468+
index, array.length());
469+
}
470+
471+
if (array.IsNull(index)) {
472+
if (!datum->isUnion()) [[unlikely]] {
473+
return InvalidSchema("Cannot extract null to non-union type: {}",
474+
::avro::toString(datum->type()));
475+
}
476+
datum->selectBranch(kNullBranch);
477+
return {};
478+
}
479+
480+
if (datum->isUnion()) {
481+
datum->selectBranch(kValueBranch);
482+
}
483+
484+
switch (array.type()->id()) {
485+
case ::arrow::Type::BOOL: {
486+
const auto& bool_array =
487+
internal::checked_cast<const ::arrow::BooleanArray&>(array);
488+
datum->value<bool>() = bool_array.Value(index);
489+
return {};
490+
}
491+
492+
case ::arrow::Type::INT32: {
493+
const auto& int32_array = internal::checked_cast<const ::arrow::Int32Array&>(array);
494+
datum->value<int32_t>() = int32_array.Value(index);
495+
return {};
496+
}
497+
498+
case ::arrow::Type::INT64: {
499+
const auto& int64_array = internal::checked_cast<const ::arrow::Int64Array&>(array);
500+
datum->value<int64_t>() = int64_array.Value(index);
501+
return {};
502+
}
503+
504+
case ::arrow::Type::FLOAT: {
505+
const auto& float_array = internal::checked_cast<const ::arrow::FloatArray&>(array);
506+
datum->value<float>() = float_array.Value(index);
507+
return {};
508+
}
509+
510+
case ::arrow::Type::DOUBLE: {
511+
const auto& double_array =
512+
internal::checked_cast<const ::arrow::DoubleArray&>(array);
513+
datum->value<double>() = double_array.Value(index);
514+
return {};
515+
}
516+
517+
case ::arrow::Type::STRING: {
518+
const auto& string_array =
519+
internal::checked_cast<const ::arrow::StringArray&>(array);
520+
datum->value<std::string>() = string_array.GetString(index);
521+
return {};
522+
}
523+
524+
case ::arrow::Type::BINARY: {
525+
const auto& binary_array =
526+
internal::checked_cast<const ::arrow::BinaryArray&>(array);
527+
std::string_view value = binary_array.GetView(index);
528+
datum->value<std::vector<uint8_t>>().assign(
529+
reinterpret_cast<const uint8_t*>(value.data()),
530+
reinterpret_cast<const uint8_t*>(value.data()) + value.size());
531+
return {};
532+
}
533+
534+
case ::arrow::Type::FIXED_SIZE_BINARY: {
535+
const auto& fixed_array =
536+
internal::checked_cast<const ::arrow::FixedSizeBinaryArray&>(array);
537+
std::string_view value = fixed_array.GetView(index);
538+
auto& fixed_datum = datum->value<::avro::GenericFixed>();
539+
fixed_datum.value().assign(
540+
reinterpret_cast<const char*>(value.data()),
541+
reinterpret_cast<const char*>(value.data()) + value.size());
542+
return {};
543+
}
544+
545+
case ::arrow::Type::DECIMAL128: {
546+
const auto& decimal_array =
547+
internal::checked_cast<const ::arrow::Decimal128Array&>(array);
548+
std::string_view decimal_value = decimal_array.GetView(index);
549+
auto& fixed_datum = datum->value<::avro::GenericFixed>();
550+
auto& bytes = fixed_datum.value();
551+
bytes.assign(decimal_value.begin(), decimal_value.end());
552+
std::ranges::reverse(bytes);
553+
return {};
554+
}
555+
556+
case ::arrow::Type::DATE32: {
557+
const auto& date_array = internal::checked_cast<const ::arrow::Date32Array&>(array);
558+
datum->value<int32_t>() = date_array.Value(index);
559+
return {};
560+
}
561+
562+
case ::arrow::Type::TIME64: {
563+
const auto& time_array = internal::checked_cast<const ::arrow::Time64Array&>(array);
564+
datum->value<int64_t>() = time_array.Value(index);
565+
return {};
566+
}
567+
568+
// For both timestamp and timestamp_tz with time unit as microsecond.
569+
case ::arrow::Type::TIMESTAMP: {
570+
const auto& timestamp_array =
571+
internal::checked_cast<const ::arrow::TimestampArray&>(array);
572+
datum->value<int64_t>() = timestamp_array.Value(index);
573+
return {};
574+
}
575+
576+
// TODO(gangwu): support uuid type.
577+
578+
case ::arrow::Type::STRUCT: {
579+
const auto& struct_array =
580+
internal::checked_cast<const ::arrow::StructArray&>(array);
581+
auto& record = datum->value<::avro::GenericRecord>();
582+
for (int i = 0; i < struct_array.num_fields(); ++i) {
583+
ICEBERG_RETURN_UNEXPECTED(
584+
ExtractDatumFromArray(*struct_array.field(i), index, &record.fieldAt(i)));
585+
}
586+
return {};
587+
}
588+
589+
// TODO(gangwu): support LARGE_LIST.
590+
case ::arrow::Type::LIST: {
591+
const auto& list_array = internal::checked_cast<const ::arrow::ListArray&>(array);
592+
auto& avro_array = datum->value<::avro::GenericArray>();
593+
auto& elements = avro_array.value();
594+
595+
auto start = list_array.value_offset(index);
596+
auto end = list_array.value_offset(index + 1);
597+
auto length = end - start;
598+
599+
auto values = list_array.values();
600+
elements.resize(length, ::avro::GenericDatum(avro_array.schema()->leafAt(0)));
601+
602+
for (int64_t i = 0; i < length; ++i) {
603+
ICEBERG_RETURN_UNEXPECTED(
604+
ExtractDatumFromArray(*values, start + i, &elements[i]));
605+
}
606+
return {};
607+
}
608+
609+
case ::arrow::Type::MAP: {
610+
const auto& map_array = internal::checked_cast<const ::arrow::MapArray&>(array);
611+
auto start = map_array.value_offset(index);
612+
auto end = map_array.value_offset(index + 1);
613+
auto length = end - start;
614+
615+
auto keys = map_array.keys();
616+
auto items = map_array.items();
617+
618+
if (datum->type() == ::avro::AVRO_MAP) {
619+
// Handle regular Avro map
620+
auto& avro_map = datum->value<::avro::GenericMap>();
621+
auto value_node = avro_map.schema()->leafAt(1);
622+
623+
auto& map_entries = avro_map.value();
624+
map_entries.resize(
625+
length, std::make_pair(std::string(), ::avro::GenericDatum(value_node)));
626+
627+
const auto& key_array =
628+
internal::checked_cast<const ::arrow::StringArray&>(*keys);
629+
630+
for (int64_t i = 0; i < length; ++i) {
631+
auto& map_entry = map_entries[i];
632+
map_entry.first = key_array.GetString(start + i);
633+
ICEBERG_RETURN_UNEXPECTED(
634+
ExtractDatumFromArray(*items, start + i, &map_entry.second));
635+
}
636+
} else if (datum->type() == ::avro::AVRO_ARRAY) {
637+
// Handle array-based map (list<struct<key, value>>)
638+
auto& avro_array = datum->value<::avro::GenericArray>();
639+
auto record_node = avro_array.schema()->leafAt(0);
640+
if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 2) {
641+
return InvalidArgument(
642+
"Expected Avro record with 2 fields for map value, got: {}",
643+
ToString(record_node));
644+
}
645+
646+
auto& elements = avro_array.value();
647+
elements.resize(length, ::avro::GenericDatum(record_node));
648+
649+
for (int64_t i = 0; i < length; ++i) {
650+
auto& record = elements[i].value<::avro::GenericRecord>();
651+
ICEBERG_RETURN_UNEXPECTED(
652+
ExtractDatumFromArray(*keys, start + i, &record.fieldAt(0)));
653+
ICEBERG_RETURN_UNEXPECTED(
654+
ExtractDatumFromArray(*items, start + i, &record.fieldAt(1)));
655+
}
656+
} else {
657+
return InvalidArgument("Unsupported Avro type for map: {}",
658+
static_cast<int>(datum->type()));
659+
}
660+
return {};
661+
}
662+
663+
default:
664+
return InvalidArgument("Unsupported Arrow array type: {}",
665+
array.type()->ToString());
666+
}
667+
}
668+
454669
} // namespace iceberg::avro

src/iceberg/avro/avro_data_util_internal.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,13 @@ Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
4343
const Schema& projected_schema,
4444
::arrow::ArrayBuilder* array_builder);
4545

46+
/// \brief Extract an Avro datum from an Arrow array.
47+
///
48+
/// \param array The Arrow array to extract from.
49+
/// \param index The index of the element to extract.
50+
/// \param datum The Avro datum to extract to. Its Avro type should be consistent with the
51+
/// Arrow type.
52+
Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index,
53+
::avro::GenericDatum* datum);
54+
4655
} // namespace iceberg::avro

0 commit comments

Comments
 (0)