Skip to content

Commit a55b77e

Browse files
authored
fix: DynamicFilterPhysicalExpr violates Hash/Eq contract (#19659)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19641. ## Rationale for this change `DynamicFilterPhysicalExpr` violates the `Hash/Eq` contract because the `Hash` and `PartialEq` implementations each call `self.current()` which acquires separate `RwLock::read()` locks. This allows the underlying expression to change between `hash()` and `eq()` calls via `update()`, causing: - HashMap key instability (keys "disappear" after update) - Potential infinite loops during HashMap operations - Corrupted HashMap state during concurrent access <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? Replaced content-based Hash/Eq with identity-based implementations: - Hash: Uses `Arc::as_ptr(&self.inner)` instead of hashing the mutable expression content - PartialEq: Uses `Arc::ptr_eq(&self.inner)` instead of comparing expression content via locks <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent b9a3b9f commit a55b77e

File tree

1 file changed

+115
-8
lines changed

1 file changed

+115
-8
lines changed

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 115 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use datafusion_common::{
2626
tree_node::{Transformed, TransformedResult, TreeNode},
2727
};
2828
use datafusion_expr::ColumnarValue;
29-
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
29+
use datafusion_physical_expr_common::physical_expr::DynHash;
3030

3131
/// State of a dynamic filter, tracking both updates and completion.
3232
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -103,20 +103,25 @@ impl Inner {
103103

104104
impl Hash for DynamicFilterPhysicalExpr {
105105
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
106-
let inner = self.current().expect("Failed to get current expression");
107-
inner.dyn_hash(state);
106+
// Use pointer identity of the inner Arc for stable hashing.
107+
// This is stable across update() calls and consistent with Eq.
108+
// See issue #19641 for details on why content-based hashing violates
109+
// the Hash/Eq contract when the underlying expression can change.
110+
Arc::as_ptr(&self.inner).hash(state);
108111
self.children.dyn_hash(state);
109112
self.remapped_children.dyn_hash(state);
110113
}
111114
}
112115

113116
impl PartialEq for DynamicFilterPhysicalExpr {
114117
fn eq(&self, other: &Self) -> bool {
115-
let inner = self.current().expect("Failed to get current expression");
116-
let our_children = self.remapped_children.as_ref().unwrap_or(&self.children);
117-
let other_children = other.remapped_children.as_ref().unwrap_or(&other.children);
118-
let other = other.current().expect("Failed to get current expression");
119-
inner.dyn_eq(other.as_any()) && our_children == other_children
118+
// Two dynamic filters are equal if they share the same inner source
119+
// AND have the same children configuration.
120+
// This is consistent with Hash using Arc::as_ptr.
121+
// See issue #19641 for details on the Hash/Eq contract violation fix.
122+
Arc::ptr_eq(&self.inner, &other.inner)
123+
&& self.children == other.children
124+
&& self.remapped_children == other.remapped_children
120125
}
121126
}
122127

@@ -753,4 +758,106 @@ mod test {
753758
"Filter should still be used with multiple consumers"
754759
);
755760
}
761+
762+
/// Test that verifies the Hash/Eq contract is now satisfied (issue #19641 fix).
763+
///
764+
/// After the fix, Hash uses Arc::as_ptr(&self.inner) which is stable across
765+
/// update() calls, fixing the HashMap key instability issue.
766+
#[test]
767+
fn test_hash_stable_after_update() {
768+
use std::collections::hash_map::DefaultHasher;
769+
use std::hash::{Hash, Hasher};
770+
771+
// Create filter with initial value
772+
let filter =
773+
DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn PhysicalExpr>);
774+
775+
// Compute hash BEFORE update
776+
let mut hasher_before = DefaultHasher::new();
777+
filter.hash(&mut hasher_before);
778+
let hash_before = hasher_before.finish();
779+
780+
// Update changes the underlying expression
781+
filter
782+
.update(lit(false) as Arc<dyn PhysicalExpr>)
783+
.expect("Update should succeed");
784+
785+
// Compute hash AFTER update
786+
let mut hasher_after = DefaultHasher::new();
787+
filter.hash(&mut hasher_after);
788+
let hash_after = hasher_after.finish();
789+
790+
// FIXED: Hash should now be STABLE after update() because we use
791+
// Arc::as_ptr for identity-based hashing instead of expression content.
792+
assert_eq!(
793+
hash_before, hash_after,
794+
"Hash should be stable after update() - fix for issue #19641"
795+
);
796+
797+
// Self-equality should still hold
798+
assert!(filter.eq(&filter), "Self-equality should hold");
799+
}
800+
801+
/// Test that verifies separate DynamicFilterPhysicalExpr instances
802+
/// with the same expression are NOT equal (identity-based comparison).
803+
#[test]
804+
fn test_identity_based_equality() {
805+
// Create two separate filters with identical initial expressions
806+
let filter1 =
807+
DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn PhysicalExpr>);
808+
let filter2 =
809+
DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn PhysicalExpr>);
810+
811+
// Different instances should NOT be equal even with same expression
812+
// because they have independent inner Arcs (different update lifecycles)
813+
assert!(
814+
!filter1.eq(&filter2),
815+
"Different instances should not be equal (identity-based)"
816+
);
817+
818+
// Self-equality should hold
819+
assert!(filter1.eq(&filter1), "Self-equality should hold");
820+
}
821+
822+
/// Test that hash is stable for the same filter instance.
823+
/// After the fix, hash uses Arc::as_ptr which is pointer-based.
824+
#[test]
825+
fn test_hash_stable_for_same_instance() {
826+
use std::collections::hash_map::DefaultHasher;
827+
use std::hash::{Hash, Hasher};
828+
829+
let filter =
830+
DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn PhysicalExpr>);
831+
832+
// Compute hash twice for the same instance
833+
let hash1 = {
834+
let mut h = DefaultHasher::new();
835+
filter.hash(&mut h);
836+
h.finish()
837+
};
838+
let hash2 = {
839+
let mut h = DefaultHasher::new();
840+
filter.hash(&mut h);
841+
h.finish()
842+
};
843+
844+
assert_eq!(hash1, hash2, "Same instance should have stable hash");
845+
846+
// Update the expression
847+
filter
848+
.update(lit(false) as Arc<dyn PhysicalExpr>)
849+
.expect("Update should succeed");
850+
851+
// Hash should STILL be the same (identity-based)
852+
let hash3 = {
853+
let mut h = DefaultHasher::new();
854+
filter.hash(&mut h);
855+
h.finish()
856+
};
857+
858+
assert_eq!(
859+
hash1, hash3,
860+
"Hash should be stable after update (identity-based)"
861+
);
862+
}
756863
}

0 commit comments

Comments
 (0)