diff --git a/src/common/hashtable/src/hashjoin_hashtable.rs b/src/common/hashtable/src/hashjoin_hashtable.rs index 0fec13dc8a4d2..f51240e914343 100644 --- a/src/common/hashtable/src/hashjoin_hashtable.rs +++ b/src/common/hashtable/src/hashjoin_hashtable.rs @@ -129,7 +129,7 @@ impl HashJoinHashTable { hashtable } - pub fn insert(&mut self, key: K, entry_ptr: *mut RawEntry) { + pub fn insert(&self, key: K, entry_ptr: *mut RawEntry) { let hash = key.hash(); let index = (hash >> self.hash_shift) as usize; let new_header = new_header(entry_ptr as u64, hash); diff --git a/src/common/hashtable/src/hashjoin_string_hashtable.rs b/src/common/hashtable/src/hashjoin_string_hashtable.rs index 2779bb08f9964..2d664058d04a2 100644 --- a/src/common/hashtable/src/hashjoin_string_hashtable.rs +++ b/src/common/hashtable/src/hashjoin_string_hashtable.rs @@ -63,7 +63,7 @@ impl HashJoinStringHashTable { hashtable } - pub fn insert(&mut self, key: &[u8], entry_ptr: *mut StringRawEntry) { + pub fn insert(&self, key: &[u8], entry_ptr: *mut StringRawEntry) { let hash = hash_join_fast_string_hash(key); let index = (hash >> self.hash_shift) as usize; let new_header = new_header(entry_ptr as u64, hash); diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index d0196c8f82e79..effccbea3cdd7 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -644,8 +644,8 @@ impl DataBlock { } #[inline] - pub fn remove_column(&mut self, index: usize) { - self.entries.remove(index); + pub fn remove_column(&mut self, index: usize) -> BlockEntry { + self.entries.remove(index) } #[inline] diff --git a/src/query/expression/src/kernels/group_by_hash/method_fixed_keys.rs b/src/query/expression/src/kernels/group_by_hash/method_fixed_keys.rs index eb344ca28e14d..14c94a5dfd368 100644 --- a/src/query/expression/src/kernels/group_by_hash/method_fixed_keys.rs +++ b/src/query/expression/src/kernels/group_by_hash/method_fixed_keys.rs @@ -78,7 +78,9 @@ where T: Clone + Default } } -trait FixedKey: FastHash + 'static + Sized + Clone + Default + Eq + Debug + Sync + Send { +pub trait FixedKey: + FastHash + 'static + Sized + Clone + Default + Eq + Debug + Sync + Send +{ fn downcast(keys_state: &KeysState) -> Option<&Buffer>; fn downcast_owned(keys_state: KeysState) -> Option>; diff --git a/src/query/expression/src/projected_block.rs b/src/query/expression/src/projected_block.rs index d0bd49b2934fd..a14b878f222a1 100644 --- a/src/query/expression/src/projected_block.rs +++ b/src/query/expression/src/projected_block.rs @@ -48,6 +48,16 @@ impl<'a> ProjectedBlock<'a> { } } + pub fn num_rows(&self) -> usize { + if self.entries.is_empty() { + return 0; + } + + let num_rows = self.entries[0].len(); + debug_assert!(self.entries.iter().all(|c| c.len() == num_rows)); + num_rows + } + pub fn slice(&self, index: I) -> ProjectedBlock<'_> where I: SliceIndex<[usize], Output = [usize]> + SliceIndex<[BlockEntry], Output = [BlockEntry]> { diff --git a/src/query/service/src/lib.rs b/src/query/service/src/lib.rs index d7684a3d87788..3a82340d825a6 100644 --- a/src/query/service/src/lib.rs +++ b/src/query/service/src/lib.rs @@ -41,7 +41,9 @@ #![allow(clippy::diverging_sub_expression)] #![allow(clippy::arc_with_non_send_sync)] #![feature(debug_closure_helpers)] - +#![feature(associated_type_defaults)] +#![feature(mapped_lock_guards)] +#![feature(unsafe_cell_access)] extern crate core; pub mod auth; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index 9d70cf417688c..a88cb20c3c899 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -67,7 +67,9 @@ pub struct FixedKeyHashJoinHashTable { pub(crate) hash_method: HashMethodFixedKeys, } +#[derive(Default)] pub enum HashJoinHashTable { + #[default] Null, Serializer(SerializerHashJoinHashTable), SingleBinary(SingleBinaryHashJoinHashTable), diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index 1d231572fe49f..ed10b3623b760 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -17,6 +17,7 @@ pub mod aggregator; mod broadcast; mod hash_join; mod materialized_cte; +pub mod new_hash_join; pub(crate) mod range_join; mod runtime_pool; pub mod sort; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/common.rs new file mode 100644 index 0000000000000..09737f1c5cbbd --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/common.rs @@ -0,0 +1,345 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cell::SyncUnsafeCell; +use std::collections::VecDeque; +use std::ops::Deref; +use std::sync::Mutex; +use std::sync::MutexGuard; +use std::sync::PoisonError; + +use databend_common_column::bitmap::Bitmap; +use databend_common_exception::Result; +use databend_common_expression::arrow::and_validities; +use databend_common_expression::BlockEntry; +use databend_common_expression::DataBlock; +use databend_common_expression::Evaluator; +use databend_common_functions::BUILTIN_FUNCTIONS; + +use crate::pipelines::processors::transforms::new_hash_join::JoinParams; + +pub struct PartialBlock { + avg_bytes: usize, + block: DataBlock, +} + +impl PartialBlock { + pub fn new(block: DataBlock) -> PartialBlock { + let avg_bytes = block.memory_size().div_ceil(block.num_rows()); + PartialBlock { avg_bytes, block } + } +} + +#[derive(Default)] +pub struct PartialBlocks { + pub current_rows: usize, + pub current_bytes: usize, + pub blocks: VecDeque, +} + +impl PartialBlocks { + pub fn new() -> PartialBlocks { + PartialBlocks { + current_rows: 0, + current_bytes: 0, + blocks: VecDeque::new(), + } + } + + pub fn add_block(&mut self, block: DataBlock) { + self.current_rows += block.num_rows(); + self.current_bytes += block.memory_size(); + self.blocks.push_back(PartialBlock::new(block)); + } + + pub fn compact_block(&mut self, rows: usize, bytes: usize) -> Result { + // Only split when rows exceed 2× the two-thirds point. + if self.current_rows >= rows * 2 / 3 * 2 { + return self.compact_block_inner(rows, bytes); + } + + if self.current_bytes >= bytes * 2 / 3 * 2 { + return self.compact_block_inner(rows, bytes); + } + + let blocks = std::mem::take(&mut self.blocks) + .into_iter() + .map(|block| block.block) + .collect::>(); + + DataBlock::concat(&blocks) + } + + fn compact_block_inner(&mut self, rows: usize, bytes: usize) -> Result { + let mut blocks = vec![]; + + let mut current_rows = 0; + let mut current_bytes = 0; + + while let Some(mut block) = self.blocks.pop_front() { + if current_rows + block.block.num_rows() >= rows { + let compact_block = block.block.slice(0..rows - current_rows); + let remain_block = block + .block + .slice(rows - current_rows..block.block.num_rows()); + + blocks.push(compact_block); + + if !remain_block.is_empty() { + block.block = remain_block; + self.blocks.push_front(block); + } + + break; + } + + if current_bytes + block.block.memory_size() >= bytes { + let compact_bytes = bytes - current_bytes; + let estimated_rows = compact_bytes / block.avg_bytes; + + let compact_block = block.block.slice(0..estimated_rows); + let remain_block = block.block.slice(estimated_rows..block.block.num_rows()); + + blocks.push(compact_block); + + if !remain_block.is_empty() { + block.block = remain_block; + self.blocks.push_front(block); + } + + break; + } + + current_rows += block.block.num_rows(); + current_bytes += block.block.memory_size(); + blocks.push(block.block); + } + + DataBlock::concat(&blocks) + } +} + +/// Compact small blocks into larger blocks that meet the max_rows and max_bytes requirements +pub fn compact_blocks( + blocks: impl IntoIterator, + rows: usize, + bytes: usize, +) -> Result> { + let mut compacted_blocks = Vec::new(); + let mut current_blocks = Vec::new(); + let mut current_rows = 0; + let mut current_bytes = 0; + + for block in blocks { + let block_rows = block.num_rows(); + let block_bytes = block.memory_size(); + + // Check if adding this block would exceed the limits + if (current_rows + block_rows >= rows) || (current_bytes + block_bytes >= bytes) { + // If we have accumulated blocks, compact them + if !current_blocks.is_empty() { + let compacted = DataBlock::concat(¤t_blocks)?; + // Free memory quickly. + current_blocks.clear(); + + if !compacted.is_empty() { + compacted_blocks.push(compacted); + } + + current_rows = 0; + current_bytes = 0; + } + + // If the current block itself meets the requirements, add it directly + if block_rows >= rows || block_bytes >= bytes { + compacted_blocks.push(block); + } else { + // Otherwise, start a new accumulation with this block + current_blocks.push(block); + current_rows = block_rows; + current_bytes = block_bytes; + } + } else { + // Add block to current accumulation + current_blocks.push(block); + current_rows += block_rows; + current_bytes += block_bytes; + } + } + + // Handle remaining blocks + if !current_blocks.is_empty() { + let compacted = DataBlock::concat(¤t_blocks)?; + // Free memory quickly. + current_blocks.clear(); + + if !compacted.is_empty() { + compacted_blocks.push(compacted); + } + } + + Ok(compacted_blocks) +} + +pub fn build_join_keys(block: DataBlock, params: &JoinParams) -> Result { + let build_keys = ¶ms.build_keys; + + let evaluator = Evaluator::new(&block, ¶ms.func_ctx, &BUILTIN_FUNCTIONS); + let keys_entries: Vec = build_keys + .iter() + .map(|expr| { + Ok(evaluator + .run(expr)? + .convert_to_full_column(expr.data_type(), block.num_rows()) + .into()) + }) + .collect::>()?; + + // projection data blocks + let column_nums = block.num_columns(); + let mut block_entries = Vec::with_capacity(params.build_projections.len()); + + for index in 0..column_nums { + if !params.build_projections.contains(&index) { + continue; + } + + block_entries.push(block.get_by_offset(index).clone()); + } + + let mut projected_block = DataBlock::new(block_entries, block.num_rows()); + // After computing complex join key expressions, we discard unnecessary columns as soon as possible to expect the release of memory. + drop(block); + + let is_null_equal = ¶ms.is_null_equals; + let mut valids = None; + + for (entry, null_equals) in keys_entries.iter().zip(is_null_equal.iter()) { + if !null_equals { + let (is_all_null, column_valids) = entry.as_column().unwrap().validity(); + + if is_all_null { + valids = Some(Bitmap::new_constant(false, projected_block.num_rows())); + break; + } + + valids = and_validities(valids, column_valids.cloned()); + + if let Some(bitmap) = valids.as_ref() { + if bitmap.null_count() == bitmap.len() { + break; + } + + if bitmap.null_count() == 0 { + valids = None; + } + } + } + } + + for (entry, is_null) in keys_entries.into_iter().zip(is_null_equal.iter()) { + projected_block.add_entry(match !is_null && entry.data_type().is_nullable() { + true => entry.remove_nullable(), + false => entry, + }); + } + + if let Some(bitmap) = valids { + if bitmap.null_count() != bitmap.len() { + return projected_block.filter_with_bitmap(&bitmap); + } + } + + Ok(projected_block) +} + +pub struct IgnorePanicMutex { + inner: Mutex, +} + +impl IgnorePanicMutex { + pub fn new(inner: T) -> Self { + Self { + inner: Mutex::new(inner), + } + } + + pub fn into_inner(self) -> T { + self.inner + .into_inner() + .unwrap_or_else(PoisonError::into_inner) + } + + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut().unwrap_or_else(PoisonError::into_inner) + } + + pub fn lock(&self) -> MutexGuard<'_, T> { + self.inner.lock().unwrap_or_else(PoisonError::into_inner) + } +} + +/// A C-style cell that provides interior mutability without runtime borrow checking. +/// +/// This is a thin wrapper around `SyncUnsafeCell` that allows shared mutable access +/// to the inner value without the overhead of `RefCell` or `Mutex`. It's designed +/// for performance-critical scenarios where the caller can guarantee memory safety. +/// +/// # Safety +/// +/// - The caller must ensure that there are no data races when accessing the inner value +/// - Multiple mutable references to the same data must not exist simultaneously +/// - This should only be used when you can statically guarantee exclusive access +/// or when protected by external synchronization mechanisms +/// +/// # Use Cases +/// +/// - High-performance hash join operations where contention is managed externally +/// - Single-threaded contexts where `RefCell`'s runtime checks are unnecessary overhead +/// - Data structures that implement their own synchronization protocols +pub struct CStyleCell { + inner: SyncUnsafeCell, +} + +impl CStyleCell { + pub fn new(inner: T) -> Self { + Self { + inner: SyncUnsafeCell::new(inner), + } + } + + pub fn into_inner(self) -> T { + self.inner.into_inner() + } + + /// Returns a mutable reference to the inner value. + /// + /// # Safety + /// + /// The caller must ensure that no other references (mutable or immutable) + /// to the inner value exist when this method is called, and that the + /// returned reference is not used concurrently with other accesses. + #[allow(clippy::mut_from_ref)] + pub fn as_mut(&self) -> &mut T { + unsafe { &mut *self.inner.get() } + } +} + +impl Deref for CStyleCell { + type Target = T; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.inner.get() } + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/interfaces.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/interfaces.rs new file mode 100644 index 0000000000000..533da4ee6c037 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/interfaces.rs @@ -0,0 +1,223 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::Expr; +use databend_common_expression::FunctionContext; +use databend_common_sql::ColumnSet; +// use futures_util::FutureExt; // Not needed anymore + +pub enum ProbeData { + Next, + DataBlock(DataBlock), +} + +pub struct Progress { + pub total_rows: usize, + pub total_bytes: usize, + pub progressed_rows: usize, + pub progressed_bytes: usize, +} + +/// Core Join trait that abstracts different join implementations. +/// Provides unified interface for both regular hash join and grace hash join. +pub trait Join: Send + Sync + 'static { + fn as_any(&self) -> &dyn Any; + + fn add_block(&self, block: DataBlock) -> Result>; + + fn finish_build(&self) -> Result>; + + fn probe(&self, block: DataBlock) -> Result>; + + fn finish_probe(&self) -> Result>; +} + +// /// Convertible join trait for joins that can be converted from one type to another + +// /// Convert this join implementation to another type +// pub trait ConvertibleJoin: Join { +// fn convert_to(self) -> Result; +// } + +/// Join configuration for setting up different join types +#[derive(Clone)] +pub struct JoinSettings { + pub max_block_rows: usize, + pub max_block_bytes: usize, + // pub join_type: JoinType, + // pub build_keys: Vec, + // pub probe_keys: Vec, + // pub hash_method: HashMethodKind, + // pub func_ctx: Arc, + // pub memory_settings: MemorySettings, + // pub is_null_equal: Vec, + // pub max_block_size: usize, +} + +impl JoinSettings { + pub fn check_threshold(&self, block: &DataBlock) -> bool { + block.num_rows() >= self.max_block_rows || block.memory_size() >= self.max_block_bytes + } +} + +pub struct JoinParams { + pub build_keys: Vec, + pub probe_keys: Vec, + pub is_null_equals: Vec, + + pub build_projections: ColumnSet, + + pub func_ctx: Arc, +} + +pub trait ITryCompleteStream: Send + 'static { + fn next_try_complete(&mut self) -> Result>>; +} + +#[async_trait::async_trait] +pub trait ITryCompleteFuture: Send + 'static { + fn try_complete(&mut self) -> Result>; + + async fn async_complete(&mut self) -> Result { + unreachable!() + } +} + +pub type TryCompleteStream = Box>; + +/// A wrapper struct that implements Future for ITryCompleteFuture +/// This allows ITryCompleteFuture to be polled as a regular Future +pub struct TryCompleteFuture { + inner: Option>>, + async_future: Option> + Send + 'static>>>, +} + +impl TryCompleteFuture { + pub fn new>(inner: F) -> Self { + Self { + inner: Some(Box::new(inner)), + async_future: None, + } + } + + pub fn try_complete(&mut self) -> Result> { + if let Some(inner) = &mut self.inner { + if let Some(res) = inner.try_complete()? { + self.inner = None; + return Ok(Some(res)); + } + + let mut inner = self.inner.take().unwrap(); + let fut = async move { inner.async_complete().await }; + self.async_future = Some(Box::pin(fut)); + } + + Ok(None) + } +} + +impl Future for TryCompleteFuture { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().get_mut(); + + // If we have an async future already running, poll it + if let Some(ref mut fut) = this.async_future { + match fut.as_mut().poll(cx) { + Poll::Ready(result) => { + this.async_future = None; + return Poll::Ready(result); + } + Poll::Pending => return Poll::Pending, + } + } + + // Try synchronous completion first + if let Some(ref mut inner) = this.inner { + match inner.try_complete() { + Ok(Some(result)) => { + this.inner = None; + return Poll::Ready(Ok(result)); + } + Ok(None) => { + // Move to async completion + if let Some(mut inner) = this.inner.take() { + let fut = async move { inner.async_complete().await }; + this.async_future = Some(Box::pin(fut)); + // Immediately poll the new future + return self.poll(cx); + } + } + Err(e) => { + this.inner = None; + return Poll::Ready(Err(e)); + } + } + } + + Poll::Pending + } +} + +pub struct NoneTryCompleteStream(PhantomData); + +impl NoneTryCompleteStream { + pub fn create() -> TryCompleteStream { + Box::new(NoneTryCompleteStream(PhantomData)) + } +} + +impl ITryCompleteStream for NoneTryCompleteStream { + fn next_try_complete(&mut self) -> Result>> { + Ok(None) + } +} + +pub struct FlattenTryCompleteStream { + stream: VecDeque>, +} + +impl FlattenTryCompleteStream { + pub fn create(stream: Vec>) -> TryCompleteStream { + Box::new(FlattenTryCompleteStream { + stream: VecDeque::from(stream), + }) + } +} + +impl ITryCompleteStream for FlattenTryCompleteStream { + fn next_try_complete(&mut self) -> Result>> { + while let Some(stream) = self.stream.front_mut() { + if let Some(v) = stream.next_try_complete()? { + return Ok(Some(v)); + } + + self.stream.pop_front(); + } + + Ok(None) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_join.rs new file mode 100644 index 0000000000000..61de3339748fc --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_join.rs @@ -0,0 +1,122 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::DataBlock; + +use crate::pipelines::processors::transforms::new_hash_join::compact_blocks; +use crate::pipelines::processors::transforms::new_hash_join::memory::memory_hash_table_allocator::MemoryHashTableAllocator; +use crate::pipelines::processors::transforms::new_hash_join::memory::memory_hash_table_append::HashTableAppendScheduler; +use crate::pipelines::processors::transforms::new_hash_join::CStyleCell; +use crate::pipelines::processors::transforms::new_hash_join::FlattenTryCompleteStream; +use crate::pipelines::processors::transforms::new_hash_join::IgnorePanicMutex; +use crate::pipelines::processors::transforms::new_hash_join::Join; +use crate::pipelines::processors::transforms::new_hash_join::JoinParams; +use crate::pipelines::processors::transforms::new_hash_join::JoinSettings; +use crate::pipelines::processors::transforms::new_hash_join::NoneTryCompleteStream; +use crate::pipelines::processors::transforms::new_hash_join::ProbeData; +use crate::pipelines::processors::transforms::new_hash_join::Progress; +use crate::pipelines::processors::transforms::new_hash_join::TryCompleteStream; +use crate::pipelines::processors::transforms::HashJoinHashTable; + +pub struct MemoryHashJoin { + state: Arc, +} + +impl Join for MemoryHashJoin { + fn as_any(&self) -> &dyn Any { + self + } + + fn add_block(&self, block: DataBlock) -> Result> { + if self.state.settings.check_threshold(&block) { + let _guard = self.state.mutex.lock(); + + *self.state.build_rows.as_mut() += block.num_rows(); + self.state.chunks.as_mut().push(block); + return Ok(NoneTryCompleteStream::create()); + } + + // compact for small blocks + let _guard = self.state.mutex.lock(); + *self.state.build_rows.as_mut() += block.num_rows(); + self.state.small_chunks.as_mut().push_back(block); + Ok(NoneTryCompleteStream::create()) + } + + fn finish_build(&self) -> Result> { + // The compact process may still OOM, but the probability is very low. + let _guard = self.state.mutex.lock(); + + // 1. Compact undersized blocks into uniformly-sized blocks. + self.state.compact_small_blocks()?; + + // 2. Initialize task queue for parallel building. + self.state.init_working_queue(); + + // Large memory operations need to be performed under the protection of a stream. + Ok(FlattenTryCompleteStream::create(vec![ + // 3. init hash table with fixed memory size + MemoryHashTableAllocator::create(self.state.clone()), + // 4. Populate the hashtable with the collected data chunks. + HashTableAppendScheduler::create(self.state.clone()), + ])) + } + + fn probe(&self, _block: DataBlock) -> Result> { + todo!() + } + + fn finish_probe(&self) -> Result> { + Ok(NoneTryCompleteStream::create()) + } +} + +pub struct MemoryHashJoinState { + pub params: JoinParams, + pub settings: JoinSettings, + + pub mutex: IgnorePanicMutex<()>, + pub build_rows: CStyleCell, + pub chunks: CStyleCell>, + pub small_chunks: CStyleCell>, + + pub working_queue: CStyleCell>, + + pub hash_table: CStyleCell, + pub arenas: CStyleCell>>, +} + +impl MemoryHashJoinState { + pub fn compact_small_blocks(&self) -> Result<()> { + if self.small_chunks.is_empty() { + return Ok(()); + } + + let small_blocks = std::mem::take(self.small_chunks.as_mut()); + let compacted_blocks = compact_blocks( + small_blocks, + self.settings.max_block_rows, + self.settings.max_block_bytes, + )?; + + let chunks = self.chunks.as_mut(); + chunks.extend(compacted_blocks); + Ok(()) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_table.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_table.rs new file mode 100644 index 0000000000000..e0f8889bae989 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_table.rs @@ -0,0 +1,185 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::Column; +use databend_common_expression::FixedKey; +use databend_common_expression::HashMethod; +use databend_common_expression::KeysState; +use databend_common_expression::ProjectedBlock; +use databend_common_hashtable::HashtableKeyable; +use databend_common_hashtable::RawEntry; +use databend_common_hashtable::RowPtr; +use databend_common_hashtable::StringRawEntry; +use databend_common_hashtable::STRING_EARLY_SIZE; + +use crate::pipelines::processors::transforms::FixedKeyHashJoinHashTable; +use crate::pipelines::processors::transforms::SerializerHashJoinHashTable; +use crate::pipelines::processors::transforms::SingleBinaryHashJoinHashTable; + +pub trait JoinHashTable: Send + Sync + 'static { + fn insert(&self, keys: ProjectedBlock, chunk: usize, arena: &mut Vec) -> Result<()>; +} + +impl JoinHashTable for SerializerHashJoinHashTable { + fn insert(&self, keys: ProjectedBlock, chunk: usize, arena: &mut Vec) -> Result<()> { + let num_rows = keys.num_rows(); + let keys_state = self.hash_method.build_keys_state(keys, num_rows)?; + let build_keys_iter = self.hash_method.build_keys_iter(&keys_state)?; + + let space_size = match &keys_state { + // safe to unwrap(): offset.len() >= 1. + KeysState::Column(Column::Bitmap(col)) => col.data().len(), + KeysState::Column(Column::Binary(col)) => col.data().len(), + KeysState::Column(Column::Variant(col)) => col.data().len(), + KeysState::Column(Column::String(col)) => col.total_bytes_len(), + _ => unreachable!(), + }; + + static ENTRY_SIZE: usize = std::mem::size_of::(); + arena.reserve(num_rows * ENTRY_SIZE + space_size); + + let (mut raw_entry_ptr, mut string_local_space_ptr) = unsafe { + ( + std::mem::transmute::<*mut u8, *mut StringRawEntry>(arena.as_mut_ptr()), + arena.as_mut_ptr().add(num_rows * ENTRY_SIZE), + ) + }; + + for (row_index, key) in build_keys_iter.enumerate() { + let row_ptr = RowPtr { + chunk_index: chunk as u32, + row_index: row_index as u32, + }; + + // # Safety + // The memory address of `raw_entry_ptr` is valid. + // string_offset + key.len() <= space_size. + unsafe { + (*raw_entry_ptr).row_ptr = row_ptr; + (*raw_entry_ptr).length = key.len() as u32; + (*raw_entry_ptr).next = 0; + (*raw_entry_ptr).key = string_local_space_ptr; + // The size of `early` is 4. + std::ptr::copy_nonoverlapping( + key.as_ptr(), + (*raw_entry_ptr).early.as_mut_ptr(), + std::cmp::min(STRING_EARLY_SIZE, key.len()), + ); + std::ptr::copy_nonoverlapping(key.as_ptr(), string_local_space_ptr, key.len()); + string_local_space_ptr = string_local_space_ptr.add(key.len()); + } + + self.hash_table.insert(key, raw_entry_ptr); + raw_entry_ptr = unsafe { raw_entry_ptr.add(1) }; + } + + Ok(()) + } +} + +impl JoinHashTable for SingleBinaryHashJoinHashTable { + fn insert(&self, keys: ProjectedBlock, chunk: usize, arena: &mut Vec) -> Result<()> { + let num_rows = keys.num_rows(); + let keys_state = self.hash_method.build_keys_state(keys, num_rows)?; + let build_keys_iter = self.hash_method.build_keys_iter(&keys_state)?; + + let space_size = match &keys_state { + // safe to unwrap(): offset.len() >= 1. + KeysState::Column(Column::Bitmap(col)) => col.data().len(), + KeysState::Column(Column::Binary(col)) => col.data().len(), + KeysState::Column(Column::Variant(col)) => col.data().len(), + KeysState::Column(Column::String(col)) => col.total_bytes_len(), + _ => unreachable!(), + }; + + static ENTRY_SIZE: usize = std::mem::size_of::(); + arena.reserve(num_rows * ENTRY_SIZE + space_size); + + let (mut raw_entry_ptr, mut string_local_space_ptr) = unsafe { + ( + std::mem::transmute::<*mut u8, *mut StringRawEntry>(arena.as_mut_ptr()), + arena.as_mut_ptr().add(num_rows * ENTRY_SIZE), + ) + }; + + for (row_index, key) in build_keys_iter.enumerate() { + let row_ptr = RowPtr { + chunk_index: chunk as u32, + row_index: row_index as u32, + }; + + // # Safety + // The memory address of `raw_entry_ptr` is valid. + // string_offset + key.len() <= space_size. + unsafe { + (*raw_entry_ptr).row_ptr = row_ptr; + (*raw_entry_ptr).length = key.len() as u32; + (*raw_entry_ptr).next = 0; + (*raw_entry_ptr).key = string_local_space_ptr; + // The size of `early` is 4. + std::ptr::copy_nonoverlapping( + key.as_ptr(), + (*raw_entry_ptr).early.as_mut_ptr(), + std::cmp::min(STRING_EARLY_SIZE, key.len()), + ); + std::ptr::copy_nonoverlapping(key.as_ptr(), string_local_space_ptr, key.len()); + string_local_space_ptr = string_local_space_ptr.add(key.len()); + } + + self.hash_table.insert(key, raw_entry_ptr); + raw_entry_ptr = unsafe { raw_entry_ptr.add(1) }; + } + + Ok(()) + } +} + +impl JoinHashTable + for FixedKeyHashJoinHashTable +{ + fn insert(&self, keys: ProjectedBlock, chunk: usize, arena: &mut Vec) -> Result<()> { + let num_rows = keys.num_rows(); + let keys_state = self.hash_method.build_keys_state(keys, num_rows)?; + let build_keys_iter = self.hash_method.build_keys_iter(&keys_state)?; + + let entry_size = std::mem::size_of::>(); + arena.reserve(num_rows * entry_size); + + let mut raw_entry_ptr = + unsafe { std::mem::transmute::<*mut u8, *mut RawEntry>(arena.as_mut_ptr()) }; + + for (row_index, key) in build_keys_iter.enumerate() { + let row_ptr = RowPtr { + chunk_index: chunk as u32, + row_index: row_index as u32, + }; + + // # Safety + // The memory address of `raw_entry_ptr` is valid. + unsafe { + *raw_entry_ptr = RawEntry { + row_ptr, + key: *key, + next: 0, + } + } + + self.hash_table.insert(*key, raw_entry_ptr); + raw_entry_ptr = unsafe { raw_entry_ptr.add(1) }; + } + + Ok(()) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_table_allocator.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_table_allocator.rs new file mode 100644 index 0000000000000..55e3c010b5241 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_table_allocator.rs @@ -0,0 +1,101 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; +use std::sync::Arc; + +use databend_common_hashtable::BinaryHashJoinHashMap; +use databend_common_hashtable::HashJoinHashMap; +use ethnum::U256; + +use crate::pipelines::processors::transforms::new_hash_join::memory::memory_hash_join::MemoryHashJoinState; +use crate::pipelines::processors::transforms::new_hash_join::ITryCompleteStream; +use crate::pipelines::processors::transforms::new_hash_join::Progress; +use crate::pipelines::processors::transforms::new_hash_join::TryCompleteFuture; +use crate::pipelines::processors::transforms::new_hash_join::TryCompleteStream; +use crate::pipelines::processors::transforms::FixedKeyHashJoinHashTable; +use crate::pipelines::processors::transforms::HashJoinHashTable; +use crate::pipelines::processors::transforms::SerializerHashJoinHashTable; +use crate::pipelines::processors::transforms::SingleBinaryHashJoinHashTable; + +pub struct MemoryHashTableAllocator { + state: Arc, +} + +impl MemoryHashTableAllocator { + pub fn create(state: Arc) -> TryCompleteStream { + Box::new(MemoryHashTableAllocator { state }) + } +} + +impl ITryCompleteStream for MemoryHashTableAllocator { + fn next_try_complete( + &mut self, + ) -> databend_common_exception::Result>> { + let _guard = self.state.mutex.lock(); + let rows = *self.state.build_rows.deref(); + + *self.state.hash_table.as_mut() = match std::mem::take(self.state.hash_table.as_mut()) { + HashJoinHashTable::Null => HashJoinHashTable::Null, + HashJoinHashTable::Serializer(v) => { + HashJoinHashTable::Serializer(SerializerHashJoinHashTable { + hash_method: v.hash_method, + hash_table: BinaryHashJoinHashMap::with_build_row_num(rows), + }) + } + HashJoinHashTable::SingleBinary(v) => { + HashJoinHashTable::SingleBinary(SingleBinaryHashJoinHashTable { + hash_method: v.hash_method, + hash_table: BinaryHashJoinHashMap::with_build_row_num(rows), + }) + } + HashJoinHashTable::KeysU8(v) => HashJoinHashTable::KeysU8(FixedKeyHashJoinHashTable { + hash_method: v.hash_method, + hash_table: HashJoinHashMap::::with_build_row_num(rows), + }), + HashJoinHashTable::KeysU16(v) => { + HashJoinHashTable::KeysU16(FixedKeyHashJoinHashTable { + hash_method: v.hash_method, + hash_table: HashJoinHashMap::::with_build_row_num(rows), + }) + } + HashJoinHashTable::KeysU32(v) => { + HashJoinHashTable::KeysU32(FixedKeyHashJoinHashTable { + hash_method: v.hash_method, + hash_table: HashJoinHashMap::::with_build_row_num(rows), + }) + } + HashJoinHashTable::KeysU64(v) => { + HashJoinHashTable::KeysU64(FixedKeyHashJoinHashTable { + hash_method: v.hash_method, + hash_table: HashJoinHashMap::::with_build_row_num(rows), + }) + } + HashJoinHashTable::KeysU128(v) => { + HashJoinHashTable::KeysU128(FixedKeyHashJoinHashTable { + hash_method: v.hash_method, + hash_table: HashJoinHashMap::::with_build_row_num(rows), + }) + } + HashJoinHashTable::KeysU256(v) => { + HashJoinHashTable::KeysU256(FixedKeyHashJoinHashTable { + hash_method: v.hash_method, + hash_table: HashJoinHashMap::::with_build_row_num(rows), + }) + } + }; + + Ok(None) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_table_append.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_table_append.rs new file mode 100644 index 0000000000000..6a2e64100403b --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/memory_hash_table_append.rs @@ -0,0 +1,119 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; +use std::sync::Arc; + +use databend_common_expression::ProjectedBlock; + +use crate::pipelines::processors::transforms::new_hash_join::memory::memory_hash_join::MemoryHashJoinState; +use crate::pipelines::processors::transforms::new_hash_join::memory::memory_hash_table::JoinHashTable; +use crate::pipelines::processors::transforms::new_hash_join::ITryCompleteFuture; +use crate::pipelines::processors::transforms::new_hash_join::ITryCompleteStream; +use crate::pipelines::processors::transforms::new_hash_join::Progress; +use crate::pipelines::processors::transforms::new_hash_join::TryCompleteFuture; +use crate::pipelines::processors::transforms::new_hash_join::TryCompleteStream; +use crate::pipelines::processors::transforms::HashJoinHashTable; + +pub struct HashTableAppendScheduler { + state: Arc, +} + +impl HashTableAppendScheduler { + pub fn create(state: Arc) -> TryCompleteStream { + Box::new(HashTableAppendScheduler { state }) + } + + fn steal_task(&self) -> Option { + let _guard = self.state.mutex.lock(); + self.state.working_queue.as_mut().pop_front() + } +} + +impl ITryCompleteStream for HashTableAppendScheduler { + fn next_try_complete( + &mut self, + ) -> databend_common_exception::Result>> { + let Some(chunk_num) = self.steal_task() else { + return Ok(None); + }; + + Ok(Some(HashTableAppendBlock::create( + chunk_num, + self.state.clone(), + ))) + } +} + +pub struct HashTableAppendBlock { + chunk_num: usize, + state: Arc, +} + +impl HashTableAppendBlock { + pub fn create(num: usize, state: Arc) -> TryCompleteFuture { + TryCompleteFuture::new(HashTableAppendBlock { + chunk_num: num, + state, + }) + } +} + +impl ITryCompleteFuture for HashTableAppendBlock { + fn try_complete(&mut self) -> databend_common_exception::Result> { + let block = &mut self.state.chunks.as_mut()[self.chunk_num]; + + let key_num = self.state.params.build_keys.len(); + let mut group_keys = Vec::with_capacity(key_num); + + let block_column_num = block.num_columns() - key_num; + while block.num_columns() > block_column_num { + group_keys.push(block.remove_column(block_column_num)); + } + + let mut arena = Vec::new(); + let keys = ProjectedBlock::from(&group_keys); + + match self.state.hash_table.deref() { + HashJoinHashTable::Null => Ok(()), + HashJoinHashTable::KeysU8(v) => v.insert(keys, self.chunk_num, &mut arena), + HashJoinHashTable::KeysU16(v) => v.insert(keys, self.chunk_num, &mut arena), + HashJoinHashTable::KeysU32(v) => v.insert(keys, self.chunk_num, &mut arena), + HashJoinHashTable::KeysU64(v) => v.insert(keys, self.chunk_num, &mut arena), + HashJoinHashTable::KeysU128(v) => v.insert(keys, self.chunk_num, &mut arena), + HashJoinHashTable::KeysU256(v) => v.insert(keys, self.chunk_num, &mut arena), + HashJoinHashTable::Serializer(v) => v.insert(keys, self.chunk_num, &mut arena), + HashJoinHashTable::SingleBinary(v) => v.insert(keys, self.chunk_num, &mut arena), + }?; + + { + let _guard = self.state.mutex.lock(); + self.state.arenas.as_mut().push(arena); + } + + Ok(Some(Progress { + total_rows: *self.state.build_rows.deref(), + total_bytes: 0, + progressed_rows: 0, + progressed_bytes: 0, + })) + } +} + +impl MemoryHashJoinState { + pub fn init_working_queue(&self) { + let working_queue = self.working_queue.as_mut(); + working_queue.extend(0..self.chunks.len()); + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs new file mode 100644 index 0000000000000..230043ffb5fb0 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod memory_hash_join; +mod memory_hash_table; +mod memory_hash_table_allocator; +mod memory_hash_table_append; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs new file mode 100644 index 0000000000000..2b3095b34d6bc --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; +mod interfaces; + +mod memory; +mod transform_hash_join; + +pub use common::*; +pub use interfaces::*; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs new file mode 100644 index 0000000000000..3b93060cbe363 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs @@ -0,0 +1,235 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::Event; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::Processor; + +use crate::pipelines::processors::transforms::new_hash_join::build_join_keys; +use crate::pipelines::processors::transforms::new_hash_join::Join; +use crate::pipelines::processors::transforms::new_hash_join::JoinParams; +use crate::pipelines::processors::transforms::new_hash_join::ProbeData; +use crate::pipelines::processors::transforms::new_hash_join::Progress; +use crate::pipelines::processors::transforms::new_hash_join::TryCompleteFuture; +use crate::pipelines::processors::transforms::new_hash_join::TryCompleteStream; + +enum HashJoinStream { + Stream(TryCompleteStream), + Future((TryCompleteFuture, TryCompleteStream)), +} + +pub struct TransformHashJoin { + build_input: Arc, + probe_input: Arc, + joined_output: Arc, + + output_data: Option, + build_input_data: Option, + probe_input_data: Option, + + build_finish: bool, + probe_finish: bool, + build_stream: Option>, + probe_stream: Option>, + + join: Arc, + + params: Arc, +} + +#[async_trait::async_trait] +impl Processor for TransformHashJoin { + fn name(&self) -> String { + String::from("HashJoin") + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.joined_output.is_finished() { + self.build_input.finish(); + self.probe_input.finish(); + return Ok(Event::Finished); + } + + if !self.joined_output.can_push() { + self.build_input.set_not_need_data(); + self.probe_input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if let Some(output_data) = self.output_data.take() { + self.joined_output.push_data(Ok(output_data)); + return Ok(Event::NeedConsume); + } + + if let Some(stream) = self.build_stream.as_ref() { + return match stream { + HashJoinStream::Stream(_) => Ok(Event::Sync), + HashJoinStream::Future((_, _)) => Ok(Event::Async), + }; + } + + if self.build_input_data.is_some() { + return Ok(Event::Sync); + } + + if self.build_input.has_data() { + let build_data = self.build_input.pull_data().unwrap()?; + self.build_input_data = Some(build_data); + return Ok(Event::Sync); + } + + if !self.build_input.is_finished() { + self.build_input.set_need_data(); + return Ok(Event::NeedData); + } + + if !self.build_finish { + self.build_finish = true; + let stream = self.join.finish_build()?; + self.build_stream = Some(HashJoinStream::Stream(stream)); + return Ok(Event::Sync); + } + + if let Some(stream) = self.probe_stream.as_ref() { + return match stream { + HashJoinStream::Stream(_) => Ok(Event::Sync), + HashJoinStream::Future((_, _)) => Ok(Event::Async), + }; + } + + if self.probe_input_data.is_some() { + return Ok(Event::Sync); + } + + if self.probe_input.has_data() { + let probe_data = self.probe_input.pull_data().unwrap()?; + self.probe_input_data = Some(probe_data); + return Ok(Event::Sync); + } + + if self.probe_input.is_finished() { + if !self.probe_finish { + self.probe_finish = true; + self.probe_stream = Some(HashJoinStream::Stream(self.join.finish_probe()?)); + return Ok(Event::Sync); + } + + self.joined_output.finish(); + return Ok(Event::Finished); + } + + self.probe_input.set_need_data(); + Ok(Event::NeedData) + } + + fn process(&mut self) -> Result<()> { + if let Some(data_block) = self.build_input_data.take() { + assert!(self.build_stream.is_none()); + let data_block = build_join_keys(data_block, &self.params)?; + // TODO: fast stream compact + self.build_stream = Some(HashJoinStream::Stream(self.join.add_block(data_block)?)); + } + + if let Some(build_stream) = self.build_stream.take() { + let HashJoinStream::Stream(mut stream) = build_stream else { + return Err(ErrorCode::Internal("Expect build stream")); + }; + + while let Some(mut try_future) = stream.next_try_complete()? { + if let Some(_build_progress) = try_future.try_complete()? { + continue; + } + + self.build_stream = Some(HashJoinStream::Future((try_future, stream))); + return Ok(()); + } + } + + if let Some(data_block) = self.probe_input_data.take() { + assert!(self.probe_stream.is_none()); + + // TODO: build join key and projection datablock + self.probe_stream = Some(HashJoinStream::Stream(self.join.probe(data_block)?)); + } + + if let Some(probe_stream) = self.probe_stream.take() { + let HashJoinStream::Stream(mut stream) = probe_stream else { + return Err(ErrorCode::Internal("Expect probe stream")); + }; + + while let Some(mut try_future) = stream.next_try_complete()? { + if let Some(probe_data) = try_future.try_complete()? { + match probe_data { + ProbeData::Next => { + continue; + } + ProbeData::DataBlock(joined_data) => { + self.output_data = Some(joined_data); + self.probe_stream = Some(HashJoinStream::Stream(stream)); + return Ok(()); + } + } + } + + self.probe_stream = Some(HashJoinStream::Future((try_future, stream))); + return Ok(()); + } + } + + Ok(()) + } + + async fn async_process(&mut self) -> Result<()> { + if let Some(build_stream) = self.build_stream.take() { + let HashJoinStream::Future((try_future, stream)) = build_stream else { + return Err(ErrorCode::Internal("Expect build future")); + }; + + let _build_progress = try_future.await?; + self.build_stream = Some(HashJoinStream::Stream(stream)); + return Ok(()); + } + + if let Some(probe_stream) = self.probe_stream.take() { + let HashJoinStream::Future((try_future, stream)) = probe_stream else { + return Err(ErrorCode::Internal("Expect probe future")); + }; + + match try_future.await? { + ProbeData::Next => { + self.probe_stream = Some(HashJoinStream::Stream(stream)); + } + ProbeData::DataBlock(joined_data) => { + self.output_data = Some(joined_data); + self.probe_stream = Some(HashJoinStream::Stream(stream)); + } + } + + return Ok(()); + } + + Ok(()) + } +}