Skip to content

Commit ed25bba

Browse files
authored
Implement Array Decoding in arrow-avro (#7559)
# Which issue does this PR close? Part of #4886 Related to #6965 # Rationale for this change Avro supports arrays as a core data type, but previously arrow-avro had incomplete decoding logic to handle them. As a result, any Avro file containing array fields would fail to parse correctly within the Arrow ecosystem. This PR addresses this gap by: 1. Completing the implementation of explicit `Array` -> `List` decoding: It completes the `Decoder::Array` logic that reads array blocks in Avro format and constructs an Arrow `ListArray`. Overall, these changes expand Arrow’s Avro reader capabilities, allowing users to work with array-encoded data in a standardized Arrow format. # What changes are included in this PR? **1. arrow-avro/src/reader/record.rs:** * Completed the Array decoding path which leverages blockwise reads of Avro array data. * Implemented decoder unit tests for Array types. # Are there any user-facing changes? N/A
1 parent f37b114 commit ed25bba

File tree

1 file changed

+113
-10
lines changed

1 file changed

+113
-10
lines changed

arrow-avro/src/reader/record.rs

Lines changed: 113 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ enum Decoder {
113113
String(OffsetBufferBuilder<i32>, Vec<u8>),
114114
/// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
115115
StringView(OffsetBufferBuilder<i32>, Vec<u8>),
116-
List(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
116+
Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
117117
Record(Fields, Vec<Decoder>),
118118
Map(
119119
FieldRef,
@@ -161,7 +161,7 @@ impl Decoder {
161161
Codec::Interval => return nyi("decoding interval"),
162162
Codec::List(item) => {
163163
let decoder = Self::try_new(item)?;
164-
Self::List(
164+
Self::Array(
165165
Arc::new(item.field_with_name("item")),
166166
OffsetBufferBuilder::new(DEFAULT_CAPACITY),
167167
Box::new(decoder),
@@ -223,7 +223,7 @@ impl Decoder {
223223
Self::Binary(offsets, _) | Self::String(offsets, _) | Self::StringView(offsets, _) => {
224224
offsets.push_length(0);
225225
}
226-
Self::List(_, offsets, e) => {
226+
Self::Array(_, offsets, e) => {
227227
offsets.push_length(0);
228228
e.append_null();
229229
}
@@ -256,18 +256,17 @@ impl Decoder {
256256
offsets.push_length(data.len());
257257
values.extend_from_slice(data);
258258
}
259-
Self::List(_, _, _) => {
260-
return Err(ArrowError::NotYetImplemented(
261-
"Decoding ListArray".to_string(),
262-
))
259+
Self::Array(_, off, encoding) => {
260+
let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
261+
off.push_length(total_items);
263262
}
264263
Self::Record(_, encodings) => {
265264
for encoding in encodings {
266265
encoding.decode(buf)?;
267266
}
268267
}
269268
Self::Map(_, koff, moff, kdata, valdec) => {
270-
let newly_added = read_map_blocks(buf, |cur| {
269+
let newly_added = read_blocks(buf, |cur| {
271270
let kb = cur.get_bytes()?;
272271
koff.push_length(kb.len());
273272
kdata.extend_from_slice(kb);
@@ -339,7 +338,7 @@ impl Decoder {
339338

340339
Arc::new(StringViewArray::from(values))
341340
}
342-
Self::List(field, offsets, values) => {
341+
Self::Array(field, offsets, values) => {
343342
let values = values.flush(None)?;
344343
let offsets = flush_offsets(offsets);
345344
Arc::new(ListArray::new(field.clone(), offsets, values, nulls))
@@ -388,7 +387,7 @@ impl Decoder {
388387
}
389388
}
390389

391-
fn read_map_blocks(
390+
fn read_blocks(
392391
buf: &mut AvroCursor,
393392
decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
394393
) -> Result<usize, ArrowError> {
@@ -462,6 +461,17 @@ mod tests {
462461
IntervalMonthDayNanoArray, ListArray, MapArray, StringArray, StructArray,
463462
};
464463

464+
fn encode_avro_int(value: i32) -> Vec<u8> {
465+
let mut buf = Vec::new();
466+
let mut v = (value << 1) ^ (value >> 31);
467+
while v & !0x7F != 0 {
468+
buf.push(((v & 0x7F) | 0x80) as u8);
469+
v >>= 7;
470+
}
471+
buf.push(v as u8);
472+
buf
473+
}
474+
465475
fn encode_avro_long(value: i64) -> Vec<u8> {
466476
let mut buf = Vec::new();
467477
let mut v = (value << 1) ^ (value >> 63);
@@ -531,4 +541,97 @@ mod tests {
531541
assert_eq!(map_arr.len(), 1);
532542
assert_eq!(map_arr.value_length(0), 0);
533543
}
544+
545+
#[test]
546+
fn test_array_decoding() {
547+
let item_dt = avro_from_codec(Codec::Int32);
548+
let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
549+
let mut decoder = Decoder::try_new(&list_dt).unwrap();
550+
let mut row1 = Vec::new();
551+
row1.extend_from_slice(&encode_avro_long(2));
552+
row1.extend_from_slice(&encode_avro_int(10));
553+
row1.extend_from_slice(&encode_avro_int(20));
554+
row1.extend_from_slice(&encode_avro_long(0));
555+
let row2 = encode_avro_long(0);
556+
let mut cursor = AvroCursor::new(&row1);
557+
decoder.decode(&mut cursor).unwrap();
558+
let mut cursor2 = AvroCursor::new(&row2);
559+
decoder.decode(&mut cursor2).unwrap();
560+
let array = decoder.flush(None).unwrap();
561+
let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
562+
assert_eq!(list_arr.len(), 2);
563+
let offsets = list_arr.value_offsets();
564+
assert_eq!(offsets, &[0, 2, 2]);
565+
let values = list_arr.values();
566+
let int_arr = values.as_primitive::<Int32Type>();
567+
assert_eq!(int_arr.len(), 2);
568+
assert_eq!(int_arr.value(0), 10);
569+
assert_eq!(int_arr.value(1), 20);
570+
}
571+
572+
#[test]
573+
fn test_array_decoding_with_negative_block_count() {
574+
let item_dt = avro_from_codec(Codec::Int32);
575+
let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
576+
let mut decoder = Decoder::try_new(&list_dt).unwrap();
577+
let mut data = encode_avro_long(-3);
578+
data.extend_from_slice(&encode_avro_long(12));
579+
data.extend_from_slice(&encode_avro_int(1));
580+
data.extend_from_slice(&encode_avro_int(2));
581+
data.extend_from_slice(&encode_avro_int(3));
582+
data.extend_from_slice(&encode_avro_long(0));
583+
let mut cursor = AvroCursor::new(&data);
584+
decoder.decode(&mut cursor).unwrap();
585+
let array = decoder.flush(None).unwrap();
586+
let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
587+
assert_eq!(list_arr.len(), 1);
588+
assert_eq!(list_arr.value_length(0), 3);
589+
let values = list_arr.values().as_primitive::<Int32Type>();
590+
assert_eq!(values.len(), 3);
591+
assert_eq!(values.value(0), 1);
592+
assert_eq!(values.value(1), 2);
593+
assert_eq!(values.value(2), 3);
594+
}
595+
596+
#[test]
597+
fn test_nested_array_decoding() {
598+
let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
599+
let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
600+
let mut decoder = Decoder::try_new(&nested_ty).unwrap();
601+
let mut buf = Vec::new();
602+
buf.extend(encode_avro_long(1));
603+
buf.extend(encode_avro_long(2));
604+
buf.extend(encode_avro_int(5));
605+
buf.extend(encode_avro_int(6));
606+
buf.extend(encode_avro_long(0));
607+
buf.extend(encode_avro_long(0));
608+
let mut cursor = AvroCursor::new(&buf);
609+
decoder.decode(&mut cursor).unwrap();
610+
let arr = decoder.flush(None).unwrap();
611+
let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
612+
assert_eq!(outer.len(), 1);
613+
assert_eq!(outer.value_length(0), 1);
614+
let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
615+
assert_eq!(inner.len(), 1);
616+
assert_eq!(inner.value_length(0), 2);
617+
let values = inner
618+
.values()
619+
.as_any()
620+
.downcast_ref::<Int32Array>()
621+
.unwrap();
622+
assert_eq!(values.values(), &[5, 6]);
623+
}
624+
625+
#[test]
626+
fn test_array_decoding_empty_array() {
627+
let value_type = avro_from_codec(Codec::Utf8);
628+
let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
629+
let mut decoder = Decoder::try_new(&map_type).unwrap();
630+
let data = encode_avro_long(0);
631+
decoder.decode(&mut AvroCursor::new(&data)).unwrap();
632+
let array = decoder.flush(None).unwrap();
633+
let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
634+
assert_eq!(list_arr.len(), 1);
635+
assert_eq!(list_arr.value_length(0), 0);
636+
}
534637
}

0 commit comments

Comments
 (0)