Skip to content

Commit 1b4d54e

Browse files
authored
refactor: move segment generation into TableMutationAggregator for MERGE INTO (#17800)
feat: fix merge into get too many segments
1 parent 882f464 commit 1b4d54e

26 files changed

+106
-282
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;
2-
set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;
3-
set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;
1+
merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;
2+
merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;
3+
merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;

src/query/service/src/pipelines/builders/builder_mutation.rs

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
3434
use databend_common_sql::binder::MutationStrategy;
3535
use databend_common_sql::executor::physical_plans::Mutation;
3636
use databend_common_sql::executor::physical_plans::MutationKind;
37-
use databend_common_storages_fuse::operations::new_serialize_segment_pipe_item;
3837
use databend_common_storages_fuse::operations::TransformSerializeBlock;
3938
use databend_common_storages_fuse::operations::UnMatchedExprs;
4039
use databend_common_storages_fuse::FuseTable;
@@ -61,14 +60,6 @@ impl PipelineBuilder {
6160
let io_request_semaphore =
6261
Arc::new(Semaphore::new(self.settings.get_max_threads()? as usize));
6362

64-
let serialize_segment_transform = new_serialize_segment_pipe_item(
65-
InputPort::create(),
66-
OutputPort::create(),
67-
table,
68-
block_thresholds,
69-
merge_into.table_meta_timestamps,
70-
)?;
71-
7263
// For row_id port, create rowid_aggregate_mutator
7364
// For matched data port and unmatched port, do serialize
7465
let serialize_len = match merge_into.strategy {
@@ -131,51 +122,6 @@ impl PipelineBuilder {
131122
pipe_items,
132123
));
133124

134-
// The complete pipeline:
135-
// aggregate_mutator port aggregate_mutator port
136-
// serialize_block port0
137-
// serialize_block port1 ======> serialize_block port
138-
// .......
139-
let mut ranges = Vec::with_capacity(self.main_pipeline.output_len());
140-
// row id port
141-
let row_id_offset = if merge_into.need_match {
142-
ranges.push(vec![0]);
143-
1
144-
} else {
145-
0
146-
};
147-
148-
// Resize data ports
149-
debug_assert!(serialize_len > 0);
150-
let mut vec = Vec::with_capacity(self.main_pipeline.output_len());
151-
for idx in 0..serialize_len {
152-
vec.push(idx + row_id_offset);
153-
}
154-
ranges.push(vec);
155-
self.main_pipeline.resize_partial_one(ranges)?;
156-
157-
let pipe_items = {
158-
let mut vec = Vec::with_capacity(2);
159-
if merge_into.need_match {
160-
// row_id port
161-
vec.push(create_dummy_item());
162-
}
163-
// data port
164-
vec.push(serialize_segment_transform);
165-
vec
166-
};
167-
168-
// The complete pipeline:
169-
// output_port0: MutationLogs(row_id)
170-
// output_port1: MutationLogs(data)
171-
// 1. FullOperation and MatchedOnly: same as above
172-
// 2. InsertOnly: no output_port0
173-
let output_len = pipe_items.iter().map(|item| item.outputs_port.len()).sum();
174-
self.main_pipeline.add_pipe(Pipe::create(
175-
self.main_pipeline.output_len(),
176-
output_len,
177-
pipe_items,
178-
));
179125
Ok(())
180126
}
181127

src/query/service/src/pipelines/builders/builder_replace_into.rs

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ use databend_common_sql::BindContext;
4949
use databend_common_sql::Metadata;
5050
use databend_common_sql::MetadataRef;
5151
use databend_common_sql::NameResolutionContext;
52-
use databend_common_storages_fuse::operations::new_serialize_segment_pipe_item;
5352
use databend_common_storages_fuse::operations::BroadcastProcessor;
5453
use databend_common_storages_fuse::operations::ReplaceIntoProcessor;
5554
use databend_common_storages_fuse::operations::TransformSerializeBlock;
@@ -134,13 +133,6 @@ impl PipelineBuilder {
134133
let mut block_builder = serialize_block_transform.get_block_builder();
135134
block_builder.source_schema = table.schema_with_stream();
136135

137-
let serialize_segment_transform = new_serialize_segment_pipe_item(
138-
InputPort::create(),
139-
OutputPort::create(),
140-
table,
141-
*block_thresholds,
142-
replace.table_meta_timestamps,
143-
)?;
144136
if !*need_insert {
145137
if segment_partition_num == 0 {
146138
return Ok(());
@@ -204,15 +196,9 @@ impl PipelineBuilder {
204196
};
205197

206198
// 4. connect with MergeIntoOperationAggregators
207-
if segment_partition_num == 0 {
208-
let dummy_item = create_dummy_item();
209-
self.main_pipeline.add_pipe(Pipe::create(2, 2, vec![
210-
serialize_segment_transform,
211-
dummy_item,
212-
]));
213-
} else {
199+
if segment_partition_num != 0 {
214200
// ┌──────────────────┐ ┌────────────────┐
215-
// ────►│ SerializeBlock ├──────────────►│SerializeSegment
201+
// ────►│ SerializeBlock ├──────────────►│ DummyTransform
216202
// └──────────────────┘ └────────────────┘
217203
//
218204
// ┌───────────────────┐ ┌──────────────────────┐
@@ -230,7 +216,7 @@ impl PipelineBuilder {
230216
let item_size = segment_partition_num + 1;
231217
let mut pipe_items = Vec::with_capacity(item_size);
232218
// setup the dummy transform
233-
pipe_items.push(serialize_segment_transform);
219+
pipe_items.push(create_dummy_item());
234220

235221
let max_threads = self.settings.get_max_threads()?;
236222
let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize));

src/query/settings/src/settings_default.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ impl DefaultSettings {
680680
}),
681681
("enable_experimental_merge_into", DefaultSettingValue {
682682
value: UserSettingValue::UInt64(1),
683-
desc: "Enables the experimental feature for 'MERGE INTO'.",
683+
desc: "Deprecated setting",
684684
mode: SettingMode::Both,
685685
scope: SettingScope::Both,
686686
range: Some(SettingRange::Numeric(0..=1)),

src/query/settings/src/settings_getter_setter.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -554,10 +554,6 @@ impl Settings {
554554
Ok(self.try_get_u64("enable_distributed_copy_into")? != 0)
555555
}
556556

557-
pub fn get_enable_experimental_merge_into(&self) -> Result<bool> {
558-
Ok(self.try_get_u64("enable_experimental_merge_into")? != 0)
559-
}
560-
561557
pub fn get_enable_distributed_merge_into(&self) -> Result<bool> {
562558
Ok(self.try_get_u64("enable_distributed_merge_into")? != 0)
563559
}

src/query/storages/fuse/src/operations/common/meta/mutation_log.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub enum MutationLogEntry {
3939
format_version: FormatVersion,
4040
summary: Statistics,
4141
},
42-
ReclusterAppendBlock {
42+
AppendBlock {
4343
block_meta: Arc<BlockMeta>,
4444
},
4545
DeletedBlock {

src/query/storages/fuse/src/operations/common/processors/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,5 @@ pub use transform_block_writer::TransformBlockWriter;
2626
pub use transform_merge_commit_meta::TransformMergeCommitMeta;
2727
pub use transform_mutation_aggregator::TableMutationAggregator;
2828
pub use transform_serialize_block::TransformSerializeBlock;
29-
pub use transform_serialize_segment::new_serialize_segment_pipe_item;
3029
pub use transform_serialize_segment::new_serialize_segment_processor;
3130
pub use transform_serialize_segment::TransformSerializeSegment;

src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs

Lines changed: 58 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use databend_common_expression::BlockMetaInfoPtr;
2626
use databend_common_expression::BlockThresholds;
2727
use databend_common_expression::DataBlock;
2828
use databend_common_expression::TableSchemaRef;
29-
use databend_common_metrics::storage::metrics_inc_recluster_write_block_nums;
3029
use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransform;
3130
use databend_common_sql::executor::physical_plans::MutationKind;
3231
use databend_storages_common_table_meta::meta::BlockMeta;
@@ -69,8 +68,7 @@ pub struct TableMutationAggregator {
6968

7069
default_cluster_key_id: Option<u32>,
7170
base_segments: Vec<Location>,
72-
// Used for recluster.
73-
recluster_merged_blocks: Vec<Arc<BlockMeta>>,
71+
merged_blocks: Vec<Arc<BlockMeta>>,
7472
set_hilbert_level: bool,
7573

7674
mutations: HashMap<SegmentIndex, BlockMutations>,
@@ -103,6 +101,7 @@ impl AsyncAccumulatingTransform for TableMutationAggregator {
103101

104102
#[async_backtrace::framed]
105103
async fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
104+
self.generate_append_segments().await?;
106105
let mut new_segment_locs = Vec::new();
107106
new_segment_locs.extend(self.appended_segments.clone());
108107

@@ -114,7 +113,35 @@ impl AsyncAccumulatingTransform for TableMutationAggregator {
114113
},
115114
self.schema.clone(),
116115
)),
117-
MutationKind::Recluster => self.apply_recluster(&mut new_segment_locs).await?,
116+
MutationKind::Recluster => {
117+
let mut new_segments = std::mem::take(&mut self.appended_segments);
118+
let new_segments_len = new_segments.len();
119+
let removed_segments_len = self.removed_segment_indexes.len();
120+
let replaced_segments_len = new_segments_len.min(removed_segments_len);
121+
let mut appended_segments = Vec::new();
122+
let mut replaced_segments = HashMap::with_capacity(replaced_segments_len);
123+
if new_segments_len > removed_segments_len {
124+
// The remain new segments will be appended.
125+
let appended = new_segments.split_off(removed_segments_len);
126+
for location in appended.into_iter().rev() {
127+
appended_segments.push(location);
128+
}
129+
}
130+
131+
for (i, location) in new_segments.into_iter().enumerate() {
132+
// The old segments will be replaced with the news.
133+
replaced_segments.insert(self.removed_segment_indexes[i], location);
134+
}
135+
136+
ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges {
137+
appended_segments,
138+
removed_segment_indexes: self.removed_segment_indexes[replaced_segments_len..]
139+
.to_vec(),
140+
replaced_segments,
141+
removed_statistics: self.removed_statistics.clone(),
142+
merged_statistics: std::mem::take(&mut self.appended_statistics),
143+
})
144+
}
118145
_ => self.apply_mutation(&mut new_segment_locs).await?,
119146
};
120147

@@ -131,7 +158,7 @@ impl TableMutationAggregator {
131158
table: &FuseTable,
132159
ctx: Arc<dyn TableContext>,
133160
base_segments: Vec<Location>,
134-
recluster_merged_blocks: Vec<Arc<BlockMeta>>,
161+
merged_blocks: Vec<Arc<BlockMeta>>,
135162
removed_segment_indexes: Vec<usize>,
136163
removed_statistics: Statistics,
137164
kind: MutationKind,
@@ -158,7 +185,7 @@ impl TableMutationAggregator {
158185
mutations: HashMap::new(),
159186
appended_segments: vec![],
160187
base_segments,
161-
recluster_merged_blocks,
188+
merged_blocks,
162189
appended_statistics: Statistics::default(),
163190
removed_segment_indexes,
164191
removed_statistics,
@@ -197,9 +224,8 @@ impl TableMutationAggregator {
197224
}
198225
}
199226
}
200-
MutationLogEntry::ReclusterAppendBlock { block_meta } => {
201-
metrics_inc_recluster_write_block_nums();
202-
self.recluster_merged_blocks.push(block_meta);
227+
MutationLogEntry::AppendBlock { block_meta } => {
228+
self.merged_blocks.push(block_meta);
203229
}
204230
MutationLogEntry::DeletedBlock { index } => {
205231
self.mutations
@@ -215,7 +241,6 @@ impl TableMutationAggregator {
215241
self.default_cluster_key_id,
216242
);
217243
}
218-
MutationLogEntry::DoNothing => (),
219244
MutationLogEntry::AppendSegment {
220245
segment_location,
221246
format_version,
@@ -251,25 +276,26 @@ impl TableMutationAggregator {
251276
self.default_cluster_key_id,
252277
);
253278
}
279+
MutationLogEntry::DoNothing => (),
254280
}
255281
}
256282

257-
async fn apply_recluster(
258-
&mut self,
259-
new_segment_locs: &mut Vec<Location>,
260-
) -> Result<ConflictResolveContext> {
261-
// safe to unwrap.
262-
let default_cluster_key_id = self.default_cluster_key_id.unwrap();
263-
// sort ascending.
264-
self.recluster_merged_blocks.sort_by(|a, b| {
265-
sort_by_cluster_stats(&a.cluster_stats, &b.cluster_stats, default_cluster_key_id)
266-
});
283+
async fn generate_append_segments(&mut self) -> Result<()> {
284+
if self.merged_blocks.is_empty() {
285+
return Ok(());
286+
}
287+
288+
if let Some(id) = self.default_cluster_key_id {
289+
// sort ascending.
290+
self.merged_blocks
291+
.sort_by(|a, b| sort_by_cluster_stats(&a.cluster_stats, &b.cluster_stats, id));
292+
}
267293

268294
let mut tasks = Vec::new();
269-
let merged_blocks = std::mem::take(&mut self.recluster_merged_blocks);
295+
let merged_blocks = std::mem::take(&mut self.merged_blocks);
270296
let segments_num = (merged_blocks.len() / self.thresholds.block_per_segment).max(1);
271297
let chunk_size = merged_blocks.len().div_ceil(segments_num);
272-
let default_cluster_key = Some(default_cluster_key_id);
298+
let default_cluster_key = self.default_cluster_key_id;
273299
let thresholds = self.thresholds;
274300
let set_hilbert_level = self.set_hilbert_level;
275301
let kind = self.kind;
@@ -297,8 +323,7 @@ impl TableMutationAggregator {
297323
}
298324

299325
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
300-
301-
let mut new_segments = execute_futures_in_parallel(
326+
let new_segments = execute_futures_in_parallel(
302327
tasks,
303328
threads_nums,
304329
threads_nums * 2,
@@ -308,41 +333,16 @@ impl TableMutationAggregator {
308333
.into_iter()
309334
.collect::<Result<Vec<_>>>()?;
310335

311-
let new_segments_len = new_segments.len();
312-
let removed_segments_len = self.removed_segment_indexes.len();
313-
let replaced_segments_len = new_segments_len.min(removed_segments_len);
314-
let mut merged_statistics = Statistics::default();
315-
let mut appended_segments = Vec::new();
316-
let mut replaced_segments = HashMap::with_capacity(replaced_segments_len);
317-
if new_segments_len > removed_segments_len {
318-
// The remain new segments will be appended.
319-
let appended = new_segments.split_off(removed_segments_len);
320-
for (location, stats) in appended.into_iter().rev() {
321-
let segment_loc = (location, SegmentInfo::VERSION);
322-
new_segment_locs.push(segment_loc.clone());
323-
appended_segments.push(segment_loc);
324-
merge_statistics_mut(&mut merged_statistics, &stats, self.default_cluster_key_id);
325-
}
326-
}
327-
328-
for (i, (location, stats)) in new_segments.into_iter().enumerate() {
329-
// The old segments will be replaced with the news.
330-
let segment_loc = (location, SegmentInfo::VERSION);
331-
new_segment_locs.push(segment_loc.clone());
332-
replaced_segments.insert(self.removed_segment_indexes[i], segment_loc);
333-
merge_statistics_mut(&mut merged_statistics, &stats, self.default_cluster_key_id);
336+
for (location, stats) in new_segments {
337+
merge_statistics_mut(
338+
&mut self.appended_statistics,
339+
&stats,
340+
self.default_cluster_key_id,
341+
);
342+
self.appended_segments
343+
.push((location, SegmentInfo::VERSION));
334344
}
335-
336-
let conflict_resolve_context =
337-
ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges {
338-
appended_segments,
339-
removed_segment_indexes: self.removed_segment_indexes[replaced_segments_len..]
340-
.to_vec(),
341-
replaced_segments,
342-
removed_statistics: self.removed_statistics.clone(),
343-
merged_statistics,
344-
});
345-
Ok(conflict_resolve_context)
345+
Ok(())
346346
}
347347

348348
async fn apply_mutation(

src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use databend_common_expression::BlockMetaInfoDowncast;
2424
use databend_common_expression::ComputedExpr;
2525
use databend_common_expression::DataBlock;
2626
use databend_common_expression::TableSchema;
27+
use databend_common_metrics::storage::metrics_inc_recluster_write_block_nums;
2728
use databend_common_pipeline_core::processors::Event;
2829
use databend_common_pipeline_core::processors::InputPort;
2930
use databend_common_pipeline_core::processors::OutputPort;
@@ -359,12 +360,15 @@ impl Processor for TransformSerializeBlock {
359360
}
360361
}
361362

362-
if matches!(self.kind, MutationKind::Recluster) {
363-
Self::mutation_logs(MutationLogEntry::ReclusterAppendBlock {
363+
if matches!(self.kind, MutationKind::Insert) {
364+
DataBlock::empty_with_meta(Box::new(block_meta))
365+
} else {
366+
if matches!(self.kind, MutationKind::Recluster) {
367+
metrics_inc_recluster_write_block_nums();
368+
}
369+
Self::mutation_logs(MutationLogEntry::AppendBlock {
364370
block_meta: Arc::new(block_meta),
365371
})
366-
} else {
367-
DataBlock::empty_with_meta(Box::new(block_meta))
368372
}
369373
};
370374
self.output_data = Some(mutation_log_data_block);

0 commit comments

Comments
 (0)