Skip to content

Commit f7a4a5d

Browse files
authored
fix(query): double panic if broken state in block builder (#17091)
* fix(query): fix call double finish in transform accumulating * fix(query): fix DataBlock corrupted, column length mismatch panic * fix(query): remove useless file
1 parent d8b7d4d commit f7a4a5d

File tree

3 files changed

+8
-2
lines changed

3 files changed

+8
-2
lines changed

src/query/pipeline/transforms/src/processors/transforms/transform_accumulating.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ impl<T: AccumulatingTransform + 'static> Drop for AccumulatingTransformer<T> {
6666
fn drop(&mut self) {
6767
drop_guard(move || {
6868
if !self.called_on_finish {
69+
self.called_on_finish = true;
6970
self.inner.on_finish(false).unwrap();
7071
}
7172
})

src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,5 +223,6 @@ pub fn sort_merge(
223223
for block in data_blocks {
224224
processor.transform(block)?;
225225
}
226+
226227
processor.on_finish(true)
227228
}

src/query/storages/stage/src/read/row_based/processors/block_builder.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,11 @@ impl AccumulatingTransform for BlockBuilder {
148148
Ok(blocks)
149149
}
150150

151-
fn on_finish(&mut self, _output: bool) -> Result<Vec<DataBlock>> {
152-
self.flush_block(true)
151+
fn on_finish(&mut self, output: bool) -> Result<Vec<DataBlock>> {
152+
if output {
153+
return self.flush_block(true);
154+
}
155+
156+
Ok(vec![])
153157
}
154158
}

0 commit comments

Comments
 (0)