Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 115 additions & 8 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode},
};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
use datafusion_physical_expr_common::physical_expr::DynHash;

/// State of a dynamic filter, tracking both updates and completion.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -103,20 +103,25 @@ impl Inner {

impl Hash for DynamicFilterPhysicalExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
let inner = self.current().expect("Failed to get current expression");
inner.dyn_hash(state);
// Use pointer identity of the inner Arc for stable hashing.
// This is stable across update() calls and consistent with Eq.
// See issue #19641 for details on why content-based hashing violates
// the Hash/Eq contract when the underlying expression can change.
Arc::as_ptr(&self.inner).hash(state);
self.children.dyn_hash(state);
self.remapped_children.dyn_hash(state);
}
}

impl PartialEq for DynamicFilterPhysicalExpr {
fn eq(&self, other: &Self) -> bool {
let inner = self.current().expect("Failed to get current expression");
let our_children = self.remapped_children.as_ref().unwrap_or(&self.children);
let other_children = other.remapped_children.as_ref().unwrap_or(&other.children);
let other = other.current().expect("Failed to get current expression");
inner.dyn_eq(other.as_any()) && our_children == other_children
// Two dynamic filters are equal if they share the same inner source
// AND have the same children configuration.
// This is consistent with Hash using Arc::as_ptr.
// See issue #19641 for details on the Hash/Eq contract violation fix.
Arc::ptr_eq(&self.inner, &other.inner)
&& self.children == other.children
&& self.remapped_children == other.remapped_children
}
}

Expand Down Expand Up @@ -753,4 +758,106 @@ mod test {
"Filter should still be used with multiple consumers"
);
}

/// Test that verifies the Hash/Eq contract is now satisfied (issue #19641 fix).
///
/// After the fix, Hash uses Arc::as_ptr(&self.inner) which is stable across
/// update() calls, fixing the HashMap key instability issue.
#[test]
fn test_hash_stable_after_update() {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

// Create filter with initial value
let filter =
DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn PhysicalExpr>);

// Compute hash BEFORE update
let mut hasher_before = DefaultHasher::new();
filter.hash(&mut hasher_before);
let hash_before = hasher_before.finish();

// Update changes the underlying expression
filter
.update(lit(false) as Arc<dyn PhysicalExpr>)
.expect("Update should succeed");

// Compute hash AFTER update
let mut hasher_after = DefaultHasher::new();
filter.hash(&mut hasher_after);
let hash_after = hasher_after.finish();

// FIXED: Hash should now be STABLE after update() because we use
// Arc::as_ptr for identity-based hashing instead of expression content.
assert_eq!(
hash_before, hash_after,
"Hash should be stable after update() - fix for issue #19641"
);

// Self-equality should still hold
assert!(filter.eq(&filter), "Self-equality should hold");
}

/// Test that verifies separate DynamicFilterPhysicalExpr instances
/// with the same expression are NOT equal (identity-based comparison).
#[test]
fn test_identity_based_equality() {
// Create two separate filters with identical initial expressions
let filter1 =
DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn PhysicalExpr>);
let filter2 =
DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn PhysicalExpr>);

// Different instances should NOT be equal even with same expression
// because they have independent inner Arcs (different update lifecycles)
assert!(
!filter1.eq(&filter2),
"Different instances should not be equal (identity-based)"
);

// Self-equality should hold
assert!(filter1.eq(&filter1), "Self-equality should hold");
}

/// Test that hash is stable for the same filter instance.
/// After the fix, hash uses Arc::as_ptr which is pointer-based.
#[test]
fn test_hash_stable_for_same_instance() {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

let filter =
DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn PhysicalExpr>);

// Compute hash twice for the same instance
let hash1 = {
let mut h = DefaultHasher::new();
filter.hash(&mut h);
h.finish()
};
let hash2 = {
let mut h = DefaultHasher::new();
filter.hash(&mut h);
h.finish()
};

assert_eq!(hash1, hash2, "Same instance should have stable hash");

// Update the expression
filter
.update(lit(false) as Arc<dyn PhysicalExpr>)
.expect("Update should succeed");

// Hash should STILL be the same (identity-based)
let hash3 = {
let mut h = DefaultHasher::new();
filter.hash(&mut h);
h.finish()
};

assert_eq!(
hash1, hash3,
"Hash should be stable after update (identity-based)"
);
}
}