Skip to content

Commit 74944f3

Browse files
feat(physical-plan): add FilterExec constructor with built-in projection
Enables FilterExec creation with projection in a single call Avoids duplicate compute_properties execution Improves performance for windowed queries and pipelines
1 parent 6954497 commit 74944f3

File tree

2 files changed

+453
-1
lines changed

2 files changed

+453
-1
lines changed

datafusion/physical-plan/src/filter.rs

Lines changed: 306 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,64 @@ impl FilterExec {
126126
}
127127
}
128128

129+
/// Create a FilterExec with projection applied during construction.
130+
///
131+
/// This method is more efficient than calling `try_new().with_projection()`
132+
/// because it computes properties only once, avoiding redundant work.
133+
///
134+
/// # Arguments
135+
///
136+
/// * `predicate` - Boolean expression to filter rows
137+
/// * `input` - Input execution plan
138+
/// * `projection` - Optional column indices to project after filtering
139+
///
140+
/// # Example
141+
///
142+
/// ```ignore
143+
/// // Create a filter that selects rows where column 0 > 5 and projects columns [1, 3]
144+
/// let filter = FilterExec::try_new_with_projection(
145+
/// predicate,
146+
/// input_plan,
147+
/// Some(vec![1, 3])
148+
/// )?;
149+
/// ```
150+
#[expect(clippy::needless_pass_by_value)]
151+
pub fn try_new_with_projection(
152+
predicate: Arc<dyn PhysicalExpr>,
153+
input: Arc<dyn ExecutionPlan>,
154+
projection: Option<Vec<usize>>,
155+
) -> Result<Self> {
156+
match predicate.data_type(input.schema().as_ref())? {
157+
DataType::Boolean => {
158+
// Validate projection if provided
159+
if let Some(ref proj) = projection {
160+
can_project(&input.schema(), Some(proj))?;
161+
}
162+
163+
let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
164+
let cache = Self::compute_properties(
165+
&input,
166+
&predicate,
167+
default_selectivity,
168+
projection.as_ref(),
169+
)?;
170+
Ok(Self {
171+
predicate,
172+
input: Arc::clone(&input),
173+
metrics: ExecutionPlanMetricsSet::new(),
174+
default_selectivity,
175+
cache,
176+
projection,
177+
batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
178+
fetch: None,
179+
})
180+
}
181+
other => {
182+
plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
183+
}
184+
}
185+
}
186+
129187
pub fn with_default_selectivity(
130188
mut self,
131189
default_selectivity: u8,
@@ -1586,4 +1644,251 @@ mod tests {
15861644

15871645
Ok(())
15881646
}
1589-
}
1647+
1648+
#[tokio::test]
1649+
async fn test_try_new_with_projection_basic() -> Result<()> {
1650+
// Create a schema with multiple columns
1651+
let schema = Arc::new(Schema::new(vec![
1652+
Field::new("a", DataType::Int32, false),
1653+
Field::new("b", DataType::Int32, false),
1654+
Field::new("c", DataType::Int32, false),
1655+
]));
1656+
1657+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1658+
1659+
// Create a filter predicate: a > 10
1660+
let predicate = Arc::new(BinaryExpr::new(
1661+
Arc::new(Column::new("a", 0)),
1662+
Operator::Gt,
1663+
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1664+
));
1665+
1666+
// Create filter with projection [0, 2] (columns a and c)
1667+
let projection = Some(vec![0, 2]);
1668+
let filter = FilterExec::try_new_with_projection(
1669+
Arc::clone(&predicate),
1670+
Arc::clone(&input),
1671+
projection.clone(),
1672+
)?;
1673+
1674+
// Verify projection is set correctly
1675+
assert_eq!(filter.projection(), Some(&vec![0, 2]));
1676+
1677+
// Verify schema contains only projected columns
1678+
let output_schema = filter.schema();
1679+
assert_eq!(output_schema.fields().len(), 2);
1680+
assert_eq!(output_schema.field(0).name(), "a");
1681+
assert_eq!(output_schema.field(1).name(), "c");
1682+
1683+
Ok(())
1684+
}
1685+
1686+
#[tokio::test]
1687+
async fn test_try_new_with_projection_none() -> Result<()> {
1688+
let schema = Arc::new(Schema::new(vec![
1689+
Field::new("a", DataType::Int32, false),
1690+
Field::new("b", DataType::Int32, false),
1691+
]));
1692+
1693+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1694+
1695+
let predicate = Arc::new(BinaryExpr::new(
1696+
Arc::new(Column::new("a", 0)),
1697+
Operator::Gt,
1698+
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1699+
));
1700+
1701+
// Create filter without projection
1702+
let filter = FilterExec::try_new_with_projection(
1703+
Arc::clone(&predicate),
1704+
Arc::clone(&input),
1705+
None,
1706+
)?;
1707+
1708+
// Verify no projection is set
1709+
assert_eq!(filter.projection(), None);
1710+
1711+
// Verify schema contains all columns
1712+
let output_schema = filter.schema();
1713+
assert_eq!(output_schema.fields().len(), 2);
1714+
1715+
Ok(())
1716+
}
1717+
1718+
#[tokio::test]
1719+
async fn test_try_new_with_projection_invalid() -> Result<()> {
1720+
let schema = Arc::new(Schema::new(vec![
1721+
Field::new("a", DataType::Int32, false),
1722+
Field::new("b", DataType::Int32, false),
1723+
]));
1724+
1725+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1726+
1727+
let predicate = Arc::new(BinaryExpr::new(
1728+
Arc::new(Column::new("a", 0)),
1729+
Operator::Gt,
1730+
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1731+
));
1732+
1733+
// Try to create filter with invalid projection (index out of bounds)
1734+
let result = FilterExec::try_new_with_projection(
1735+
Arc::clone(&predicate),
1736+
Arc::clone(&input),
1737+
Some(vec![0, 5]), // 5 is out of bounds
1738+
);
1739+
1740+
// Should return an error
1741+
assert!(result.is_err());
1742+
1743+
Ok(())
1744+
}
1745+
1746+
#[tokio::test]
1747+
async fn test_try_new_with_projection_vs_with_projection() -> Result<()> {
1748+
// This test verifies that try_new_with_projection produces the same result
1749+
// as try_new().with_projection(), but more efficiently
1750+
let schema = Arc::new(Schema::new(vec![
1751+
Field::new("a", DataType::Int32, false),
1752+
Field::new("b", DataType::Int32, false),
1753+
Field::new("c", DataType::Int32, false),
1754+
Field::new("d", DataType::Int32, false),
1755+
]));
1756+
1757+
let input = Arc::new(StatisticsExec::new(
1758+
Statistics {
1759+
num_rows: Precision::Inexact(1000),
1760+
total_byte_size: Precision::Inexact(4000),
1761+
column_statistics: vec![
1762+
ColumnStatistics {
1763+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1764+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1765+
..Default::default()
1766+
},
1767+
ColumnStatistics {
1768+
..Default::default()
1769+
},
1770+
ColumnStatistics {
1771+
..Default::default()
1772+
},
1773+
ColumnStatistics {
1774+
..Default::default()
1775+
},
1776+
],
1777+
},
1778+
Arc::clone(&schema),
1779+
));
1780+
1781+
let predicate = Arc::new(BinaryExpr::new(
1782+
Arc::new(Column::new("a", 0)),
1783+
Operator::Lt,
1784+
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1785+
));
1786+
1787+
let projection = Some(vec![0, 2]);
1788+
1789+
// Method 1: try_new_with_projection (one call to compute_properties)
1790+
let filter1 = FilterExec::try_new_with_projection(
1791+
Arc::clone(&predicate),
1792+
Arc::clone(&input),
1793+
projection.clone(),
1794+
)?;
1795+
1796+
// Method 2: try_new().with_projection() (two calls to compute_properties)
1797+
let filter2 = FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&input))?
1798+
.with_projection(projection)?;
1799+
1800+
// Both methods should produce equivalent results
1801+
assert_eq!(filter1.schema(), filter2.schema());
1802+
assert_eq!(filter1.projection(), filter2.projection());
1803+
1804+
// Verify statistics are the same
1805+
let stats1 = filter1.partition_statistics(None)?;
1806+
let stats2 = filter2.partition_statistics(None)?;
1807+
assert_eq!(stats1.num_rows, stats2.num_rows);
1808+
assert_eq!(stats1.total_byte_size, stats2.total_byte_size);
1809+
1810+
Ok(())
1811+
}
1812+
1813+
#[tokio::test]
1814+
async fn test_try_new_with_projection_statistics() -> Result<()> {
1815+
// Test that statistics are correctly computed with projection
1816+
let schema = Arc::new(Schema::new(vec![
1817+
Field::new("a", DataType::Int32, false),
1818+
Field::new("b", DataType::Int32, false),
1819+
Field::new("c", DataType::Int32, false),
1820+
]));
1821+
1822+
let input = Arc::new(StatisticsExec::new(
1823+
Statistics {
1824+
num_rows: Precision::Inexact(1000),
1825+
total_byte_size: Precision::Inexact(12000),
1826+
column_statistics: vec![
1827+
ColumnStatistics {
1828+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1829+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1830+
..Default::default()
1831+
},
1832+
ColumnStatistics {
1833+
min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1834+
max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
1835+
..Default::default()
1836+
},
1837+
ColumnStatistics {
1838+
min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1839+
max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1840+
..Default::default()
1841+
},
1842+
],
1843+
},
1844+
Arc::clone(&schema),
1845+
));
1846+
1847+
// Filter: a < 50, Project: [0, 2]
1848+
let predicate = Arc::new(BinaryExpr::new(
1849+
Arc::new(Column::new("a", 0)),
1850+
Operator::Lt,
1851+
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1852+
));
1853+
1854+
let filter = FilterExec::try_new_with_projection(
1855+
Arc::clone(&predicate),
1856+
Arc::clone(&input),
1857+
Some(vec![0, 2]),
1858+
)?;
1859+
1860+
let statistics = filter.partition_statistics(None)?;
1861+
1862+
// Verify statistics reflect both filtering and projection
1863+
assert!(matches!(statistics.num_rows, Precision::Inexact(_)));
1864+
1865+
// Schema should only have 2 columns after projection
1866+
assert_eq!(filter.schema().fields().len(), 2);
1867+
1868+
Ok(())
1869+
}
1870+
1871+
#[test]
1872+
fn test_try_new_with_projection_predicate_validation() -> Result<()> {
1873+
// Test that predicate type validation works correctly
1874+
let schema = Arc::new(Schema::new(vec![
1875+
Field::new("a", DataType::Int32, false),
1876+
Field::new("b", DataType::Int32, false),
1877+
]));
1878+
1879+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1880+
1881+
// Create a predicate that doesn't return boolean (returns Int32)
1882+
let invalid_predicate = Arc::new(Column::new("a", 0));
1883+
1884+
// Should fail because predicate doesn't return boolean
1885+
let result = FilterExec::try_new_with_projection(
1886+
invalid_predicate,
1887+
Arc::clone(&input),
1888+
Some(vec![0]),
1889+
);
1890+
1891+
assert!(result.is_err());
1892+
1893+
Ok(())
1894+
}

0 commit comments

Comments
 (0)