Skip to content

Commit 2e81662

Browse files
authored
Merge pull request #10593 from dantengsky/fix-replace-into-panic
fix: replace into panic
2 parents 9e8eb63 + 9984fac commit 2e81662

File tree

4 files changed

+111
-22
lines changed

4 files changed

+111
-22
lines changed

src/query/service/tests/it/storages/fuse/operations/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,6 @@ mod optimize;
2525
mod purge_drop;
2626
mod purge_truncate;
2727
mod read_plan;
28+
mod replace_into;
2829
mod table_analyze;
2930
mod truncate;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_exception::Result;
16+
use common_storages_fuse::FuseTable;
17+
18+
#[test]
19+
fn test_partition() -> Result<()> {
20+
use rand::Rng;
21+
let mut rng = rand::thread_rng();
22+
for _ in 0..100 {
23+
let number_segment: usize = rng.gen_range(1..100);
24+
25+
// do not matter, arbitrarily picked
26+
let format_version = 2;
27+
28+
let segments = (0..number_segment)
29+
.map(|idx| (format!("{idx}"), format_version))
30+
.collect::<Vec<_>>();
31+
32+
for _ in 0..100 {
33+
let num_partition: usize = if number_segment == 1 {
34+
1
35+
} else {
36+
rng.gen_range(1..number_segment)
37+
};
38+
39+
let chunks = FuseTable::partition_segments(&segments, num_partition);
40+
assert_eq!(chunks.len(), num_partition);
41+
for (idx, (segment_idx, _)) in chunks.clone().into_iter().flatten().enumerate() {
42+
assert_eq!(idx, segment_idx)
43+
}
44+
}
45+
}
46+
Ok(())
47+
}

src/query/storages/fuse/src/operations/replace.rs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use common_pipeline_core::pipe::PipeItem;
2525
use common_pipeline_core::processors::processor::ProcessorPtr;
2626
use common_pipeline_transforms::processors::transforms::create_dummy_item;
2727
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer;
28+
use storages_common_table_meta::meta::Location;
2829
use storages_common_table_meta::meta::Statistics;
2930
use storages_common_table_meta::meta::TableSnapshot;
3031
use uuid::Uuid;
@@ -37,6 +38,7 @@ use crate::operations::merge_into::CommitSink;
3738
use crate::operations::merge_into::MergeIntoOperationAggregator;
3839
use crate::operations::merge_into::OnConflictField;
3940
use crate::operations::merge_into::TableMutationAggregator;
41+
use crate::operations::mutation::base_mutator::SegmentIndex;
4042
use crate::operations::replace_into::processor_replace_into::ReplaceIntoProcessor;
4143
use crate::pipelines::Pipeline;
4244
use crate::FuseTable;
@@ -259,28 +261,7 @@ impl FuseTable {
259261
on_conflicts: Vec<OnConflictField>,
260262
table_snapshot: &TableSnapshot,
261263
) -> Result<Vec<PipeItem>> {
262-
let segments = table_snapshot.segments.as_slice();
263-
let chunk_size = segments.len() / num_partition;
264-
// caller site should guarantee this
265-
assert!(chunk_size >= 1);
266-
267-
let mut chunks = vec![];
268-
let mut chunk = vec![];
269-
for (segment_idx, segment_location) in segments.iter().enumerate() {
270-
chunk.push((segment_idx, segment_location.clone()));
271-
if (segment_idx + 1) % chunk_size == 0 {
272-
chunks.push(std::mem::take(&mut chunk))
273-
}
274-
}
275-
276-
if !chunk.is_empty() {
277-
if chunks.len() == num_partition {
278-
chunks.last_mut().unwrap().append(&mut chunk);
279-
} else {
280-
chunks.push(std::mem::take(&mut chunk))
281-
}
282-
}
283-
264+
let chunks = Self::partition_segments(&table_snapshot.segments, num_partition);
284265
let read_settings = ReadSettings::from_ctx(&ctx)?;
285266
let mut items = vec![];
286267
for chunk_of_segment_locations in chunks {
@@ -299,6 +280,28 @@ impl FuseTable {
299280
Ok(items)
300281
}
301282

283+
pub fn partition_segments(
284+
segments: &[Location],
285+
num_partition: usize,
286+
) -> Vec<Vec<(SegmentIndex, Location)>> {
287+
let chunk_size = segments.len() / num_partition;
288+
// caller site guarantees this
289+
assert!(chunk_size >= 1);
290+
291+
let mut chunks = vec![];
292+
for (chunk_idx, chunk) in segments.chunks(chunk_size).enumerate() {
293+
let mut segment_chunk = (chunk_idx * chunk_size..)
294+
.zip(chunk.to_vec())
295+
.collect::<Vec<_>>();
296+
if chunks.len() < num_partition {
297+
chunks.push(segment_chunk);
298+
} else {
299+
chunks.last_mut().unwrap().append(&mut segment_chunk);
300+
}
301+
}
302+
chunks
303+
}
304+
302305
fn create_append_transform(&self, ctx: Arc<dyn TableContext>) -> AppendTransform {
303306
AppendTransform::try_create(
304307
ctx,

tests/sqllogictests/suites/base/09_fuse_engine/09_0023_replace_into

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,44 @@ DROP TABLE test;
323323
statement ok
324324
drop stage test_stage
325325

326+
###############################################################
327+
# issue https://github.com/datafuselabs/databend/issues/10572 #
328+
###############################################################
329+
330+
# with 5 segments and max_threads set to 3, `replace into` leads to
331+
# ERROR HY000 (1105): Code: 1068, displayText = Cannot join handle from context's runtime
332+
333+
statement ok
334+
CREATE TABLE test(a int);
335+
336+
statement ok
337+
insert into test values(1);
338+
339+
statement ok
340+
insert into test values(2);
341+
342+
statement ok
343+
insert into test values(3);
344+
345+
statement ok
346+
insert into test values(4);
347+
348+
statement ok
349+
insert into test values(5);
350+
351+
statement ok
352+
set max_threads = 3;
353+
354+
# if not fixed: ERROR HY000 (1105): Code: 1068, displayText = Cannot join handle from context's runtime
355+
statement ok
356+
replace into test on(a) values(6);
357+
358+
statement ok
359+
drop table test
360+
361+
362+
#####################
363+
326364

327365
statement ok
328366
DROP DATABASE db_09_0023

0 commit comments

Comments
 (0)