Skip to content

Commit 2fc00e5

Browse files
more wip optims
1 parent e755677 commit 2fc00e5

File tree

8 files changed

+690
-175
lines changed

8 files changed

+690
-175
lines changed

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
//! Implementation of `InList` expressions: [`InListExpr`]
1919
20-
mod array_filter;
21-
mod primitive;
20+
mod byte_filter;
21+
mod nested_filter;
22+
mod primitive_filter;
2223
mod result;
24+
mod static_filter;
2325
mod strategy;
2426
mod transform;
2527

@@ -41,7 +43,7 @@ use datafusion_common::{
4143
};
4244
use datafusion_expr::{ColumnarValue, expr_vec_fmt};
4345

44-
use array_filter::StaticFilter;
46+
use static_filter::StaticFilter;
4547
use strategy::instantiate_static_filter;
4648

4749
/// InList
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Hash-based filter for byte array types (Utf8, Binary, and their variants)
19+
20+
use std::marker::PhantomData;
21+
22+
use ahash::RandomState;
23+
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
24+
use arrow::buffer::NullBuffer;
25+
use arrow::datatypes::ByteArrayType;
26+
use datafusion_common::Result;
27+
use datafusion_common::hash_utils::with_hashes;
28+
use hashbrown::HashTable;
29+
30+
use super::result::{build_in_list_result, handle_dictionary};
31+
use super::static_filter::StaticFilter;
32+
33+
// =============================================================================
34+
// BYTE ACCESS TRAIT
35+
// =============================================================================
36+
37+
/// Trait abstracting byte array access for GenericByteArray types.
38+
pub(crate) trait ByteAccess: 'static {
39+
type Native: PartialEq + AsRef<[u8]> + ?Sized;
40+
41+
/// Get a value from the array at the given index (unchecked for performance).
42+
///
43+
/// # Safety
44+
/// `idx` must be in bounds for the array.
45+
unsafe fn get_unchecked(arr: &dyn Array, idx: usize) -> &Self::Native;
46+
47+
/// Get the null buffer from the array, if any.
48+
fn nulls(arr: &dyn Array) -> Option<&NullBuffer>;
49+
}
50+
51+
/// Marker type for GenericByteArray access (Utf8, LargeUtf8, Binary, LargeBinary).
52+
pub(crate) struct ByteArrayAccess<T: ByteArrayType>(PhantomData<T>);
53+
54+
impl<T> ByteAccess for ByteArrayAccess<T>
55+
where
56+
T: ByteArrayType + 'static,
57+
T::Native: PartialEq,
58+
{
59+
type Native = T::Native;
60+
61+
#[inline(always)]
62+
unsafe fn get_unchecked(arr: &dyn Array, idx: usize) -> &Self::Native {
63+
unsafe { arr.as_bytes::<T>().value_unchecked(idx) }
64+
}
65+
66+
#[inline(always)]
67+
fn nulls(arr: &dyn Array) -> Option<&NullBuffer> {
68+
arr.as_bytes::<T>().nulls()
69+
}
70+
}
71+
72+
// =============================================================================
73+
// BYTE FILTER
74+
// =============================================================================
75+
76+
/// Hash-based filter for byte array types (Utf8, Binary, and their variants).
77+
///
78+
/// Uses HashTable with batch hashing via `with_hashes` for SIMD-optimized
79+
/// hash computation. Stores indices into the haystack array for O(1) lookup.
80+
pub(crate) struct ByteFilter<A: ByteAccess> {
81+
/// The haystack array containing values to match against.
82+
in_array: ArrayRef,
83+
/// HashTable storing indices into `in_array` for O(1) lookup.
84+
table: HashTable<usize>,
85+
/// Random state for consistent hashing between haystack and needles.
86+
state: RandomState,
87+
_phantom: PhantomData<A>,
88+
}
89+
90+
impl<A: ByteAccess> ByteFilter<A> {
91+
pub(crate) fn try_new(in_array: ArrayRef) -> Result<Self> {
92+
let state = RandomState::new();
93+
let mut table = HashTable::new();
94+
95+
// Build haystack table using batch hashing
96+
with_hashes([in_array.as_ref()], &state, |hashes| {
97+
for i in 0..in_array.len() {
98+
if in_array.is_valid(i) {
99+
let hash = hashes[i];
100+
101+
// Only insert if not already present (deduplication)
102+
// SAFETY: i is in bounds and we checked validity
103+
let val: &[u8] =
104+
unsafe { A::get_unchecked(in_array.as_ref(), i) }.as_ref();
105+
if table
106+
.find(hash, |&idx| {
107+
let stored: &[u8] =
108+
unsafe { A::get_unchecked(in_array.as_ref(), idx) }
109+
.as_ref();
110+
stored == val
111+
})
112+
.is_none()
113+
{
114+
table.insert_unique(hash, i, |&idx| hashes[idx]);
115+
}
116+
}
117+
}
118+
Ok::<_, datafusion_common::DataFusionError>(())
119+
})?;
120+
121+
Ok(Self {
122+
in_array,
123+
table,
124+
state,
125+
_phantom: PhantomData,
126+
})
127+
}
128+
}
129+
130+
impl<A: ByteAccess> StaticFilter for ByteFilter<A> {
131+
fn null_count(&self) -> usize {
132+
self.in_array.null_count()
133+
}
134+
135+
fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
136+
handle_dictionary!(self, v, negated);
137+
138+
let needle_nulls = A::nulls(v);
139+
let haystack_has_nulls = self.in_array.null_count() > 0;
140+
141+
// Batch hash all needle values using SIMD-optimized hashing
142+
with_hashes([v], &self.state, |needle_hashes| {
143+
Ok(build_in_list_result(
144+
v.len(),
145+
needle_nulls,
146+
haystack_has_nulls,
147+
negated,
148+
#[inline(always)]
149+
|i| {
150+
// SAFETY: i is in bounds from build_in_list_result iteration
151+
let needle_val: &[u8] = unsafe { A::get_unchecked(v, i) }.as_ref();
152+
let hash = needle_hashes[i];
153+
// Look up using pre-computed hash, compare via index into haystack
154+
self.table
155+
.find(hash, |&idx| {
156+
let haystack_val: &[u8] =
157+
unsafe { A::get_unchecked(self.in_array.as_ref(), idx) }
158+
.as_ref();
159+
haystack_val == needle_val
160+
})
161+
.is_some()
162+
},
163+
))
164+
})
165+
}
166+
}

0 commit comments

Comments
 (0)