Skip to content

Commit 7d52145

Browse files
Preserve equivalence properties during projection pushdown (#17129)
1 parent a1431e2 commit 7d52145

File tree

4 files changed

+66
-1
lines changed

4 files changed

+66
-1
lines changed

datafusion/datasource/src/source.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::fmt;
2222
use std::fmt::{Debug, Formatter};
2323
use std::sync::Arc;
2424

25+
use datafusion_physical_expr::equivalence::ProjectionMapping;
2526
use datafusion_physical_plan::execution_plan::{
2627
Boundedness, EmissionType, SchedulingType,
2728
};
@@ -320,7 +321,39 @@ impl ExecutionPlan for DataSourceExec {
320321
&self,
321322
projection: &ProjectionExec,
322323
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
323-
self.data_source.try_swapping_with_projection(projection)
324+
match self.data_source.try_swapping_with_projection(projection)? {
325+
Some(new_plan) => {
326+
if let Some(new_data_source_exec) =
327+
new_plan.as_any().downcast_ref::<DataSourceExec>()
328+
{
329+
let projection_mapping = ProjectionMapping::try_new(
330+
projection.expr().iter().cloned(),
331+
&self.schema(),
332+
)?;
333+
334+
// Project the equivalence properties to the new schema
335+
let projected_eq_properties = self
336+
.cache
337+
.eq_properties
338+
.project(&projection_mapping, new_data_source_exec.schema());
339+
340+
let preserved_exec = DataSourceExec {
341+
data_source: Arc::clone(&new_data_source_exec.data_source),
342+
cache: PlanProperties::new(
343+
projected_eq_properties,
344+
new_data_source_exec.cache.partitioning.clone(),
345+
new_data_source_exec.cache.emission_type,
346+
new_data_source_exec.cache.boundedness,
347+
)
348+
.with_scheduling_type(new_data_source_exec.cache.scheduling_type),
349+
};
350+
Ok(Some(Arc::new(preserved_exec)))
351+
} else {
352+
Ok(Some(new_plan))
353+
}
354+
}
355+
None => Ok(None),
356+
}
324357
}
325358

326359
fn handle_child_pushdown_result(
1.23 KB
Binary file not shown.
1.24 KB
Binary file not shown.

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,3 +543,35 @@ query TT
543543
select val, part from t_pushdown where part = val AND part = 'a';
544544
----
545545
a a
546+
547+
statement ok
548+
COPY (
549+
SELECT
550+
'00000000000000000000000000000001' AS trace_id,
551+
'2023-10-01 00:00:00'::timestamptz AS start_timestamp,
552+
'prod' as deployment_environment
553+
)
554+
TO 'data/1.parquet';
555+
556+
statement ok
557+
COPY (
558+
SELECT
559+
'00000000000000000000000000000002' AS trace_id,
560+
'2024-10-01 00:00:00'::timestamptz AS start_timestamp,
561+
'staging' as deployment_environment
562+
)
563+
TO 'data/2.parquet';
564+
565+
statement ok
566+
CREATE EXTERNAL TABLE t1 STORED AS PARQUET LOCATION 'data/';
567+
568+
statement ok
569+
SET datafusion.execution.parquet.pushdown_filters = true;
570+
571+
query T
572+
SELECT deployment_environment
573+
FROM t1
574+
WHERE trace_id = '00000000000000000000000000000002'
575+
ORDER BY start_timestamp, trace_id;
576+
----
577+
staging

0 commit comments

Comments
 (0)