Skip to content

Commit 4c53d73

Browse files
authored
Merge branch 'main' into perf-factorial
2 parents 5e680a4 + 79f67b8 commit 4c53d73

File tree

4 files changed

+969
-156
lines changed

4 files changed

+969
-156
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod test {
3030
use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
3131
use datafusion_execution::TaskContext;
3232
use datafusion_execution::config::SessionConfig;
33+
use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
3334
use datafusion_expr_common::operator::Operator;
3435
use datafusion_functions_aggregate::count::count_udaf;
3536
use datafusion_physical_expr::Partitioning;
@@ -52,6 +53,7 @@ mod test {
5253
use datafusion_physical_plan::repartition::RepartitionExec;
5354
use datafusion_physical_plan::sorts::sort::SortExec;
5455
use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
56+
use datafusion_physical_plan::windows::{WindowAggExec, create_window_expr};
5557
use datafusion_physical_plan::{
5658
ExecutionPlan, ExecutionPlanProperties, execute_stream_partitioned,
5759
get_plan_string,
@@ -1154,4 +1156,105 @@ mod test {
11541156

11551157
Ok(())
11561158
}
1159+
1160+
#[tokio::test]
1161+
async fn test_statistic_by_partition_of_window_agg() -> Result<()> {
1162+
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
1163+
1164+
let window_expr = create_window_expr(
1165+
&WindowFunctionDefinition::AggregateUDF(count_udaf()),
1166+
"count".to_owned(),
1167+
&[col("id", &scan.schema())?],
1168+
&[], // no partition by
1169+
&[PhysicalSortExpr::new(
1170+
col("id", &scan.schema())?,
1171+
SortOptions::default(),
1172+
)],
1173+
Arc::new(WindowFrame::new(Some(false))),
1174+
scan.schema(),
1175+
false,
1176+
false,
1177+
None,
1178+
)?;
1179+
1180+
let window_agg: Arc<dyn ExecutionPlan> =
1181+
Arc::new(WindowAggExec::try_new(vec![window_expr], scan, true)?);
1182+
1183+
// Verify partition statistics are properly propagated (not unknown)
1184+
let statistics = (0..window_agg.output_partitioning().partition_count())
1185+
.map(|idx| window_agg.partition_statistics(Some(idx)))
1186+
.collect::<Result<Vec<_>>>()?;
1187+
1188+
assert_eq!(statistics.len(), 2);
1189+
1190+
// Window functions preserve input row counts and column statistics
1191+
// but add unknown statistics for the new window column
1192+
let expected_statistic_partition_1 = Statistics {
1193+
num_rows: Precision::Exact(2),
1194+
total_byte_size: Precision::Absent,
1195+
column_statistics: vec![
1196+
ColumnStatistics {
1197+
null_count: Precision::Exact(0),
1198+
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
1199+
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
1200+
sum_value: Precision::Absent,
1201+
distinct_count: Precision::Absent,
1202+
byte_size: Precision::Exact(8),
1203+
},
1204+
ColumnStatistics {
1205+
null_count: Precision::Exact(0),
1206+
max_value: Precision::Exact(ScalarValue::Date32(Some(
1207+
DATE_2025_03_02,
1208+
))),
1209+
min_value: Precision::Exact(ScalarValue::Date32(Some(
1210+
DATE_2025_03_01,
1211+
))),
1212+
sum_value: Precision::Absent,
1213+
distinct_count: Precision::Absent,
1214+
byte_size: Precision::Exact(8),
1215+
},
1216+
ColumnStatistics::new_unknown(), // window column
1217+
],
1218+
};
1219+
1220+
let expected_statistic_partition_2 = Statistics {
1221+
num_rows: Precision::Exact(2),
1222+
total_byte_size: Precision::Absent,
1223+
column_statistics: vec![
1224+
ColumnStatistics {
1225+
null_count: Precision::Exact(0),
1226+
max_value: Precision::Exact(ScalarValue::Int32(Some(2))),
1227+
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1228+
sum_value: Precision::Absent,
1229+
distinct_count: Precision::Absent,
1230+
byte_size: Precision::Exact(8),
1231+
},
1232+
ColumnStatistics {
1233+
null_count: Precision::Exact(0),
1234+
max_value: Precision::Exact(ScalarValue::Date32(Some(
1235+
DATE_2025_03_04,
1236+
))),
1237+
min_value: Precision::Exact(ScalarValue::Date32(Some(
1238+
DATE_2025_03_03,
1239+
))),
1240+
sum_value: Precision::Absent,
1241+
distinct_count: Precision::Absent,
1242+
byte_size: Precision::Exact(8),
1243+
},
1244+
ColumnStatistics::new_unknown(), // window column
1245+
],
1246+
};
1247+
1248+
assert_eq!(statistics[0], expected_statistic_partition_1);
1249+
assert_eq!(statistics[1], expected_statistic_partition_2);
1250+
1251+
// Verify the statistics match actual execution results
1252+
let expected_stats = vec![
1253+
ExpectedStatistics::NonEmpty(3, 4, 2),
1254+
ExpectedStatistics::NonEmpty(1, 2, 2),
1255+
];
1256+
validate_statistics_with_data(window_agg, expected_stats, 0).await?;
1257+
1258+
Ok(())
1259+
}
11571260
}

datafusion/datasource-parquet/src/opener.rs

Lines changed: 111 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,16 @@ pub(super) struct ParquetOpener {
121121
}
122122

123123
/// Represents a prepared access plan with optional row selection
124-
struct PreparedAccessPlan {
124+
pub(crate) struct PreparedAccessPlan {
125125
/// Row group indexes to read
126-
row_group_indexes: Vec<usize>,
126+
pub(crate) row_group_indexes: Vec<usize>,
127127
/// Optional row selection for filtering within row groups
128-
row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
128+
pub(crate) row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
129129
}
130130

131131
impl PreparedAccessPlan {
132132
/// Create a new prepared access plan from a ParquetAccessPlan
133-
fn from_access_plan(
133+
pub(crate) fn from_access_plan(
134134
access_plan: ParquetAccessPlan,
135135
rg_metadata: &[RowGroupMetaData],
136136
) -> Result<Self> {
@@ -144,17 +144,23 @@ impl PreparedAccessPlan {
144144
}
145145

146146
/// Reverse the access plan for reverse scanning
147-
fn reverse(
147+
pub(crate) fn reverse(
148148
mut self,
149149
file_metadata: &parquet::file::metadata::ParquetMetaData,
150150
) -> Result<Self> {
151+
// Get the row group indexes before reversing
152+
let row_groups_to_scan = self.row_group_indexes.clone();
153+
151154
// Reverse the row group indexes
152155
self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect();
153156

154157
// If we have a row selection, reverse it to match the new row group order
155158
if let Some(row_selection) = self.row_selection {
156-
self.row_selection =
157-
Some(reverse_row_selection(&row_selection, file_metadata)?);
159+
self.row_selection = Some(reverse_row_selection(
160+
&row_selection,
161+
file_metadata,
162+
&row_groups_to_scan, // Pass the original (non-reversed) row group indexes
163+
)?);
158164
}
159165

160166
Ok(self)
@@ -964,7 +970,7 @@ mod test {
964970
use std::sync::Arc;
965971

966972
use super::{ConstantColumns, constant_columns_from_stats};
967-
use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener};
973+
use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener};
968974
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
969975
use bytes::{BufMut, BytesMut};
970976
use datafusion_common::{
@@ -1851,4 +1857,101 @@ mod test {
18511857
"Reverse scan should reverse row group order while maintaining correct RowSelection for each group"
18521858
);
18531859
}
1860+
1861+
#[tokio::test]
1862+
async fn test_reverse_scan_with_non_contiguous_row_groups() {
1863+
use parquet::file::properties::WriterProperties;
1864+
1865+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1866+
1867+
// Create 4 batches (4 row groups)
1868+
let batch0 = record_batch!(("a", Int32, vec![Some(1), Some(2)])).unwrap();
1869+
let batch1 = record_batch!(("a", Int32, vec![Some(3), Some(4)])).unwrap();
1870+
let batch2 = record_batch!(("a", Int32, vec![Some(5), Some(6)])).unwrap();
1871+
let batch3 = record_batch!(("a", Int32, vec![Some(7), Some(8)])).unwrap();
1872+
1873+
let props = WriterProperties::builder()
1874+
.set_max_row_group_size(2)
1875+
.build();
1876+
1877+
let data_len = write_parquet_batches(
1878+
Arc::clone(&store),
1879+
"test.parquet",
1880+
vec![batch0.clone(), batch1, batch2, batch3],
1881+
Some(props),
1882+
)
1883+
.await;
1884+
1885+
let schema = batch0.schema();
1886+
1887+
use crate::ParquetAccessPlan;
1888+
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1889+
1890+
// KEY: Skip RG1 (non-contiguous!)
1891+
// Only scan row groups: [0, 2, 3]
1892+
let mut access_plan = ParquetAccessPlan::new(vec![
1893+
RowGroupAccess::Scan, // RG0
1894+
RowGroupAccess::Skip, // RG1 - SKIPPED!
1895+
RowGroupAccess::Scan, // RG2
1896+
RowGroupAccess::Scan, // RG3
1897+
]);
1898+
1899+
// Add RowSelection for each scanned row group
1900+
// RG0: select first row (1), skip second (2)
1901+
access_plan.scan_selection(
1902+
0,
1903+
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
1904+
);
1905+
// RG1: skipped, no selection needed
1906+
// RG2: select first row (5), skip second (6)
1907+
access_plan.scan_selection(
1908+
2,
1909+
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
1910+
);
1911+
// RG3: select first row (7), skip second (8)
1912+
access_plan.scan_selection(
1913+
3,
1914+
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
1915+
);
1916+
1917+
let file = PartitionedFile::new(
1918+
"test.parquet".to_string(),
1919+
u64::try_from(data_len).unwrap(),
1920+
)
1921+
.with_extensions(Arc::new(access_plan));
1922+
1923+
let make_opener = |reverse_scan: bool| {
1924+
ParquetOpenerBuilder::new()
1925+
.with_store(Arc::clone(&store))
1926+
.with_schema(Arc::clone(&schema))
1927+
.with_projection_indices(&[0])
1928+
.with_reverse_row_groups(reverse_scan)
1929+
.build()
1930+
};
1931+
1932+
// Forward scan: RG0(1), RG2(5), RG3(7)
1933+
// Note: RG1 is completely skipped
1934+
let opener = make_opener(false);
1935+
let stream = opener.open(file.clone()).unwrap().await.unwrap();
1936+
let forward_values = collect_int32_values(stream).await;
1937+
1938+
assert_eq!(
1939+
forward_values,
1940+
vec![1, 5, 7],
1941+
"Forward scan with non-contiguous row groups"
1942+
);
1943+
1944+
// Reverse scan: RG3(7), RG2(5), RG0(1)
1945+
// WITHOUT the bug fix, this would return WRONG values
1946+
// because the RowSelection would be incorrectly mapped
1947+
let opener = make_opener(true);
1948+
let stream = opener.open(file).unwrap().await.unwrap();
1949+
let reverse_values = collect_int32_values(stream).await;
1950+
1951+
assert_eq!(
1952+
reverse_values,
1953+
vec![7, 5, 1],
1954+
"Reverse scan with non-contiguous row groups should correctly map RowSelection"
1955+
);
1956+
}
18541957
}

0 commit comments

Comments
 (0)