Skip to content

Commit a876b59

Browse files
committed
Support arrow readers for strings with DELTA_BYTE_ARRAY encoding
1 parent 00c7ff3 commit a876b59

File tree

1 file changed

+164
-2
lines changed

1 file changed

+164
-2
lines changed

parquet/src/arrow/arrow_array_reader.rs

Lines changed: 164 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use super::array_reader::ArrayReader;
1919
use crate::arrow::schema::parquet_to_arrow_field;
2020
use crate::basic::Encoding;
21+
use crate::data_type::{ByteArray, ByteArrayType};
22+
use crate::decoding::{Decoder, DeltaByteArrayDecoder};
2123
use crate::errors::{ParquetError, Result};
2224
use crate::{
2325
column::page::{Page, PageIterator},
@@ -485,7 +487,10 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
485487
// Encoding::RLE => Box::new(RleValueDecoder::new()),
486488
// Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
487489
// Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
488-
// Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
490+
Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayValueDecoder::new(
491+
values_buffer,
492+
num_values,
493+
)?)),
489494
e => return Err(nyi_err!("Encoding {} is not supported", e)),
490495
}
491496
}
@@ -1074,6 +1079,39 @@ impl ValueDecoder for VariableLenDictionaryDecoder {
10741079
}
10751080
}
10761081

1082+
pub(crate) struct DeltaByteArrayValueDecoder {
1083+
decoder: DeltaByteArrayDecoder<ByteArrayType>,
1084+
}
1085+
1086+
impl DeltaByteArrayValueDecoder {
1087+
pub fn new(data: ByteBufferPtr, num_values: usize) -> Result<Self> {
1088+
let mut decoder = DeltaByteArrayDecoder::new();
1089+
decoder.set_data(data, num_values)?;
1090+
Ok(Self { decoder })
1091+
}
1092+
}
1093+
1094+
impl ValueDecoder for DeltaByteArrayValueDecoder {
1095+
fn read_value_bytes(
1096+
&mut self,
1097+
mut num_values: usize,
1098+
read_bytes: &mut dyn FnMut(&[u8], usize),
1099+
) -> Result<usize> {
1100+
num_values = std::cmp::min(num_values, self.decoder.values_left());
1101+
let mut values_read = 0;
1102+
while values_read < num_values {
1103+
let mut buf = [ByteArray::new()];
1104+
let num_read = self.decoder.get(&mut buf)?;
1105+
assert_eq!(num_read, 1);
1106+
1107+
read_bytes(buf[0].data(), 1);
1108+
1109+
values_read += 1;
1110+
}
1111+
Ok(values_read)
1112+
}
1113+
}
1114+
10771115
use arrow::datatypes::ArrowPrimitiveType;
10781116

