@@ -21,10 +21,11 @@ use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
2121
2222use ahash:: RandomState ;
2323use arrow:: {
24- array:: UInt64Array ,
24+ array:: { ArrayRef , UInt64Array } ,
2525 datatypes:: { DataType , Schema } ,
26+ record_batch:: RecordBatch ,
2627} ;
27- use datafusion_common:: hash_utils:: create_hashes;
28+ use datafusion_common:: hash_utils:: { create_hashes, with_hashes } ;
2829use datafusion_common:: { Result , internal_datafusion_err, internal_err} ;
2930use datafusion_expr:: ColumnarValue ;
3031use datafusion_physical_expr_common:: physical_expr:: {
@@ -113,6 +114,15 @@ impl HashExpr {
113114 pub fn description ( & self ) -> & str {
114115 & self . description
115116 }
117+
118+ /// Evaluate the columns to be hashed.
119+ fn evaluate_on_columns ( & self , batch : & RecordBatch ) -> Result < Vec < ArrayRef > > {
120+ let num_rows = batch. num_rows ( ) ;
121+ self . on_columns
122+ . iter ( )
123+ . map ( |c| c. evaluate ( batch) ?. into_array ( num_rows) )
124+ . collect ( )
125+ }
116126}
117127
118128impl std:: fmt:: Debug for HashExpr {
@@ -180,18 +190,11 @@ impl PhysicalExpr for HashExpr {
180190 Ok ( false )
181191 }
182192
183- fn evaluate (
184- & self ,
185- batch : & arrow:: record_batch:: RecordBatch ,
186- ) -> Result < ColumnarValue > {
193+ fn evaluate ( & self , batch : & RecordBatch ) -> Result < ColumnarValue > {
187194 let num_rows = batch. num_rows ( ) ;
188195
189196 // Evaluate columns
190- let keys_values = self
191- . on_columns
192- . iter ( )
193- . map ( |c| c. evaluate ( batch) ?. into_array ( num_rows) )
194- . collect :: < Result < Vec < _ > > > ( ) ?;
197+ let keys_values = self . evaluate_on_columns ( batch) ?;
195198
196199 // Compute hashes
197200 let mut hashes_buffer = vec ! [ 0 ; num_rows] ;
@@ -326,12 +329,24 @@ impl PhysicalExpr for HashTableLookupExpr {
326329 Ok ( false )
327330 }
328331
329- fn evaluate (
330- & self ,
331- batch : & arrow:: record_batch:: RecordBatch ,
332- ) -> Result < ColumnarValue > {
332+ fn evaluate ( & self , batch : & RecordBatch ) -> Result < ColumnarValue > {
333333 let num_rows = batch. num_rows ( ) ;
334334
335+ // Optimization: if hash_expr is HashExpr, compute hashes directly into callback
336+ // to avoid redundant allocations and copies.
337+ if let Some ( hash_expr) = self . hash_expr . as_any ( ) . downcast_ref :: < HashExpr > ( ) {
338+ let keys_values = hash_expr. evaluate_on_columns ( batch) ?;
339+
340+ return with_hashes (
341+ & keys_values,
342+ hash_expr. random_state . random_state ( ) ,
343+ |hashes| {
344+ let array = self . hash_map . contain_hashes ( hashes) ;
345+ Ok ( ColumnarValue :: Array ( Arc :: new ( array) ) )
346+ } ,
347+ ) ;
348+ }
349+
335350 // Evaluate hash expression to get hash values
336351 let hash_array = self . hash_expr . evaluate ( batch) ?. into_array ( num_rows) ?;
337352 let hash_array = hash_array. as_any ( ) . downcast_ref :: < UInt64Array > ( ) . ok_or (
0 commit comments