Skip to content

Commit 9e3dd6c

Browse files
authored
feat: backward compatibility of decimal (#48)
1 parent 03cb44c commit 9e3dd6c

File tree

10 files changed

+129
-28
lines changed

10 files changed

+129
-28
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
Cargo.lock
22
target
33
rusty-tags.vi
4+
.claude
45
.history
56
.flatbuffers/
67
.idea/

arrow-ipc/src/convert.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,14 @@ pub(crate) fn get_fb_field_type<'a>(
837837
let mut builder = crate::DecimalBuilder::new(fbb);
838838
builder.add_precision(*precision as i32);
839839
builder.add_scale(*scale as i32);
840-
builder.add_bitWidth(128);
840+
let bit_width = if *precision > 1 && *precision <= 18 {
841+
64
842+
} else if *precision <= 27 {
843+
96
844+
} else {
845+
128
846+
};
847+
builder.add_bitWidth(bit_width);
841848
FBFieldType {
842849
type_type: crate::Type::Decimal,
843850
type_: builder.finish().as_union_value(),

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,10 @@ impl<T> ArrowReaderBuilder<T> {
121121
/// Enables splitting of row group I/O into multiple reads, with the goal of loading less data
122122
/// into memory at a time.
123123
pub fn with_split_row_group_reads(self, split_row_group_reads: bool) -> Self {
124-
Self { split_row_group_reads, ..self }
124+
Self {
125+
split_row_group_reads,
126+
..self
127+
}
125128
}
126129

127130
/// Only read data from the provided row group indexes
@@ -329,7 +332,7 @@ impl ArrowReaderOptions {
329332
///
330333
/// // Create the reader and read the data using the supplied schema.
331334
/// let mut reader = builder.build().unwrap();
332-
/// let _batch = reader.next().unwrap().unwrap();
335+
/// let _batch = reader.next().unwrap().unwrap();
333336
/// ```
334337
pub fn with_schema(self, schema: SchemaRef) -> Self {
335338
Self {
@@ -4069,7 +4072,7 @@ mod tests {
40694072
fn test_decimal_roundtrip<T: DecimalType>() {
40704073
// Precision <= 9 -> INT32
40714074
// Precision <= 18 -> INT64
4072-
// Precision > 18 -> FIXED_LEN_BYTE_ARRAY
4075+
// Precision > 27 -> FIXED_LEN_BYTE_ARRAY
40734076

40744077
let d = |values: Vec<usize>, p: u8| {
40754078
let iter = values.into_iter().map(T::Native::usize_as);
@@ -4081,7 +4084,7 @@ mod tests {
40814084
let d1 = d(vec![1, 2, 3, 4, 5], 9);
40824085
let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
40834086
let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4084-
let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4087+
let d4 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 28);
40854088

40864089
let batch = RecordBatch::try_from_iter([
40874090
("d1", Arc::new(d1) as ArrayRef),
@@ -4113,10 +4116,57 @@ mod tests {
41134116
assert_eq!(batch, out);
41144117
}
41154118

4119+
fn test_decimal_roundtrip_int96() {
4120+
// Decimal128 Precision > 18 && <= 27 -> INT96
4121+
// Decimal256 Precision > 18 && <= 27 -> FIXED_LEN_BYTE_ARRAY
4122+
4123+
fn d<T: DecimalType>(values: Vec<usize>, p: u8) -> PrimitiveArray<T> {
4124+
let iter = values.into_iter().map(T::Native::usize_as);
4125+
PrimitiveArray::<T>::from_iter_values(iter)
4126+
.with_precision_and_scale(p, 2)
4127+
.unwrap()
4128+
}
4129+
4130+
let d1 = d::<Decimal128Type>(vec![1, 2, 3, 4, 10.pow(18) - 1], 19);
4131+
let d2 = d::<Decimal128Type>(vec![1, 2, 3, 4, 10.pow(18) - 1], 27);
4132+
let d3 = d::<Decimal256Type>(vec![1, 2, 3, 4, 10.pow(18) - 1], 19);
4133+
let d4 = d::<Decimal256Type>(vec![1, 2, 3, 4, 10.pow(18) - 1], 27);
4134+
4135+
let batch = RecordBatch::try_from_iter([
4136+
("d1", Arc::new(d1) as ArrayRef),
4137+
("d2", Arc::new(d2) as ArrayRef),
4138+
("d3", Arc::new(d3) as ArrayRef),
4139+
("d4", Arc::new(d4) as ArrayRef),
4140+
])
4141+
.unwrap();
4142+
4143+
let mut buffer = Vec::with_capacity(1024);
4144+
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4145+
writer.write(&batch).unwrap();
4146+
writer.close().unwrap();
4147+
4148+
let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4149+
let t1 = builder.parquet_schema().columns()[0].physical_type();
4150+
assert_eq!(t1, PhysicalType::INT96);
4151+
let t2 = builder.parquet_schema().columns()[1].physical_type();
4152+
assert_eq!(t2, PhysicalType::INT96);
4153+
let t3 = builder.parquet_schema().columns()[2].physical_type();
4154+
assert_eq!(t3, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4155+
let t4 = builder.parquet_schema().columns()[3].physical_type();
4156+
assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4157+
4158+
let mut reader = builder.build().unwrap();
4159+
assert_eq!(batch.schema(), reader.schema());
4160+
4161+
let out = reader.next().unwrap().unwrap();
4162+
assert_eq!(batch, out);
4163+
}
4164+
41164165
#[test]
41174166
fn test_decimal() {
41184167
test_decimal_roundtrip::<Decimal128Type>();
41194168
test_decimal_roundtrip::<Decimal256Type>();
4169+
test_decimal_roundtrip_int96();
41204170
}
41214171

41224172
#[test]

parquet/src/arrow/arrow_reader/statistics.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,6 +1139,11 @@ where
11391139
.iter()
11401140
.map(|x| x.null_count.map(|x| x as u64))
11411141
.collect::<Vec<_>>(),
1142+
Index::INT96(native_index) => native_index
1143+
.indexes
1144+
.iter()
1145+
.map(|x| x.null_count.map(|x| x as u64))
1146+
.collect::<Vec<_>>(),
11421147
Index::FLOAT(native_index) => native_index
11431148
.indexes
11441149
.iter()

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::column::writer::encoder::ColumnValueEncoder;
3939
use crate::column::writer::{
4040
get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
4141
};
42+
use crate::data_type::Int96;
4243
use crate::data_type::{ByteArray, FixedLenByteArray};
4344
use crate::errors::{ParquetError, Result};
4445
use crate::file::encryption::{
@@ -1044,9 +1045,16 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
10441045
}
10451046
}
10461047
}
1047-
ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1048-
unreachable!("Currently unreachable because data type not supported")
1049-
}
1048+
ColumnWriter::Int96ColumnWriter(ref mut typed) => match column.data_type() {
1049+
ArrowDataType::Decimal128(_, _) => {
1050+
// Cube: Decimal96 backwards compatibility - write Decimal128 as INT96
1051+
let array = column.as_primitive::<Decimal128Type>();
1052+
let int96_values: Vec<Int96> =
1053+
array.values().iter().map(|v| i128_to_int96(*v)).collect();
1054+
typed.write_batch(&int96_values, levels.def_levels(), levels.rep_levels())
1055+
}
1056+
_ => unreachable!("INT96 column writer only supports Decimal128 for Decimal96"),
1057+
},
10501058
ColumnWriter::FloatColumnWriter(ref mut typed) => {
10511059
let array = column.as_primitive::<Float32Type>();
10521060
write_primitive(typed, array.values(), levels)
@@ -1225,6 +1233,19 @@ fn get_fsb_array_slice(
12251233
values
12261234
}
12271235

1236+
/// Cube: Convert i128 to Int96 for Decimal96 backwards compatibility
1237+
/// Int96 stores 12 bytes (96 bits), we take lower 12 bytes from i128
1238+
fn i128_to_int96(value: i128) -> Int96 {
1239+
let bytes = value.to_le_bytes();
1240+
let mut int96 = Int96::new();
1241+
int96.set_data(
1242+
u32::from_le_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]),
1243+
u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
1244+
u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
1245+
);
1246+
int96
1247+
}
1248+
12281249
#[cfg(test)]
12291250
mod tests {
12301251
use super::*;

parquet/src/arrow/async_reader/mod.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -855,17 +855,21 @@ where
855855
let selection: Option<RowSelection>;
856856
let row_group_idx: usize;
857857

858-
if let Some((active_row_group_idx, remaining_selection)) = self.active_row_group_and_selection.take() {
858+
if let Some((active_row_group_idx, remaining_selection)) =
859+
self.active_row_group_and_selection.take()
860+
{
859861
if !remaining_selection.selects_any() {
860862
continue;
861863
} else {
862864
reader = self.reader.take().expect("lost reader");
863865

864-
let new_remaining_selection = remaining_selection.clone().offset(self.batch_size);
866+
let new_remaining_selection =
867+
remaining_selection.clone().offset(self.batch_size);
865868
selection = Some(remaining_selection.limit(self.batch_size));
866869
row_group_idx = active_row_group_idx;
867870

868-
self.active_row_group_and_selection = Some((active_row_group_idx, new_remaining_selection));
871+
self.active_row_group_and_selection =
872+
Some((active_row_group_idx, new_remaining_selection));
869873
}
870874
} else {
871875
row_group_idx = match self.row_groups.pop_front() {
@@ -878,18 +882,27 @@ where
878882
let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;
879883

880884
if self.split_row_group_reads {
881-
let remaining_selection = self.selection.as_mut().map_or_else(|| RowSelection::from_consecutive_ranges([0..row_count].into_iter(), row_count), |s| s.split_off(row_count));
882-
883-
let new_remaining_selection = remaining_selection.clone().offset(self.batch_size);
885+
let remaining_selection = self.selection.as_mut().map_or_else(
886+
|| {
887+
RowSelection::from_consecutive_ranges(
888+
[0..row_count].into_iter(),
889+
row_count,
890+
)
891+
},
892+
|s| s.split_off(row_count),
893+
);
894+
895+
let new_remaining_selection =
896+
remaining_selection.clone().offset(self.batch_size);
884897
selection = Some(remaining_selection.limit(self.batch_size));
885898

886-
self.active_row_group_and_selection = Some((row_group_idx, new_remaining_selection));
899+
self.active_row_group_and_selection =
900+
Some((row_group_idx, new_remaining_selection));
887901
} else {
888902
selection = self.selection.as_mut().map(|s| s.split_off(row_count));
889903
}
890904
}
891905

892-
893906
let fut = reader
894907
.read_row_group(
895908
row_group_idx,

parquet/src/arrow/schema/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,9 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
629629
(PhysicalType::INT32, -1)
630630
} else if *precision <= 18 {
631631
(PhysicalType::INT64, -1)
632+
} else if *precision <= 27 && matches!(field.data_type(), DataType::Decimal128(_, _)) {
633+
// For backward compatibility with older Cube Store versions
634+
(PhysicalType::INT96, -1)
632635
} else {
633636
(
634637
PhysicalType::FIXED_LEN_BYTE_ARRAY,

parquet/src/file/metadata/reader.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::file::{
3434
decrypt_module, ParquetEncryptionConfig, ParquetEncryptionKey, ParquetEncryptionMode,
3535
RandomFileIdentifier, AAD_FILE_UNIQUE_SIZE, PARQUET_KEY_HASH_LENGTH,
3636
},
37-
PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE_READONLY, PARQUET_MAGIC_ENCRYPTED_FOOTER,
37+
PARQUET_MAGIC_ENCRYPTED_FOOTER, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE_READONLY,
3838
};
3939
use crate::file::{FOOTER_SIZE, PARQUET_MAGIC};
4040
use crate::format::{
@@ -755,7 +755,9 @@ impl ParquetMetaDataReader {
755755
}
756756
}
757757
encrypted_footer = false;
758-
} else if trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER || trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE_READONLY {
758+
} else if trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER
759+
|| trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE_READONLY
760+
{
759761
let has_keys = encryption_config.as_ref().map_or(false, |config| {
760762
config
761763
.read_keys()

parquet/src/file/serialized_reader.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -736,7 +736,8 @@ impl<R: ChunkReader> SerializedPageReader<R> {
736736
// This assumes we have either a data page or dictionary page. INDEX_PAGE is an
737737
// "unknown page type" and encryption would fail if we encountered it -- but our
738738
// encrypted files don't have it.
739-
(_aad_module_type, aad_header_module_type) = Self::module_types(aad_page_ordinal);
739+
(_aad_module_type, aad_header_module_type) =
740+
Self::module_types(aad_page_ordinal);
740741

741742
if *remaining_bytes == 0 {
742743
return Ok(None);
@@ -1441,7 +1442,7 @@ mod tests {
14411442
let row_group_metadata = file_reader.metadata.row_group(row_group);
14421443
let props = Arc::clone(&file_reader.props);
14431444
let f = Arc::clone(&file_reader.chunk_reader);
1444-
assert!(file_reader.metadata.file_encryption_info().is_none()); // We pass None to the SerializedPageReader below
1445+
assert!(file_reader.metadata.file_encryption_info().is_none()); // We pass None to the SerializedPageReader below
14451446
SerializedRowGroupReader::new(
14461447
f,
14471448
&None,

parquet/src/file/writer.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -399,14 +399,12 @@ fn write_bloom_filters<W: Write + Send>(
399399
// write bloom filter to the file
400400

401401
let row_group_ordinal: i16 = row_group.ordinal().expect("Missing row group ordinal");
402-
let row_group_idx: u16 = row_group_ordinal
403-
.try_into()
404-
.map_err(|_| {
405-
ParquetError::General(format!(
406-
"Negative row group ordinal: {})",
407-
row_group.ordinal().unwrap()
408-
))
409-
})?;
402+
let row_group_idx: u16 = row_group_ordinal.try_into().map_err(|_| {
403+
ParquetError::General(format!(
404+
"Negative row group ordinal: {})",
405+
row_group.ordinal().unwrap()
406+
))
407+
})?;
410408
let row_group_idx = row_group_idx as usize;
411409
for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
412410
if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {

0 commit comments

Comments
 (0)