Skip to content

Commit f9ddf2e

Browse files
authored
Merge pull request #10711 from xudong963/left_join_oom
fix: left join oom
2 parents 379bb89 + 09de206 commit f9ddf2e

File tree

6 files changed

+30
-21
lines changed

6 files changed

+30
-21
lines changed

src/query/service/src/pipelines/pipeline_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,7 @@ impl PipelineBuilder {
829829
|| join.join_type == JoinType::Single)
830830
&& join.non_equi_conditions.is_empty()
831831
{
832-
self.main_pipeline.resize(1)?;
832+
// self.main_pipeline.resize(1)?;
833833
self.main_pipeline.add_transform(|input, output| {
834834
let transform = TransformLeftJoin::try_create(
835835
input,

src/query/service/src/pipelines/processors/transforms/hash_join/common.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -309,20 +309,22 @@ impl JoinHashTable {
309309
}
310310

311311
pub(crate) fn rest_block(&self) -> Result<DataBlock> {
312-
let rest_probe_blocks = self.hash_join_desc.join_state.rest_probe_blocks.read();
313-
if rest_probe_blocks.is_empty() {
312+
let mut rest_pairs = self.hash_join_desc.join_state.rest_pairs.write();
313+
if rest_pairs.0.is_empty() {
314314
return Ok(DataBlock::empty());
315315
}
316-
let probe_block = DataBlock::concat(&rest_probe_blocks)?;
317-
let rest_build_indexes = self.hash_join_desc.join_state.rest_build_indexes.read();
318-
let mut build_block = self.row_space.gather(&rest_build_indexes)?;
316+
let probe_block = DataBlock::concat(&rest_pairs.0)?;
317+
rest_pairs.0.clear();
318+
let mut build_block = self.row_space.gather(&rest_pairs.1)?;
319+
rest_pairs.1.clear();
319320
// For left join, wrap nullable for build block
320321
if matches!(
321322
self.hash_join_desc.join_type,
322323
JoinType::Left | JoinType::Single | JoinType::Full
323324
) {
324-
let validity = self.hash_join_desc.join_state.validity.read();
325-
let validity: Bitmap = (*validity).clone().into();
325+
let mut validity_state = self.hash_join_desc.join_state.validity.write();
326+
let validity: Bitmap = (*validity_state).clone().into();
327+
validity_state.clear();
326328
let num_rows = validity.len();
327329
let nullable_columns = if self.row_space.datablocks().is_empty() {
328330
build_block
@@ -342,7 +344,6 @@ impl JoinHashTable {
342344
};
343345
build_block = DataBlock::new(nullable_columns, num_rows);
344346
}
345-
346347
self.merge_eq_block(&build_block, &probe_block)
347348
}
348349

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,17 @@ pub struct JoinState {
4343
/// Record rows in build side that are matched with rows in probe side.
4444
/// It's order-sensitive, aligned with the order of rows in merged block.
4545
pub(crate) build_indexes: RwLock<Vec<RowPtr>>,
46-
pub(crate) rest_build_indexes: RwLock<Vec<RowPtr>>,
47-
pub(crate) rest_probe_blocks: RwLock<Vec<DataBlock>>,
46+
/// Rest build indexes and probe blocks
47+
pub(crate) rest_pairs: RwLock<(Vec<DataBlock>, Vec<RowPtr>)>,
4848
pub(crate) validity: RwLock<MutableBitmap>,
4949
}
5050

5151
impl JoinState {
5252
pub fn create() -> Result<Self> {
5353
Ok(JoinState {
5454
build_indexes: RwLock::new(Vec::with_capacity(JOIN_MAX_BLOCK_SIZE)),
55-
rest_build_indexes: RwLock::new(Vec::with_capacity(JOIN_MAX_BLOCK_SIZE)),
56-
rest_probe_blocks: RwLock::new(Vec::with_capacity(JOIN_MAX_BLOCK_SIZE)),
5755
validity: RwLock::new(MutableBitmap::with_capacity(JOIN_MAX_BLOCK_SIZE)),
56+
rest_pairs: Default::default(),
5857
})
5958
}
6059
}

src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,10 +243,9 @@ impl JoinHashTable {
243243
}
244244

245245
if !WITH_OTHER_CONJUNCT {
246-
let mut rest_build_indexes = self.hash_join_desc.join_state.rest_build_indexes.write();
247-
rest_build_indexes.extend(local_build_indexes);
248-
let mut rest_probe_blocks = self.hash_join_desc.join_state.rest_probe_blocks.write();
249-
rest_probe_blocks.push(probe_block);
246+
let mut rest_pairs = self.hash_join_desc.join_state.rest_pairs.write();
247+
rest_pairs.1.extend(local_build_indexes);
248+
rest_pairs.0.push(probe_block);
250249
let validity: Bitmap = validity.into();
251250
let mut validity_state = self.hash_join_desc.join_state.validity.write();
252251
validity_state.extend_from_bitmap(&validity);

src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,9 @@ impl JoinHashTable {
136136
probe_block = DataBlock::new(nullable_columns, validity.len());
137137
}
138138

139-
let mut rest_build_indexes = self.hash_join_desc.join_state.rest_build_indexes.write();
140-
let mut rest_probe_blocks = self.hash_join_desc.join_state.rest_probe_blocks.write();
141-
rest_probe_blocks.push(probe_block);
142-
rest_build_indexes.extend(local_build_indexes);
139+
let mut rest_pairs = self.hash_join_desc.join_state.rest_pairs.write();
140+
rest_pairs.0.push(probe_block);
141+
rest_pairs.1.extend(local_build_indexes);
143142

144143
Ok(probed_blocks)
145144
}

src/query/service/src/pipelines/processors/transforms/transform_left_join.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,17 @@ impl Compactor for LeftJoinCompactor {
4040
self.hash_join_state.interrupt();
4141
}
4242

43+
fn use_partial_compact() -> bool {
44+
true
45+
}
46+
47+
fn compact_partial(&mut self, blocks: &mut Vec<DataBlock>) -> Result<Vec<DataBlock>> {
48+
let res = self.hash_join_state.left_join_blocks(blocks)?;
49+
// Original blocks are already in res, so clear them.
50+
blocks.clear();
51+
Ok(res)
52+
}
53+
4354
// `compact_final` is called when all the blocks are pushed
4455
fn compact_final(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
4556
self.hash_join_state.left_join_blocks(blocks)

0 commit comments

Comments
 (0)