10791117
pub struct PrimitiveArrayConverter<T: ArrowPrimitiveType> {
@@ -1163,9 +1201,16 @@ impl ArrayConverter for StringArrayConverter {
11631201
#[cfg(test)]
11641202
mod tests {
11651203
use super::*;
1204+
use crate::arrow::{ArrowReader, ParquetFileArrowReader};
1205+
use crate::basic::ConvertedType;
11661206
use crate::column::page::Page;
1207+
use crate::column::writer::ColumnWriter;
11671208
use crate::data_type::ByteArray;
11681209
use crate::data_type::ByteArrayType;
1210+
use crate::file::properties::WriterProperties;
1211+
use crate::file::reader::SerializedFileReader;
1212+
use crate::file::serialized_reader::SliceableCursor;
1213+
use crate::file::writer::{FileWriter, SerializedFileWriter, TryClone};
11691214
use crate::schema::parser::parse_message_type;
11701215
use crate::schema::types::SchemaDescriptor;
11711216
use crate::util::test_common::page_util::{
@@ -1177,7 +1222,8 @@ mod tests {
11771222
use arrow::array::{PrimitiveArray, StringArray};
11781223
use arrow::datatypes::Int32Type as ArrowInt32;
11791224
use rand::{distributions::uniform::SampleUniform, thread_rng, Rng};
1180-
use std::sync::Arc;
1225+
use std::io::{Cursor, Seek, SeekFrom, Write};
1226+
use std::sync::{Arc, Mutex};
11811227

11821228
/// Iterator for testing reading empty columns
11831229
struct EmptyPageIterator {
@@ -1559,4 +1605,120 @@ mod tests {
15591605
array_reader.get_rep_levels()
15601606
);
15611607
}
1608+
1609+
/// Allows to write parquet into memory. Intended only for use in tests.
1610+
#[derive(Clone)]
1611+
struct VecWriter {
1612+
data: Arc<Mutex<Cursor<Vec<u8>>>>,
1613+
}
1614+
1615+
impl VecWriter {
1616+
pub fn new() -> VecWriter {
1617+
VecWriter {
1618+
data: Arc::new(Mutex::new(Cursor::new(Vec::new()))),
1619+
}
1620+
}
1621+
1622+
pub fn consume(self) -> Vec<u8> {
1623+
Arc::try_unwrap(self.data)
1624+
.unwrap()
1625+
.into_inner()
1626+
.unwrap()
1627+
.into_inner()
1628+
}
1629+
}
1630+
1631+
impl TryClone for VecWriter {
1632+
fn try_clone(&self) -> std::io::Result<Self> {
1633+
Ok(self.clone())
1634+
}
1635+
}
1636+
1637+
impl Seek for VecWriter {
1638+
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
1639+
self.data.lock().unwrap().seek(pos)
1640+
}
1641+
1642+
fn stream_position(&mut self) -> std::io::Result<u64> {
1643+
self.data.lock().unwrap().stream_position()
1644+
}
1645+
}
1646+
1647+
impl Write for VecWriter {
1648+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1649+
self.data.lock().unwrap().write(buf)
1650+
}
1651+
1652+
fn flush(&mut self) -> std::io::Result<()> {
1653+
self.data.lock().unwrap().flush()
1654+
}
1655+
}
1656+
1657+
#[test]
1658+
fn test_string_delta_byte_array() {
1659+
use crate::basic;
1660+
use crate::schema::types::Type;
1661+
1662+
let data = VecWriter::new();
1663+
let schema = Arc::new(
1664+
Type::group_type_builder("string_test")
1665+
.with_fields(&mut vec![Arc::new(
1666+
Type::primitive_type_builder("c", basic::Type::BYTE_ARRAY)
1667+
.with_converted_type(ConvertedType::UTF8)
1668+
.build()
1669+
.unwrap(),
1670+
)])
1671+
.build()
1672+
.unwrap(),
1673+
);
1674+
// Disable dictionary and use the fallback encoding.
1675+
let p = Arc::new(
1676+
WriterProperties::builder()
1677+
.set_dictionary_enabled(false)
1678+
.set_encoding(Encoding::DELTA_BYTE_ARRAY)
1679+
.build(),
1680+
);
1681+
// Write a few strings.
1682+
let mut w = SerializedFileWriter::new(data.clone(), schema, p).unwrap();
1683+
let mut rg = w.next_row_group().unwrap();
1684+
let mut c = rg.next_column().unwrap().unwrap();
1685+
match &mut c {
1686+
ColumnWriter::ByteArrayColumnWriter(c) => {
1687+
c.write_batch(
1688+
&[ByteArray::from("foo"), ByteArray::from("bar")],
1689+
Some(&[1, 1]),
1690+
Some(&[0, 0]),
1691+
)
1692+
.unwrap();
1693+
}
1694+
_ => panic!("unexpected column"),
1695+
};
1696+
rg.close_column(c).unwrap();
1697+
w.close_row_group(rg).unwrap();
1698+
w.close().unwrap();
1699+
std::mem::drop(w);
1700+
1701+
// Check we can read them back.
1702+
let r = SerializedFileReader::new(SliceableCursor::new(Arc::new(data.consume())))
1703+
.unwrap();
1704+
let mut r = ParquetFileArrowReader::new(Arc::new(r));
1705+
1706+
let batch = r
1707+
.get_record_reader_by_columns([0], 1024)
1708+
.unwrap()
1709+
.next()
1710+
.unwrap()
1711+
.unwrap();
1712+
assert_eq!(batch.columns().len(), 1);
1713+
1714+
let strings = batch
1715+
.column(0)
1716+
.as_any()
1717+
.downcast_ref::<StringArray>()
1718+
.unwrap();
1719+
assert_eq!(
1720+
strings.into_iter().collect::<Vec<_>>(),
1721+
vec![Some("foo"), Some("bar")]
1722+
);
1723+
}
15621724
}

0 commit comments

Comments
 (0)