Skip to content

Commit 2caf141

Browse files
feat(physical-plan): add FilterExecBuilder for efficient construction
Adds FilterExecBuilder pattern with fluent API Allows setting projection, selectivity, batch_size, fetch in one build Refactors try_new to use builder internally (reduces duplication) Ensures compute_properties executes only once Fixes #19608
1 parent 6954497 commit 2caf141

File tree

1 file changed

+329
-26
lines changed

1 file changed

+329
-26
lines changed

datafusion/physical-plan/src/filter.rs

Lines changed: 329 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -93,37 +93,106 @@ pub struct FilterExec {
9393
fetch: Option<usize>,
9494
}
9595

96+
/// Builder for [`FilterExec`] to set optional parameters
97+
pub struct FilterExecBuilder {
98+
predicate: Arc<dyn PhysicalExpr>,
99+
input: Arc<dyn ExecutionPlan>,
100+
projection: Option<Vec<usize>>,
101+
default_selectivity: u8,
102+
batch_size: usize,
103+
fetch: Option<usize>,
104+
}
105+
106+
impl FilterExecBuilder {
107+
/// Create a new builder with required parameters (predicate and input)
108+
pub fn new(predicate: Arc<dyn PhysicalExpr>, input: Arc<dyn ExecutionPlan>) -> Self {
109+
Self {
110+
predicate,
111+
input,
112+
projection: None,
113+
default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY,
114+
batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
115+
fetch: None,
116+
}
117+
}
118+
119+
/// Set the projection
120+
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
121+
self.projection = projection;
122+
self
123+
}
124+
125+
/// Set the default selectivity
126+
pub fn with_default_selectivity(mut self, default_selectivity: u8) -> Self {
127+
self.default_selectivity = default_selectivity;
128+
self
129+
}
130+
131+
/// Set the batch size
132+
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
133+
self.batch_size = batch_size;
134+
self
135+
}
136+
137+
/// Set the fetch limit
138+
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
139+
self.fetch = fetch;
140+
self
141+
}
142+
143+
/// Build the FilterExec, computing properties once with all configured parameters
144+
pub fn build(self) -> Result<FilterExec> {
145+
// Validate predicate type
146+
match self.predicate.data_type(self.input.schema().as_ref())? {
147+
DataType::Boolean => {}
148+
other => {
149+
return plan_err!(
150+
"Filter predicate must return BOOLEAN values, got {other:?}"
151+
)
152+
}
153+
}
154+
155+
// Validate selectivity
156+
if self.default_selectivity > 100 {
157+
return plan_err!(
158+
"Default filter selectivity value needs to be less than or equal to 100"
159+
);
160+
}
161+
162+
// Validate projection if provided
163+
if let Some(ref proj) = self.projection {
164+
can_project(&self.input.schema(), Some(proj))?;
165+
}
166+
167+
// Compute properties once with all parameters
168+
let cache = FilterExec::compute_properties(
169+
&self.input,
170+
&self.predicate,
171+
self.default_selectivity,
172+
self.projection.as_ref(),
173+
)?;
174+
175+
Ok(FilterExec {
176+
predicate: self.predicate,
177+
input: self.input,
178+
metrics: ExecutionPlanMetricsSet::new(),
179+
default_selectivity: self.default_selectivity,
180+
cache,
181+
projection: self.projection,
182+
batch_size: self.batch_size,
183+
fetch: self.fetch,
184+
})
185+
}
186+
}
187+
96188
impl FilterExec {
97-
/// Create a FilterExec on an input
189+
/// Create a FilterExec on an input using the builder pattern
98190
#[expect(clippy::needless_pass_by_value)]
99191
pub fn try_new(
100192
predicate: Arc<dyn PhysicalExpr>,
101193
input: Arc<dyn ExecutionPlan>,
102194
) -> Result<Self> {
103-
match predicate.data_type(input.schema().as_ref())? {
104-
DataType::Boolean => {
105-
let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
106-
let cache = Self::compute_properties(
107-
&input,
108-
&predicate,
109-
default_selectivity,
110-
None,
111-
)?;
112-
Ok(Self {
113-
predicate,
114-
input: Arc::clone(&input),
115-
metrics: ExecutionPlanMetricsSet::new(),
116-
default_selectivity,
117-
cache,
118-
projection: None,
119-
batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
120-
fetch: None,
121-
})
122-
}
123-
other => {
124-
plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
125-
}
126-
}
195+
FilterExecBuilder::new(predicate, input).build()
127196
}
128197

129198
pub fn with_default_selectivity(
@@ -1586,4 +1655,238 @@ mod tests {
15861655

15871656
Ok(())
15881657
}
1589-
}
1658+
1659+
#[tokio::test]
1660+
async fn test_builder_with_projection() -> Result<()> {
1661+
// Create a schema with multiple columns
1662+
let schema = Arc::new(Schema::new(vec![
1663+
Field::new("a", DataType::Int32, false),
1664+
Field::new("b", DataType::Int32, false),
1665+
Field::new("c", DataType::Int32, false),
1666+
]));
1667+
1668+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1669+
1670+
// Create a filter predicate: a > 10
1671+
let predicate = Arc::new(BinaryExpr::new(
1672+
Arc::new(Column::new("a", 0)),
1673+
Operator::Gt,
1674+
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1675+
));
1676+
1677+
// Create filter with projection [0, 2] (columns a and c) using builder
1678+
let projection = Some(vec![0, 2]);
1679+
let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
1680+
.with_projection(projection.clone())
1681+
.build()?;
1682+
1683+
// Verify projection is set correctly
1684+
assert_eq!(filter.projection(), Some(&vec![0, 2]));
1685+
1686+
// Verify schema contains only projected columns
1687+
let output_schema = filter.schema();
1688+
assert_eq!(output_schema.fields().len(), 2);
1689+
assert_eq!(output_schema.field(0).name(), "a");
1690+
assert_eq!(output_schema.field(1).name(), "c");
1691+
1692+
Ok(())
1693+
}
1694+
1695+
#[tokio::test]
1696+
async fn test_builder_without_projection() -> Result<()> {
1697+
let schema = Arc::new(Schema::new(vec![
1698+
Field::new("a", DataType::Int32, false),
1699+
Field::new("b", DataType::Int32, false),
1700+
]));
1701+
1702+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1703+
1704+
let predicate = Arc::new(BinaryExpr::new(
1705+
Arc::new(Column::new("a", 0)),
1706+
Operator::Gt,
1707+
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1708+
));
1709+
1710+
// Create filter without projection using builder
1711+
let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
1712+
.build()?;
1713+
1714+
// Verify no projection is set
1715+
assert_eq!(filter.projection(), None);
1716+
1717+
// Verify schema contains all columns
1718+
let output_schema = filter.schema();
1719+
assert_eq!(output_schema.fields().len(), 2);
1720+
1721+
Ok(())
1722+
}
1723+
1724+
#[tokio::test]
1725+
async fn test_builder_invalid_projection() -> Result<()> {
1726+
let schema = Arc::new(Schema::new(vec![
1727+
Field::new("a", DataType::Int32, false),
1728+
Field::new("b", DataType::Int32, false),
1729+
]));
1730+
1731+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1732+
1733+
let predicate = Arc::new(BinaryExpr::new(
1734+
Arc::new(Column::new("a", 0)),
1735+
Operator::Gt,
1736+
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1737+
));
1738+
1739+
// Try to create filter with invalid projection (index out of bounds) using builder
1740+
let result = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
1741+
.with_projection(Some(vec![0, 5])) // 5 is out of bounds
1742+
.build();
1743+
1744+
// Should return an error
1745+
assert!(result.is_err());
1746+
1747+
Ok(())
1748+
}
1749+
1750+
#[tokio::test]
1751+
async fn test_builder_vs_with_projection() -> Result<()> {
1752+
// This test verifies that the builder with projection produces the same result
1753+
// as try_new().with_projection(), but more efficiently (one compute_properties call)
1754+
let schema = Arc::new(Schema::new(vec![
1755+
Field::new("a", DataType::Int32, false),
1756+
Field::new("b", DataType::Int32, false),
1757+
Field::new("c", DataType::Int32, false),
1758+
Field::new("d", DataType::Int32, false),
1759+
]));
1760+
1761+
let input = Arc::new(StatisticsExec::new(
1762+
Statistics {
1763+
num_rows: Precision::Inexact(1000),
1764+
total_byte_size: Precision::Inexact(4000),
1765+
column_statistics: vec![
1766+
ColumnStatistics {
1767+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1768+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1769+
..Default::default()
1770+
},
1771+
ColumnStatistics {
1772+
..Default::default()
1773+
},
1774+
ColumnStatistics {
1775+
..Default::default()
1776+
},
1777+
ColumnStatistics {
1778+
..Default::default()
1779+
},
1780+
],
1781+
},
1782+
Arc::clone(&schema),
1783+
));
1784+
1785+
let predicate = Arc::new(BinaryExpr::new(
1786+
Arc::new(Column::new("a", 0)),
1787+
Operator::Lt,
1788+
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1789+
));
1790+
1791+
let projection = Some(vec![0, 2]);
1792+
1793+
// Method 1: Builder with projection (one call to compute_properties)
1794+
let filter1 = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
1795+
.with_projection(projection.clone())
1796+
.build()?;
1797+
1798+
// Method 2: try_new().with_projection() (two calls to compute_properties)
1799+
let filter2 = FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&input))?
1800+
.with_projection(projection)?;
1801+
1802+
// Both methods should produce equivalent results
1803+
assert_eq!(filter1.schema(), filter2.schema());
1804+
assert_eq!(filter1.projection(), filter2.projection());
1805+
1806+
// Verify statistics are the same
1807+
let stats1 = filter1.partition_statistics(None)?;
1808+
let stats2 = filter2.partition_statistics(None)?;
1809+
assert_eq!(stats1.num_rows, stats2.num_rows);
1810+
assert_eq!(stats1.total_byte_size, stats2.total_byte_size);
1811+
1812+
Ok(())
1813+
}
1814+
1815+
#[tokio::test]
1816+
async fn test_builder_statistics_with_projection() -> Result<()> {
1817+
// Test that statistics are correctly computed when using builder with projection
1818+
let schema = Arc::new(Schema::new(vec![
1819+
Field::new("a", DataType::Int32, false),
1820+
Field::new("b", DataType::Int32, false),
1821+
Field::new("c", DataType::Int32, false),
1822+
]));
1823+
1824+
let input = Arc::new(StatisticsExec::new(
1825+
Statistics {
1826+
num_rows: Precision::Inexact(1000),
1827+
total_byte_size: Precision::Inexact(12000),
1828+
column_statistics: vec![
1829+
ColumnStatistics {
1830+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1831+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1832+
..Default::default()
1833+
},
1834+
ColumnStatistics {
1835+
min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
1836+
max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
1837+
..Default::default()
1838+
},
1839+
ColumnStatistics {
1840+
min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
1841+
max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
1842+
..Default::default()
1843+
},
1844+
],
1845+
},
1846+
Arc::clone(&schema),
1847+
));
1848+
1849+
// Filter: a < 50, Project: [0, 2]
1850+
let predicate = Arc::new(BinaryExpr::new(
1851+
Arc::new(Column::new("a", 0)),
1852+
Operator::Lt,
1853+
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1854+
));
1855+
1856+
let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
1857+
.with_projection(Some(vec![0, 2]))
1858+
.build()?;
1859+
1860+
let statistics = filter.partition_statistics(None)?;
1861+
1862+
// Verify statistics reflect both filtering and projection
1863+
assert!(matches!(statistics.num_rows, Precision::Inexact(_)));
1864+
1865+
// Schema should only have 2 columns after projection
1866+
assert_eq!(filter.schema().fields().len(), 2);
1867+
1868+
Ok(())
1869+
}
1870+
1871+
#[test]
1872+
fn test_builder_predicate_validation() -> Result<()> {
1873+
// Test that builder validates predicate type correctly
1874+
let schema = Arc::new(Schema::new(vec![
1875+
Field::new("a", DataType::Int32, false),
1876+
Field::new("b", DataType::Int32, false),
1877+
]));
1878+
1879+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1880+
1881+
// Create a predicate that doesn't return boolean (returns Int32)
1882+
let invalid_predicate = Arc::new(Column::new("a", 0));
1883+
1884+
// Should fail because predicate doesn't return boolean
1885+
let result = FilterExecBuilder::new(invalid_predicate, Arc::clone(&input))
1886+
.with_projection(Some(vec![0]))
1887+
.build();
1888+
1889+
assert!(result.is_err());
1890+
1891+
Ok(())
1892+
}

0 commit comments

Comments
 (0)