Skip to content

Commit 3614687

Browse files
committed
raw
1 parent b2c29ac commit 3614687

File tree

10 files changed

+345
-18
lines changed

10 files changed

+345
-18
lines changed

datafusion/datasource-parquet/src/source.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -734,6 +734,13 @@ impl FileSource for ParquetSource {
734734
.with_updated_node(source))
735735
}
736736

737+
fn with_new_state(
738+
&self,
739+
_state: Arc<dyn Any + Send + Sync>,
740+
) -> Option<Arc<dyn FileSource>> {
741+
todo!()
742+
}
743+
737744
/// Try to optimize the scan to produce data in the requested sort order.
738745
///
739746
/// This method receives:

datafusion/datasource/src/file.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,13 @@ pub trait FileSource: Send + Sync {
148148
))
149149
}
150150

151+
fn with_new_state(
152+
&self,
153+
_state: Arc<dyn Any + Send + Sync>,
154+
) -> Option<Arc<dyn FileSource>> {
155+
None
156+
}
157+
151158
/// Try to create a new FileSource that can produce data in the specified sort order.
152159
///
153160
/// This method attempts to optimize data retrieval to match the requested ordering.

datafusion/datasource/src/source.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ use datafusion_physical_plan::filter_pushdown::{
7474
/// ```text
7575
/// ┌─────────────────────┐ -----► execute path
7676
/// │ │ ┄┄┄┄┄► init path
77-
/// │ DataSourceExec │
78-
/// │ │
77+
/// │ DataSourceExec │
78+
/// │ │
7979
/// └───────▲─────────────┘
8080
/// ┊ │
8181
/// ┊ │
@@ -192,6 +192,13 @@ pub trait DataSource: Send + Sync + Debug {
192192
))
193193
}
194194

195+
fn with_new_state(
196+
&self,
197+
_state: Arc<dyn Any + Send + Sync>,
198+
) -> Option<Arc<dyn DataSource>> {
199+
None
200+
}
201+
195202
/// Try to create a new DataSource that produces data in the specified sort order.
196203
///
197204
/// # Arguments

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@ pub struct DynamicFilterPhysicalExpr {
7474
/// But this can have overhead in production, so it's only included in our tests.
7575
data_type: Arc<RwLock<Option<DataType>>>,
7676
nullable: Arc<RwLock<Option<bool>>>,
77+
/// Used to:
78+
///
79+
/// * Reset this filter state to the initial.
80+
/// * Compare origins of two different filters.
81+
///
82+
/// Typically filter is created by some node and then pushed into children during plan optimization.
83+
/// Then, when the plan is re-executed (e.g., a recursive query) the filter must be re-set and a new
84+
/// indepenent [`Self::inner`] must be created and pushed into child node. To do it, parent node keeps
85+
/// [`Self::identity`] within a new filter instance to provide the ability for child node to establish
86+
/// a correspondence between stored filter and its new pushed version.
87+
origin: Arc<dyn PhysicalExpr>,
7788
}
7889

7990
#[derive(Debug)]
@@ -173,10 +184,11 @@ impl DynamicFilterPhysicalExpr {
173184
Self {
174185
children,
175186
remapped_children: None, // Initially no remapped children
176-
inner: Arc::new(RwLock::new(Inner::new(inner))),
187+
inner: Arc::new(RwLock::new(Inner::new(Arc::clone(&inner)))),
177188
state_watch,
178189
data_type: Arc::new(RwLock::new(None)),
179190
nullable: Arc::new(RwLock::new(None)),
191+
origin: inner,
180192
}
181193
}
182194

@@ -324,6 +336,33 @@ impl DynamicFilterPhysicalExpr {
324336
Arc::strong_count(self) > 1 || Arc::strong_count(&self.inner) > 1
325337
}
326338

