Skip to content

Commit 6775dc3

Browse files
committed
fix replace into panic
1 parent b30a4a1 commit 6775dc3

File tree

4 files changed

+106
-22
lines changed

4 files changed

+106
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/storages/fuse/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,6 @@ tokio-rayon = "2.1.0"
5656
tracing = "0.1.36"
5757
typetag = "0.2.3"
5858
uuid = { version = "1.1.2", features = ["serde", "v4"] }
59+
60+
[dev-dependencies]
61+
rand = "0.8.5"

src/query/storages/fuse/src/operations/replace.rs

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use common_pipeline_core::pipe::PipeItem;
2525
use common_pipeline_core::processors::processor::ProcessorPtr;
2626
use common_pipeline_transforms::processors::transforms::create_dummy_item;
2727
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer;
28+
use storages_common_table_meta::meta::Location;
2829
use storages_common_table_meta::meta::Statistics;
2930
use storages_common_table_meta::meta::TableSnapshot;
3031
use uuid::Uuid;
@@ -37,6 +38,7 @@ use crate::operations::merge_into::CommitSink;
3738
use crate::operations::merge_into::MergeIntoOperationAggregator;
3839
use crate::operations::merge_into::OnConflictField;
3940
use crate::operations::merge_into::TableMutationAggregator;
41+
use crate::operations::mutation::base_mutator::SegmentIndex;
4042
use crate::operations::replace_into::processor_replace_into::ReplaceIntoProcessor;
4143
use crate::pipelines::Pipeline;
4244
use crate::FuseTable;
@@ -259,28 +261,7 @@ impl FuseTable {
259261
on_conflicts: Vec<OnConflictField>,
260262
table_snapshot: &TableSnapshot,
261263
) -> Result<Vec<PipeItem>> {
262-
let segments = table_snapshot.segments.as_slice();
263-
let chunk_size = segments.len() / num_partition;
264-
// caller site should guarantee this
265-
assert!(chunk_size >= 1);
266-
267-
let mut chunks = vec![];
268-
let mut chunk = vec![];
269-
for (segment_idx, segment_location) in segments.iter().enumerate() {
270-
chunk.push((segment_idx, segment_location.clone()));
271-
if (segment_idx + 1) % chunk_size == 0 {
272-
chunks.push(std::mem::take(&mut chunk))
273-
}
274-
}
275-
276-
if !chunk.is_empty() {
277-
if chunks.len() == num_partition {
278-
chunks.last_mut().unwrap().append(&mut chunk);
279-
} else {
280-
chunks.push(std::mem::take(&mut chunk))
281-
}
282-
}
283-
264+
let chunks = Self::partition_segments(&table_snapshot.segments, num_partition);
284265
let read_settings = ReadSettings::from_ctx(&ctx)?;
285266
let mut items = vec![];
286267
for chunk_of_segment_locations in chunks {
@@ -299,6 +280,28 @@ impl FuseTable {
299280
Ok(items)
300281
}
301282

283+
fn partition_segments(
284+
segments: &[Location],
285+
num_partition: usize,
286+
) -> Vec<Vec<(SegmentIndex, Location)>> {
287+
let chunk_size = segments.len() / num_partition;
288+
// caller site guarantees this
289+
assert!(chunk_size >= 1);
290+
291+
let mut chunks = vec![];
292+
for (chunk_idx, chunk) in segments.chunks(chunk_size).enumerate() {
293+
let mut segment_chunk = (chunk_idx * chunk_size..)
294+
.zip(chunk.to_vec())
295+
.collect::<Vec<_>>();
296+
if chunks.len() < num_partition {
297+
chunks.push(segment_chunk);
298+
} else {
299+
chunks.last_mut().unwrap().append(&mut segment_chunk);
300+
}
301+
}
302+
chunks
303+
}
304+
302305
fn create_append_transform(&self, ctx: Arc<dyn TableContext>) -> AppendTransform {
303306
AppendTransform::try_create(
304307
ctx,
@@ -362,3 +365,42 @@ impl FuseTable {
362365
)
363366
}
364367
}
368+
369+
#[cfg(test)]
370+
mod tests {
371+
use super::*;
372+
373+
#[test]
374+
fn test_partition() -> Result<()> {
375+
use rand::Rng;
376+
377+
let mut rng = rand::thread_rng();
378+
379+
for _ in 0..100 {
380+
let number_segment: usize = rng.gen_range(1..100);
381+
382+
// do not matter, arbitrarily picked
383+
let format_version = 2;
384+
385+
let segments = (0..number_segment)
386+
.into_iter()
387+
.map(|idx| (format!("{idx}"), format_version))
388+
.collect::<Vec<_>>();
389+
390+
for _ in 0..100 {
391+
let num_partition: usize = if number_segment == 1 {
392+
1
393+
} else {
394+
rng.gen_range(1..number_segment)
395+
};
396+
397+
let chunks = FuseTable::partition_segments(&segments, num_partition);
398+
assert_eq!(chunks.len(), num_partition);
399+
for (idx, (segment_idx, _)) in chunks.clone().into_iter().flatten().enumerate() {
400+
assert_eq!(idx, segment_idx)
401+
}
402+
}
403+
}
404+
Ok(())
405+
}
406+
}

tests/sqllogictests/suites/base/09_fuse_engine/09_0023_replace_into

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,44 @@ DROP TABLE test;
323323
statement ok
324324
drop stage test_stage
325325

326+
###############################################################
327+
# issue https://github.com/datafuselabs/databend/issues/10572 #
328+
###############################################################
329+
330+
# with 5 segments and max_threads set to 3, `replace into` leads to
331+
# ERROR HY000 (1105): Code: 1068, displayText = Cannot join handle from context's runtime
332+
333+
statement ok
334+
CREATE TABLE test(a int);
335+
336+
statement ok
337+
insert into test values(1);
338+
339+
statement ok
340+
insert into test values(2);
341+
342+
statement ok
343+
insert into test values(3);
344+
345+
statement ok
346+
insert into test values(4);
347+
348+
statement ok
349+
insert into test values(5);
350+
351+
statement ok
352+
set max_threads = 3;
353+
354+
# if not fixed: ERROR HY000 (1105): Code: 1068, displayText = Cannot join handle from context's runtime
355+
statement ok
356+
replace into test on(a) values(6);
357+
358+
statement ok
359+
drop table test
360+
361+
362+
#####################
363+
326364

327365
statement ok
328366
DROP DATABASE db_09_0023

0 commit comments

Comments
 (0)