Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Cargo.lock
target
rusty-tags.vi
.claude
.history
.flatbuffers/
.idea/
Expand Down
9 changes: 8 additions & 1 deletion arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,14 @@ pub(crate) fn get_fb_field_type<'a>(
let mut builder = crate::DecimalBuilder::new(fbb);
builder.add_precision(*precision as i32);
builder.add_scale(*scale as i32);
builder.add_bitWidth(128);
let bit_width = if *precision > 1 && *precision <= 18 {
64
} else if *precision <= 27 {
96
} else {
128
};
builder.add_bitWidth(bit_width);
FBFieldType {
type_type: crate::Type::Decimal,
type_: builder.finish().as_union_value(),
Expand Down
58 changes: 54 additions & 4 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ impl<T> ArrowReaderBuilder<T> {
/// Enables splitting of row group I/O into multiple reads, with the goal of loading less data
/// into memory at a time.
pub fn with_split_row_group_reads(self, split_row_group_reads: bool) -> Self {
Self { split_row_group_reads, ..self }
Self {
split_row_group_reads,
..self
}
}

/// Only read data from the provided row group indexes
Expand Down Expand Up @@ -329,7 +332,7 @@ impl ArrowReaderOptions {
///
/// // Create the reader and read the data using the supplied schema.
/// let mut reader = builder.build().unwrap();
/// let _batch = reader.next().unwrap().unwrap();
/// let _batch = reader.next().unwrap().unwrap();
/// ```
pub fn with_schema(self, schema: SchemaRef) -> Self {
Self {
Expand Down Expand Up @@ -4069,7 +4072,7 @@ mod tests {
fn test_decimal_roundtrip<T: DecimalType>() {
// Precision <= 9 -> INT32
// Precision <= 18 -> INT64
// Precision > 18 -> FIXED_LEN_BYTE_ARRAY
// Precision > 27 -> FIXED_LEN_BYTE_ARRAY

let d = |values: Vec<usize>, p: u8| {
let iter = values.into_iter().map(T::Native::usize_as);
Expand All @@ -4081,7 +4084,7 @@ mod tests {
let d1 = d(vec![1, 2, 3, 4, 5], 9);
let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
let d4 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 28);

let batch = RecordBatch::try_from_iter([
("d1", Arc::new(d1) as ArrayRef),
Expand Down Expand Up @@ -4113,10 +4116,57 @@ mod tests {
assert_eq!(batch, out);
}

fn test_decimal_roundtrip_int96() {
// Decimal128 Precision > 18 && <= 27 -> INT96
// Decimal256 Precision > 18 && <= 27 -> FIXED_LEN_BYTE_ARRAY

fn d<T: DecimalType>(values: Vec<usize>, p: u8) -> PrimitiveArray<T> {
let iter = values.into_iter().map(T::Native::usize_as);
PrimitiveArray::<T>::from_iter_values(iter)
.with_precision_and_scale(p, 2)
.unwrap()
}

let d1 = d::<Decimal128Type>(vec![1, 2, 3, 4, 10.pow(18) - 1], 19);
let d2 = d::<Decimal128Type>(vec![1, 2, 3, 4, 10.pow(18) - 1], 27);
let d3 = d::<Decimal256Type>(vec![1, 2, 3, 4, 10.pow(18) - 1], 19);
let d4 = d::<Decimal256Type>(vec![1, 2, 3, 4, 10.pow(18) - 1], 27);

let batch = RecordBatch::try_from_iter([
("d1", Arc::new(d1) as ArrayRef),
("d2", Arc::new(d2) as ArrayRef),
("d3", Arc::new(d3) as ArrayRef),
("d4", Arc::new(d4) as ArrayRef),
])
.unwrap();

let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
let t1 = builder.parquet_schema().columns()[0].physical_type();
assert_eq!(t1, PhysicalType::INT96);
let t2 = builder.parquet_schema().columns()[1].physical_type();
assert_eq!(t2, PhysicalType::INT96);
let t3 = builder.parquet_schema().columns()[2].physical_type();
assert_eq!(t3, PhysicalType::FIXED_LEN_BYTE_ARRAY);
let t4 = builder.parquet_schema().columns()[3].physical_type();
assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);

let mut reader = builder.build().unwrap();
assert_eq!(batch.schema(), reader.schema());

let out = reader.next().unwrap().unwrap();
assert_eq!(batch, out);
}

#[test]
fn test_decimal() {
test_decimal_roundtrip::<Decimal128Type>();
test_decimal_roundtrip::<Decimal256Type>();
test_decimal_roundtrip_int96();
}

#[test]
Expand Down
5 changes: 5 additions & 0 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,11 @@ where
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::INT96(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::FLOAT(native_index) => native_index
.indexes
.iter()
Expand Down
27 changes: 24 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::column::writer::encoder::ColumnValueEncoder;
use crate::column::writer::{
get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
};
use crate::data_type::Int96;
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::encryption::{
Expand Down Expand Up @@ -1044,9 +1045,16 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
}
}
}
ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
unreachable!("Currently unreachable because data type not supported")
}
ColumnWriter::Int96ColumnWriter(ref mut typed) => match column.data_type() {
ArrowDataType::Decimal128(_, _) => {
// Cube: Decimal96 backwards compatibility - write Decimal128 as INT96
let array = column.as_primitive::<Decimal128Type>();
let int96_values: Vec<Int96> =
array.values().iter().map(|v| i128_to_int96(*v)).collect();
typed.write_batch(&int96_values, levels.def_levels(), levels.rep_levels())
}
_ => unreachable!("INT96 column writer only supports Decimal128 for Decimal96"),
},
ColumnWriter::FloatColumnWriter(ref mut typed) => {
let array = column.as_primitive::<Float32Type>();
write_primitive(typed, array.values(), levels)
Expand Down Expand Up @@ -1225,6 +1233,19 @@ fn get_fsb_array_slice(
values
}

/// Cube: Convert i128 to Int96 for Decimal96 backwards compatibility
/// Int96 stores 12 bytes (96 bits), we take lower 12 bytes from i128
fn i128_to_int96(value: i128) -> Int96 {
let bytes = value.to_le_bytes();
let mut int96 = Int96::new();
int96.set_data(
u32::from_le_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]),
u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
);
int96
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
29 changes: 21 additions & 8 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,17 +855,21 @@ where
let selection: Option<RowSelection>;
let row_group_idx: usize;

if let Some((active_row_group_idx, remaining_selection)) = self.active_row_group_and_selection.take() {
if let Some((active_row_group_idx, remaining_selection)) =
self.active_row_group_and_selection.take()
{
if !remaining_selection.selects_any() {
continue;
} else {
reader = self.reader.take().expect("lost reader");

let new_remaining_selection = remaining_selection.clone().offset(self.batch_size);
let new_remaining_selection =
remaining_selection.clone().offset(self.batch_size);
selection = Some(remaining_selection.limit(self.batch_size));
row_group_idx = active_row_group_idx;

self.active_row_group_and_selection = Some((active_row_group_idx, new_remaining_selection));
self.active_row_group_and_selection =
Some((active_row_group_idx, new_remaining_selection));
}
} else {
row_group_idx = match self.row_groups.pop_front() {
Expand All @@ -878,18 +882,27 @@ where
let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;

if self.split_row_group_reads {
let remaining_selection = self.selection.as_mut().map_or_else(|| RowSelection::from_consecutive_ranges([0..row_count].into_iter(), row_count), |s| s.split_off(row_count));

let new_remaining_selection = remaining_selection.clone().offset(self.batch_size);
let remaining_selection = self.selection.as_mut().map_or_else(
|| {
RowSelection::from_consecutive_ranges(
[0..row_count].into_iter(),
row_count,
)
},
|s| s.split_off(row_count),
);

let new_remaining_selection =
remaining_selection.clone().offset(self.batch_size);
selection = Some(remaining_selection.limit(self.batch_size));

self.active_row_group_and_selection = Some((row_group_idx, new_remaining_selection));
self.active_row_group_and_selection =
Some((row_group_idx, new_remaining_selection));
} else {
selection = self.selection.as_mut().map(|s| s.split_off(row_count));
}
}


let fut = reader
.read_row_group(
row_group_idx,
Expand Down
3 changes: 3 additions & 0 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,9 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
(PhysicalType::INT32, -1)
} else if *precision <= 18 {
(PhysicalType::INT64, -1)
} else if *precision <= 27 && matches!(field.data_type(), DataType::Decimal128(_, _)) {
// For backward compatibility with older Cube Store versions
(PhysicalType::INT96, -1)
} else {
(
PhysicalType::FIXED_LEN_BYTE_ARRAY,
Expand Down
6 changes: 4 additions & 2 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::file::{
decrypt_module, ParquetEncryptionConfig, ParquetEncryptionKey, ParquetEncryptionMode,
RandomFileIdentifier, AAD_FILE_UNIQUE_SIZE, PARQUET_KEY_HASH_LENGTH,
},
PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE_READONLY, PARQUET_MAGIC_ENCRYPTED_FOOTER,
PARQUET_MAGIC_ENCRYPTED_FOOTER, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE_READONLY,
};
use crate::file::{FOOTER_SIZE, PARQUET_MAGIC};
use crate::format::{
Expand Down Expand Up @@ -755,7 +755,9 @@ impl ParquetMetaDataReader {
}
}
encrypted_footer = false;
} else if trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER || trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE_READONLY {
} else if trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER
|| trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE_READONLY
{
let has_keys = encryption_config.as_ref().map_or(false, |config| {
config
.read_keys()
Expand Down
5 changes: 3 additions & 2 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,8 @@ impl<R: ChunkReader> SerializedPageReader<R> {
// This assumes we have either a data page or dictionary page. INDEX_PAGE is an
// "unknown page type" and encryption would fail if we encountered it -- but our
// encrypted files don't have it.
(_aad_module_type, aad_header_module_type) = Self::module_types(aad_page_ordinal);
(_aad_module_type, aad_header_module_type) =
Self::module_types(aad_page_ordinal);

if *remaining_bytes == 0 {
return Ok(None);
Expand Down Expand Up @@ -1441,7 +1442,7 @@ mod tests {
let row_group_metadata = file_reader.metadata.row_group(row_group);
let props = Arc::clone(&file_reader.props);
let f = Arc::clone(&file_reader.chunk_reader);
assert!(file_reader.metadata.file_encryption_info().is_none()); // We pass None to the SerializedPageReader below
assert!(file_reader.metadata.file_encryption_info().is_none()); // We pass None to the SerializedPageReader below
SerializedRowGroupReader::new(
f,
&None,
Expand Down
14 changes: 6 additions & 8 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,12 @@ fn write_bloom_filters<W: Write + Send>(
// write bloom filter to the file

let row_group_ordinal: i16 = row_group.ordinal().expect("Missing row group ordinal");
let row_group_idx: u16 = row_group_ordinal
.try_into()
.map_err(|_| {
ParquetError::General(format!(
"Negative row group ordinal: {})",
row_group.ordinal().unwrap()
))
})?;
let row_group_idx: u16 = row_group_ordinal.try_into().map_err(|_| {
ParquetError::General(format!(
"Negative row group ordinal: {})",
row_group.ordinal().unwrap()
))
})?;
let row_group_idx = row_group_idx as usize;
for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
Expand Down
Loading