@@ -57,6 +57,8 @@ struct Inner {
5757 /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
5858 generation : u64 ,
5959 expr : Arc < dyn PhysicalExpr > ,
60+ /// Flag indicating whether all updates have been received and the filter is complete.
61+ is_complete : bool ,
6062}
6163
6264impl Inner {
@@ -66,6 +68,7 @@ impl Inner {
6668 // This is not currently used anywhere but it seems useful to have this simple distinction.
6769 generation : 1 ,
6870 expr,
71+ is_complete : false ,
6972 }
7073 }
7174
@@ -207,10 +210,50 @@ impl DynamicFilterPhysicalExpr {
207210 * current = Inner {
208211 generation : current. generation + 1 ,
209212 expr : new_expr,
213+ is_complete : current. is_complete ,
210214 } ;
211215 Ok ( ( ) )
212216 }
213217
218+ /// Mark this dynamic filter as complete.
219+ ///
220+ /// This signals that all expected updates have been received and the filter
221+ /// is ready to be pushed down to external data sources. This is particularly
222+ /// important for partitioned hash joins where multiple partitions contribute
223+ /// to the filter - external data sources must wait for all partitions to
224+ /// report before pushing down the filter to avoid incorrect results.
225+ ///
226+ /// # Example
227+ ///
228+ /// ```ignore
229+ /// // After all partitions have reported their bounds:
230+ /// dynamic_filter.mark_complete();
231+ ///
232+ /// // External data source can now safely check:
233+ /// if dynamic_filter.is_complete() {
234+ /// // Safe to push down the filter
235+ /// let filter_expr = dynamic_filter.current()?;
236+ /// push_filter_to_external_source(filter_expr);
237+ /// }
238+ /// ```
239+ pub fn mark_complete ( & self ) {
240+ let mut current = self . inner . write ( ) ;
241+ current. is_complete = true ;
242+ }
243+
244+ /// Check if this dynamic filter is complete.
245+ ///
246+ /// Returns `true` if all expected updates have been received via [`Self::mark_complete`].
247+ /// External data sources should wait for this to return `true` before pushing
248+ /// down the filter to ensure correctness.
249+ ///
250+ /// # Returns
251+ ///
252+ /// `true` if the filter is complete and safe to push down, `false` otherwise.
253+ pub fn is_complete ( & self ) -> bool {
254+ self . inner . read ( ) . is_complete
255+ }
256+
214257 fn render (
215258 & self ,
216259 f : & mut std:: fmt:: Formatter < ' _ > ,
0 commit comments