Skip to content
14 changes: 12 additions & 2 deletions src/query/service/src/physical_plans/physical_cache_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,19 @@ impl IPhysicalPlan for CacheScan {
max_block_size,
))
}
Some(HashJoinStateRef::NewHashJoinState(hash_join_state)) => {
Some(HashJoinStateRef::NewHashJoinState(hash_join_state, column_map)) => {
let mut column_offsets = Vec::with_capacity(column_indexes.len());
for index in column_indexes {
let Some(offset) = column_map.get(index) else {
return Err(ErrorCode::Internal(format!(
"Hash join cache column {} not found in build projection",
index
)));
};
column_offsets.push(*offset);
}
CacheSourceState::NewHashJoinCacheState(NewHashJoinCacheState::new(
column_indexes.clone(),
column_offsets,
hash_join_state.clone(),
))
}
Expand Down
26 changes: 20 additions & 6 deletions src/query/service/src/physical_plans/physical_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use crate::pipelines::processors::transforms::RuntimeFiltersDesc;
use crate::pipelines::processors::transforms::TransformHashJoin;
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
use crate::pipelines::processors::transforms::TransformHashJoinProbe;
use crate::sessions::QueryContext;

// Type aliases to simplify complex return types
type JoinConditionsResult = (
Expand Down Expand Up @@ -270,19 +271,29 @@ impl IPhysicalPlan for HashJoin {
let (enable_optimization, _) = builder.merge_into_get_optimization_flag(self);

if desc.single_to_inner.is_none()
&& (self.join_type == JoinType::Inner || self.join_type == JoinType::Left)
&& matches!(
self.join_type,
JoinType::Inner
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti
)
&& experimental_new_join
&& !enable_optimization
&& !self.need_hold_hash_table
{
return self.build_new_join_pipeline(builder, desc);
}

// Create the join state with optimization flags
let state = self.build_state(builder)?;

if let Some((build_cache_index, _)) = self.build_side_cache_info {
if let Some((build_cache_index, _)) = &self.build_side_cache_info {
builder.hash_join_states.insert(
build_cache_index,
*build_cache_index,
HashJoinStateRef::OldHashJoinState(state.clone()),
);
}
Expand Down Expand Up @@ -413,15 +424,18 @@ impl HashJoin {
{
let state = factory.create_basic_state(0)?;

if let Some((build_cache_index, _)) = self.build_side_cache_info {
if let Some((build_cache_index, column_map)) = &self.build_side_cache_info {
builder.hash_join_states.insert(
build_cache_index,
HashJoinStateRef::NewHashJoinState(state.clone()),
*build_cache_index,
HashJoinStateRef::NewHashJoinState(state.clone(), column_map.clone()),
);
}
}

let mut sub_query_ctx = QueryContext::create_from(&builder.ctx);
std::mem::swap(&mut builder.ctx, &mut sub_query_ctx);
self.build.build_pipeline(builder)?;
std::mem::swap(&mut builder.ctx, &mut sub_query_ctx);
let mut build_sinks = builder.main_pipeline.take_sinks();

self.probe.build_pipeline(builder)?;
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_pipeline::core::ExecutionInfo;
use databend_common_pipeline::core::Pipeline;
use databend_common_pipeline::core::always_callback;
use databend_common_settings::Settings;
use databend_common_sql::IndexType;

use super::PipelineBuilderData;
use crate::interpreters::CreateTableInterpreter;
Expand All @@ -38,7 +39,7 @@ use crate::sessions::QueryContext;
#[derive(Clone)]
pub enum HashJoinStateRef {
OldHashJoinState(Arc<HashJoinState>),
NewHashJoinState(Arc<BasicHashJoinState>),
NewHashJoinState(Arc<BasicHashJoinState>, HashMap<IndexType, usize>),
}

pub struct PipelineBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,14 @@ pub fn wrap_true_validity(
NullableColumn::new_column(col, validity).into()
}
}

pub fn wrap_nullable_block(input: &DataBlock) -> DataBlock {
let input_num_rows = input.num_rows();
let true_validity = Bitmap::new_constant(true, input_num_rows);
let nullable_columns = input
.columns()
.iter()
.map(|c| wrap_true_validity(c, input_num_rows, &true_validity))
.collect::<Vec<_>>();
DataBlock::new(nullable_columns, input_num_rows)
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct HashJoinDesc {
pub(crate) probe_projections: ColumnSet,
pub(crate) probe_to_build: Vec<(usize, (bool, bool))>,
pub(crate) build_schema: DataSchemaRef,
pub(crate) probe_schema: DataSchemaRef,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -138,6 +139,7 @@ impl HashJoinDesc {
build_projection: join.build_projections.clone(),
probe_projections: join.probe_projections.clone(),
build_schema: join.build.output_schema()?,
probe_schema: join.probe.output_schema()?,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod transform_hash_join_build;
mod transform_hash_join_probe;
mod util;

pub use common::wrap_nullable_block;
pub use common::wrap_true_validity;
pub use desc::HashJoinDesc;
pub use desc::RuntimeFilterDesc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ use crate::pipelines::processors::transforms::BasicHashJoinState;
use crate::pipelines::processors::transforms::HashJoinHashTable;
use crate::pipelines::processors::transforms::InnerHashJoin;
use crate::pipelines::processors::transforms::Join;
use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin;
use crate::pipelines::processors::transforms::memory::AntiLeftHashJoin;
use crate::pipelines::processors::transforms::memory::AntiRightHashJoin;
use crate::pipelines::processors::transforms::memory::OuterRightHashJoin;
use crate::pipelines::processors::transforms::memory::SemiLeftHashJoin;
use crate::pipelines::processors::transforms::memory::SemiRightHashJoin;
use crate::pipelines::processors::transforms::memory::left_join::OuterLeftHashJoin;

pub trait GraceMemoryJoin: Join {
fn reset_memory(&mut self);
Expand Down Expand Up @@ -52,6 +57,14 @@ fn reset_basic_state(state: &BasicHashJoinState) {
state.build_queue.as_mut().clear();
}

if !state.scan_map.is_empty() {
state.scan_map.as_mut().clear();
}

if !state.scan_queue.is_empty() {
state.scan_queue.as_mut().clear();
}

*state.hash_table.as_mut() = HashJoinHashTable::Null;
}

Expand All @@ -68,3 +81,38 @@ impl GraceMemoryJoin for OuterLeftHashJoin {
reset_basic_state(&self.basic_state);
}
}

impl GraceMemoryJoin for SemiLeftHashJoin {
fn reset_memory(&mut self) {
self.performance_context.clear();
reset_basic_state(&self.basic_state);
}
}

impl GraceMemoryJoin for AntiLeftHashJoin {
fn reset_memory(&mut self) {
self.performance_context.clear();
reset_basic_state(&self.basic_state);
}
}

impl GraceMemoryJoin for OuterRightHashJoin {
fn reset_memory(&mut self) {
self.performance_context.clear();
reset_basic_state(&self.basic_state);
}
}

impl GraceMemoryJoin for SemiRightHashJoin {
fn reset_memory(&mut self) {
self.performance_context.clear();
reset_basic_state(&self.basic_state);
}
}

impl GraceMemoryJoin for AntiRightHashJoin {
fn reset_memory(&mut self) {
self.performance_context.clear();
reset_basic_state(&self.basic_state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ use crate::pipelines::processors::transforms::BasicHashJoinState;
use crate::pipelines::processors::transforms::GraceHashJoin;
use crate::pipelines::processors::transforms::InnerHashJoin;
use crate::pipelines::processors::transforms::Join;
use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin;
use crate::pipelines::processors::transforms::memory::AntiLeftHashJoin;
use crate::pipelines::processors::transforms::memory::AntiRightHashJoin;
use crate::pipelines::processors::transforms::memory::OuterRightHashJoin;
use crate::pipelines::processors::transforms::memory::SemiLeftHashJoin;
use crate::pipelines::processors::transforms::memory::SemiRightHashJoin;
use crate::pipelines::processors::transforms::memory::left_join::OuterLeftHashJoin;
use crate::pipelines::processors::transforms::new_hash_join::common::CStyleCell;
use crate::pipelines::processors::transforms::new_hash_join::grace::GraceHashJoinState;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -140,6 +145,41 @@ impl HashJoinFactory {
self.desc.clone(),
self.create_basic_state(id)?,
)?)),
JoinType::LeftAnti => Ok(Box::new(AntiLeftHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?)),
JoinType::LeftSemi => Ok(Box::new(SemiLeftHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?)),
JoinType::Right => Ok(Box::new(OuterRightHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?)),
JoinType::RightSemi => Ok(Box::new(SemiRightHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?)),
JoinType::RightAnti => Ok(Box::new(AntiRightHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?)),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -184,6 +224,101 @@ impl HashJoinFactory {
0,
)?))
}
JoinType::LeftAnti => {
let left_anti_hash_join = AntiLeftHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?;

Ok(Box::new(GraceHashJoin::create(
self.ctx.clone(),
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_grace_state(id + 1)?,
left_anti_hash_join,
0,
)?))
}
JoinType::LeftSemi => {
let left_semi_hash_join = SemiLeftHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?;

Ok(Box::new(GraceHashJoin::create(
self.ctx.clone(),
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_grace_state(id + 1)?,
left_semi_hash_join,
0,
)?))
}
JoinType::Right => {
let right_hash_join = OuterRightHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?;

Ok(Box::new(GraceHashJoin::create(
self.ctx.clone(),
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_grace_state(id + 1)?,
right_hash_join,
0,
)?))
}
JoinType::RightSemi => {
let semi_right_hash_join = SemiRightHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?;

Ok(Box::new(GraceHashJoin::create(
self.ctx.clone(),
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_grace_state(id + 1)?,
semi_right_hash_join,
0,
)?))
}
JoinType::RightAnti => {
let anti_right_hash_join = AntiRightHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?;

Ok(Box::new(GraceHashJoin::create(
self.ctx.clone(),
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_grace_state(id + 1)?,
anti_right_hash_join,
0,
)?))
}
_ => unreachable!(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use databend_common_exception::Result;
use databend_common_hashtable::RowPtr;

#[derive(Debug)]
pub struct ProbedRows {
pub unmatched: Vec<u64>,
pub matched_probe: Vec<u64>,
Expand Down
Loading
Loading