diff --git a/src/common/base/src/base/barrier.rs b/src/common/base/src/base/barrier.rs new file mode 100644 index 0000000000000..a5d3b13d327eb --- /dev/null +++ b/src/common/base/src/base/barrier.rs @@ -0,0 +1,111 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Mutex; +use std::sync::PoisonError; + +use tokio::sync::watch; + +#[derive(Debug)] +struct BarrierState { + waker: watch::Sender, + arrived: usize, + generation: usize, + + n: usize, +} + +pub struct Barrier { + state: Mutex, + wait: watch::Receiver, +} + +impl Barrier { + pub fn new(mut n: usize) -> Barrier { + let (waker, wait) = watch::channel(0); + + if n == 0 { + n = 1; + } + + Barrier { + state: Mutex::new(BarrierState { + n, + waker, + arrived: 0, + generation: 1, + }), + wait, + } + } + + pub async fn wait(&self) -> BarrierWaitResult { + let (generation, is_leader) = { + let locked = self.state.lock(); + let mut state = locked.unwrap_or_else(PoisonError::into_inner); + + let is_leader = state.arrived == 0; + let generation = state.generation; + state.arrived += 1; + + if state.arrived == state.n { + state + .waker + .send(state.generation) + .expect("there is at least one receiver"); + state.arrived = 0; + state.generation += 1; + return BarrierWaitResult(is_leader); + } + + (generation, is_leader) + }; + + let mut wait = self.wait.clone(); + + loop { + let _ = wait.changed().await; + + if *wait.borrow() >= generation { + break; + } + } + + BarrierWaitResult(is_leader) + } + + pub fn reduce_quorum(&self, n: usize) { + let locked = self.state.lock(); + let mut state = locked.unwrap_or_else(PoisonError::into_inner); + state.n -= n; + + if state.arrived >= state.n { + state + .waker + .send(state.generation) + .expect("there is at least one receiver"); + state.arrived = 0; + state.generation += 1; + } + } +} + +#[derive(Debug, Clone)] +pub struct BarrierWaitResult(bool); + +impl BarrierWaitResult { + pub fn is_leader(&self) -> bool { + self.0 + } +} diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index e11bacaa1993a..02769806bca0d 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod barrier; mod build_info; mod dma; mod drop_callback; @@ -30,6 +31,7 @@ mod take_mut; mod uniq_id; mod watch_notify; +pub use barrier::Barrier; pub use build_info::*; pub use dma::*; pub use drop_callback::DropCallback; diff --git a/src/query/service/src/physical_plans/physical_cache_scan.rs b/src/query/service/src/physical_plans/physical_cache_scan.rs index 6988ab483fbdf..e55663a9d7d1f 100644 --- a/src/query/service/src/physical_plans/physical_cache_scan.rs +++ b/src/query/service/src/physical_plans/physical_cache_scan.rs @@ -85,9 +85,19 @@ impl IPhysicalPlan for CacheScan { max_block_size, )) } - Some(HashJoinStateRef::NewHashJoinState(hash_join_state)) => { + Some(HashJoinStateRef::NewHashJoinState(hash_join_state, column_map)) => { + let mut column_offsets = Vec::with_capacity(column_indexes.len()); + for index in column_indexes { + let Some(offset) = column_map.get(index) else { + return Err(ErrorCode::Internal(format!( + "Hash join cache column {} not found in build projection", + index + ))); + }; + column_offsets.push(*offset); + } CacheSourceState::NewHashJoinCacheState(NewHashJoinCacheState::new( - column_indexes.clone(), + column_offsets, hash_join_state.clone(), )) } diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index 9a2e5802f105f..6a93b57d6f9a5 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -66,6 +66,7 @@ use crate::pipelines::processors::transforms::RuntimeFiltersDesc; use crate::pipelines::processors::transforms::TransformHashJoin; use crate::pipelines::processors::transforms::TransformHashJoinBuild; use crate::pipelines::processors::transforms::TransformHashJoinProbe; +use crate::sessions::QueryContext; // Type aliases to simplify complex return types type JoinConditionsResult = ( @@ -270,9 +271,19 @@ impl IPhysicalPlan for HashJoin { let (enable_optimization, _) = builder.merge_into_get_optimization_flag(self); if desc.single_to_inner.is_none() - && (self.join_type == JoinType::Inner || self.join_type == JoinType::Left) + && matches!( + self.join_type, + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::Right + | JoinType::RightSemi + | JoinType::RightAnti + ) && experimental_new_join && !enable_optimization + && !self.need_hold_hash_table { return self.build_new_join_pipeline(builder, desc); } @@ -280,9 +291,9 @@ impl IPhysicalPlan for HashJoin { // Create the join state with optimization flags let state = self.build_state(builder)?; - if let Some((build_cache_index, _)) = self.build_side_cache_info { + if let Some((build_cache_index, _)) = &self.build_side_cache_info { builder.hash_join_states.insert( - build_cache_index, + *build_cache_index, HashJoinStateRef::OldHashJoinState(state.clone()), ); } @@ -413,15 +424,18 @@ impl HashJoin { { let state = factory.create_basic_state(0)?; - if let Some((build_cache_index, _)) = self.build_side_cache_info { + if let Some((build_cache_index, column_map)) = &self.build_side_cache_info { builder.hash_join_states.insert( - build_cache_index, - HashJoinStateRef::NewHashJoinState(state.clone()), + *build_cache_index, + HashJoinStateRef::NewHashJoinState(state.clone(), column_map.clone()), ); } } + let mut sub_query_ctx = QueryContext::create_from(&builder.ctx); + std::mem::swap(&mut builder.ctx, &mut sub_query_ctx); self.build.build_pipeline(builder)?; + std::mem::swap(&mut builder.ctx, &mut sub_query_ctx); let mut build_sinks = builder.main_pipeline.take_sinks(); self.probe.build_pipeline(builder)?; @@ -440,7 +454,8 @@ impl HashJoin { debug_assert_eq!(build_sinks.len(), probe_sinks.len()); - let stage_sync_barrier = Arc::new(Barrier::new(output_len)); + let barrier = databend_common_base::base::Barrier::new(output_len); + let stage_sync_barrier = Arc::new(barrier); let mut join_sinks = Vec::with_capacity(output_len * 2); let mut join_pipe_items = Vec::with_capacity(output_len); for (build_sink, probe_sink) in build_sinks.into_iter().zip(probe_sinks.into_iter()) { diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index d21e5498aadc8..96cc04db288cd 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -23,6 +23,7 @@ use databend_common_pipeline::core::ExecutionInfo; use databend_common_pipeline::core::Pipeline; use databend_common_pipeline::core::always_callback; use databend_common_settings::Settings; +use databend_common_sql::IndexType; use super::PipelineBuilderData; use crate::interpreters::CreateTableInterpreter; @@ -38,7 +39,7 @@ use crate::sessions::QueryContext; #[derive(Clone)] pub enum HashJoinStateRef { OldHashJoinState(Arc), - NewHashJoinState(Arc), + NewHashJoinState(Arc, HashMap), } pub struct PipelineBuilder { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs index 1b4e3239edb1d..7d8dd7ddfd17a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs @@ -198,3 +198,14 @@ pub fn wrap_true_validity( NullableColumn::new_column(col, validity).into() } } + +pub fn wrap_nullable_block(input: &DataBlock) -> DataBlock { + let input_num_rows = input.num_rows(); + let true_validity = Bitmap::new_constant(true, input_num_rows); + let nullable_columns = input + .columns() + .iter() + .map(|c| wrap_true_validity(c, input_num_rows, &true_validity)) + .collect::>(); + DataBlock::new(nullable_columns, input_num_rows) +} diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs index 1bd24c4750501..0ba8718d03872 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs @@ -63,6 +63,7 @@ pub struct HashJoinDesc { pub(crate) probe_projections: ColumnSet, pub(crate) probe_to_build: Vec<(usize, (bool, bool))>, pub(crate) build_schema: DataSchemaRef, + pub(crate) probe_schema: DataSchemaRef, } #[derive(Debug, Clone)] @@ -138,6 +139,7 @@ impl HashJoinDesc { build_projection: join.build_projections.clone(), probe_projections: join.probe_projections.clone(), build_schema: join.build.output_schema()?, + probe_schema: join.probe.output_schema()?, }) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs index 32b46295ccb1f..d8285950a70e1 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs @@ -30,6 +30,7 @@ mod transform_hash_join_build; mod transform_hash_join_probe; mod util; +pub use common::wrap_nullable_block; pub use common::wrap_true_validity; pub use desc::HashJoinDesc; pub use desc::RuntimeFilterDesc; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_memory.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_memory.rs index 45b7d3409f9f3..e53d7b61d5ef8 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_memory.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_memory.rs @@ -18,7 +18,12 @@ use crate::pipelines::processors::transforms::BasicHashJoinState; use crate::pipelines::processors::transforms::HashJoinHashTable; use crate::pipelines::processors::transforms::InnerHashJoin; use crate::pipelines::processors::transforms::Join; -use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin; +use crate::pipelines::processors::transforms::memory::AntiLeftHashJoin; +use crate::pipelines::processors::transforms::memory::AntiRightHashJoin; +use crate::pipelines::processors::transforms::memory::OuterRightHashJoin; +use crate::pipelines::processors::transforms::memory::SemiLeftHashJoin; +use crate::pipelines::processors::transforms::memory::SemiRightHashJoin; +use crate::pipelines::processors::transforms::memory::left_join::OuterLeftHashJoin; pub trait GraceMemoryJoin: Join { fn reset_memory(&mut self); @@ -52,6 +57,14 @@ fn reset_basic_state(state: &BasicHashJoinState) { state.build_queue.as_mut().clear(); } + if !state.scan_map.is_empty() { + state.scan_map.as_mut().clear(); + } + + if !state.scan_queue.is_empty() { + state.scan_queue.as_mut().clear(); + } + *state.hash_table.as_mut() = HashJoinHashTable::Null; } @@ -68,3 +81,38 @@ impl GraceMemoryJoin for OuterLeftHashJoin { reset_basic_state(&self.basic_state); } } + +impl GraceMemoryJoin for SemiLeftHashJoin { + fn reset_memory(&mut self) { + self.performance_context.clear(); + reset_basic_state(&self.basic_state); + } +} + +impl GraceMemoryJoin for AntiLeftHashJoin { + fn reset_memory(&mut self) { + self.performance_context.clear(); + reset_basic_state(&self.basic_state); + } +} + +impl GraceMemoryJoin for OuterRightHashJoin { + fn reset_memory(&mut self) { + self.performance_context.clear(); + reset_basic_state(&self.basic_state); + } +} + +impl GraceMemoryJoin for SemiRightHashJoin { + fn reset_memory(&mut self) { + self.performance_context.clear(); + reset_basic_state(&self.basic_state); + } +} + +impl GraceMemoryJoin for AntiRightHashJoin { + fn reset_memory(&mut self) { + self.performance_context.clear(); + reset_basic_state(&self.basic_state); + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs index 873a4139b1e8d..6ff2fd160179b 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs @@ -31,7 +31,12 @@ use crate::pipelines::processors::transforms::BasicHashJoinState; use crate::pipelines::processors::transforms::GraceHashJoin; use crate::pipelines::processors::transforms::InnerHashJoin; use crate::pipelines::processors::transforms::Join; -use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin; +use crate::pipelines::processors::transforms::memory::AntiLeftHashJoin; +use crate::pipelines::processors::transforms::memory::AntiRightHashJoin; +use crate::pipelines::processors::transforms::memory::OuterRightHashJoin; +use crate::pipelines::processors::transforms::memory::SemiLeftHashJoin; +use crate::pipelines::processors::transforms::memory::SemiRightHashJoin; +use crate::pipelines::processors::transforms::memory::left_join::OuterLeftHashJoin; use crate::pipelines::processors::transforms::new_hash_join::common::CStyleCell; use crate::pipelines::processors::transforms::new_hash_join::grace::GraceHashJoinState; use crate::sessions::QueryContext; @@ -140,6 +145,41 @@ impl HashJoinFactory { self.desc.clone(), self.create_basic_state(id)?, )?)), + JoinType::LeftAnti => Ok(Box::new(AntiLeftHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?)), + JoinType::LeftSemi => Ok(Box::new(SemiLeftHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?)), + JoinType::Right => Ok(Box::new(OuterRightHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?)), + JoinType::RightSemi => Ok(Box::new(SemiRightHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?)), + JoinType::RightAnti => Ok(Box::new(AntiRightHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?)), _ => unreachable!(), } } @@ -184,6 +224,101 @@ impl HashJoinFactory { 0, )?)) } + JoinType::LeftAnti => { + let left_anti_hash_join = AntiLeftHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?; + + Ok(Box::new(GraceHashJoin::create( + self.ctx.clone(), + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_grace_state(id + 1)?, + left_anti_hash_join, + 0, + )?)) + } + JoinType::LeftSemi => { + let left_semi_hash_join = SemiLeftHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?; + + Ok(Box::new(GraceHashJoin::create( + self.ctx.clone(), + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_grace_state(id + 1)?, + left_semi_hash_join, + 0, + )?)) + } + JoinType::Right => { + let right_hash_join = OuterRightHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?; + + Ok(Box::new(GraceHashJoin::create( + self.ctx.clone(), + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_grace_state(id + 1)?, + right_hash_join, + 0, + )?)) + } + JoinType::RightSemi => { + let semi_right_hash_join = SemiRightHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?; + + Ok(Box::new(GraceHashJoin::create( + self.ctx.clone(), + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_grace_state(id + 1)?, + semi_right_hash_join, + 0, + )?)) + } + JoinType::RightAnti => { + let anti_right_hash_join = AntiRightHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?; + + Ok(Box::new(GraceHashJoin::create( + self.ctx.clone(), + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_grace_state(id + 1)?, + anti_right_hash_join, + 0, + )?)) + } _ => unreachable!(), } } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hashtable/basic.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hashtable/basic.rs index 87c04267935ce..79a7d1c4b409c 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hashtable/basic.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hashtable/basic.rs @@ -15,6 +15,7 @@ use databend_common_exception::Result; use databend_common_hashtable::RowPtr; +#[derive(Debug)] pub struct ProbedRows { pub unmatched: Vec, pub matched_probe: Vec, diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs index 6aaae9cdc3738..9c7896c55ad4d 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic.rs @@ -49,6 +49,7 @@ pub struct BasicHashJoin { pub(crate) method: HashMethodKind, pub(crate) function_ctx: FunctionContext, pub(crate) state: Arc, + pub(crate) scan_blocks: Vec, } impl BasicHashJoin { @@ -69,6 +70,7 @@ impl BasicHashJoin { method, function_ctx, squash_block: SquashBlocks::new(block_size, block_bytes), + scan_blocks: vec![], }) } pub(crate) fn add_block(&mut self, mut data: Option) -> Result<()> { @@ -85,15 +87,36 @@ impl BasicHashJoin { let chunk_index = self.state.chunks.len(); self.state.chunks.as_mut().push(squashed_block); self.state.build_queue.as_mut().push_back(chunk_index); + self.state.scan_map.as_mut().push(vec![]); + self.state.scan_queue.as_mut().push_back(chunk_index); } Ok(()) } - pub(crate) fn final_build(&mut self) -> Result> { + pub(crate) fn final_build(&mut self) -> Result> { self.init_memory_hash_table(); let Some(chunk_index) = self.steal_chunk_index() else { + if SCAN_MAP { + if let Some(null_keys) = self.squash_block.finalize()? { + self.scan_blocks.push(null_keys); + } + + if !self.scan_blocks.is_empty() { + let locked = self.state.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + for scan_block in std::mem::take(&mut self.scan_blocks) { + let chunk_index = self.state.chunks.len(); + let scan_map = vec![0; scan_block.num_rows()]; + + self.state.chunks.as_mut().push(scan_block); + self.state.scan_map.as_mut().push(scan_map); + self.state.scan_queue.as_mut().push_back(chunk_index); + } + } + } + return Ok(None); }; @@ -110,9 +133,17 @@ impl BasicHashJoin { chunk_block = chunk_block.project(&self.desc.build_projection); if let Some(bitmap) = self.desc.build_valids_by_keys(&keys_block)? { - keys_block = keys_block.filter_with_bitmap(&bitmap)?; + if bitmap.true_count() != bitmap.len() { + keys_block = keys_block.filter_with_bitmap(&bitmap)?; + + if SCAN_MAP { + let null_keys = chunk_block.clone().filter_with_bitmap(&(!(&bitmap)))?; + + if let Some(null_keys) = self.squash_block.add_block(null_keys)? { + self.scan_blocks.push(null_keys); + } + } - if bitmap.null_count() != bitmap.len() { chunk_block = chunk_block.filter_with_bitmap(&bitmap)?; } } @@ -125,6 +156,13 @@ impl BasicHashJoin { // restore storage block { let chunks = self.state.chunks.as_mut(); + + if SCAN_MAP { + let mut scan_map = vec![0; chunk_block.num_rows()]; + let scan_maps = self.state.scan_map.as_mut(); + std::mem::swap(&mut scan_maps[chunk_index], &mut scan_map); + } + std::mem::swap(&mut chunks[chunk_index], &mut chunk_block); } @@ -182,7 +220,9 @@ impl BasicHashJoin { if !matches!(self.state.hash_table.deref(), HashJoinHashTable::Null) { return; } - let unique_entry = matches!(self.desc.join_type, JoinType::InnerAny | JoinType::LeftAny); + let unique_entry = matches!(self.desc.join_type, JoinType::InnerAny | JoinType::LeftAny) + || (matches!(self.desc.join_type, JoinType::LeftSemi | JoinType::LeftAnti) + && self.desc.other_predicate.is_none()); let locked = self.state.mutex.lock(); let _locked = locked.unwrap_or_else(PoisonError::into_inner); diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs index 5f6684ba7188c..d3af14b4051f2 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/basic_state.rs @@ -15,6 +15,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::Mutex; +use std::sync::PoisonError; use databend_common_expression::ColumnVec; use databend_common_expression::DataBlock; @@ -37,6 +38,9 @@ pub struct BasicHashJoinState { pub hash_table: CStyleCell, pub packets: CStyleCell>, + pub scan_map: CStyleCell>>, + pub scan_queue: CStyleCell>, + level: usize, factory: Arc, } @@ -55,8 +59,16 @@ impl BasicHashJoinState { arenas: CStyleCell::new(Vec::new()), hash_table: CStyleCell::new(HashJoinHashTable::Null), packets: CStyleCell::new(Vec::new()), + scan_map: CStyleCell::new(Vec::new()), + scan_queue: CStyleCell::new(VecDeque::new()), } } + + pub fn steal_scan_chunk_index(&self) -> Option<(usize, usize)> { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + self.scan_queue.as_mut().pop_front().map(|x| (x, 0)) + } } impl Drop for BasicHashJoinState { diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs index abc81674601f8..697f68c19ffa2 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs @@ -18,20 +18,18 @@ use std::sync::PoisonError; use databend_common_base::base::ProgressValues; use databend_common_catalog::table_context::TableContext; -use databend_common_column::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::BlockEntry; use databend_common_expression::DataBlock; use databend_common_expression::FilterExecutor; use databend_common_expression::FunctionContext; use databend_common_expression::HashMethodKind; -use databend_common_expression::types::NullableColumn; use databend_common_expression::with_join_hash_method; use crate::pipelines::processors::HashJoinDesc; use crate::pipelines::processors::transforms::HashJoinHashTable; use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; +use crate::pipelines::processors::transforms::memory::left_join::final_result_block; use crate::pipelines::processors::transforms::merge_join_runtime_filter_packets; use crate::pipelines::processors::transforms::new_hash_join::hashtable::ProbeData; use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbeStream; @@ -90,7 +88,7 @@ impl Join for InnerHashJoin { } fn final_build(&mut self) -> Result> { - self.basic_hash_join.final_build() + self.basic_hash_join.final_build::() } fn add_runtime_filter_packet(&self, packet: JoinRuntimeFilterPacket) { @@ -219,49 +217,17 @@ impl<'a> JoinStream for InnerHashJoinStream<'a> { } }; - let mut result_block = match (probe_block, build_block) { - (Some(mut probe_block), Some(build_block)) => { - probe_block.merge_block(build_block); - probe_block - } - (Some(probe_block), None) => probe_block, - (None, Some(build_block)) => build_block, - (None, None) => DataBlock::new(vec![], self.probed_rows.matched_build.len()), - }; - - if !self.desc.probe_to_build.is_empty() { - for (index, (is_probe_nullable, is_build_nullable)) in - self.desc.probe_to_build.iter() - { - let entry = match (is_probe_nullable, is_build_nullable) { - (true, true) | (false, false) => result_block.get_by_offset(*index).clone(), - (true, false) => { - result_block.get_by_offset(*index).clone().remove_nullable() - } - (false, true) => { - let entry = result_block.get_by_offset(*index); - let col = entry.to_column(); - - match col.is_null() || col.is_nullable() { - true => entry.clone(), - false => BlockEntry::from(NullableColumn::new_column( - col, - Bitmap::new_constant(true, result_block.num_rows()), - )), - } - } - }; - - result_block.add_entry(entry); - } - } - - return Ok(Some(result_block)); + return Ok(Some(final_result_block( + &self.desc, + probe_block, + build_block, + self.probed_rows.matched_build.len(), + ))); } } } -struct InnerHashJoinFilterStream<'a> { +pub struct InnerHashJoinFilterStream<'a> { inner: Box, filter_executor: &'a mut FilterExecutor, } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/left_join.rs similarity index 97% rename from src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs rename to src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/left_join.rs index 9c0dc8aa08aef..9300903633e4b 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/outer_left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/left_join.rs @@ -91,7 +91,7 @@ impl Join for OuterLeftHashJoin { } fn final_build(&mut self) -> Result> { - self.basic_hash_join.final_build() + self.basic_hash_join.final_build::() } fn probe_block(&mut self, data: DataBlock) -> Result> { @@ -110,7 +110,7 @@ impl Join for OuterLeftHashJoin { .map(|x| x.data_type().clone()) .collect::>(); - let build_block = null_build_block(&types, data.num_rows()); + let build_block = null_block(&types, data.num_rows()); let probe_block = Some(data.project(&self.desc.probe_projections)); let result_block = final_result_block(&self.desc, probe_block, build_block, num_rows); return Ok(Box::new(OneBlockJoinStream(Some(result_block)))); @@ -210,7 +210,7 @@ impl<'a, const CONJUNCT: bool> JoinStream for OuterLeftHashJoinStream<'a, CONJUN }; let types = &self.join_state.column_types; - let build_block = null_build_block(types, unmatched_row_id.len()); + let build_block = null_block(types, unmatched_row_id.len()); return Ok(Some(final_result_block( &self.desc, @@ -316,7 +316,7 @@ impl<'a, const CONJUNCT: bool> OuterLeftHashJoinStream<'a, CONJUNCT> { } } -fn final_result_block( +pub fn final_result_block( desc: &HashJoinDesc, probe_block: Option, build_block: Option, @@ -357,7 +357,7 @@ fn final_result_block( result_block } -fn null_build_block(types: &[DataType], num_rows: usize) -> Option { +pub fn null_block(types: &[DataType], num_rows: usize) -> Option { match types.is_empty() { true => None, false => { diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/left_join_anti.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/left_join_anti.rs new file mode 100644 index 0000000000000..633eaac235027 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/left_join_anti.rs @@ -0,0 +1,305 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; +use std::sync::Arc; + +use databend_common_base::base::ProgressValues; +use databend_common_base::hints::assume; +use databend_common_catalog::table_context::TableContext; +use databend_common_column::bitmap::Bitmap; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::FilterExecutor; +use databend_common_expression::FunctionContext; +use databend_common_expression::HashMethodKind; +use databend_common_expression::with_join_hash_method; + +use crate::pipelines::processors::HashJoinDesc; +use crate::pipelines::processors::transforms::BasicHashJoinState; +use crate::pipelines::processors::transforms::HashJoinHashTable; +use crate::pipelines::processors::transforms::Join; +use crate::pipelines::processors::transforms::memory::basic::BasicHashJoin; +use crate::pipelines::processors::transforms::memory::left_join::final_result_block; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::ProbeData; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbeStream; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbedRows; +use crate::pipelines::processors::transforms::new_hash_join::join::EmptyJoinStream; +use crate::pipelines::processors::transforms::new_hash_join::join::JoinStream; +use crate::pipelines::processors::transforms::new_hash_join::join::OneBlockJoinStream; +use crate::pipelines::processors::transforms::new_hash_join::performance::PerformanceContext; +use crate::sessions::QueryContext; + +pub struct AntiLeftHashJoin { + pub(crate) basic_hash_join: BasicHashJoin, + + pub(crate) desc: Arc, + pub(crate) function_ctx: FunctionContext, + pub(crate) basic_state: Arc, + pub(crate) performance_context: PerformanceContext, +} + +impl AntiLeftHashJoin { + pub fn create( + ctx: &QueryContext, + function_ctx: FunctionContext, + method: HashMethodKind, + desc: Arc, + state: Arc, + ) -> Result { + let settings = ctx.get_settings(); + let block_size = settings.get_max_block_size()? as usize; + + let context = PerformanceContext::create(block_size, desc.clone(), function_ctx.clone()); + + let basic_hash_join = BasicHashJoin::create( + ctx, + function_ctx.clone(), + method, + desc.clone(), + state.clone(), + )?; + + Ok(AntiLeftHashJoin { + desc, + basic_hash_join, + function_ctx, + basic_state: state, + performance_context: context, + }) + } +} + +impl Join for AntiLeftHashJoin { + fn add_block(&mut self, data: Option) -> Result<()> { + self.basic_hash_join.add_block(data) + } + + fn final_build(&mut self) -> Result> { + self.basic_hash_join.final_build::() + } + + fn probe_block(&mut self, data: DataBlock) -> Result> { + if data.is_empty() { + return Ok(Box::new(EmptyJoinStream)); + } + + if *self.basic_state.build_rows == 0 { + let result_block = data.project(&self.desc.probe_projections); + return Ok(Box::new(OneBlockJoinStream(Some(result_block)))); + } + + self.basic_hash_join.finalize_chunks(); + + let probe_keys = self.desc.probe_key(&data, &self.function_ctx)?; + + let mut keys = DataBlock::new(probe_keys, data.num_rows()); + let valids = match self.desc.from_correlated_subquery { + true => None, + false => self.desc.build_valids_by_keys(&keys)?, + }; + + self.desc.remove_keys_nullable(&mut keys); + let probe_block = data.project(&self.desc.probe_projections); + + let join_stream = with_join_hash_method!(|T| match self.basic_state.hash_table.deref() { + HashJoinHashTable::T(table) => { + let probe_hash_statistics = &mut self.performance_context.probe_hash_statistics; + probe_hash_statistics.clear(probe_block.num_rows()); + + let probe_data = ProbeData::new(keys, valids, probe_hash_statistics); + table.probe(probe_data) + } + HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( + "Aborted query, because the hash table is uninitialized.", + )), + })?; + + match &mut self.performance_context.filter_executor { + None => Ok(LeftAntiHashJoinStream::create( + probe_block, + join_stream, + &mut self.performance_context.probe_result, + )), + Some(filter_executor) => Ok(LeftAntiFilterHashJoinStream::create( + probe_block, + self.basic_state.clone(), + join_stream, + self.desc.clone(), + &mut self.performance_context.probe_result, + filter_executor, + )), + } + } +} + +struct LeftAntiHashJoinStream<'a> { + probe_data_block: Option, + probe_keys_stream: Box, + probed_rows: &'a mut ProbedRows, +} + +unsafe impl<'a> Send for LeftAntiHashJoinStream<'a> {} +unsafe impl<'a> Sync for LeftAntiHashJoinStream<'a> {} + +impl<'a> LeftAntiHashJoinStream<'a> { + pub fn create( + probe_data_block: DataBlock, + probe_keys_stream: Box, + probed_rows: &'a mut ProbedRows, + ) -> Box { + Box::new(LeftAntiHashJoinStream { + probed_rows, + probe_data_block: Some(probe_data_block), + probe_keys_stream, + }) + } +} + +impl<'a> JoinStream for LeftAntiHashJoinStream<'a> { + fn next(&mut self) -> Result> { + let Some(probe_data_block) = self.probe_data_block.take() else { + return Ok(None); + }; + + let num_rows = probe_data_block.num_rows(); + let mut selected = vec![false; num_rows]; + + loop { + self.probed_rows.clear(); + let max_rows = self.probed_rows.matched_probe.capacity(); + self.probe_keys_stream.advance(self.probed_rows, max_rows)?; + + if self.probed_rows.is_empty() { + let bitmap = Bitmap::from_trusted_len_iter(selected.into_iter()); + return Ok(Some(probe_data_block.filter_with_bitmap(&bitmap)?)); + } + + for idx in &self.probed_rows.unmatched { + let idx = *idx as usize; + assume(idx < selected.len()); + selected[idx] = true; + } + } + } +} + +struct LeftAntiFilterHashJoinStream<'a> { + desc: Arc, + probe_data_block: Option, + join_state: Arc, + probe_keys_stream: Box, + probed_rows: &'a mut ProbedRows, + filter_executor: &'a mut FilterExecutor, +} + +unsafe impl<'a> Send for LeftAntiFilterHashJoinStream<'a> {} +unsafe impl<'a> Sync for LeftAntiFilterHashJoinStream<'a> {} + +impl<'a> LeftAntiFilterHashJoinStream<'a> { + pub fn create( + probe_data_block: DataBlock, + join_state: Arc, + probe_keys_stream: Box, + desc: Arc, + probed_rows: &'a mut ProbedRows, + filter_executor: &'a mut FilterExecutor, + ) -> Box { + Box::new(LeftAntiFilterHashJoinStream { + desc, + join_state, + probed_rows, + filter_executor, + probe_keys_stream, + probe_data_block: Some(probe_data_block), + }) + } +} + +impl<'a> JoinStream for LeftAntiFilterHashJoinStream<'a> { + fn next(&mut self) -> Result> { + let Some(probe_data_block) = self.probe_data_block.take() else { + return Ok(None); + }; + + let num_rows = probe_data_block.num_rows(); + let mut selected = vec![true; num_rows]; + + loop { + self.probed_rows.clear(); + let max_rows = self.probed_rows.matched_probe.capacity(); + self.probe_keys_stream.advance(self.probed_rows, max_rows)?; + + if self.probed_rows.is_empty() { + break; + } + + if self.probed_rows.is_all_unmatched() { + continue; + } + + let probe_block = match probe_data_block.num_columns() { + 0 => None, + _ => Some(DataBlock::take( + &probe_data_block, + &self.probed_rows.matched_probe, + )?), + }; + + let build_block = match self.join_state.columns.is_empty() { + true => None, + false => { + let row_ptrs = self.probed_rows.matched_build.as_slice(); + Some(DataBlock::take_column_vec( + self.join_state.columns.as_slice(), + self.join_state.column_types.as_slice(), + row_ptrs, + row_ptrs.len(), + )) + } + }; + + let result_block = final_result_block( + &self.desc, + probe_block, + build_block, + self.probed_rows.matched_build.len(), + ); + + let selected_rows = self.filter_executor.select(&result_block)?; + + if selected_rows == result_block.num_rows() { + for probe_idx in &self.probed_rows.matched_probe { + assume((*probe_idx as usize) < selected.len()); + selected[*probe_idx as usize] = false; + } + } else if selected_rows != 0 { + let selection = self.filter_executor.true_selection(); + for idx in selection[..selected_rows].iter() { + assume((*idx as usize) < self.probed_rows.matched_probe.len()); + let idx = self.probed_rows.matched_probe[*idx as usize]; + assume((idx as usize) < selected.len()); + selected[idx as usize] = false; + } + } + } + + let bitmap = Bitmap::from_trusted_len_iter(selected.into_iter()); + match bitmap.true_count() { + 0 => Ok(None), + _ => Ok(Some(probe_data_block.filter_with_bitmap(&bitmap)?)), + } + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/left_join_semi.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/left_join_semi.rs new file mode 100644 index 0000000000000..e55be04a6eb0f --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/left_join_semi.rs @@ -0,0 +1,326 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; +use std::sync::Arc; + +use databend_common_base::base::ProgressValues; +use databend_common_base::hints::assume; +use databend_common_catalog::table_context::TableContext; +use databend_common_column::bitmap::Bitmap; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockEntry; +use databend_common_expression::DataBlock; +use databend_common_expression::FilterExecutor; +use databend_common_expression::FunctionContext; +use databend_common_expression::HashMethodKind; +use databend_common_expression::types::NullableColumn; +use databend_common_expression::with_join_hash_method; + +use crate::pipelines::processors::HashJoinDesc; +use crate::pipelines::processors::transforms::BasicHashJoinState; +use crate::pipelines::processors::transforms::HashJoinHashTable; +use crate::pipelines::processors::transforms::Join; +use crate::pipelines::processors::transforms::memory::basic::BasicHashJoin; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::ProbeData; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbeStream; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbedRows; +use crate::pipelines::processors::transforms::new_hash_join::join::EmptyJoinStream; +use crate::pipelines::processors::transforms::new_hash_join::join::JoinStream; +use crate::pipelines::processors::transforms::new_hash_join::performance::PerformanceContext; +use crate::sessions::QueryContext; + +pub struct SemiLeftHashJoin { + pub(crate) basic_hash_join: BasicHashJoin, + + pub(crate) desc: Arc, + pub(crate) function_ctx: FunctionContext, + pub(crate) basic_state: Arc, + pub(crate) performance_context: PerformanceContext, +} + +impl SemiLeftHashJoin { + pub fn create( + ctx: &QueryContext, + function_ctx: FunctionContext, + method: HashMethodKind, + desc: Arc, + state: Arc, + ) -> Result { + let settings = ctx.get_settings(); + let block_size = settings.get_max_block_size()? as usize; + + let context = PerformanceContext::create(block_size, desc.clone(), function_ctx.clone()); + + let basic_hash_join = BasicHashJoin::create( + ctx, + function_ctx.clone(), + method, + desc.clone(), + state.clone(), + )?; + + Ok(SemiLeftHashJoin { + desc, + basic_hash_join, + function_ctx, + basic_state: state, + performance_context: context, + }) + } +} + +impl Join for SemiLeftHashJoin { + fn add_block(&mut self, data: Option) -> Result<()> { + self.basic_hash_join.add_block(data) + } + + fn final_build(&mut self) -> Result> { + self.basic_hash_join.final_build::() + } + + fn probe_block(&mut self, data: DataBlock) -> Result> { + if data.is_empty() || *self.basic_state.build_rows == 0 { + return Ok(Box::new(EmptyJoinStream)); + } + + self.basic_hash_join.finalize_chunks(); + + let probe_keys = self.desc.probe_key(&data, &self.function_ctx)?; + + let mut keys = DataBlock::new(probe_keys, data.num_rows()); + let valids = match self.desc.from_correlated_subquery { + true => None, + false => self.desc.build_valids_by_keys(&keys)?, + }; + + self.desc.remove_keys_nullable(&mut keys); + let probe_block = data.project(&self.desc.probe_projections); + + let join_stream = with_join_hash_method!(|T| match self.basic_state.hash_table.deref() { + HashJoinHashTable::T(table) => { + let probe_hash_statistics = &mut self.performance_context.probe_hash_statistics; + probe_hash_statistics.clear(probe_block.num_rows()); + + let probe_data = ProbeData::new(keys, valids, probe_hash_statistics); + table.probe_matched(probe_data) + } + HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( + "Aborted query, because the hash table is uninitialized.", + )), + })?; + + match &mut self.performance_context.filter_executor { + None => Ok(LeftSemiHashJoinStream::create( + probe_block, + join_stream, + &mut self.performance_context.probe_result, + )), + Some(filter_executor) => Ok(LeftSemiFilterHashJoinStream::create( + probe_block, + self.basic_state.clone(), + join_stream, + self.desc.clone(), + &mut self.performance_context.probe_result, + filter_executor, + )), + } + } +} + +struct LeftSemiHashJoinStream<'a> { + probe_data_block: DataBlock, + probe_keys_stream: Box, + probed_rows: &'a mut ProbedRows, +} + +unsafe impl<'a> Send for LeftSemiHashJoinStream<'a> {} +unsafe impl<'a> Sync for LeftSemiHashJoinStream<'a> {} + +impl<'a> LeftSemiHashJoinStream<'a> { + pub fn create( + probe_data_block: DataBlock, + probe_keys_stream: Box, + probed_rows: &'a mut ProbedRows, + ) -> Box { + Box::new(LeftSemiHashJoinStream { + probed_rows, + probe_data_block, + probe_keys_stream, + }) + } +} + +impl<'a> JoinStream for LeftSemiHashJoinStream<'a> { + fn next(&mut self) -> Result> { + loop { + self.probed_rows.clear(); + let max_rows = self.probed_rows.matched_probe.capacity(); + self.probe_keys_stream.advance(self.probed_rows, max_rows)?; + + if self.probed_rows.is_empty() { + return Ok(None); + } + + if self.probed_rows.is_all_unmatched() { + continue; + } + + return Ok(Some(DataBlock::take( + &self.probe_data_block, + &self.probed_rows.matched_probe, + )?)); + } + } +} + +struct LeftSemiFilterHashJoinStream<'a> { + desc: Arc, + probe_data_block: Option, + join_state: Arc, + probe_keys_stream: Box, + probed_rows: &'a mut ProbedRows, + filter_executor: &'a mut FilterExecutor, +} + +unsafe impl<'a> Send for LeftSemiFilterHashJoinStream<'a> {} +unsafe impl<'a> Sync for LeftSemiFilterHashJoinStream<'a> {} + +impl<'a> LeftSemiFilterHashJoinStream<'a> { + pub fn create( + probe_data_block: DataBlock, + join_state: Arc, + probe_keys_stream: Box, + desc: Arc, + probed_rows: &'a mut ProbedRows, + filter_executor: &'a mut FilterExecutor, + ) -> Box { + Box::new(LeftSemiFilterHashJoinStream { + desc, + join_state, + probed_rows, + filter_executor, + probe_keys_stream, + probe_data_block: Some(probe_data_block), + }) + } +} + +impl<'a> JoinStream for LeftSemiFilterHashJoinStream<'a> { + fn next(&mut self) -> Result> { + let Some(probe_data_block) = self.probe_data_block.take() else { + return Ok(None); + }; + + let num_rows = probe_data_block.num_rows(); + let mut selected = vec![false; num_rows]; + + loop { + self.probed_rows.clear(); + let max_rows = self.probed_rows.matched_probe.capacity(); + self.probe_keys_stream.advance(self.probed_rows, max_rows)?; + + if self.probed_rows.is_empty() { + break; + } + + if self.probed_rows.is_all_unmatched() { + continue; + } + + let probe_block = match probe_data_block.num_columns() { + 0 => None, + _ => Some(DataBlock::take( + &probe_data_block, + &self.probed_rows.matched_probe, + )?), + }; + + let build_block = match self.join_state.columns.is_empty() { + true => None, + false => { + let row_ptrs = self.probed_rows.matched_build.as_slice(); + Some(DataBlock::take_column_vec( + self.join_state.columns.as_slice(), + self.join_state.column_types.as_slice(), + row_ptrs, + row_ptrs.len(), + )) + } + }; + + let mut result_block = match (probe_block, build_block) { + (Some(mut probe_block), Some(build_block)) => { + probe_block.merge_block(build_block); + probe_block + } + (Some(probe_block), None) => probe_block, + (None, Some(build_block)) => build_block, + (None, None) => DataBlock::new(vec![], self.probed_rows.matched_build.len()), + }; + + if !self.desc.probe_to_build.is_empty() { + for (index, (is_probe_nullable, is_build_nullable)) in + self.desc.probe_to_build.iter() + { + let entry = match (is_probe_nullable, is_build_nullable) { + (true, true) | (false, false) => result_block.get_by_offset(*index).clone(), + (true, false) => { + result_block.get_by_offset(*index).clone().remove_nullable() + } + (false, true) => { + let entry = result_block.get_by_offset(*index); + let col = entry.to_column(); + + match col.is_null() || col.is_nullable() { + true => entry.clone(), + false => BlockEntry::from(NullableColumn::new_column( + col, + Bitmap::new_constant(true, result_block.num_rows()), + )), + } + } + }; + + result_block.add_entry(entry); + } + } + + let selected_rows = self.filter_executor.select(&result_block)?; + + if selected_rows == result_block.num_rows() { + for probe_idx in &self.probed_rows.matched_probe { + assume((*probe_idx as usize) < selected.len()); + selected[*probe_idx as usize] = true; + } + } else if selected_rows != 0 { + let selection = self.filter_executor.true_selection(); + for idx in selection[..selected_rows].iter() { + assume((*idx as usize) < self.probed_rows.matched_probe.len()); + let idx = self.probed_rows.matched_probe[*idx as usize]; + assume((idx as usize) < selected.len()); + selected[idx as usize] = true; + } + } + } + + let bitmap = Bitmap::from_trusted_len_iter(selected.into_iter()); + + match bitmap.true_count() { + 0 => Ok(None), + _ => Ok(Some(probe_data_block.filter_with_bitmap(&bitmap)?)), + } + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs index 4979c37245fca..bd37514325aa2 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/mod.rs @@ -15,7 +15,17 @@ mod basic; mod basic_state; mod inner_join; -pub mod outer_left_join; +pub mod left_join; +mod left_join_anti; +mod left_join_semi; +mod right_join; +mod right_join_anti; +mod right_join_semi; pub use basic_state::BasicHashJoinState; pub use inner_join::InnerHashJoin; +pub use left_join_anti::AntiLeftHashJoin; +pub use left_join_semi::SemiLeftHashJoin; +pub use right_join::OuterRightHashJoin; +pub use right_join_anti::AntiRightHashJoin; +pub use right_join_semi::SemiRightHashJoin; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/right_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/right_join.rs new file mode 100644 index 0000000000000..f4bba52268937 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/right_join.rs @@ -0,0 +1,387 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; +use std::sync::Arc; + +use databend_common_base::base::ProgressValues; +use databend_common_base::hints::assume; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::FilterExecutor; +use databend_common_expression::FunctionContext; +use databend_common_expression::HashMethodKind; +use databend_common_expression::types::DataType; +use databend_common_expression::with_join_hash_method; +use databend_common_hashtable::RowPtr; + +use crate::pipelines::processors::HashJoinDesc; +use crate::pipelines::processors::transforms::BasicHashJoinState; +use crate::pipelines::processors::transforms::HashJoinHashTable; +use crate::pipelines::processors::transforms::Join; +use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; +use crate::pipelines::processors::transforms::memory::basic::BasicHashJoin; +use crate::pipelines::processors::transforms::memory::left_join::final_result_block; +use crate::pipelines::processors::transforms::memory::left_join::null_block; +use crate::pipelines::processors::transforms::merge_join_runtime_filter_packets; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::ProbeData; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbeStream; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbedRows; +use crate::pipelines::processors::transforms::new_hash_join::join::JoinStream; +use crate::pipelines::processors::transforms::new_hash_join::performance::PerformanceContext; +use crate::pipelines::processors::transforms::wrap_nullable_block; +use crate::sessions::QueryContext; + +pub struct OuterRightHashJoin { + pub(crate) basic_hash_join: BasicHashJoin, + + pub(crate) desc: Arc, + pub(crate) function_ctx: FunctionContext, + pub(crate) basic_state: Arc, + pub(crate) performance_context: PerformanceContext, + + pub(crate) finished: bool, +} + +impl OuterRightHashJoin { + pub fn create( + ctx: &QueryContext, + function_ctx: FunctionContext, + method: HashMethodKind, + desc: Arc, + state: Arc, + ) -> Result { + let settings = ctx.get_settings(); + let block_size = settings.get_max_block_size()? as usize; + + let context = PerformanceContext::create(block_size, desc.clone(), function_ctx.clone()); + + let basic_hash_join = BasicHashJoin::create( + ctx, + function_ctx.clone(), + method, + desc.clone(), + state.clone(), + )?; + + Ok(OuterRightHashJoin { + desc, + basic_hash_join, + function_ctx, + basic_state: state, + performance_context: context, + finished: false, + }) + } +} + +impl Join for OuterRightHashJoin { + fn add_block(&mut self, data: Option) -> Result<()> { + self.basic_hash_join.add_block(data) + } + + fn final_build(&mut self) -> Result> { + self.basic_hash_join.final_build::() + } + + fn build_runtime_filter(&self) -> Result { + let packets = std::mem::take(self.basic_state.packets.as_mut()); + merge_join_runtime_filter_packets(packets) + } + + fn probe_block(&mut self, data: DataBlock) -> Result> { + self.basic_hash_join.finalize_chunks(); + + let mut probe_keys = { + let nullable_block = wrap_nullable_block(&data); + let probe_keys = self.desc.probe_key(&nullable_block, &self.function_ctx)?; + DataBlock::new(probe_keys, data.num_rows()) + }; + + let valids = self.desc.build_valids_by_keys(&probe_keys)?; + + self.desc.remove_keys_nullable(&mut probe_keys); + let probe_block = data.project(&self.desc.probe_projections); + + let probe_stream = with_join_hash_method!(|T| match self.basic_state.hash_table.deref() { + HashJoinHashTable::T(table) => { + let probe_hash_statistics = &mut self.performance_context.probe_hash_statistics; + probe_hash_statistics.clear(probe_block.num_rows()); + + let probe_data = ProbeData::new(probe_keys, valids, probe_hash_statistics); + table.probe_matched(probe_data) + } + HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( + "Aborted query, because the hash table is uninitialized.", + )), + })?; + + match self.performance_context.filter_executor.as_mut() { + None => Ok(OuterRightHashJoinStream::::create( + probe_block, + self.basic_state.clone(), + probe_stream, + self.desc.clone(), + &mut self.performance_context.probe_result, + None, + )), + Some(filter_executor) => Ok(OuterRightHashJoinStream::::create( + probe_block, + self.basic_state.clone(), + probe_stream, + self.desc.clone(), + &mut self.performance_context.probe_result, + Some(filter_executor), + )), + } + } + + fn final_probe(&mut self) -> Result>> { + self.basic_hash_join.finalize_chunks(); + + if self.finished { + return Ok(None); + } + + self.finished = true; + let max_rows = self + .performance_context + .probe_result + .matched_probe + .capacity(); + + Ok(Some(OuterRightHashJoinFinalStream::create( + max_rows, + self.desc.clone(), + self.basic_state.clone(), + ))) + } +} + +struct OuterRightHashJoinStream<'a, const CONJUNCT: bool> { + desc: Arc, + probe_data_block: DataBlock, + join_state: Arc, + probe_keys_stream: Box, + probed_rows: &'a mut ProbedRows, + filter_executor: Option<&'a mut FilterExecutor>, +} + +unsafe impl<'a, const CONJUNCT: bool> Send for OuterRightHashJoinStream<'a, CONJUNCT> {} +unsafe impl<'a, const CONJUNCT: bool> Sync for OuterRightHashJoinStream<'a, CONJUNCT> {} + +impl<'a, const CONJUNCT: bool> JoinStream for OuterRightHashJoinStream<'a, CONJUNCT> { + fn next(&mut self) -> Result> { + loop { + self.probed_rows.clear(); + let max_rows = self.probed_rows.matched_probe.capacity(); + self.probe_keys_stream.advance(self.probed_rows, max_rows)?; + + if self.probed_rows.is_empty() { + return Ok(None); + } + + if self.probed_rows.matched_probe.is_empty() { + continue; + } + + let probe_block = match self.probe_data_block.num_columns() { + 0 => None, + _ => Some(wrap_nullable_block(&DataBlock::take( + &self.probe_data_block, + &self.probed_rows.matched_probe, + )?)), + }; + + let build_block = match self.join_state.columns.is_empty() { + true => None, + false => { + let row_ptrs = self.probed_rows.matched_build.as_slice(); + Some(DataBlock::take_column_vec( + self.join_state.columns.as_slice(), + self.join_state.column_types.as_slice(), + row_ptrs, + row_ptrs.len(), + )) + } + }; + + let data_block = final_result_block( + &self.desc, + probe_block, + build_block, + self.probed_rows.matched_build.len(), + ); + + if !CONJUNCT { + for row_ptr in &self.probed_rows.matched_build { + let row_idx = row_ptr.row_index as usize; + let chunk_idx = row_ptr.chunk_index as usize; + self.join_state.scan_map.as_mut()[chunk_idx][row_idx] = 1; + } + + return Ok(Some(data_block)); + } + + let Some(filter_executor) = self.filter_executor.as_mut() else { + for row_ptr in &self.probed_rows.matched_build { + let row_idx = row_ptr.row_index as usize; + let chunk_idx = row_ptr.chunk_index as usize; + self.join_state.scan_map.as_mut()[chunk_idx][row_idx] = 1; + } + + return Ok(Some(data_block)); + }; + + if !data_block.is_empty() { + let res_rows = filter_executor.select(&data_block)?; + + if res_rows == 0 { + continue; + } + + let true_sel = filter_executor.true_selection(); + + for idx in true_sel.iter().take(res_rows) { + let row_ptr = self.probed_rows.matched_build[*idx as usize]; + let row_idx = row_ptr.row_index as usize; + let chunk_idx = row_ptr.chunk_index as usize; + self.join_state.scan_map.as_mut()[chunk_idx][row_idx] = 1; + } + + let num_rows = data_block.num_rows(); + return Ok(Some(filter_executor.take(data_block, num_rows, res_rows)?)); + } + } + } +} + +impl<'a, const CONJUNCT: bool> OuterRightHashJoinStream<'a, CONJUNCT> { + pub fn create( + probe_data_block: DataBlock, + join_state: Arc, + probe_keys_stream: Box, + desc: Arc, + probed_rows: &'a mut ProbedRows, + filter_executor: Option<&'a mut FilterExecutor>, + ) -> Box { + Box::new(OuterRightHashJoinStream::<'a, CONJUNCT> { + desc, + join_state, + probed_rows, + probe_data_block, + probe_keys_stream, + filter_executor, + }) + } +} + +struct OuterRightHashJoinFinalStream<'a> { + max_rows: usize, + desc: Arc, + join_state: Arc, + scan_idx: Vec, + scan_progress: Option<(usize, usize)>, + types: Vec, + _marker: std::marker::PhantomData<&'a ()>, +} + +impl<'a> JoinStream for OuterRightHashJoinFinalStream<'a> { + fn next(&mut self) -> Result> { + while let Some((chunk_idx, row_idx)) = self.scan_progress.take() { + let scan_map = &self.join_state.scan_map[chunk_idx]; + let remain_rows = self.max_rows - self.scan_idx.len(); + let remain_rows = std::cmp::min(remain_rows, scan_map.len() - row_idx); + + for idx in (row_idx..scan_map.len()).take(remain_rows) { + assume(idx < scan_map.len()); + assume(self.scan_idx.len() < self.scan_idx.capacity()); + + if scan_map[idx] == 0 { + let row_ptr = RowPtr::new(chunk_idx as u32, idx as u32); + self.scan_idx.push(row_ptr); + } + } + + let new_row_idx = row_idx + remain_rows; + self.scan_progress = match new_row_idx >= scan_map.len() { + true => self.join_state.steal_scan_chunk_index(), + false => Some((chunk_idx, new_row_idx)), + }; + + if self.scan_idx.len() >= self.max_rows { + break; + } + } + + if self.scan_idx.is_empty() { + return Ok(None); + } + + let num_rows = self.scan_idx.len(); + let probe_block = match self.types.len() { + 0 => None, + _ => null_block(&self.types, num_rows), + }; + + let build_block = match self.join_state.columns.is_empty() { + true => None, + false => { + let row_ptrs = self.scan_idx.as_slice(); + Some(DataBlock::take_column_vec( + self.join_state.columns.as_slice(), + self.join_state.column_types.as_slice(), + row_ptrs, + row_ptrs.len(), + )) + } + }; + + self.scan_idx.clear(); + Ok(Some(final_result_block( + &self.desc, + probe_block, + build_block, + num_rows, + ))) + } +} + +impl<'a> OuterRightHashJoinFinalStream<'a> { + pub fn create( + max_rows: usize, + desc: Arc, + join_state: Arc, + ) -> Box { + let scan_progress = join_state.steal_scan_chunk_index(); + let mut types = vec![]; + for (i, field) in desc.probe_schema.fields().iter().enumerate() { + if desc.probe_projections.contains(&i) { + types.push(field.data_type().clone()); + } + } + + Box::new(OuterRightHashJoinFinalStream::<'a> { + desc, + types, + max_rows, + join_state, + scan_progress, + scan_idx: Vec::with_capacity(max_rows), + _marker: Default::default(), + }) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/right_join_anti.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/right_join_anti.rs new file mode 100644 index 0000000000000..24c36b0a4f824 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/right_join_anti.rs @@ -0,0 +1,237 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; +use std::sync::Arc; + +use databend_common_base::base::ProgressValues; +use databend_common_base::hints::assume; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::FunctionContext; +use databend_common_expression::HashMethodKind; +use databend_common_expression::with_join_hash_method; +use databend_common_hashtable::RowPtr; + +use crate::pipelines::processors::HashJoinDesc; +use crate::pipelines::processors::transforms::BasicHashJoinState; +use crate::pipelines::processors::transforms::HashJoinHashTable; +use crate::pipelines::processors::transforms::Join; +use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; +use crate::pipelines::processors::transforms::memory::basic::BasicHashJoin; +use crate::pipelines::processors::transforms::memory::right_join_semi::SemiRightHashJoinStream; +use crate::pipelines::processors::transforms::merge_join_runtime_filter_packets; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::ProbeData; +use crate::pipelines::processors::transforms::new_hash_join::join::JoinStream; +use crate::pipelines::processors::transforms::new_hash_join::performance::PerformanceContext; +use crate::sessions::QueryContext; + +pub struct AntiRightHashJoin { + pub(crate) basic_hash_join: BasicHashJoin, + + pub(crate) desc: Arc, + pub(crate) function_ctx: FunctionContext, + pub(crate) basic_state: Arc, + pub(crate) performance_context: PerformanceContext, + + pub(crate) finished: bool, +} + +impl AntiRightHashJoin { + pub fn create( + ctx: &QueryContext, + function_ctx: FunctionContext, + method: HashMethodKind, + desc: Arc, + state: Arc, + ) -> Result { + let settings = ctx.get_settings(); + let block_size = settings.get_max_block_size()? as usize; + + let context = PerformanceContext::create(block_size, desc.clone(), function_ctx.clone()); + + let basic_hash_join = BasicHashJoin::create( + ctx, + function_ctx.clone(), + method, + desc.clone(), + state.clone(), + )?; + + Ok(AntiRightHashJoin { + desc, + basic_hash_join, + function_ctx, + basic_state: state, + performance_context: context, + finished: false, + }) + } +} + +impl Join for AntiRightHashJoin { + fn add_block(&mut self, data: Option) -> Result<()> { + self.basic_hash_join.add_block(data) + } + + fn final_build(&mut self) -> Result> { + self.basic_hash_join.final_build::() + } + + fn build_runtime_filter(&self) -> Result { + let packets = std::mem::take(self.basic_state.packets.as_mut()); + merge_join_runtime_filter_packets(packets) + } + + fn probe_block(&mut self, data: DataBlock) -> Result> { + self.basic_hash_join.finalize_chunks(); + + let probe_keys = self.desc.probe_key(&data, &self.function_ctx)?; + let mut probe_keys = DataBlock::new(probe_keys, data.num_rows()); + + let valids = self.desc.build_valids_by_keys(&probe_keys)?; + + self.desc.remove_keys_nullable(&mut probe_keys); + let probe_block = data.project(&self.desc.probe_projections); + + let probe_stream = with_join_hash_method!(|T| match self.basic_state.hash_table.deref() { + HashJoinHashTable::T(table) => { + let probe_hash_statistics = &mut self.performance_context.probe_hash_statistics; + probe_hash_statistics.clear(probe_block.num_rows()); + + let probe_data = ProbeData::new(probe_keys, valids, probe_hash_statistics); + table.probe_matched(probe_data) + } + HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( + "Aborted query, because the hash table is uninitialized.", + )), + })?; + + match self.performance_context.filter_executor.as_mut() { + None => Ok(SemiRightHashJoinStream::::create( + probe_block, + self.basic_state.clone(), + probe_stream, + self.desc.clone(), + &mut self.performance_context.probe_result, + None, + )), + Some(filter_executor) => Ok(SemiRightHashJoinStream::::create( + probe_block, + self.basic_state.clone(), + probe_stream, + self.desc.clone(), + &mut self.performance_context.probe_result, + Some(filter_executor), + )), + } + } + + fn final_probe(&mut self) -> Result>> { + self.basic_hash_join.finalize_chunks(); + + if self.finished { + return Ok(None); + } + + self.finished = true; + let max_rows = self + .performance_context + .probe_result + .matched_probe + .capacity(); + + Ok(Some(AntiRightHashJoinFinalStream::create( + max_rows, + self.basic_state.clone(), + ))) + } +} + +struct AntiRightHashJoinFinalStream<'a> { + max_rows: usize, + join_state: Arc, + scan_idx: Vec, + scan_progress: Option<(usize, usize)>, + _marker: std::marker::PhantomData<&'a ()>, +} + +impl<'a> JoinStream for AntiRightHashJoinFinalStream<'a> { + fn next(&mut self) -> Result> { + while let Some((chunk_idx, row_idx)) = self.scan_progress.take() { + let scan_map = &self.join_state.scan_map[chunk_idx]; + let remain_rows = self.max_rows - self.scan_idx.len(); + let remain_rows = std::cmp::min(remain_rows, scan_map.len() - row_idx); + + for idx in (row_idx..scan_map.len()).take(remain_rows) { + assume(idx < scan_map.len()); + assume(self.scan_idx.len() < self.scan_idx.capacity()); + + if scan_map[idx] == 0 { + let row_ptr = RowPtr::new(chunk_idx as u32, idx as u32); + self.scan_idx.push(row_ptr); + } + } + + let new_row_idx = row_idx + remain_rows; + self.scan_progress = match new_row_idx >= scan_map.len() { + true => self.join_state.steal_scan_chunk_index(), + false => Some((chunk_idx, new_row_idx)), + }; + + if self.scan_idx.len() >= self.max_rows { + break; + } + } + + if self.scan_idx.is_empty() { + return Ok(None); + } + + let build_block = match self.join_state.columns.is_empty() { + true => Some(DataBlock::new(vec![], self.scan_idx.len())), + false => { + let row_ptrs = self.scan_idx.as_slice(); + Some(DataBlock::take_column_vec( + self.join_state.columns.as_slice(), + self.join_state.column_types.as_slice(), + row_ptrs, + row_ptrs.len(), + )) + } + }; + + self.scan_idx.clear(); + Ok(build_block) + } +} + +impl<'a> AntiRightHashJoinFinalStream<'a> { + pub fn create( + max_rows: usize, + join_state: Arc, + ) -> Box { + let scan_progress = join_state.steal_scan_chunk_index(); + + Box::new(AntiRightHashJoinFinalStream::<'a> { + max_rows, + join_state, + scan_progress, + scan_idx: Vec::with_capacity(max_rows), + _marker: Default::default(), + }) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/right_join_semi.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/right_join_semi.rs new file mode 100644 index 0000000000000..c1b6a563f0331 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/right_join_semi.rs @@ -0,0 +1,355 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; +use std::sync::Arc; + +use databend_common_base::base::ProgressValues; +use databend_common_base::hints::assume; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::FilterExecutor; +use databend_common_expression::FunctionContext; +use databend_common_expression::HashMethodKind; +use databend_common_expression::with_join_hash_method; +use databend_common_hashtable::RowPtr; + +use crate::pipelines::processors::HashJoinDesc; +use crate::pipelines::processors::transforms::BasicHashJoinState; +use crate::pipelines::processors::transforms::HashJoinHashTable; +use crate::pipelines::processors::transforms::Join; +use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; +use crate::pipelines::processors::transforms::memory::basic::BasicHashJoin; +use crate::pipelines::processors::transforms::memory::left_join::final_result_block; +use crate::pipelines::processors::transforms::merge_join_runtime_filter_packets; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::ProbeData; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbeStream; +use crate::pipelines::processors::transforms::new_hash_join::hashtable::basic::ProbedRows; +use crate::pipelines::processors::transforms::new_hash_join::join::JoinStream; +use crate::pipelines::processors::transforms::new_hash_join::performance::PerformanceContext; +use crate::sessions::QueryContext; + +pub struct SemiRightHashJoin { + pub(crate) basic_hash_join: BasicHashJoin, + + pub(crate) desc: Arc, + pub(crate) function_ctx: FunctionContext, + pub(crate) basic_state: Arc, + pub(crate) performance_context: PerformanceContext, + + pub(crate) finished: bool, +} + +impl SemiRightHashJoin { + pub fn create( + ctx: &QueryContext, + function_ctx: FunctionContext, + method: HashMethodKind, + desc: Arc, + state: Arc, + ) -> Result { + let settings = ctx.get_settings(); + let block_size = settings.get_max_block_size()? as usize; + + let context = PerformanceContext::create(block_size, desc.clone(), function_ctx.clone()); + + let basic_hash_join = BasicHashJoin::create( + ctx, + function_ctx.clone(), + method, + desc.clone(), + state.clone(), + )?; + + Ok(SemiRightHashJoin { + desc, + basic_hash_join, + function_ctx, + basic_state: state, + performance_context: context, + finished: false, + }) + } +} + +impl Join for SemiRightHashJoin { + fn add_block(&mut self, data: Option) -> Result<()> { + self.basic_hash_join.add_block(data) + } + + fn final_build(&mut self) -> Result> { + self.basic_hash_join.final_build::() + } + + fn build_runtime_filter(&self) -> Result { + let packets = std::mem::take(self.basic_state.packets.as_mut()); + merge_join_runtime_filter_packets(packets) + } + + fn probe_block(&mut self, data: DataBlock) -> Result> { + self.basic_hash_join.finalize_chunks(); + + let probe_keys = self.desc.probe_key(&data, &self.function_ctx)?; + let mut probe_keys = DataBlock::new(probe_keys, data.num_rows()); + + let valids = self.desc.build_valids_by_keys(&probe_keys)?; + + self.desc.remove_keys_nullable(&mut probe_keys); + let probe_block = data.project(&self.desc.probe_projections); + + let probe_stream = with_join_hash_method!(|T| match self.basic_state.hash_table.deref() { + HashJoinHashTable::T(table) => { + let probe_hash_statistics = &mut self.performance_context.probe_hash_statistics; + probe_hash_statistics.clear(probe_block.num_rows()); + + let probe_data = ProbeData::new(probe_keys, valids, probe_hash_statistics); + table.probe_matched(probe_data) + } + HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery( + "Aborted query, because the hash table is uninitialized.", + )), + })?; + + match self.performance_context.filter_executor.as_mut() { + None => Ok(SemiRightHashJoinStream::::create( + probe_block, + self.basic_state.clone(), + probe_stream, + self.desc.clone(), + &mut self.performance_context.probe_result, + None, + )), + Some(filter_executor) => Ok(SemiRightHashJoinStream::::create( + probe_block, + self.basic_state.clone(), + probe_stream, + self.desc.clone(), + &mut self.performance_context.probe_result, + Some(filter_executor), + )), + } + } + + fn final_probe(&mut self) -> Result>> { + self.basic_hash_join.finalize_chunks(); + + if self.finished { + return Ok(None); + } + + self.finished = true; + let max_rows = self + .performance_context + .probe_result + .matched_probe + .capacity(); + + Ok(Some(SemiRightHashJoinFinalStream::create( + max_rows, + self.basic_state.clone(), + ))) + } +} + +pub struct SemiRightHashJoinStream<'a, const CONJUNCT: bool> { + desc: Arc, + probe_data_block: DataBlock, + join_state: Arc, + probe_keys_stream: Box, + probed_rows: &'a mut ProbedRows, + filter_executor: Option<&'a mut FilterExecutor>, +} + +unsafe impl<'a, const CONJUNCT: bool> Send for SemiRightHashJoinStream<'a, CONJUNCT> {} +unsafe impl<'a, const CONJUNCT: bool> Sync for SemiRightHashJoinStream<'a, CONJUNCT> {} + +impl<'a, const CONJUNCT: bool> JoinStream for SemiRightHashJoinStream<'a, CONJUNCT> { + fn next(&mut self) -> Result> { + loop { + self.probed_rows.clear(); + let max_rows = self.probed_rows.matched_probe.capacity(); + self.probe_keys_stream.advance(self.probed_rows, max_rows)?; + + if self.probed_rows.is_empty() { + return Ok(None); + } + + if self.probed_rows.matched_probe.is_empty() { + continue; + } + + if !CONJUNCT { + for row_ptr in &self.probed_rows.matched_build { + let row_idx = row_ptr.row_index as usize; + let chunk_idx = row_ptr.chunk_index as usize; + self.join_state.scan_map.as_mut()[chunk_idx][row_idx] = 1; + } + + continue; + } + + let Some(filter_executor) = self.filter_executor.as_mut() else { + for row_ptr in &self.probed_rows.matched_build { + let row_idx = row_ptr.row_index as usize; + let chunk_idx = row_ptr.chunk_index as usize; + self.join_state.scan_map.as_mut()[chunk_idx][row_idx] = 1; + } + + continue; + }; + + let probe_block = match self.probe_data_block.num_columns() { + 0 => None, + _ => Some(DataBlock::take( + &self.probe_data_block, + &self.probed_rows.matched_probe, + )?), + }; + + let build_block = match self.join_state.columns.is_empty() { + true => None, + false => { + let row_ptrs = self.probed_rows.matched_build.as_slice(); + Some(DataBlock::take_column_vec( + self.join_state.columns.as_slice(), + self.join_state.column_types.as_slice(), + row_ptrs, + row_ptrs.len(), + )) + } + }; + + let result_block = final_result_block( + &self.desc, + probe_block, + build_block, + self.probed_rows.matched_build.len(), + ); + + if !result_block.is_empty() { + let result_count = filter_executor.select(&result_block)?; + + if result_count == 0 { + continue; + } + + let true_sel = filter_executor.true_selection(); + + for idx in true_sel.iter().take(result_count) { + let row_ptr = self.probed_rows.matched_build[*idx as usize]; + let row_idx = row_ptr.row_index as usize; + let chunk_idx = row_ptr.chunk_index as usize; + self.join_state.scan_map.as_mut()[chunk_idx][row_idx] = 1; + } + } + } + } +} + +impl<'a, const CONJUNCT: bool> SemiRightHashJoinStream<'a, CONJUNCT> { + pub fn create( + probe_data_block: DataBlock, + join_state: Arc, + probe_keys_stream: Box, + desc: Arc, + probed_rows: &'a mut ProbedRows, + filter_executor: Option<&'a mut FilterExecutor>, + ) -> Box { + Box::new(SemiRightHashJoinStream::<'a, CONJUNCT> { + desc, + join_state, + probed_rows, + probe_data_block, + probe_keys_stream, + filter_executor, + }) + } +} + +struct SemiRightHashJoinFinalStream<'a> { + max_rows: usize, + join_state: Arc, + scan_idx: Vec, + scan_progress: Option<(usize, usize)>, + _marker: std::marker::PhantomData<&'a ()>, +} + +impl<'a> JoinStream for SemiRightHashJoinFinalStream<'a> { + fn next(&mut self) -> Result> { + while let Some((chunk_idx, row_idx)) = self.scan_progress.take() { + let scan_map = &self.join_state.scan_map[chunk_idx]; + let remain_rows = self.max_rows - self.scan_idx.len(); + let remain_rows = std::cmp::min(remain_rows, scan_map.len() - row_idx); + + for idx in (row_idx..scan_map.len()).take(remain_rows) { + assume(idx < scan_map.len()); + assume(self.scan_idx.len() < self.scan_idx.capacity()); + + if scan_map[idx] == 1 { + let row_ptr = RowPtr::new(chunk_idx as u32, idx as u32); + self.scan_idx.push(row_ptr); + } + } + + let new_row_idx = row_idx + remain_rows; + self.scan_progress = match new_row_idx >= scan_map.len() { + true => self.join_state.steal_scan_chunk_index(), + false => Some((chunk_idx, new_row_idx)), + }; + + if self.scan_idx.len() >= self.max_rows { + break; + } + } + + if self.scan_idx.is_empty() { + return Ok(None); + } + + let build_block = match self.join_state.columns.is_empty() { + true => Some(DataBlock::new(vec![], self.scan_idx.len())), + false => { + let row_ptrs = self.scan_idx.as_slice(); + Some(DataBlock::take_column_vec( + self.join_state.columns.as_slice(), + self.join_state.column_types.as_slice(), + row_ptrs, + row_ptrs.len(), + )) + } + }; + + self.scan_idx.clear(); + Ok(build_block) + } +} + +impl<'a> SemiRightHashJoinFinalStream<'a> { + pub fn create( + max_rows: usize, + join_state: Arc, + ) -> Box { + let scan_progress = join_state.steal_scan_chunk_index(); + + Box::new(SemiRightHashJoinFinalStream::<'a> { + max_rows, + join_state, + scan_progress, + scan_idx: Vec::with_capacity(max_rows), + _marker: Default::default(), + }) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs index 76df5ca47280f..21197fb981232 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs @@ -18,6 +18,7 @@ use std::fmt::Formatter; use std::sync::Arc; use std::time::Instant; +use databend_common_base::base::Barrier; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline::core::Event; @@ -26,7 +27,6 @@ use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::Processor; use databend_common_pipeline::core::ProcessorPtr; use databend_common_sql::ColumnSet; -use tokio::sync::Barrier; use crate::pipelines::processors::transforms::RuntimeFilterLocalBuilder; use crate::pipelines::processors::transforms::new_hash_join::join::Join; @@ -100,10 +100,12 @@ impl Processor for TransformHashJoin { self.build_port.finish(); self.probe_port.finish(); - return match &self.stage { - Stage::Finished => Ok(Event::Finished), - _ => Ok(Event::Async), - }; + if !matches!(self.stage, Stage::Finished) { + self.stage = Stage::Finished; + self.stage_sync_barrier.reduce_quorum(1); + } + + return Ok(Event::Finished); } if !self.joined_port.can_push() { diff --git a/src/query/service/src/pipelines/processors/transforms/transform_cache_scan.rs b/src/query/service/src/pipelines/processors/transforms/transform_cache_scan.rs index 09fc6aef05270..224c106b0f1b8 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_cache_scan.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_cache_scan.rs @@ -23,6 +23,7 @@ use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::ProcessorPtr; use databend_common_pipeline::sources::AsyncSource; use databend_common_pipeline::sources::AsyncSourcer; +use databend_common_sql::IndexType; use crate::pipelines::processors::HashJoinState; use crate::pipelines::processors::transforms::BasicHashJoinState; @@ -112,18 +113,18 @@ impl HashJoinCacheState { pub struct NewHashJoinCacheState { idx: usize, memory_state: Arc, - column_indexes: Vec, + column_offsets: Vec, } impl NewHashJoinCacheState { pub fn new( - column_indexes: Vec, + column_offsets: Vec, memory_state: Arc, ) -> NewHashJoinCacheState { NewHashJoinCacheState { idx: 0, memory_state, - column_indexes, + column_offsets, } } fn next_data_block(&mut self) -> Option { @@ -131,10 +132,10 @@ impl NewHashJoinCacheState { return None; } - let mut columns = Vec::with_capacity(self.column_indexes.len()); + let mut columns = Vec::with_capacity(self.column_offsets.len()); let num_rows = self.memory_state.chunks[self.idx].num_rows(); - for column_index in self.column_indexes.iter() { - let column = self.memory_state.chunks[self.idx].get_by_offset(*column_index); + for offset in self.column_offsets.iter() { + let column = self.memory_state.chunks[self.idx].get_by_offset(*offset); columns.push(column.clone()); } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 7efd9f212e9e4..a143014f67554 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1477,7 +1477,7 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=1)), }), ("enable_experimental_new_join", DefaultSettingValue { - value: UserSettingValue::UInt64(0), + value: UserSettingValue::UInt64(1), desc: "Enables the experimental new join implement", mode: SettingMode::Both, scope: SettingScope::Both,