339+
/// Check if this filter and `other` has the same origin.
340+
///
341+
/// Please, check [`Self::origin`] doc-comment for the details.
342+
pub fn has_same_origin(&self, other: &DynamicFilterPhysicalExpr) -> bool {
343+
Arc::ptr_eq(&self.origin, &other.origin)
344+
}
345+
346+
/// Reset this filter to the initial state.
347+
///
348+
/// # Origin
349+
///
350+
/// Returned filter has the same origin, check [`Self::origin`] doc-comment
351+
/// for the details.
352+
///
353+
pub fn reset_state(&self) -> Arc<DynamicFilterPhysicalExpr> {
354+
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
355+
Arc::new(Self {
356+
children: self.children.clone(),
357+
remapped_children: self.remapped_children.clone(),
358+
inner: Arc::new(RwLock::new(Inner::new(Arc::clone(&self.origin)))),
359+
state_watch,
360+
data_type: Arc::new(RwLock::new(None)),
361+
nullable: Arc::new(RwLock::new(None)),
362+
origin: Arc::clone(&self.origin),
363+
})
364+
}
365+
327366
fn render(
328367
&self,
329368
f: &mut std::fmt::Formatter<'_>,
@@ -369,6 +408,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
369408
state_watch: self.state_watch.clone(),
370409
data_type: Arc::clone(&self.data_type),
371410
nullable: Arc::clone(&self.nullable),
411+
origin: Arc::clone(&self.origin),
372412
}))
373413
}
374414

@@ -864,4 +904,17 @@ mod test {
864904
"Hash should be stable after update (identity-based)"
865905
);
866906
}
907+
908+
#[test]
909+
fn test_dynamic_filter_reset_state() {
910+
let origin = lit(true) as Arc<dyn PhysicalExpr>;
911+
let filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&origin));
912+
filter.update(lit(false) as _).unwrap();
913+
filter.mark_complete();
914+
let reset_filter = filter.reset_state();
915+
assert!(!Arc::ptr_eq(&filter.inner, &reset_filter.inner));
916+
assert!(filter.has_same_origin(&reset_filter));
917+
assert!(Arc::ptr_eq(&reset_filter.current().unwrap(), &origin));
918+
assert!(!reset_filter.inner.read().is_complete);
919+
}
867920
}

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,15 @@ impl AggregateExec {
11041104
_ => Precision::Absent,
11051105
}
11061106
}
1107+
1108+
fn reset_dynamic_filter(&self) -> Option<Arc<AggrDynFilter>> {
1109+
self.dynamic_filter.as_ref().map(|f| {
1110+
Arc::new(AggrDynFilter {
1111+
filter: f.filter.reset_state(),
1112+
supported_accumulators_info: f.supported_accumulators_info.clone(),
1113+
})
1114+
})
1115+
}
11071116
}
11081117

11091118
impl DisplayAs for AggregateExec {
@@ -1300,6 +1309,13 @@ impl ExecutionPlan for AggregateExec {
13001309
Ok(Arc::new(me))
13011310
}
13021311

1312+
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1313+
let mut me = AggregateExec::clone(&self);
1314+
me.dynamic_filter = self.reset_dynamic_filter();
1315+
me.metrics = ExecutionPlanMetricsSet::new();
1316+
Ok(Arc::new(me))
1317+
}
1318+
13031319
fn execute(
13041320
&self,
13051321
partition: usize,
@@ -1466,6 +1482,14 @@ impl ExecutionPlan for AggregateExec {
14661482

14671483
Ok(result)
14681484
}
1485+
1486+
fn dynamic_filters(&self) -> Vec<Arc<DynamicFilterPhysicalExpr>> {
1487+
if let Some(expr) = self.dynamic_filter.as_ref() {
1488+
vec![Arc::clone(&expr.filter)]
1489+
} else {
1490+
vec![]
1491+
}
1492+
}
14691493
}
14701494

