Skip to content

Commit 6abe7ea

Browse files
committed
Revert "fix: validate inter-file ordering in eq_properties() (apache#20329)"
This reverts commit 53b0ffb.
1 parent 95d0617 commit 6abe7ea

File tree

5 files changed

+47
-660
lines changed

5 files changed

+47
-660
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,7 @@ mod test {
826826
let plan_string = get_plan_string(&aggregate_exec_partial).swap_remove(0);
827827
assert_snapshot!(
828828
plan_string,
829-
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]"
829+
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)], ordering_mode=Sorted"
830830
);
831831

832832
let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?;

datafusion/datasource/src/file_scan_config.rs

Lines changed: 44 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -736,7 +736,7 @@ impl DataSource for FileScanConfig {
736736
let schema = self.file_source.table_schema().table_schema();
737737
let mut eq_properties = EquivalenceProperties::new_with_orderings(
738738
Arc::clone(schema),
739-
self.validated_output_ordering(),
739+
self.output_ordering.clone(),
740740
)
741741
.with_constraints(self.constraints.clone());
742742

@@ -926,40 +926,6 @@ impl DataSource for FileScanConfig {
926926
}
927927

928928
impl FileScanConfig {
929-
/// Returns only the output orderings that are validated against actual
930-
/// file group statistics.
931-
///
932-
/// For example, individual files may be ordered by `col1 ASC`,
933-
/// but if we have files with these min/max statistics in a single partition / file group:
934-
///
935-
/// - file1: min(col1) = 10, max(col1) = 20
936-
/// - file2: min(col1) = 5, max(col1) = 15
937-
///
938-
/// Because reading file1 followed by file2 would produce out-of-order output (there is overlap
939-
/// in the ranges), we cannot retain `col1 ASC` as a valid output ordering.
940-
///
941-
/// Similarly this would not be a valid order (non-overlapping ranges but not ordered):
942-
///
943-
/// - file1: min(col1) = 20, max(col1) = 30
944-
/// - file2: min(col1) = 10, max(col1) = 15
945-
///
946-
/// On the other hand if we had:
947-
///
948-
/// - file1: min(col1) = 5, max(col1) = 15
949-
/// - file2: min(col1) = 16, max(col1) = 25
950-
///
951-
/// Then we know that reading file1 followed by file2 will produce ordered output,
952-
/// so `col1 ASC` would be retained.
953-
///
954-
/// Note that we are checking for ordering *within* *each* file group / partition,
955-
/// files in different partitions are read independently and do not affect each other's ordering.
956-
/// Merging of the multiple partition streams into a single ordered stream is handled
957-
/// upstream e.g. by `SortPreservingMergeExec`.
958-
fn validated_output_ordering(&self) -> Vec<LexOrdering> {
959-
let schema = self.file_source.table_schema().table_schema();
960-
validate_orderings(&self.output_ordering, schema, &self.file_groups, None)
961-
}
962-
963929
/// Get the file schema (schema of the files without partition columns)
964930
pub fn file_schema(&self) -> &SchemaRef {
965931
self.file_source.table_schema().file_schema()
@@ -1334,51 +1300,6 @@ fn ordered_column_indices_from_projection(
13341300
.collect::<Option<Vec<usize>>>()
13351301
}
13361302

1337-
/// Check whether a given ordering is valid for all file groups by verifying
1338-
/// that files within each group are sorted according to their min/max statistics.
1339-
///
1340-
/// For single-file (or empty) groups, the ordering is trivially valid.
1341-
/// For multi-file groups, we check that the min/max statistics for the sort
1342-
/// columns are in order and non-overlapping (or touching at boundaries).
1343-
///
1344-
/// `projection` maps projected column indices back to table-schema indices
1345-
/// when validating after projection; pass `None` when validating at
1346-
/// table-schema level.
1347-
fn is_ordering_valid_for_file_groups(
1348-
file_groups: &[FileGroup],
1349-
ordering: &LexOrdering,
1350-
schema: &SchemaRef,
1351-
projection: Option<&[usize]>,
1352-
) -> bool {
1353-
file_groups.iter().all(|group| {
1354-
if group.len() <= 1 {
1355-
return true; // single-file groups are trivially sorted
1356-
}
1357-
match MinMaxStatistics::new_from_files(ordering, schema, projection, group.iter())
1358-
{
1359-
Ok(stats) => stats.is_sorted(),
1360-
Err(_) => false, // can't prove sorted → reject
1361-
}
1362-
})
1363-
}
1364-
1365-
/// Filters orderings to retain only those valid for all file groups,
1366-
/// verified via min/max statistics.
1367-
fn validate_orderings(
1368-
orderings: &[LexOrdering],
1369-
schema: &SchemaRef,
1370-
file_groups: &[FileGroup],
1371-
projection: Option<&[usize]>,
1372-
) -> Vec<LexOrdering> {
1373-
orderings
1374-
.iter()
1375-
.filter(|ordering| {
1376-
is_ordering_valid_for_file_groups(file_groups, ordering, schema, projection)
1377-
})
1378-
.cloned()
1379-
.collect()
1380-
}
1381-
13821303
/// The various listing tables does not attempt to read all files
13831304
/// concurrently, instead they will read files in sequence within a
13841305
/// partition. This is an important property as it allows plans to
@@ -1445,47 +1366,52 @@ fn get_projected_output_ordering(
14451366
let projected_orderings =
14461367
project_orderings(&base_config.output_ordering, projected_schema);
14471368

1448-
let indices = base_config
1449-
.file_source
1450-
.projection()
1451-
.as_ref()
1452-
.map(|p| ordered_column_indices_from_projection(p));
1453-
1454-
match indices {
1455-
Some(Some(indices)) => {
1456-
// Simple column projection — validate with statistics
1457-
validate_orderings(
1458-
&projected_orderings,
1459-
projected_schema,
1460-
&base_config.file_groups,
1461-
Some(indices.as_slice()),
1462-
)
1463-
}
1464-
None => {
1465-
// No projection — validate with statistics (no remapping needed)
1466-
validate_orderings(
1467-
&projected_orderings,
1468-
projected_schema,
1469-
&base_config.file_groups,
1470-
None,
1471-
)
1472-
}
1473-
Some(None) => {
1474-
// Complex projection (expressions, not simple columns) — can't
1475-
// determine column indices for statistics. Still valid if all
1476-
// file groups have at most one file.
1477-
if base_config.file_groups.iter().all(|g| g.len() <= 1) {
1478-
projected_orderings
1479-
} else {
1480-
debug!(
1481-
"Skipping specified output orderings. \
1482-
Some file groups couldn't be determined to be sorted: {:?}",
1483-
base_config.file_groups
1484-
);
1485-
vec![]
1369+
let mut all_orderings = vec![];
1370+
for new_ordering in projected_orderings {
1371+
// Check if any file groups are not sorted
1372+
if base_config.file_groups.iter().any(|group| {
1373+
if group.len() <= 1 {
1374+
// File groups with <= 1 files are always sorted
1375+
return false;
14861376
}
1377+
1378+
let Some(indices) = base_config
1379+
.file_source
1380+
.projection()
1381+
.as_ref()
1382+
.map(|p| ordered_column_indices_from_projection(p))
1383+
else {
1384+
// Can't determine if ordered without a simple projection
1385+
return true;
1386+
};
1387+
1388+
let statistics = match MinMaxStatistics::new_from_files(
1389+
&new_ordering,
1390+
projected_schema,
1391+
indices.as_deref(),
1392+
group.iter(),
1393+
) {
1394+
Ok(statistics) => statistics,
1395+
Err(e) => {
1396+
log::trace!("Error fetching statistics for file group: {e}");
1397+
// we can't prove that it's ordered, so we have to reject it
1398+
return true;
1399+
}
1400+
};
1401+
1402+
!statistics.is_sorted()
1403+
}) {
1404+
debug!(
1405+
"Skipping specified output ordering {:?}. \
1406+
Some file groups couldn't be determined to be sorted: {:?}",
1407+
base_config.output_ordering[0], base_config.file_groups
1408+
);
1409+
continue;
14871410
}
1411+
1412+
all_orderings.push(new_ordering);
14881413
}
1414+
all_orderings
14891415
}
14901416

14911417
/// Convert type to a type suitable for use as a `ListingTable`

datafusion/datasource/src/statistics.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,12 +266,11 @@ impl MinMaxStatistics {
266266
}
267267

268268
/// Check if the min/max statistics are in order and non-overlapping
269-
/// (or touching at boundaries)
270269
pub fn is_sorted(&self) -> bool {
271270
self.max_by_sort_order
272271
.iter()
273272
.zip(self.min_by_sort_order.iter().skip(1))
274-
.all(|(max, next_min)| max <= next_min)
273+
.all(|(max, next_min)| max < next_min)
275274
}
276275
}
277276

datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,4 +274,4 @@ logical_plan
274274
02)--TableScan: test_table projection=[constant_col]
275275
physical_plan
276276
01)SortPreservingMergeExec: [constant_col@0 ASC NULLS LAST]
277-
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], output_ordering=[constant_col@0 ASC NULLS LAST], file_type=parquet
277+
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], file_type=parquet

0 commit comments

Comments
 (0)