Skip to content

Commit 27de50d

Browse files
authored
fix: Reverse row selection should respect the row group index (#19557)
## Which issue does this PR close? - Closes [#19535](#19535) ## Rationale for this change Reverse row selection should respect the row group index, this PR will fix the issue. ## What changes are included in this PR? Reverse row selection should respect the row group index, this PR will fix the issue. ## Are these changes tested? Yes ## Are there any user-facing changes? No
1 parent 1ce4b51 commit 27de50d

File tree

2 files changed

+851
-133
lines changed

2 files changed

+851
-133
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 111 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,16 @@ pub(super) struct ParquetOpener {
121121
}
122122

123123
/// Represents a prepared access plan with optional row selection
124-
struct PreparedAccessPlan {
124+
pub(crate) struct PreparedAccessPlan {
125125
/// Row group indexes to read
126-
row_group_indexes: Vec<usize>,
126+
pub(crate) row_group_indexes: Vec<usize>,
127127
/// Optional row selection for filtering within row groups
128-
row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
128+
pub(crate) row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
129129
}
130130

131131
impl PreparedAccessPlan {
132132
/// Create a new prepared access plan from a ParquetAccessPlan
133-
fn from_access_plan(
133+
pub(crate) fn from_access_plan(
134134
access_plan: ParquetAccessPlan,
135135
rg_metadata: &[RowGroupMetaData],
136136
) -> Result<Self> {
@@ -144,17 +144,23 @@ impl PreparedAccessPlan {
144144
}
145145

146146
/// Reverse the access plan for reverse scanning
147-
fn reverse(
147+
pub(crate) fn reverse(
148148
mut self,
149149
file_metadata: &parquet::file::metadata::ParquetMetaData,
150150
) -> Result<Self> {
151+
// Get the row group indexes before reversing
152+
let row_groups_to_scan = self.row_group_indexes.clone();
153+
151154
// Reverse the row group indexes
152155
self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect();
153156

154157
// If we have a row selection, reverse it to match the new row group order
155158
if let Some(row_selection) = self.row_selection {
156-
self.row_selection =
157-
Some(reverse_row_selection(&row_selection, file_metadata)?);
159+
self.row_selection = Some(reverse_row_selection(
160+
&row_selection,
161+
file_metadata,
162+
&row_groups_to_scan, // Pass the original (non-reversed) row group indexes
163+
)?);
158164
}
159165

160166
Ok(self)
@@ -964,7 +970,7 @@ mod test {
964970
use std::sync::Arc;
965971

966972
use super::{ConstantColumns, constant_columns_from_stats};
967-
use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener};
973+
use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener};
968974
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
969975
use bytes::{BufMut, BytesMut};
970976
use datafusion_common::{
@@ -1851,4 +1857,101 @@ mod test {
18511857
"Reverse scan should reverse row group order while maintaining correct RowSelection for each group"
18521858
);
18531859
}
1860+
1861+
#[tokio::test]
1862+
async fn test_reverse_scan_with_non_contiguous_row_groups() {
1863+
use parquet::file::properties::WriterProperties;
1864+
1865+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1866+
1867+
// Create 4 batches (4 row groups)
1868+
let batch0 = record_batch!(("a", Int32, vec![Some(1), Some(2)])).unwrap();
1869+
let batch1 = record_batch!(("a", Int32, vec![Some(3), Some(4)])).unwrap();
1870+
let batch2 = record_batch!(("a", Int32, vec![Some(5), Some(6)])).unwrap();
1871+
let batch3 = record_batch!(("a", Int32, vec![Some(7), Some(8)])).unwrap();
1872+
1873+
let props = WriterProperties::builder()
1874+
.set_max_row_group_size(2)
1875+
.build();
1876+
1877+
let data_len = write_parquet_batches(
1878+
Arc::clone(&store),
1879+
"test.parquet",
1880+
vec![batch0.clone(), batch1, batch2, batch3],
1881+
Some(props),
1882+
)
1883+
.await;
1884+
1885+
let schema = batch0.schema();
1886+
1887+
use crate::ParquetAccessPlan;
1888+
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1889+
1890+
// KEY: Skip RG1 (non-contiguous!)
1891+
// Only scan row groups: [0, 2, 3]
1892+
let mut access_plan = ParquetAccessPlan::new(vec![
1893+
RowGroupAccess::Scan, // RG0
1894+
RowGroupAccess::Skip, // RG1 - SKIPPED!
1895+
RowGroupAccess::Scan, // RG2
1896+
RowGroupAccess::Scan, // RG3
1897+
]);
1898+
1899+
// Add RowSelection for each scanned row group
1900+
// RG0: select first row (1), skip second (2)
1901+
access_plan.scan_selection(
1902+
0,
1903+
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
1904+
);
1905+
// RG1: skipped, no selection needed
1906+
// RG2: select first row (5), skip second (6)
1907+
access_plan.scan_selection(
1908+
2,
1909+
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
1910+
);
1911+
// RG3: select first row (7), skip second (8)
1912+
access_plan.scan_selection(
1913+
3,
1914+
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
1915+
);
1916+
1917+
let file = PartitionedFile::new(
1918+
"test.parquet".to_string(),
1919+
u64::try_from(data_len).unwrap(),
1920+
)
1921+
.with_extensions(Arc::new(access_plan));
1922+
1923+
let make_opener = |reverse_scan: bool| {
1924+
ParquetOpenerBuilder::new()
1925+
.with_store(Arc::clone(&store))
1926+
.with_schema(Arc::clone(&schema))
1927+
.with_projection_indices(&[0])
1928+
.with_reverse_row_groups(reverse_scan)
1929+
.build()
1930+
};
1931+
1932+
// Forward scan: RG0(1), RG2(5), RG3(7)
1933+
// Note: RG1 is completely skipped
1934+
let opener = make_opener(false);
1935+
let stream = opener.open(file.clone()).unwrap().await.unwrap();
1936+
let forward_values = collect_int32_values(stream).await;
1937+
1938+
assert_eq!(
1939+
forward_values,
1940+
vec![1, 5, 7],
1941+
"Forward scan with non-contiguous row groups"
1942+
);
1943+
1944+
// Reverse scan: RG3(7), RG2(5), RG0(1)
1945+
// WITHOUT the bug fix, this would return WRONG values
1946+
// because the RowSelection would be incorrectly mapped
1947+
let opener = make_opener(true);
1948+
let stream = opener.open(file).unwrap().await.unwrap();
1949+
let reverse_values = collect_int32_values(stream).await;
1950+
1951+
assert_eq!(
1952+
reverse_values,
1953+
vec![7, 5, 1],
1954+
"Reverse scan with non-contiguous row groups should correctly map RowSelection"
1955+
);
1956+
}
18541957
}

0 commit comments

Comments
 (0)