14711495
fn create_schema(

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 103 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::filter_pushdown::{
2222
};
2323
pub use crate::metrics::Metric;
2424
pub use crate::ordering::InputOrderMode;
25+
use crate::recursive_query::RecursiveQueryExec;
2526
use crate::sort_pushdown::SortOrderPushdownResult;
2627
pub use crate::stream::EmptyRecordBatchStream;
2728

@@ -31,12 +32,15 @@ pub use datafusion_common::utils::project_schema;
3132
pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
3233
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
3334
pub use datafusion_expr::{Accumulator, ColumnarValue};
35+
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
3436
pub use datafusion_physical_expr::window::WindowExpr;
3537
pub use datafusion_physical_expr::{
3638
Distribution, Partitioning, PhysicalExpr, expressions,
3739
};
40+
use parking_lot::Mutex;
3841

3942
use std::any::Any;
43+
use std::cell::RefCell;
4044
use std::fmt::Debug;
4145
use std::sync::Arc;
4246

@@ -680,6 +684,12 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
680684
/// in order to wire up the working table used during recursive-CTE execution.
681685
/// Similar patterns can be followed by custom nodes that need late-bound
682686
/// dependencies or shared state.
687+
///
688+
/// Also, this method is used to update dynamic filters in the plan when
689+
/// its state is re-set. So if the node supports dynamic filtering it must
690+
/// implement this method to be reusable. Please, check [`reset_plan_states`]
691+
/// for the details.
692+
///
683693
fn with_new_state(
684694
&self,
685695
_state: Arc<dyn Any + Send + Sync>,
@@ -722,6 +732,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
722732
) -> Option<Arc<dyn ExecutionPlan>> {
723733
None
724734
}
735+
736+
/// Returns dynamic filters owned by the plan.
737+
fn dynamic_filters(&self) -> Vec<Arc<DynamicFilterPhysicalExpr>> {
738+
vec![]
739+
}
725740
}
726741

727742
/// [`ExecutionPlan`] Invariant Level
@@ -1398,6 +1413,43 @@ pub fn check_not_null_constraints(
13981413
Ok(batch)
13991414
}
14001415

