Skip to content

Commit 32bbd7a

Browse files
committed
unify in one watch
1 parent 278c3bb commit 32bbd7a

File tree

1 file changed

+43
-20
lines changed

1 file changed

+43
-20
lines changed

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,24 @@ use datafusion_common::{
2828
use datafusion_expr::ColumnarValue;
2929
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
3030

31+
/// State of a dynamic filter, tracking both updates and completion.
32+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33+
enum FilterState {
34+
/// Filter is in progress and may receive more updates.
35+
InProgress { generation: u64 },
36+
/// Filter is complete and will not receive further updates.
37+
Complete { generation: u64 },
38+
}
39+
40+
impl FilterState {
41+
fn generation(&self) -> u64 {
42+
match self {
43+
FilterState::InProgress { generation }
44+
| FilterState::Complete { generation } => *generation,
45+
}
46+
}
47+
}
48+
3149
/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
3250
///
3351
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
@@ -45,10 +63,8 @@ pub struct DynamicFilterPhysicalExpr {
4563
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
4664
/// The source of dynamic filters.
4765
inner: Arc<RwLock<Inner>>,
48-
/// Broadcasts completion state changes to all waiters.
49-
completion_watch: watch::Sender<bool>,
50-
/// Broadcasts update changes to all waiters.
51-
update_watch: watch::Sender<u64>,
66+
/// Broadcasts filter state (updates and completion) to all waiters.
67+
state_watch: watch::Sender<FilterState>,
5268
/// For testing purposes track the data type and nullability to make sure they don't change.
5369
/// If they do, there's a bug in the implementation.
5470
/// But this can have overhead in production, so it's only included in our tests.
@@ -139,14 +155,12 @@ impl DynamicFilterPhysicalExpr {
139155
children: Vec<Arc<dyn PhysicalExpr>>,
140156
inner: Arc<dyn PhysicalExpr>,
141157
) -> Self {
142-
let (completion_watch, _) = watch::channel(false);
143-
let (update_watch, _) = watch::channel(1u64);
158+
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
144159
Self {
145160
children,
146161
remapped_children: None, // Initially no remapped children
147162
inner: Arc::new(RwLock::new(Inner::new(inner))),
148-
completion_watch,
149-
update_watch,
163+
state_watch,
150164
data_type: Arc::new(RwLock::new(None)),
151165
nullable: Arc::new(RwLock::new(None)),
152166
}
@@ -220,8 +234,10 @@ impl DynamicFilterPhysicalExpr {
220234
};
221235
drop(current); // Release the lock before broadcasting
222236

223-
// Broadcast the new generation to all waiters
224-
let _ = self.update_watch.send(new_generation);
237+
// Broadcast the new state to all waiters
238+
let _ = self.state_watch.send(FilterState::InProgress {
239+
generation: new_generation,
240+
});
225241
Ok(())
226242
}
227243

@@ -230,18 +246,24 @@ impl DynamicFilterPhysicalExpr {
230246
/// This signals that all expected updates have been received.
231247
/// Waiters using [`Self::wait_complete`] will be notified.
232248
pub fn mark_complete(&self) {
249+
let current_generation = self.inner.read().generation;
233250
// Broadcast completion to all waiters
234-
let _ = self.completion_watch.send(true);
251+
let _ = self.state_watch.send(FilterState::Complete {
252+
generation: current_generation,
253+
});
235254
}
236255

237256
/// Wait asynchronously for any update to this filter.
238257
///
239-
/// This method will return when [`Self::update`] is called.
258+
/// This method will return when [`Self::update`] is called and the generation increases.
240259
/// It does not guarantee that the filter is complete.
241260
pub async fn wait_update(&self) {
242-
let mut rx = self.update_watch.subscribe();
243-
// Wait for the generation to change
244-
let _ = rx.changed().await;
261+
let mut rx = self.state_watch.subscribe();
262+
// Get the current generation
263+
let current_gen = rx.borrow_and_update().generation();
264+
265+
// Wait until generation increases
266+
let _ = rx.wait_for(|state| state.generation() > current_gen).await;
245267
}
246268

247269
/// Wait asynchronously until this dynamic filter is marked as complete.
@@ -252,9 +274,11 @@ impl DynamicFilterPhysicalExpr {
252274
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
253275
/// the filter is fully complete with no more updates expected.
254276
pub async fn wait_complete(&self) {
255-
let mut rx = self.completion_watch.subscribe();
256-
// Wait until the completion flag becomes true
257-
let _ = rx.wait_for(|&complete| complete).await;
277+
let mut rx = self.state_watch.subscribe();
278+
// Wait until the state becomes Complete
279+
let _ = rx
280+
.wait_for(|state| matches!(state, FilterState::Complete { .. }))
281+
.await;
258282
}
259283

260284
fn render(
@@ -299,8 +323,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
299323
children: self.children.clone(),
300324
remapped_children: Some(children),
301325
inner: Arc::clone(&self.inner),
302-
completion_watch: self.completion_watch.clone(),
303-
update_watch: self.update_watch.clone(),
326+
state_watch: self.state_watch.clone(),
304327
data_type: Arc::clone(&self.data_type),
305328
nullable: Arc::clone(&self.nullable),
306329
}))

0 commit comments

Comments
 (0)