diff --git a/Cargo.lock b/Cargo.lock index f7c7e6f8c1994..f1d408370884c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4009,6 +4009,23 @@ dependencies = [ "zstd 0.12.4", ] +[[package]] +name = "databend-common-parquet-reader-experimental" +version = "0.1.0" +dependencies = [ + "bytes", + "databend-common-column", + "databend-common-exception", + "databend-common-expression", + "databend-storages-common-table-meta", + "lz4_flex", + "parquet", + "parquet-format-safe", + "parquet2", + "streaming-decompression", + "zstd 0.12.4", +] + [[package]] name = "databend-common-pipeline-core" version = "0.1.0" @@ -4330,6 +4347,7 @@ dependencies = [ "databend-common-meta-types", "databend-common-metrics", "databend-common-native", + "databend-common-parquet-reader-experimental", "databend-common-pipeline-core", "databend-common-pipeline-sinks", "databend-common-pipeline-sources", @@ -4360,6 +4378,7 @@ dependencies = [ "opendal", "parking_lot 0.12.3", "parquet", + "parquet2", "paste", "rand 0.8.5", "serde", @@ -11407,6 +11426,36 @@ dependencies = [ "zstd 0.13.3", ] +[[package]] +name = "parquet-format-safe" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1131c54b167dd4e4799ce762e1ab01549ebb94d5bdd13e6ec1b467491c378e1f" +dependencies = [ + "async-trait", + "futures", +] + +[[package]] +name = "parquet2" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "579fe5745f02cef3d5f236bfed216fd4693e49e4e920a13475c6132233283bce" +dependencies = [ + "async-stream", + "brotli 3.5.0", + "flate2", + "futures", + "lz4", + "parquet-format-safe", + "seq-macro", + "serde", + "snap", + "streaming-decompression", + "xxhash-rust", + "zstd 0.12.4", +] + [[package]] name = "parse-display" version = "0.9.1" @@ -14567,6 +14616,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "streaming-decompression" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf6cc3b19bfb128a8ad11026086e31d3ce9ad23f8ea37354b31383a187c44cf3" +dependencies = [ + "fallible-streaming-iterator", +] + [[package]] name = "strength_reduce" version = "0.2.4" @@ -17526,6 +17584,12 @@ dependencies = [ "cbordata", ] +[[package]] +name = "xxhash-rust" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" + [[package]] name = "xz2" version = "0.1.7" diff --git a/Cargo.toml b/Cargo.toml index db3e6706dcda0..f1a3545445bbe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -159,6 +159,8 @@ databend-common-meta-store = { path = "src/meta/store" } databend-common-meta-types = { path = "src/meta/types" } databend-common-metrics = { path = "src/common/metrics" } databend-common-native = { path = "src/common/native" } +databend-common-openai = { path = "src/common/openai" } +databend-common-parquet-reader-experimental = { path = "src/common/experimental_parquet_reader" } databend-common-pipeline-core = { path = "src/query/pipeline/core" } databend-common-pipeline-sinks = { path = "src/query/pipeline/sinks" } databend-common-pipeline-sources = { path = "src/query/pipeline/sources" } @@ -380,6 +382,7 @@ logforth = { git = "https://github.com/datafuse-extras/logforth", branch = "glob 'fastrace', ] } lz4 = "1.24.0" +lz4_flex = { version = "^0.11" } map-api = { version = "0.2.5" } maplit = "1.0.2" match-template = "0.0.1" @@ -428,6 +431,8 @@ ordq = "0.2.0" p256 = "0.13" parking_lot = "0.12.1" parquet = { version = "55", features = ["async"] } +parquet-format-safe = "0.2.0" +parquet2 = { version = "0.17.0", features = ["serde_types", "async", "zstd", "snappy", "lz4"] } passwords = { version = "3.1.16", features = ["common-password"] } paste = "1.0.15" percent-encoding = "2.3.1" @@ -503,6 +508,7 @@ sqlx = { version = "0.8", features = ["mysql", "runtime-tokio"] } state = "0.6.0" state-machine-api = { version = "0.1.1" } stream-more = "0.1.3" +streaming-decompression = "0.1.2" strength_reduce = "0.2.4" stringslice = "0.2.0" strum = "0.24.1" diff --git a/src/common/column/src/binview/mod.rs b/src/common/column/src/binview/mod.rs index f774ac93c2b30..d076b34ea0b34 100644 --- a/src/common/column/src/binview/mod.rs +++ b/src/common/column/src/binview/mod.rs @@ -133,9 +133,9 @@ impl Clone for BinaryViewColumnGeneric { } } -unsafe impl Send for BinaryViewColumnGeneric {} +// impl Send for BinaryViewColumnGeneric {} -unsafe impl Sync for BinaryViewColumnGeneric {} +// unsafe impl Sync for BinaryViewColumnGeneric {} impl BinaryViewColumnGeneric { pub fn new_unchecked( diff --git a/src/common/experimental_parquet_reader/Cargo.toml b/src/common/experimental_parquet_reader/Cargo.toml new file mode 100644 index 0000000000000..611340b5e25ac --- /dev/null +++ b/src/common/experimental_parquet_reader/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "databend-common-parquet-reader-experimental" +version = { workspace = true } +authors = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +edition = { workspace = true } + +[features] + +[dependencies] +databend-common-column = { workspace = true } +databend-common-exception = { workspace = true } +databend-common-expression = { workspace = true } +databend-storages-common-table-meta = { workspace = true } + +bytes = { workspace = true } +lz4_flex = { workspace = true } +parquet = { workspace = true, features = ["experimental"] } +parquet-format-safe = { workspace = true } +parquet2 = { workspace = true } +streaming-decompression = { workspace = true } +zstd = { workspace = true } + +[dev-dependencies] +# used to test async readers + +[package.metadata.cargo-machete] +ignored = ["match-template"] + +[lints] +workspace = true diff --git a/src/common/experimental_parquet_reader/src/column/common.rs b/src/common/experimental_parquet_reader/src/column/common.rs new file mode 100644 index 0000000000000..b8ba242af617e --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/common.rs @@ -0,0 +1,547 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Common utilities for Parquet column deserialization + +use databend_common_column::bitmap::Bitmap; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::NullableColumn; +use databend_common_expression::Column; +use decompressor::Decompressor; +use parquet::encodings::rle::RleDecoder; +use parquet2::schema::types::PhysicalType; +use streaming_decompression::FallibleStreamingIterator; + +use crate::reader::decompressor; + +/// Extract definition levels, repetition levels, and values from a data page +fn extract_page_data(data_page: &parquet2::page::DataPage) -> Result<(&[u8], &[u8], &[u8])> { + match parquet2::page::split_buffer(data_page) { + Ok((rep_levels, def_levels, values_buffer)) => Ok((def_levels, rep_levels, values_buffer)), + Err(e) => Err(ErrorCode::Internal(format!( + "Failed to split buffer: {}", + e + ))), + } +} + +/// Decode definition levels and create validity bitmap +pub fn decode_definition_levels( + def_levels: &[u8], + bit_width: u32, + num_values: usize, + data_page: &parquet2::page::DataPage, +) -> Result<(Option, usize)> { + let mut rle_decoder = RleDecoder::new(bit_width as u8); + rle_decoder.set_data(bytes::Bytes::copy_from_slice(def_levels)); + + let expected_levels = num_values; + let mut levels = vec![0i32; expected_levels]; + let decoded_count = rle_decoder + .get_batch(&mut levels) + .map_err(|e| ErrorCode::Internal(format!("Failed to decode definition levels: {}", e)))?; + + if decoded_count != expected_levels { + return Err(ErrorCode::Internal(format!( + "Definition level decoder returned wrong count: expected={}, got={}", + expected_levels, decoded_count + ))); + } + + let max_def_level = data_page.descriptor.max_def_level as i32; + let mut validity_bits = Vec::with_capacity(expected_levels); + let mut non_null_count = 0; + let mut has_nulls = false; + + for &level in &levels { + let is_valid = level == max_def_level; + validity_bits.push(is_valid); + if is_valid { + non_null_count += 1; + } else { + has_nulls = true; + } + } + + let bitmap = if has_nulls { + Some(Bitmap::from_iter(validity_bits)) + } else { + Some(Bitmap::new_constant(true, expected_levels)) + }; + Ok((bitmap, non_null_count)) +} + +/// Process plain encoded data +/// # Arguments +/// * `values_buffer` - The buffer containing the encoded values (maybe plain encoded) +/// * `page_rows` - The number of rows in the page +/// * `column_data` - The vector to which the decoded values will be appended, capacity should be reserved properly +/// * `validity_bitmap` - The validity bitmap for the column if any +fn process_plain_encoding( + values_buffer: &[u8], + page_rows: usize, + column_data: &mut Vec, + validity_bitmap: Option<&Bitmap>, +) -> Result<()> { + let type_size = std::mem::size_of::(); + let old_len = column_data.len(); + + // Calculate how many non-null values we expect to read + let non_null_count = if let Some(bitmap) = validity_bitmap { + bitmap.iter().filter(|&b| b).count() + } else { + page_rows + }; + + if let Some(bitmap) = validity_bitmap { + // Nullable column: process values based on validity bitmap + // Extend vector to final size, leaving NULL positions uninitialized + unsafe { + column_data.set_len(old_len + page_rows); + } + + let mut values_read = 0; + for (i, is_valid) in bitmap.iter().enumerate() { + if is_valid && values_read < non_null_count { + let src_offset = values_read * type_size; + let dst_offset = old_len + i; + + if src_offset + type_size <= values_buffer.len() { + // Handle endianness conversion for numeric types + #[cfg(target_endian = "big")] + { + // On big-endian systems, convert from Parquet's little-endian format + convert_endianness_and_copy::( + &values_buffer[src_offset..src_offset + type_size], + &mut column_data[dst_offset..dst_offset + 1], + ); + } + #[cfg(target_endian = "little")] + { + // On little-endian systems, direct copy is sufficient + unsafe { + let src_ptr = values_buffer.as_ptr().add(src_offset); + let dst_ptr = + column_data[dst_offset..dst_offset + 1].as_mut_ptr() as *mut u8; + std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, type_size); + } + } + values_read += 1; + } else { + return Err(ErrorCode::Internal("Values buffer underflow".to_string())); + } + } + // Note: NULL positions (is_valid == false) are left uninitialized + // This is safe because the validity bitmap controls access + } + } else { + let values_to_copy = non_null_count.min(page_rows); + let total_bytes = values_to_copy * type_size; + + if total_bytes <= values_buffer.len() { + #[cfg(target_endian = "big")] + { + // On big-endian systems, convert each value individually + unsafe { + column_data.set_len(old_len + values_to_copy); + } + for i in 0..values_to_copy { + let src_offset = i * type_size; + let dst_offset = old_len + i; + convert_endianness_and_copy::( + &values_buffer[src_offset..src_offset + type_size], + &mut column_data[dst_offset..dst_offset + 1], + ); + } + } + #[cfg(target_endian = "little")] + { + // On little-endian systems, batch copy for performance + unsafe { + let src_ptr = values_buffer.as_ptr(); + let dst_ptr = column_data.as_mut_ptr().add(old_len) as *mut u8; + std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, total_bytes); + column_data.set_len(old_len + values_to_copy); + } + } + } else { + return Err(ErrorCode::Internal("Values buffer underflow".to_string())); + } + } + + Ok(()) +} + +/// Convert endianness and copy data for big-endian systems +/// +/// This function handles the conversion from Parquet's little-endian format +/// to the native big-endian format on big-endian systems. +#[cfg(target_endian = "big")] +fn convert_endianness_and_copy(src_bytes: &[u8], dst_slice: &mut [T]) { + let type_size = std::mem::size_of::(); + + match type_size { + 1 => { + // Single byte: no endianness conversion needed + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut u8; + std::ptr::copy_nonoverlapping(src_bytes.as_ptr(), dst_ptr, 1); + } + } + 2 => { + // 2-byte integer (i16): convert from little-endian + let mut bytes = [0u8; 2]; + bytes.copy_from_slice(src_bytes); + let value = i16::from_le_bytes(bytes); + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut i16; + *dst_ptr = value; + } + } + 4 => { + // 4-byte integer (i32): convert from little-endian + let mut bytes = [0u8; 4]; + bytes.copy_from_slice(src_bytes); + let value = i32::from_le_bytes(bytes); + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut i32; + *dst_ptr = value; + } + } + 8 => { + // 8-byte integer (i64): convert from little-endian + let mut bytes = [0u8; 8]; + bytes.copy_from_slice(src_bytes); + let value = i64::from_le_bytes(bytes); + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut i64; + *dst_ptr = value; + } + } + 16 => { + // 16-byte integer (i128): convert from little-endian + let mut bytes = [0u8; 16]; + bytes.copy_from_slice(src_bytes); + let value = i128::from_le_bytes(bytes); + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut i128; + *dst_ptr = value; + } + } + 32 => { + // 32-byte integer (i256): convert from little-endian + // Note: i256 doesn't have from_le_bytes, so we reverse the bytes manually + let mut bytes = [0u8; 32]; + bytes.copy_from_slice(src_bytes); + bytes.reverse(); // Convert from little-endian to big-endian + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut u8; + std::ptr::copy_nonoverlapping(bytes.as_ptr(), dst_ptr, 32); + } + } + _ => { + // For other sizes, fall back to direct copy (may not be correct for all types) + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut u8; + std::ptr::copy_nonoverlapping(src_bytes.as_ptr(), dst_ptr, type_size); + } + } + } +} + +/// Perform defensive checks for nullable vs non-nullable columns +#[cfg(debug_assertions)] +pub fn validate_column_nullability(def_levels: &[u8], is_nullable: bool) -> Result<()> { + if is_nullable { + // Nullable columns must have definition levels + if def_levels.is_empty() { + return Err(ErrorCode::Internal( + "Nullable column must have definition levels".to_string(), + )); + } + } else { + // Non-nullable columns should not have definition levels + if !def_levels.is_empty() { + return Err(ErrorCode::Internal( + "Non-nullable column should not have definition levels".to_string(), + )); + } + } + Ok(()) +} + +/// Validate physical type matches expected type +pub fn validate_physical_type(actual: PhysicalType, expected: PhysicalType) -> Result<()> { + if actual != expected { + return Err(ErrorCode::Internal(format!( + "Physical type mismatch: expected {:?}, got {:?}", + expected, actual + ))); + } + Ok(()) +} + +/// Validate values buffer alignment +#[cfg(debug_assertions)] +pub fn validate_buffer_alignment(values_buffer: &[u8]) -> Result<()> { + let type_size = std::mem::size_of::(); + if values_buffer.len() % type_size != 0 { + return Err(ErrorCode::Internal(format!( + "Values buffer length ({}) is not aligned to type size ({}). Buffer may be corrupted.", + values_buffer.len(), + type_size + ))); + } + Ok(()) +} + +/// Combine multiple validity bitmaps from different pages +pub fn combine_validity_bitmaps( + validity_bitmaps: Vec, + expected_total_len: usize, +) -> Result { + if validity_bitmaps.is_empty() { + Ok(Bitmap::new_constant(true, expected_total_len)) + } else if validity_bitmaps.len() == 1 { + Ok(validity_bitmaps.into_iter().next().unwrap()) + } else { + // Combine multiple validity bitmaps + let total_len: usize = validity_bitmaps.iter().map(|b| b.len()).sum(); + if total_len != expected_total_len { + return Err(ErrorCode::Internal(format!( + "Combined bitmap length ({}) does not match expected length ({})", + total_len, expected_total_len + ))); + } + let mut combined_bits = Vec::with_capacity(total_len); + for bitmap in validity_bitmaps { + combined_bits.extend(bitmap.iter()); + } + Ok(Bitmap::from_iter(combined_bits)) + } +} + +// TODO: this is not suitable for all types, should be adjusted later +/// Process a complete data page for any type T +fn process_data_page( + data_page: &parquet2::page::DataPage, + column_data: &mut Vec, + target_rows: usize, + is_nullable: bool, + expected_physical_type: &PhysicalType, +) -> Result> { + // Validate physical type + validate_physical_type( + data_page.descriptor.primitive_type.physical_type, + *expected_physical_type, + )?; + + let (def_levels, _, values_buffer) = extract_page_data(data_page)?; + let remaining = target_rows - column_data.len(); + + // Defensive checks for nullable vs non-nullable columns + #[cfg(debug_assertions)] + validate_column_nullability(def_levels, is_nullable)?; + + // Number of values(not rows), including NULLs + let num_values = data_page.num_values(); + + // Validate values_buffer alignment + #[cfg(debug_assertions)] + validate_buffer_alignment::(values_buffer)?; + + // Calculate how many rows this page will actually contribute + let page_rows = if is_nullable { + // For nullable columns, page contributes num_values rows (including NULLs) + num_values.min(remaining) + } else { + let type_size = std::mem::size_of::(); + let num_values_in_buffer = values_buffer.len() / type_size; + // For non-nullable columns, page contributes num_values_in_buffer rows + num_values_in_buffer.min(remaining) + }; + + // Process definition levels to create validity bitmap (only for nullable columns) + let validity_bitmap = if is_nullable { + let bit_width = get_bit_width(data_page.descriptor.max_def_level); + let (bitmap, _non_null_count) = + decode_definition_levels(def_levels, bit_width, num_values, data_page)?; + bitmap + } else { + // For non-nullable columns, no validity bitmap needed + None + }; + + // Process values based on encoding + match data_page.encoding() { + parquet2::encoding::Encoding::Plain => { + process_plain_encoding( + values_buffer, + page_rows, + column_data, + validity_bitmap.as_ref(), + )?; + } + encoding => { + return Err(ErrorCode::Internal(format!( + "Unsupported encoding: {:?}", + encoding + ))); + } + } + + Ok(validity_bitmap) +} + +// TODO rename this +pub trait ParquetColumnType: Copy + Send + Sync + 'static { + /// Additional metadata needed to create columns (e.g., precision/scale for decimals) + type Metadata: Clone; + + /// The Parquet physical type for this column type + const PHYSICAL_TYPE: PhysicalType; + + /// Create a column from the deserialized data + fn create_column(data: Vec, metadata: &Self::Metadata) -> Column; +} + +// TODO rename this +pub struct ParquetColumnIterator<'a, T: ParquetColumnType> { + pages: Decompressor<'a>, + chunk_size: Option, + num_rows: usize, + is_nullable: bool, + metadata: T::Metadata, + _phantom: std::marker::PhantomData, +} + +impl<'a, T: ParquetColumnType> ParquetColumnIterator<'a, T> { + pub fn new( + pages: Decompressor<'a>, + num_rows: usize, + is_nullable: bool, + metadata: T::Metadata, + chunk_size: Option, + ) -> Self { + Self { + pages, + chunk_size, + num_rows, + is_nullable, + metadata, + _phantom: std::marker::PhantomData, + } + } +} + +// WIP: State of iterator should be adjusted, if we allow chunk_size be chosen freely +impl<'a, T: ParquetColumnType> Iterator for ParquetColumnIterator<'a, T> { + type Item = Result; + + fn next(&mut self) -> Option { + let target_rows = self.chunk_size.unwrap_or(self.num_rows); + let mut column_data: Vec = Vec::with_capacity(target_rows); + let mut validity_bitmaps = Vec::new(); + + while column_data.len() < target_rows { + // Get the next page + let page = match self.pages.next() { + Ok(Some(page)) => page, + Ok(None) => break, + Err(e) => { + return Some(Err(ErrorCode::Internal(format!( + "Failed to get next page: {}", + e + )))) + } + }; + + match page { + parquet2::page::Page::Data(data_page) => { + let data_len_before = column_data.len(); + match process_data_page( + data_page, + &mut column_data, + target_rows, + self.is_nullable, + &T::PHYSICAL_TYPE, + ) { + Ok(validity_bitmap) => { + if self.is_nullable { + // For nullable columns, we must have a validity bitmap for each page + if let Some(bitmap) = validity_bitmap { + let data_added = column_data.len() - data_len_before; + + // Verify bitmap length matches data added + if bitmap.len() != data_added { + return Some(Err(ErrorCode::Internal(format!( + "Bitmap length mismatch: bitmap={}, data_added={}", + bitmap.len(), + data_added + )))); + } + validity_bitmaps.push(bitmap); + } else { + // This should not happen for nullable columns + return Some(Err(ErrorCode::Internal( + "Nullable column page must produce validity bitmap" + .to_string(), + ))); + } + } + } + Err(e) => return Some(Err(e)), + } + } + parquet2::page::Page::Dict(_) => { + return Some(Err(ErrorCode::Internal( + "Dictionary page not supported yet".to_string(), + ))); + } + } + } + + if column_data.is_empty() { + return None; + } + + // Return the appropriate Column variant based on nullability + if self.is_nullable { + // For nullable columns, create NullableColumn + let column_len = column_data.len(); + let base_column = T::create_column(column_data, &self.metadata); + + // Combine validity bitmaps from multiple pages + let combined_bitmap = match combine_validity_bitmaps(validity_bitmaps, column_len) { + Ok(bitmap) => bitmap, + Err(e) => return Some(Err(e)), + }; + + let nullable_column = NullableColumn::new(base_column, combined_bitmap); + Some(Ok(Column::Nullable(Box::new(nullable_column)))) + } else { + // For non-nullable columns, return the column directly + Some(Ok(T::create_column(column_data, &self.metadata))) + } + } +} + +fn get_bit_width(max_level: i16) -> u32 { + if max_level == 1 { + 1 + } else { + 16 - max_level.leading_zeros() + } +} diff --git a/src/common/experimental_parquet_reader/src/column/date.rs b/src/common/experimental_parquet_reader/src/column/date.rs new file mode 100644 index 0000000000000..192ecf95ae19c --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/date.rs @@ -0,0 +1,36 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_expression::Column; +use parquet2::schema::types::PhysicalType; + +use crate::column::common::ParquetColumnIterator; +use crate::column::common::ParquetColumnType; +use crate::column::number::IntegerMetadata; + +#[derive(Copy, Clone)] +#[repr(transparent)] +pub struct Date(i32); + +impl ParquetColumnType for Date { + type Metadata = IntegerMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::Int32; + + fn create_column(data: Vec, _metadata: &Self::Metadata) -> Column { + let raw_data: Vec = unsafe { std::mem::transmute(data) }; + Column::Date(raw_data.into()) + } +} + +pub type DateIter<'a> = ParquetColumnIterator<'a, Date>; diff --git a/src/common/experimental_parquet_reader/src/column/decimal.rs b/src/common/experimental_parquet_reader/src/column/decimal.rs new file mode 100644 index 0000000000000..ddc1004d0a92d --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/decimal.rs @@ -0,0 +1,152 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Decimal column deserialization for Parquet data + +use databend_common_column::buffer::Buffer; +use databend_common_expression::types::i256; +use databend_common_expression::types::DecimalColumn; +use databend_common_expression::types::DecimalSize; +use databend_common_expression::Column; +use parquet2::schema::types::PhysicalType; + +use crate::column::common::ParquetColumnIterator; +use crate::column::common::ParquetColumnType; +use crate::reader::decompressor::Decompressor; + +// ============================================================================= +// Wrapper Types for Decimal Usage +// ============================================================================= + +/// Wrapper for i64 as Decimal64 - enables zero-cost transmute via #[repr(transparent)] +#[derive(Clone, Copy)] +#[repr(transparent)] +pub struct Decimal64(pub i64); + +/// Wrapper for i128 as Decimal128 - enables zero-cost transmute via #[repr(transparent)] +#[derive(Clone, Copy)] +#[repr(transparent)] +pub struct Decimal128(pub i128); + +/// Wrapper for i256 as Decimal256 - enables zero-cost transmute via #[repr(transparent)] +#[derive(Clone, Copy)] +#[repr(transparent)] +pub struct Decimal256(pub i256); + +#[derive(Clone)] +pub struct DecimalMetadata { + pub precision: u8, + pub scale: u8, +} + +impl ParquetColumnType for Decimal64 { + type Metadata = DecimalMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::Int64; + + fn create_column(data: Vec, metadata: &Self::Metadata) -> Column { + let decimal_size = DecimalSize::new_unchecked(metadata.precision, metadata.scale); + let raw_data: Vec = unsafe { std::mem::transmute(data) }; + Column::Decimal(DecimalColumn::Decimal64( + Buffer::from(raw_data), + decimal_size, + )) + } +} + +impl ParquetColumnType for Decimal128 { + type Metadata = DecimalMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::FixedLenByteArray(16); + + fn create_column(data: Vec, metadata: &Self::Metadata) -> Column { + let decimal_size = DecimalSize::new_unchecked(metadata.precision, metadata.scale); + let raw_data: Vec = unsafe { std::mem::transmute(data) }; + Column::Decimal(DecimalColumn::Decimal128( + Buffer::from(raw_data), + decimal_size, + )) + } +} + +impl ParquetColumnType for Decimal256 { + type Metadata = DecimalMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::FixedLenByteArray(32); + + fn create_column(data: Vec, metadata: &Self::Metadata) -> Column { + let decimal_size = DecimalSize::new_unchecked(metadata.precision, metadata.scale); + let raw_data: Vec = unsafe { std::mem::transmute(data) }; + Column::Decimal(DecimalColumn::Decimal256( + Buffer::from(raw_data), + decimal_size, + )) + } +} + +// ============================================================================= +// Iterator Type Aliases +// ============================================================================= + +pub type DecimalIter<'a, T> = ParquetColumnIterator<'a, T>; + +// ============================================================================= +// Constructor Functions +// ============================================================================= + +/// Generic decimal iterator constructor +pub fn new_decimal_iter( + pages: Decompressor, + num_rows: usize, + precision: u8, + scale: u8, + is_nullable: bool, + chunk_size: Option, +) -> DecimalIter +where + T: ParquetColumnType, +{ + let metadata = DecimalMetadata { precision, scale }; + ParquetColumnIterator::new(pages, num_rows, is_nullable, metadata, chunk_size) +} + +pub fn new_decimal64_iter( + pages: Decompressor, + num_rows: usize, + precision: u8, + scale: u8, + is_nullable: bool, + chunk_size: Option, +) -> DecimalIter { + new_decimal_iter(pages, num_rows, precision, scale, is_nullable, chunk_size) +} + +pub fn new_decimal128_iter( + pages: Decompressor, + num_rows: usize, + precision: u8, + scale: u8, + is_nullable: bool, + chunk_size: Option, +) -> DecimalIter { + new_decimal_iter(pages, num_rows, precision, scale, is_nullable, chunk_size) +} + +pub fn new_decimal256_iter( + pages: Decompressor, + num_rows: usize, + precision: u8, + scale: u8, + is_nullable: bool, + chunk_size: Option, +) -> DecimalIter { + new_decimal_iter(pages, num_rows, precision, scale, is_nullable, chunk_size) +} diff --git a/src/common/experimental_parquet_reader/src/column/mod.rs b/src/common/experimental_parquet_reader/src/column/mod.rs new file mode 100644 index 0000000000000..33f6233aedf4f --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/mod.rs @@ -0,0 +1,25 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; +mod date; +mod decimal; +mod number; +mod string; + +pub use date::*; +pub use decimal::*; +pub use number::IntegerMetadata; +pub use number::*; +pub use string::*; diff --git a/src/common/experimental_parquet_reader/src/column/number.rs b/src/common/experimental_parquet_reader/src/column/number.rs new file mode 100644 index 0000000000000..6cb07627b60e4 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/number.rs @@ -0,0 +1,65 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_column::buffer::Buffer; +use databend_common_expression::types::Number; +use databend_common_expression::Column; +use parquet2::schema::types::PhysicalType; + +use crate::column::common::ParquetColumnIterator; +use crate::column::common::ParquetColumnType; +use crate::reader::decompressor::Decompressor; + +#[derive(Clone, Copy)] +pub struct IntegerMetadata; + +impl ParquetColumnType for i32 { + type Metadata = IntegerMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::Int32; + + fn create_column(data: Vec, _metadata: &Self::Metadata) -> Column { + Column::Number(i32::upcast_column(Buffer::from(data))) + } +} + +impl ParquetColumnType for i64 { + type Metadata = IntegerMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::Int64; + + fn create_column(data: Vec, _metadata: &Self::Metadata) -> Column { + Column::Number(i64::upcast_column(Buffer::from(data))) + } +} + +pub type Int32Iter<'a> = ParquetColumnIterator<'a, i32>; + +pub type Int64Iter<'a> = ParquetColumnIterator<'a, i64>; + +pub fn new_int32_iter( + pages: Decompressor, + num_rows: usize, + is_nullable: bool, + chunk_size: Option, +) -> Int32Iter { + ParquetColumnIterator::new(pages, num_rows, is_nullable, IntegerMetadata, chunk_size) +} + +pub fn new_int64_iter( + pages: Decompressor, + num_rows: usize, + is_nullable: bool, + chunk_size: Option, +) -> Int64Iter { + ParquetColumnIterator::new(pages, num_rows, is_nullable, IntegerMetadata, chunk_size) +} diff --git a/src/common/experimental_parquet_reader/src/column/string.rs b/src/common/experimental_parquet_reader/src/column/string.rs new file mode 100644 index 0000000000000..c4c07fe23f171 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/string.rs @@ -0,0 +1,582 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_column::binview::Utf8ViewColumn; +use databend_common_column::binview::View; +use databend_common_column::buffer::Buffer; +use databend_common_exception::ErrorCode; +use databend_common_expression::Column; +use parquet::encodings::rle::RleDecoder; +use parquet2::encoding::Encoding; +use parquet2::page::Page; +use parquet2::schema::types::PhysicalType; + +use crate::reader::decompressor::Decompressor; + +pub struct StringIter<'a> { + /// Page decompressor for reading Parquet pages + pages: Decompressor<'a>, + /// Optional chunk size for batched processing + chunk_size: Option, + /// Total number of rows to process + num_rows: usize, + /// Dictionary entries + dictionary: Option>>, + // Cached dictionary views + cached_dict_views: Option>, +} + +impl<'a> StringIter<'a> { + pub fn new(pages: Decompressor<'a>, num_rows: usize, chunk_size: Option) -> Self { + Self { + pages, + chunk_size, + num_rows, + dictionary: None, + cached_dict_views: None, + } + } + + /// Process a dictionary page and store the dictionary entries + fn process_dictionary_page( + &mut self, + dict_page: &parquet2::page::DictPage, + ) -> Result<(), ErrorCode> { + assert!(self.dictionary.is_none()); + let mut dict_values = Vec::new(); + let mut offset = 0; + let buffer = &dict_page.buffer; + + while offset < buffer.len() { + if offset + 4 > buffer.len() { + return Err(ErrorCode::Internal( + "Invalid dictionary page: incomplete length prefix".to_string(), + )); + } + + let length = u32::from_le_bytes([ + buffer[offset], + buffer[offset + 1], + buffer[offset + 2], + buffer[offset + 3], + ]) as usize; + offset += 4; + + if offset + length > buffer.len() { + return Err(ErrorCode::Internal( + "Invalid dictionary page: string length exceeds buffer".to_string(), + )); + } + + dict_values.push(buffer[offset..offset + length].to_vec()); + offset += length; + } + + self.dictionary = Some(dict_values); + // Clear cached views when dictionary changes + self.cached_dict_views = None; + Ok(()) + } + + /// Create a View from a string slice, handling both inline and buffer storage + fn create_view_from_string( + string_data: &[u8], + page_bytes: &mut Vec, + page_offset: &mut usize, + buffer_index: u32, + ) -> View { + let len = string_data.len() as u32; + if len <= 12 { + // Inline small strings directly in the View + unsafe { + let mut payload = [0u8; 16]; + payload + .as_mut_ptr() + .cast::() + .write_unaligned(len.to_le()); + std::ptr::copy_nonoverlapping( + string_data.as_ptr(), + payload.as_mut_ptr().add(4), + len as usize, + ); + std::mem::transmute::<[u8; 16], View>(payload) + } + } else { + // Store large strings in buffer and reference them + let current_offset = *page_offset; + // TODO use memcpy + page_bytes.extend_from_slice(string_data); + *page_offset += string_data.len(); + + unsafe { + let mut payload = [0u8; 16]; + // Length + payload + .as_mut_ptr() + .cast::() + .write_unaligned(len.to_le()); + // Prefix (first 4 bytes of string) + let prefix_len = std::cmp::min(4, string_data.len()); + std::ptr::copy_nonoverlapping( + string_data.as_ptr(), + payload.as_mut_ptr().add(4), + prefix_len, + ); + // Buffer index + payload + .as_mut_ptr() + .add(8) + .cast::() + .write_unaligned(buffer_index.to_le()); + // Offset in buffer + payload + .as_mut_ptr() + .add(12) + .cast::() + .write_unaligned((current_offset as u32).to_le()); + + std::mem::transmute::<[u8; 16], View>(payload) + } + } + } + + /// Process plain encoded data page + fn process_plain_encoding( + &self, + values_buffer: &[u8], + remaining: usize, + views: &mut Vec, + buffers: &mut Vec>, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + let mut offset = 0; + let estimated_capacity = values_buffer.len(); + let mut page_bytes = Vec::with_capacity(estimated_capacity); + let mut page_offset = 0; + let buffer_index = buffers.len() as u32; + + for _ in 0..remaining { + if offset + 4 > values_buffer.len() { + return Err(ErrorCode::Internal( + "Invalid plain encoding: incomplete length prefix".to_string(), + )); + } + + let length = u32::from_le_bytes([ + values_buffer[offset], + values_buffer[offset + 1], + values_buffer[offset + 2], + values_buffer[offset + 3], + ]) as usize; + offset += 4; + + if offset + length > values_buffer.len() { + return Err(ErrorCode::Internal( + "Invalid plain encoding: string length exceeds buffer".to_string(), + )); + } + + let string_data = &values_buffer[offset..offset + length]; + let view = Self::create_view_from_string( + string_data, + &mut page_bytes, + &mut page_offset, + buffer_index, + ); + views.push(view); + *total_bytes_len += length; + offset += length; + } + + if !page_bytes.is_empty() { + buffers.push(Buffer::from(page_bytes)); + } + + Ok(()) + } + + /// Process RLE dictionary encoded data page with optimized paths for different scenarios. + fn process_rle_dictionary_encoding( + &mut self, + values_buffer: &[u8], + remaining: usize, + views: &mut Vec, + buffers: &mut Vec>, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + if values_buffer.is_empty() { + return Err(ErrorCode::Internal("Empty RLE dictionary data".to_string())); + } + + let bit_width = values_buffer[0]; + + // Clone dictionary to avoid borrowing issues + if let Some(dict) = self.dictionary.clone() { + // Check if we can use the optimized small string fast path + if self.can_use_small_string_fast_path(&dict) { + return self.process_small_string_fast_path( + &dict, + values_buffer, + bit_width, + remaining, + views, + total_bytes_len, + ); + } + } + + // General path for large dictionaries or mixed string sizes + self.process_general_rle_path( + values_buffer, + bit_width, + remaining, + views, + buffers, + total_bytes_len, + ) + } + + /// Check if dictionary qualifies for small string fast path optimization. + fn can_use_small_string_fast_path(&self, dict: &[Vec]) -> bool { + dict.len() <= 16 && dict.iter().all(|s| s.len() <= 12) + } + + /// Process RLE dictionary encoding using the optimized small string fast path. + fn process_small_string_fast_path( + &mut self, + dict: &[Vec], + values_buffer: &[u8], + bit_width: u8, + remaining: usize, + views: &mut Vec, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + views.reserve_exact(remaining); + + if bit_width == 0 { + // Special case: all indices are 0, repeat dictionary[0] + return self.process_bit_width_zero(dict, remaining, views, total_bytes_len); + } + + // General small string case with RLE decoding + self.process_small_string_rle( + dict, + values_buffer, + bit_width, + remaining, + views, + total_bytes_len, + ) + } + + /// Handle the special case where bit_width=0 (all values are dictionary[0]). + fn process_bit_width_zero( + &self, + dict: &[Vec], + remaining: usize, + views: &mut Vec, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + if dict.is_empty() { + return Err(ErrorCode::Internal( + "Empty dictionary for RLE dictionary encoding".to_string(), + )); + } + + let dict_entry = &dict[0]; + let inline_view = Self::create_inline_view(dict_entry); + + // TODO: Use slice::fill when available for better performance + for _ in 0..remaining { + views.push(inline_view); + *total_bytes_len += dict_entry.len(); + } + + Ok(()) + } + + /// Process small string RLE decoding with cached dictionary views. + fn process_small_string_rle( + &mut self, + dict: &[Vec], + values_buffer: &[u8], + bit_width: u8, + remaining: usize, + views: &mut Vec, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + // Create RLE decoder + let mut rle_decoder = RleDecoder::new(bit_width); + rle_decoder.set_data(bytes::Bytes::copy_from_slice(&values_buffer[1..])); + + // Ensure dictionary views are cached + self.ensure_dict_views_cached(dict); + let dict_views = self.cached_dict_views.as_ref().unwrap(); + + // Decode indices and populate views in single pass + let start_len = views.len(); + let mut indices = vec![0i32; remaining]; + + let decoded_count = rle_decoder + .get_batch(&mut indices) + .map_err(|e| ErrorCode::Internal(format!("Failed to decode RLE indices: {}", e)))?; + if decoded_count != remaining { + return Err(ErrorCode::Internal(format!( + "RleDecoder returned wrong count: expected={}, got={}", + remaining, decoded_count + ))); + } + + // Single pass: populate views and calculate total_bytes_len simultaneously + unsafe { + let views_ptr = views.as_mut_ptr().add(start_len); + for (i, &index) in indices.iter().enumerate() { + let dict_idx = index as usize; + if dict_idx >= dict_views.len() { + return Err(ErrorCode::Internal(format!( + "Dictionary index {} out of bounds (dictionary size: {})", + dict_idx, + dict_views.len() + ))); + } + + // Copy view and accumulate length in one operation + *views_ptr.add(i) = dict_views[dict_idx]; + *total_bytes_len += dict[dict_idx].len(); + } + // TODO Make sure this is panic safe + views.set_len(start_len + remaining); + } + + Ok(()) + } + + /// Ensure dictionary views are cached for the current dictionary. + fn ensure_dict_views_cached(&mut self, dict: &[Vec]) { + if self.cached_dict_views.is_none() { + self.cached_dict_views = Some( + dict.iter() + .map(|s| Self::create_inline_view(s)) + .collect::>(), + ); + } + } + + /// Process RLE dictionary encoding using the general path for large dictionaries. + fn process_general_rle_path( + &mut self, + values_buffer: &[u8], + bit_width: u8, + remaining: usize, + views: &mut Vec, + buffers: &mut Vec>, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + // Create new RleDecoder for general path + let mut rle_decoder = RleDecoder::new(bit_width); + rle_decoder.set_data(bytes::Bytes::copy_from_slice(&values_buffer[1..])); + + if let Some(ref dict) = self.dictionary { + // Initialize buffer management variables for general case + let mut page_bytes = Vec::new(); + let mut page_offset = 0; + let buffer_index = buffers.len() as u32; + + // Decode indices and process each one + let mut indices = vec![0i32; remaining]; + let decoded_count = rle_decoder + .get_batch(&mut indices) + .map_err(|e| ErrorCode::Internal(format!("Failed to decode RLE indices: {}", e)))?; + + if decoded_count != remaining { + return Err(ErrorCode::Internal(format!( + "RleDecoder returned wrong count: expected={}, got={}", + remaining, decoded_count + ))); + } + + // Process each index and create views + for &index in &indices { + let dict_idx = index as usize; + if dict_idx >= dict.len() { + return Err(ErrorCode::Internal(format!( + "Dictionary index {} out of bounds (dictionary size: {})", + dict_idx, + dict.len() + ))); + } + + let string_data = &dict[dict_idx]; + let view = Self::create_view_from_string( + string_data, + &mut page_bytes, + &mut page_offset, + buffer_index, + ); + views.push(view); + *total_bytes_len += string_data.len(); + } + + // Add buffer if any data was written + if !page_bytes.is_empty() { + buffers.push(Buffer::from(page_bytes)); + } + } else { + return Err(ErrorCode::Internal( + "No dictionary found for RLE dictionary encoding".to_string(), + )); + } + + Ok(()) + } + + /// Create an inline View for small strings (≤12 bytes) with maximum performance. + fn create_inline_view(string_data: &[u8]) -> View { + debug_assert!( + string_data.len() <= 12, + "create_inline_view called with string longer than 12 bytes" + ); + + unsafe { + let mut payload = [0u8; 16]; + let len = string_data.len() as u32; + + // Write length prefix (little-endian) + payload + .as_mut_ptr() + .cast::() + .write_unaligned(len.to_le()); + + // Copy string data directly + std::ptr::copy_nonoverlapping( + string_data.as_ptr(), + payload.as_mut_ptr().add(4), + len as usize, + ); + + // Convert to View with zero cost + std::mem::transmute::<[u8; 16], View>(payload) + } + } + + /// Process a data page based on its encoding type. + fn process_data_page( + &mut self, + data_page: &parquet2::page::DataPage, + views: &mut Vec, + buffers: &mut Vec>, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + let (_, _, values_buffer) = parquet2::page::split_buffer(data_page) + .map_err(|e| ErrorCode::StorageOther(format!("Failed to split buffer: {}", e)))?; + let remaining = data_page.num_values(); + + match data_page.encoding() { + Encoding::Plain => self.process_plain_encoding( + values_buffer, + remaining, + views, + buffers, + total_bytes_len, + ), + Encoding::RleDictionary | Encoding::PlainDictionary => self + .process_rle_dictionary_encoding( + values_buffer, + remaining, + views, + buffers, + total_bytes_len, + ), + _ => Err(ErrorCode::Internal(format!( + "Unsupported encoding for string column: {:?}", + data_page.encoding() + ))), + } + } +} + +impl<'a> Iterator for StringIter<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + if self.num_rows == 0 { + return None; + } + + // let chunk_size = self.chunk_size.unwrap_or(self.num_rows); + // let limit = std::cmp::min(chunk_size, self.num_rows); + let limit = self.chunk_size.unwrap_or(self.num_rows); + + let mut views = Vec::with_capacity(limit); + let mut buffers = Vec::new(); + let mut total_bytes_len = 0; + let mut processed_rows = 0; + + while processed_rows < limit { + let page = match self.pages.next_owned() { + Ok(Some(page)) => page, + Ok(None) => break, + Err(e) => return Some(Err(ErrorCode::StorageOther(e.to_string()))), + }; + + match page { + Page::Data(data_page) => { + if data_page.descriptor.primitive_type.physical_type != PhysicalType::ByteArray + { + return Some(Err(ErrorCode::Internal( + "Expected ByteArray type for string column".to_string(), + ))); + } + + let remaining_in_chunk = limit - processed_rows; + let page_rows = std::cmp::min(data_page.num_values(), remaining_in_chunk); + + if let Err(e) = self.process_data_page( + &data_page, + &mut views, + &mut buffers, + &mut total_bytes_len, + ) { + return Some(Err(e)); + } + + processed_rows += page_rows; + } + Page::Dict(dict_page) => { + if let Err(e) = self.process_dictionary_page(&dict_page) { + return Some(Err(e)); + } + } + } + } + + if processed_rows == 0 { + return None; + } + + self.num_rows -= processed_rows; + + // Calculate total buffer length for new_unchecked + let total_buffer_len = buffers.iter().map(|b| b.len()).sum(); + + let column = Utf8ViewColumn::new_unchecked( + views.into(), + buffers.into(), + total_bytes_len, + total_buffer_len, + ); + + Some(Ok(Column::String(column))) + } +} diff --git a/src/common/experimental_parquet_reader/src/lib.rs b/src/common/experimental_parquet_reader/src/lib.rs new file mode 100644 index 0000000000000..6809c8fe78d4a --- /dev/null +++ b/src/common/experimental_parquet_reader/src/lib.rs @@ -0,0 +1,23 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Direct deserialization from parquet to DataBlock + +mod column; +mod util; + +mod reader; + +pub use reader::column_reader::*; +pub use util::*; diff --git a/src/common/experimental_parquet_reader/src/reader/column_reader.rs b/src/common/experimental_parquet_reader/src/reader/column_reader.rs new file mode 100644 index 0000000000000..e8326114df302 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/reader/column_reader.rs @@ -0,0 +1,164 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::DecimalDataType; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::Column; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_storages_common_table_meta::meta::ColumnMeta; +use databend_storages_common_table_meta::meta::Compression; +use parquet2::compression::Compression as ParquetCompression; +use parquet2::metadata::Descriptor; +use parquet2::read::PageMetaData; +use parquet2::schema::types::PhysicalType; +use parquet2::schema::types::PrimitiveType; + +use crate::column::new_decimal128_iter; +use crate::column::new_decimal256_iter; +use crate::column::new_decimal64_iter; +use crate::column::new_int32_iter; +use crate::column::new_int64_iter; +use crate::column::DateIter; +use crate::column::IntegerMetadata; +use crate::column::StringIter; +use crate::reader::decompressor::Decompressor; +use crate::reader::page_reader::PageReader; + +pub type ColumnIter<'a> = Box> + Send + Sync + 'a>; + +pub fn data_chunk_to_col_iter<'a>( + meta: &ColumnMeta, + chunk: &'a [u8], + rows: usize, + column_descriptor: &Descriptor, + field: TableField, + compression: &Compression, +) -> Result> { + let pages = { + let meta = meta.as_parquet().unwrap(); + let page_meta_data = PageMetaData { + column_start: meta.offset, + num_values: meta.num_values as i64, + compression: to_parquet_compression(compression)?, + descriptor: (*column_descriptor).clone(), + }; + let pages = PageReader::new_with_page_meta(chunk, page_meta_data, usize::MAX); + Decompressor::new(pages, vec![]) + }; + + let typ = &column_descriptor.primitive_type; + + pages_to_column_iter(pages, typ, field, rows, None) +} + +fn pages_to_column_iter<'a>( + column: Decompressor<'a>, + types: &PrimitiveType, + field: TableField, + num_rows: usize, + chunk_size: Option, +) -> Result> { + let pages = column; + let parquet_physical_type = &types.physical_type; + + let (inner_data_type, is_nullable) = match &field.data_type { + TableDataType::Nullable(inner) => (inner.as_ref(), true), + other => (other, false), + }; + + match (parquet_physical_type, inner_data_type) { + (PhysicalType::Int32, TableDataType::Number(NumberDataType::Int32)) => { + Ok(Box::new(new_int32_iter(pages, num_rows, is_nullable, chunk_size))) + } + (PhysicalType::Int64, TableDataType::Number(NumberDataType::Int64)) => { + Ok(Box::new(new_int64_iter(pages, num_rows, is_nullable, chunk_size))) + } + (PhysicalType::ByteArray, TableDataType::String) => { + Ok(Box::new(StringIter::new(pages, num_rows, chunk_size))) + } + (PhysicalType::Int32, TableDataType::Decimal(DecimalDataType::Decimal64(_))) => { + unimplemented!("coming soon") + } + (PhysicalType::Int64, TableDataType::Decimal(DecimalDataType::Decimal64(decimal_size))) => { + Ok(Box::new(new_decimal64_iter( + pages, + num_rows, + decimal_size.precision(), + decimal_size.scale(), + is_nullable, + chunk_size, + ))) + } + // TODO: arrow 55.1.0 does not support Decimal64 yet, so we use Decimal128, but the storage format is Int64 + (PhysicalType::Int64, TableDataType::Decimal(DecimalDataType::Decimal128(decimal_size))) => { + Ok(Box::new(new_decimal64_iter( + pages, + num_rows, + decimal_size.precision(), + decimal_size.scale(), + is_nullable, + chunk_size, + ))) + } + (PhysicalType::FixedLenByteArray(_), TableDataType::Decimal(DecimalDataType::Decimal128(decimal_size))) => { + Ok(Box::new(new_decimal128_iter( + pages, + num_rows, + decimal_size.precision(), + decimal_size.scale(), + is_nullable, + chunk_size, + ))) + } + (PhysicalType::FixedLenByteArray(_), TableDataType::Decimal(DecimalDataType::Decimal256(decimal_size))) => { + Ok(Box::new(new_decimal256_iter( + pages, + num_rows, + decimal_size.precision(), + decimal_size.scale(), + is_nullable, + chunk_size, + ))) + } + (PhysicalType::Int32, TableDataType::Date) => { + Ok(Box::new(DateIter::new( + pages, + num_rows, + is_nullable, + IntegerMetadata, + chunk_size, + ))) + } + (physical_type, table_data_type) => Err(ErrorCode::StorageOther(format!( + "Unsupported combination: parquet_physical_type={:?}, field_data_type={:?}, nullable={}", + physical_type, table_data_type, is_nullable + ))), + } +} + +fn to_parquet_compression(meta_compression: &Compression) -> Result { + match meta_compression { + Compression::Lz4 => Err(ErrorCode::StorageOther( + "Legacy compression algorithm [Lz4] is no longer supported.", + )), + Compression::Lz4Raw => Ok(ParquetCompression::Lz4Raw), + Compression::Snappy => Ok(ParquetCompression::Snappy), + Compression::Zstd => Ok(ParquetCompression::Zstd), + Compression::Gzip => Ok(ParquetCompression::Gzip), + Compression::None => Ok(ParquetCompression::Uncompressed), + } +} diff --git a/src/common/experimental_parquet_reader/src/reader/decompressor.rs b/src/common/experimental_parquet_reader/src/reader/decompressor.rs new file mode 100644 index 0000000000000..7bb20c399ed95 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/reader/decompressor.rs @@ -0,0 +1,148 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Decompressor that integrates with zero-copy PageReader + +use parquet2::compression::Compression; +use parquet2::error::Error; +use parquet2::page::DataPage; +use parquet2::page::DictPage; +use parquet2::page::Page; +use parquet2::FallibleStreamingIterator; + +use crate::reader::page_reader::PageReader; +use crate::reader::pages::BorrowedCompressedPage; + +pub struct Decompressor<'a> { + page_reader: PageReader<'a>, + decompression_buffer: Vec, + current_page: Option, + was_decompressed: bool, +} + +impl<'a> Decompressor<'a> { + pub fn new(page_reader: PageReader<'a>, decompression_buffer: Vec) -> Self { + Self { + page_reader, + decompression_buffer, + current_page: None, + was_decompressed: false, + } + } + + fn decompress_borrowed_page( + compressed_page: BorrowedCompressedPage<'_>, + uncompressed_buffer: &mut Vec, + ) -> parquet2::error::Result { + let uncompressed_size = compressed_page.uncompressed_size(); + uncompressed_buffer.reserve(uncompressed_size); + + #[allow(clippy::uninit_vec)] + unsafe { + uncompressed_buffer.set_len(uncompressed_size); + } + + if !compressed_page.is_compressed() { + // No decompression needed - copy directly from the borrowed slice + uncompressed_buffer.extend_from_slice(compressed_page.data()); + } else { + // Decompress directly into the buffer + match compressed_page.compression() { + Compression::Lz4Raw => { + let _decompressed_len = + lz4_flex::decompress_into(compressed_page.data(), uncompressed_buffer) + .map_err(|e| { + Error::OutOfSpec(format!("LZ4 decompression failed: {}", e)) + })?; + } + Compression::Zstd => { + zstd::bulk::decompress_to_buffer(compressed_page.data(), uncompressed_buffer) + .map(|_| ()) + .map_err(|e| { + Error::OutOfSpec(format!("Zstd decompression failed: {}", e)) + })?; + } + _ => { + return Err(Error::FeatureNotSupported(format!( + "Compression {:?} not supported", + compressed_page.compression() + ))); + } + } + }; + + // Create a DataPage from the decompressed data + // Note: We need to take ownership of the buffer data here + let page = match compressed_page { + BorrowedCompressedPage::Data(compressed_data_page) => Page::Data(DataPage::new( + compressed_data_page.header, + std::mem::take(uncompressed_buffer), + compressed_data_page.descriptor, + None, + )), + BorrowedCompressedPage::Dict(compressed_dict_page) => Page::Dict(DictPage::new( + std::mem::take(uncompressed_buffer), + compressed_dict_page.num_values, + compressed_dict_page.is_sorted, + )), + }; + + Ok(page) + } + + #[allow(dead_code)] + pub fn next_owned(&mut self) -> Result, Error> { + let page_tuple = self.page_reader.next_page()?; + + if let Some(page) = page_tuple { + self.was_decompressed = page.compression() != Compression::Uncompressed; + + let decompress_page = + Self::decompress_borrowed_page(page, &mut self.decompression_buffer)?; + + Ok(Some(decompress_page)) + } else { + Ok(None) + } + } +} + +impl<'a> FallibleStreamingIterator for Decompressor<'a> { + type Item = Page; + type Error = Error; + + fn advance(&mut self) -> Result<(), Self::Error> { + self.current_page = None; + let page_tuple = self.page_reader.next_page()?; + + if let Some(page) = page_tuple { + self.was_decompressed = page.compression() != Compression::Uncompressed; + + let decompress_page = + Self::decompress_borrowed_page(page, &mut self.decompression_buffer)?; + + self.current_page = Some(decompress_page); + } + + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + self.current_page.as_ref() + } + + fn size_hint(&self) -> (usize, Option) { + (0, None) + } +} diff --git a/src/common/experimental_parquet_reader/src/reader/mod.rs b/src/common/experimental_parquet_reader/src/reader/mod.rs new file mode 100644 index 0000000000000..03cb9979b3337 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/reader/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod column_reader; +pub mod decompressor; +pub mod page_reader; +pub mod pages; diff --git a/src/common/experimental_parquet_reader/src/reader/page_reader.rs b/src/common/experimental_parquet_reader/src/reader/page_reader.rs new file mode 100644 index 0000000000000..8e5a51fb5b242 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/reader/page_reader.rs @@ -0,0 +1,178 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use parquet2::compression::Compression; +use parquet2::encoding::Encoding; +use parquet2::error::Error; +use parquet2::metadata::Descriptor; +use parquet2::page::DataPageHeader; +use parquet2::page::PageType; +use parquet2::page::ParquetPageHeader; +use parquet2::read::PageMetaData; +use parquet_format_safe::thrift::protocol::TCompactInputProtocol; + +use crate::reader::pages::BorrowedCompressedDataPage; +use crate::reader::pages::BorrowedCompressedDictPage; +use crate::reader::pages::BorrowedCompressedPage; + +/// "Zero-copy" Parquet page reader +pub struct PageReader<'a> { + raw_data_slice: &'a [u8], + compression: Compression, + seen_num_values: i64, + total_num_values: i64, + descriptor: Descriptor, + max_page_size: usize, +} + +impl<'a> PageReader<'a> { + pub fn new_with_page_meta( + raw_data: &'a [u8], + reader_meta: PageMetaData, + max_page_size: usize, + ) -> Self { + Self { + raw_data_slice: raw_data, + total_num_values: reader_meta.num_values, + compression: reader_meta.compression, + seen_num_values: 0, + descriptor: reader_meta.descriptor, + max_page_size, + } + } + + pub fn next_page(&mut self) -> parquet2::error::Result> { + if self.seen_num_values >= self.total_num_values { + return Ok(None); + }; + + let page_header = + read_page_header_from_slice(&mut self.raw_data_slice, self.max_page_size)?; + + self.seen_num_values += get_page_header(&page_header)? + .map(|x| x.num_values() as i64) + .unwrap_or_default(); + + let read_size: usize = page_header.compressed_page_size.try_into()?; + + if read_size > self.max_page_size { + return Err(Error::WouldOverAllocate); + } + + if self.raw_data_slice.len() < read_size { + return Err(Error::OutOfSpec( + "Not enough data in slice for page".to_string(), + )); + } + + let data_slice = &self.raw_data_slice[..read_size]; + self.raw_data_slice = &self.raw_data_slice[read_size..]; + + match page_header.type_.try_into()? { + PageType::DataPage => { + let header = page_header.data_page_header.ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v1 data page but the v1 data header is empty" + .to_string(), + ) + })?; + Ok(Some(BorrowedCompressedPage::new_data_page( + BorrowedCompressedDataPage::new( + DataPageHeader::V1(header), + data_slice, + self.compression, + page_header.uncompressed_page_size.try_into()?, + self.descriptor.clone(), + ), + ))) + } + PageType::DataPageV2 => { + let header = page_header.data_page_header_v2.ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v2 data page but the v2 data header is empty" + .to_string(), + ) + })?; + Ok(Some(BorrowedCompressedPage::new_data_page( + BorrowedCompressedDataPage::new( + DataPageHeader::V2(header), + data_slice, + self.compression, + page_header.uncompressed_page_size.try_into()?, + self.descriptor.clone(), + ), + ))) + } + PageType::DictionaryPage => { + let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a dictionary page but the dictionary header is empty".to_string(), + ) + })?; + let num_values = dict_header.num_values.try_into()?; + let is_sorted = dict_header.is_sorted.unwrap_or(false); + + Ok(Some(BorrowedCompressedPage::Dict( + BorrowedCompressedDictPage::new( + data_slice, + self.compression, + page_header.uncompressed_page_size.try_into()?, + num_values, + is_sorted, + ), + ))) + } + } + } +} + +fn read_page_header_from_slice( + reader: &mut &[u8], + max_size: usize, +) -> parquet2::error::Result { + let mut prot = TCompactInputProtocol::new(reader, max_size); + let page_header = ParquetPageHeader::read_from_in_protocol(&mut prot)?; + Ok(page_header) +} + +pub(crate) fn get_page_header( + header: &ParquetPageHeader, +) -> parquet2::error::Result> { + let type_ = header.type_.try_into()?; + Ok(match type_ { + PageType::DataPage => { + let header = header.data_page_header.clone().ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v1 data page but the v1 header is empty".to_string(), + ) + })?; + + let _: Encoding = header.encoding.try_into()?; + let _: Encoding = header.repetition_level_encoding.try_into()?; + let _: Encoding = header.definition_level_encoding.try_into()?; + + Some(DataPageHeader::V1(header)) + } + PageType::DataPageV2 => { + let header = header.data_page_header_v2.clone().ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v1 data page but the v1 header is empty".to_string(), + ) + })?; + let _: Encoding = header.encoding.try_into()?; + Some(DataPageHeader::V2(header)) + } + _ => None, + }) +} diff --git a/src/common/experimental_parquet_reader/src/reader/pages.rs b/src/common/experimental_parquet_reader/src/reader/pages.rs new file mode 100644 index 0000000000000..c78ea186acd75 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/reader/pages.rs @@ -0,0 +1,122 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Borrowed page types for zero-copy parquet reading +//! +//! This module provides page types that reference data without owning it, +//! enabling "zero-copy" reading when the source is a slice. + +use parquet2::compression::Compression; +use parquet2::metadata::Descriptor; +use parquet2::page::DataPageHeader; + +/// A compressed page that borrows its data from a slice +#[derive(Debug)] +pub enum BorrowedCompressedPage<'a> { + Data(Box>), + Dict(BorrowedCompressedDictPage<'a>), +} + +impl<'a> BorrowedCompressedPage<'a> { + pub fn new_data_page(data_page: BorrowedCompressedDataPage<'a>) -> Self { + Self::Data(Box::new(data_page)) + } +} + +/// A borrowed compressed data page +#[derive(Debug)] +pub struct BorrowedCompressedDataPage<'a> { + pub header: DataPageHeader, + pub buffer: &'a [u8], + pub compression: Compression, + pub uncompressed_page_size: usize, + pub descriptor: Descriptor, +} + +/// A borrowed compressed dictionary page +#[derive(Debug)] +pub struct BorrowedCompressedDictPage<'a> { + pub buffer: &'a [u8], + pub compression: Compression, + pub uncompressed_page_size: usize, + pub num_values: usize, + pub is_sorted: bool, +} + +impl<'a> BorrowedCompressedPage<'a> { + /// Get the compression type of this page + pub fn compression(&self) -> Compression { + match self { + BorrowedCompressedPage::Data(data_page) => data_page.compression, + BorrowedCompressedPage::Dict(dict_page) => dict_page.compression, + } + } + + /// Check if this page is compressed + pub fn is_compressed(&self) -> bool { + self.compression() != Compression::Uncompressed + } + + /// Get the uncompressed size of this page + pub fn uncompressed_size(&self) -> usize { + match self { + BorrowedCompressedPage::Data(data_page) => data_page.uncompressed_page_size, + BorrowedCompressedPage::Dict(dict_page) => dict_page.uncompressed_page_size, + } + } + + /// Get the compressed data as a slice + pub fn data(&self) -> &[u8] { + match self { + BorrowedCompressedPage::Data(data_page) => data_page.buffer, + BorrowedCompressedPage::Dict(dict_page) => dict_page.buffer, + } + } +} + +impl<'a> BorrowedCompressedDataPage<'a> { + pub fn new( + header: DataPageHeader, + buffer: &'a [u8], + compression: Compression, + uncompressed_page_size: usize, + descriptor: Descriptor, + ) -> Self { + Self { + header, + buffer, + compression, + uncompressed_page_size, + descriptor, + } + } +} + +impl<'a> BorrowedCompressedDictPage<'a> { + pub fn new( + buffer: &'a [u8], + compression: Compression, + uncompressed_page_size: usize, + num_values: usize, + is_sorted: bool, + ) -> Self { + Self { + buffer, + compression, + uncompressed_page_size, + num_values, + is_sorted, + } + } +} diff --git a/src/common/experimental_parquet_reader/src/util.rs b/src/common/experimental_parquet_reader/src/util.rs new file mode 100644 index 0000000000000..f693e2c8a044e --- /dev/null +++ b/src/common/experimental_parquet_reader/src/util.rs @@ -0,0 +1,101 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_expression::types::NumberDataType; +use databend_common_expression::TableDataType; +use parquet2::schema::types::PhysicalType; +use parquet2::schema::types::PrimitiveType; +use parquet2::schema::Repetition; + +pub fn from_table_field_type(field_name: String, field_type: &TableDataType) -> PrimitiveType { + let (inner_type, is_nullable) = match field_type { + TableDataType::Nullable(inner) => (inner.as_ref(), true), + other => (other, false), + }; + + let mut parquet_primitive_type = match inner_type { + TableDataType::String => PrimitiveType::from_physical(field_name, PhysicalType::ByteArray), + TableDataType::Number(number_type) => match number_type { + NumberDataType::Int8 => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + NumberDataType::Int16 => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + NumberDataType::Int32 => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + NumberDataType::Int64 => PrimitiveType::from_physical(field_name, PhysicalType::Int64), + NumberDataType::UInt8 => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + NumberDataType::UInt16 => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + NumberDataType::UInt32 => PrimitiveType::from_physical(field_name, PhysicalType::Int64), + NumberDataType::UInt64 => PrimitiveType::from_physical(field_name, PhysicalType::Int64), + NumberDataType::Float32 => { + PrimitiveType::from_physical(field_name, PhysicalType::Float) + } + NumberDataType::Float64 => { + PrimitiveType::from_physical(field_name, PhysicalType::Double) + } + }, + TableDataType::Decimal(decimal_type) => { + let precision = decimal_type.precision(); + let _scale = decimal_type.scale(); + if precision <= 9 { + PrimitiveType::from_physical(field_name, PhysicalType::Int32) + } else if precision <= 18 { + PrimitiveType::from_physical(field_name, PhysicalType::Int64) + } else { + let len = decimal_length_from_precision(precision as usize); + PrimitiveType::from_physical(field_name, PhysicalType::FixedLenByteArray(len)) + } + } + TableDataType::Date => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + TableDataType::Nullable(_) => unreachable!("Nullable should have been unwrapped"), + t => unimplemented!("Unsupported type: {:?} ", t), + }; + + if !is_nullable { + parquet_primitive_type.field_info.repetition = Repetition::Required; + } + + parquet_primitive_type +} + +fn decimal_length_from_precision(precision: usize) -> usize { + (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize +} + +pub fn calculate_parquet_max_levels(data_type: &TableDataType) -> (i16, i16) { + match data_type { + TableDataType::Boolean => (0, 0), + TableDataType::Binary => (0, 0), + TableDataType::String => (0, 0), + TableDataType::Number(_) => (0, 0), + TableDataType::Decimal(_) => (0, 0), + TableDataType::Timestamp => (0, 0), + TableDataType::Date => (0, 0), + TableDataType::Interval => (0, 0), + TableDataType::Bitmap => (0, 0), + TableDataType::Variant => (0, 0), + TableDataType::Geometry => (0, 0), + TableDataType::Geography => (0, 0), + TableDataType::Vector(_) => (0, 0), + + TableDataType::Nullable(inner) => { + let (inner_def, inner_rep) = calculate_parquet_max_levels(inner); + (inner_def + 1, inner_rep) + } + + TableDataType::Null + | TableDataType::EmptyArray + | TableDataType::EmptyMap + | TableDataType::Array(_) + | TableDataType::Map(_) + | TableDataType::Tuple { .. } => unimplemented!(), + } +} diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 1f974c749493b..cffe84a1269cf 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -845,9 +845,9 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(2..=u64::MAX)), }), - ("use_parquet2", DefaultSettingValue { + ("use_experimental_parquet_reader", DefaultSettingValue { value: UserSettingValue::UInt64(0), - desc: "This setting is deprecated", + desc: "Use experimental parquet reader to deserialize parquet data of fuse table.", mode: SettingMode::Both, scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 4cfd2b0a2fea9..e1d56a1835268 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -660,12 +660,12 @@ impl Settings { self.try_get_u64("auto_compaction_segments_limit") } - pub fn get_use_parquet2(&self) -> Result { - Ok(self.try_get_u64("use_parquet2")? != 0) + pub fn get_use_experimental_parquet_reader(&self) -> Result { + Ok(self.try_get_u64("use_experimental_parquet_reader")? != 0) } - pub fn set_use_parquet2(&self, val: bool) -> Result<()> { - self.try_set_u64("use_parquet2", u64::from(val)) + pub fn set_use_experimental_parquet_reader(&self, val: bool) -> Result<()> { + self.try_set_u64("use_experimental_parquet_reader", u64::from(val)) } pub fn get_enable_replace_into_partitioning(&self) -> Result { diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index 0601548e6e4bf..1b28b2a3578f5 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -20,6 +20,7 @@ databend-common-meta-app = { workspace = true } databend-common-meta-types = { workspace = true } databend-common-metrics = { workspace = true } databend-common-native = { workspace = true } +databend-common-parquet-reader-experimental = { workspace = true } databend-common-pipeline-core = { workspace = true } databend-common-pipeline-sinks = { workspace = true } @@ -62,6 +63,8 @@ match-template = { workspace = true } opendal = { workspace = true } parking_lot = { workspace = true } parquet = { workspace = true } + +parquet2 = { workspace = true } paste = { workspace = true } rand = { workspace = true } serde = { workspace = true } diff --git a/src/query/storages/fuse/src/io/read/block/block_reader.rs b/src/query/storages/fuse/src/io/read/block/block_reader.rs index 5cf347227f509..819e44376ae11 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use arrow_schema::Field; use arrow_schema::Schema; -use arrow_schema::SchemaRef; use databend_common_catalog::plan::Projection; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -37,14 +36,12 @@ use opendal::Operator; use crate::BlockReadResult; -// TODO: make BlockReader as a trait. #[derive(Clone)] pub struct BlockReader { pub(crate) ctx: Arc, pub(crate) operator: Operator, pub(crate) projection: Projection, pub(crate) projected_schema: TableSchemaRef, - pub(crate) arrow_schema: SchemaRef, pub(crate) project_indices: BTreeMap, pub(crate) project_column_nodes: Vec, pub(crate) default_vals: Vec, @@ -55,6 +52,7 @@ pub struct BlockReader { pub original_schema: TableSchemaRef, pub native_columns_reader: NativeColumnsReader, + pub use_experimental_parquet_reader: bool, } fn inner_project_field_default_values(default_vals: &[Scalar], paths: &[usize]) -> Result { @@ -141,12 +139,14 @@ impl BlockReader { let project_indices = Self::build_projection_indices(&project_column_nodes); + let use_experimental_parquet_reader = + ctx.get_settings().get_use_experimental_parquet_reader()?; + Ok(Arc::new(BlockReader { ctx, operator, projection, projected_schema, - arrow_schema: arrow_schema.into(), project_indices, project_column_nodes, default_vals, @@ -155,6 +155,7 @@ impl BlockReader { put_cache, original_schema: schema, native_columns_reader, + use_experimental_parquet_reader, })) } @@ -195,10 +196,6 @@ impl BlockReader { self.projected_schema.clone() } - pub fn arrow_schema(&self) -> SchemaRef { - self.arrow_schema.clone() - } - pub fn data_fields(&self) -> Vec { self.schema().fields().iter().map(DataField::from).collect() } diff --git a/src/query/storages/fuse/src/io/read/block/parquet/deserialize_v2.rs b/src/query/storages/fuse/src/io/read/block/parquet/deserialize_v2.rs new file mode 100644 index 0000000000000..df69de058f7e3 --- /dev/null +++ b/src/query/storages/fuse/src/io/read/block/parquet/deserialize_v2.rs @@ -0,0 +1,260 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockEntry; +use databend_common_expression::Column; +use databend_common_expression::ColumnId; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchema; +use databend_common_expression::Scalar; +use databend_common_parquet_reader_experimental as v2_reader; +use databend_common_storage::ColumnNode; +use databend_storages_common_cache::CacheAccessor; +use databend_storages_common_cache::CacheManager; +use databend_storages_common_cache::SizedColumnArray; +use databend_storages_common_cache::TableDataCacheKey; +use databend_storages_common_table_meta::meta::ColumnMeta; +use databend_storages_common_table_meta::meta::Compression; +use parquet2::metadata::Descriptor; +use v2_reader::calculate_parquet_max_levels; +use v2_reader::data_chunk_to_col_iter; +use v2_reader::from_table_field_type; + +use super::BlockReader; +use crate::io::read::block::block_reader_merge_io::DataItem; + +pub struct FieldDeserializationContext<'a> { + pub(crate) column_metas: &'a HashMap, + pub(crate) column_chunks: &'a HashMap>, + pub(crate) num_rows: usize, + pub(crate) compression: &'a Compression, +} + +enum DeserializedColumn<'a> { + FromCache(&'a Arc), + Column((ColumnId, Column, usize)), +} + +impl BlockReader { + pub(crate) fn deserialize_v2( + &self, + block_path: &str, + num_rows: usize, + compression: &Compression, + column_metas: &HashMap, + column_chunks: HashMap, + ) -> Result { + if column_chunks.is_empty() { + return self.build_default_values_block(num_rows); + } + + let mut need_default_vals = Vec::with_capacity(self.project_column_nodes.len()); + let mut need_to_fill_default_val = false; + let mut deserialized_column_arrays = Vec::with_capacity(self.projection.len()); + let field_deserialization_ctx = FieldDeserializationContext { + column_metas, + column_chunks: &column_chunks, + num_rows, + compression, + }; + + for column_node in &self.project_column_nodes { + let deserialized_column = self + .deserialize_field_v2(&field_deserialization_ctx, column_node) + .map_err(|e| { + e.add_message(format!( + "failed to deserialize column: {:?}, location {} ", + column_node, block_path + )) + })?; + match deserialized_column { + None => { + need_to_fill_default_val = true; + need_default_vals.push(true); + } + Some(v) => { + deserialized_column_arrays.push((v, column_node.table_field.data_type())); + need_default_vals.push(false); + } + } + } + + let cache = if self.put_cache { + CacheManager::instance().get_table_data_array_cache() + } else { + None + }; + + let mut block_entries = Vec::with_capacity(deserialized_column_arrays.len()); + for (col, table_data_type) in deserialized_column_arrays { + // TODO we should cache deserialized data as Column (instead of arrow Array) + // Converting arrow array to column may be expensive + let entry = match col { + DeserializedColumn::FromCache(arrow_array) => { + BlockEntry::Column(Column::from_arrow_rs( + arrow_array.0.clone(), + &(&table_data_type.clone()).into(), + )?) + } + DeserializedColumn::Column((column_id, col, size)) => { + if let Some(cache) = &cache { + let meta = column_metas.get(&column_id).unwrap(); + let (offset, len) = meta.offset_length(); + let key = TableDataCacheKey::new(block_path, column_id, offset, len); + let array = col.clone().into_arrow_rs(); + cache.insert(key.into(), (array, size)); + }; + BlockEntry::Column(col) + } + }; + block_entries.push(entry); + } + + // build data block + let data_block = if !need_to_fill_default_val { + assert_eq!(block_entries.len(), self.projected_schema.num_fields()); + DataBlock::new(block_entries, num_rows) + } else { + let mut default_vals = Vec::with_capacity(need_default_vals.len()); + for (i, need_default_val) in need_default_vals.iter().enumerate() { + if !need_default_val { + default_vals.push(None); + } else { + default_vals.push(Some(self.default_vals[i].clone())); + } + } + + create_with_opt_default_value( + block_entries, + &self.data_schema(), + &default_vals, + num_rows, + )? + }; + Ok(data_block) + } + + fn deserialize_field_v2<'a>( + &self, + deserialization_context: &'a FieldDeserializationContext, + column_node: &ColumnNode, + ) -> Result>> { + let is_nested = column_node.is_nested; + + if is_nested { + unimplemented!("Nested type is not supported now"); + } + + let column_chunks = deserialization_context.column_chunks; + let compression = deserialization_context.compression; + + let (max_def_level, max_rep_level) = + calculate_parquet_max_levels(&column_node.table_field.data_type); + + let parquet_primitive_type = from_table_field_type( + column_node.table_field.name.clone(), + &column_node.table_field.data_type, + ); + + let column_descriptor = Descriptor { + primitive_type: parquet_primitive_type, + max_def_level, + max_rep_level, + }; + + // Since we only support leaf column now + let leaf_column_id = 0; + let column_id = column_node.leaf_column_ids[leaf_column_id]; + + let Some(column_meta) = deserialization_context.column_metas.get(&column_id) else { + return Ok(None); + }; + let Some(chunk) = column_chunks.get(&column_id) else { + return Ok(None); + }; + + match chunk { + DataItem::RawData(data) => { + let field_uncompressed_size = data.len(); + let num_rows = deserialization_context.num_rows; + let field_name = column_node.field.name().to_owned(); + + let mut column_iter = data_chunk_to_col_iter( + column_meta, + data, + num_rows, + &column_descriptor, + column_node.table_field.clone(), + compression, + )?; + + let column = column_iter.next().transpose()?.ok_or_else(|| { + ErrorCode::StorageOther(format!("no array found for field {field_name}")) + })?; + + // Since we deserialize all the rows of this column, the iterator should be drained + assert!(column_iter.next().is_none()); + // Deserialized from raw bytes, and intended to be cached + Ok(Some(DeserializedColumn::Column(( + column_id, + column, + field_uncompressed_size, + )))) + } + DataItem::ColumnArray(column_array) => { + if is_nested { + return Err(ErrorCode::StorageOther( + "unexpected nested field: nested leaf field hits cached", + )); + } + // since it is not nested, this field contains only one column + Ok(Some(DeserializedColumn::FromCache(column_array))) + } + } + } +} + +fn create_with_opt_default_value( + block_entries: Vec, + schema: &DataSchema, + default_vals: &[Option], + num_rows: usize, +) -> Result { + let schema_fields = schema.fields(); + let mut block_entries_iter = block_entries.into_iter(); + + let mut entries = Vec::with_capacity(default_vals.len()); + for (i, default_val) in default_vals.iter().enumerate() { + let field = &schema_fields[i]; + let data_type = field.data_type(); + + let entry = match default_val { + Some(default_val) => { + BlockEntry::new_const_column(data_type.clone(), default_val.to_owned(), num_rows) + } + None => block_entries_iter + .next() + .expect("arrays should have enough elements"), + }; + + entries.push(entry); + } + + Ok(DataBlock::new(entries, num_rows)) +} diff --git a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs index 781bb843c3f8f..ec74286470378 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs @@ -34,6 +34,8 @@ use databend_storages_common_table_meta::meta::Compression; mod adapter; mod deserialize; +mod deserialize_v2; + pub use adapter::RowGroupImplBuilder; pub use deserialize::column_chunks_to_record_batch; @@ -48,6 +50,33 @@ impl BlockReader { column_chunks: HashMap, compression: &Compression, block_path: &str, + ) -> databend_common_exception::Result { + if self.use_experimental_parquet_reader { + self.deserialize_v2( + block_path, + num_rows, + compression, + column_metas, + column_chunks, + ) + } else { + self.deserialize_using_arrow( + num_rows, + column_metas, + column_chunks, + compression, + block_path, + ) + } + } + + pub fn deserialize_using_arrow( + &self, + num_rows: usize, + column_metas: &HashMap, + column_chunks: HashMap, + compression: &Compression, + block_path: &str, ) -> databend_common_exception::Result { if column_chunks.is_empty() { return self.build_default_values_block(num_rows); diff --git a/tests/sqllogictests/suites/base/03_common/03_extra_new_reader.test b/tests/sqllogictests/suites/base/03_common/03_extra_new_reader.test new file mode 100644 index 0000000000000..bebe1c2f65942 --- /dev/null +++ b/tests/sqllogictests/suites/base/03_common/03_extra_new_reader.test @@ -0,0 +1,84 @@ +statement ok +create or replace database test_new_reader; + +statement ok +use test_new_reader; + +statement ok +SET use_parquet2= 1; + + +statement ok +create table t1 (a int not null); + +statement ok +insert into t1 values(1), (3), (5); + +query I +select * from t1 order by a; +---- +1 +3 +5 + +statement ok +create table t2 (a int); + +statement ok +insert into t2 values(1), (null), (5); + +query I +select * from t2 order by a; +---- +1 +5 +NULL + + +statement ok +create table t3 (a int); + +statement ok +create table r like t3 engine = random; + +statement ok +insert into t3 select * from r limit 100000; + +statement ok +select sum(a) from t3 ignore_result; + +statement ok +CREATE TABLE IF NOT EXISTS lineitem +( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +); + +statement ok +create table lr like lineitem engine = random; + +statement ok +insert into lineitem select * from lr limit 1000; + +statement ok +insert into lineitem select * from lr limit 1000; + +statement ok +select * from lineitem ignore_result; + + +