1416+
/// Unions dynamic filters collected from plan nodes during state re-set bypass.
1417+
/// Passed into [`ExecutionPlan::with_new_state`] to actualize node filters.
1418+
#[derive(Default)]
1419+
pub struct DynamicFilters {
1420+
filters: Mutex<Vec<Arc<DynamicFilterPhysicalExpr>>>,
1421+
}
1422+
1423+
impl DynamicFilters {
1424+
fn extend(&self, iter: impl Iterator<Item = Arc<DynamicFilterPhysicalExpr>>) {
1425+
self.filters.lock().extend(iter);
1426+
}
1427+
1428+
/// Lookup for the filter with the same origin as a passed filter `expr`.
1429+
fn filter_with_same_origin_as(
1430+
&self,
1431+
expr: &DynamicFilterPhysicalExpr,
1432+
) -> Result<Arc<DynamicFilterPhysicalExpr>> {
1433+
for filter in self.filters.lock().iter() {
1434+
if filter.has_same_origin(expr) {
1435+
return Ok(Arc::clone(filter));
1436+
}
1437+
}
1438+
internal_err!("updated dynamic filter is not found")
1439+
}
1440+
1441+
/// Update each dynamic filter sub-expression to an actual version.
1442+
///
1443+
/// * If dynamic filter with the same origin is not found, then an error is returned.
1444+
/// * If `expr` does not contain dynamic filters and there are no updates, then `Ok(None)` is returned.
1445+
pub fn actualize_dynamic_filter(
1446+
&self,
1447+
expr: &DynamicFilterPhysicalExpr,
1448+
) -> Result<Option<Arc<DynamicFilterPhysicalExpr>>> {
1449+
todo!()
1450+
}
1451+
}
1452+
14011453
/// Make plan ready to be re-executed returning its clone with state reset for all nodes.
14021454
///
14031455
/// Some plans will change their internal states after execution, making them unable to be executed again.
@@ -1407,18 +1459,61 @@ pub fn check_not_null_constraints(
14071459
/// However, if the data of the left table is derived from the work table, it will become outdated
14081460
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
14091461
///
1410-
/// # Limitations
1462+
/// # Dynamic filters
1463+
///
1464+
/// Dynamic filters are re-set during state re-set bypass and must be updated in nodes which poll them.
1465+
/// To be able to do it the method [`ExecutionPlan::with_new_state`] should be implemented for the node
1466+
/// that supports dynamic filtering. The node should down-cast input state into [`DynamicFilters`] and
1467+
/// lookup for the filter with the same origin as it has to acquire an actual filter version.
14111468
///
1412-
/// While this function enables plan reuse, it does not allow the same plan to be executed if it (OR):
1469+
/// # Work table
14131470
///
1414-
/// * uses dynamic filters,
1415-
/// * represents a recursive query.
1471+
/// Work table is re-set during state re-set bypass and must be updated in nodes which write into it.
1472+
/// To be able to do it the method [`ExecutionPlan::with_new_state`] should be implemented for the node
1473+
/// that uses work table.
14161474
///
14171475
pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
1418-
plan.transform_up(|plan| {
1419-
let new_plan = Arc::clone(&plan).reset_state()?;
1420-
Ok(Transformed::yes(new_plan))
1421-
})
1476+
let dynamic_filters = Arc::new(DynamicFilters::default());
1477+
let work_tables = RefCell::new(vec![]);
1478+
1479+
plan.transform_down_up(
1480+
// Collect dynamic filters from nodes.
1481+
|plan| {
1482+
// Note: here we re-set state prior to children state is re-set to be able
1483+
// to collect and push new versions of the dynamic filters/work table.
1484+
// Children actually will be replacedo on the way up from the bottom.
1485+
let mut new_plan = Arc::clone(&plan).reset_state()?;
1486+
1487+
// Try to inject updated dynamic filters.
1488+
if let Some(plan) = new_plan.with_new_state(Arc::clone(&dynamic_filters) as _)
1489+
{
1490+
new_plan = plan;
1491+
}
1492+
1493+
// Try to inject updated work table.
1494+
if let Some(work_table) = work_tables.borrow_mut().last()
1495+
&& let Some(plan) = new_plan.with_new_state(Arc::clone(work_table))
1496+
{
1497+
new_plan = plan;
1498+
}
1499+
1500+
dynamic_filters.extend(new_plan.dynamic_filters().into_iter());
1501+
if let Some(plan) = new_plan.as_any().downcast_ref::<RecursiveQueryExec>() {
1502+
work_tables
1503+
.borrow_mut()
1504+
.push(Arc::clone(plan.work_table()) as _)
1505+
}
1506+
1507+
Ok(Transformed::yes(new_plan))
1508+
},
1509+
|plan| {
1510+
if plan.as_any().is::<RecursiveQueryExec>() {
1511+
work_tables.borrow_mut().pop();
1512+
}
1513+
// Here we must return [`Transformed::yes`] to actually replace children.
1514+
Ok(Transformed::yes(plan))
1515+
},
1516+
)
14221517
.data()
14231518
}
14241519

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,15 @@ impl HashJoinExec {
847847
reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
848848
}
849849
}
850+
851+
fn reset_dynamic_filter(&self) -> Option<HashJoinExecDynamicFilter> {
852+
self.dynamic_filter
853+
.as_ref()
854+
.map(|f| HashJoinExecDynamicFilter {
855+
filter: f.filter.reset_state(),
856+
build_accumulator: OnceLock::new(),
857+
})
858+
}
850859
}
851860

852861
impl DisplayAs for HashJoinExec {
@@ -1046,7 +1055,7 @@ impl ExecutionPlan for HashJoinExec {
10461055
null_aware: self.null_aware,
10471056
cache: self.cache.clone(),
10481057
// Reset dynamic filter and bounds accumulator to initial state
1049-
dynamic_filter: None,
1058+
dynamic_filter: self.reset_dynamic_filter(),
10501059
}))
10511060
}
10521061

@@ -1384,6 +1393,14 @@ impl ExecutionPlan for HashJoinExec {
13841393
}
13851394
Ok(result)
13861395
}
1396+
1397+
fn dynamic_filters(&self) -> Vec<Arc<DynamicFilterPhysicalExpr>> {
1398+
if let Some(expr) = self.dynamic_filter.as_ref() {
1399+
vec![Arc::clone(&expr.filter)]
1400+
} else {
1401+
vec![]
1402+
}
1403+
}
13871404
}
13881405

13891406
/// Accumulator for collecting min/max bounds from build-side data during hash join.

0 commit comments

Comments
 (0)