@@ -33,6 +33,7 @@ use common_catalog::table_context::StageAttachment;
3333use common_catalog:: table_context:: TableContext ;
3434use common_exception:: ErrorCode ;
3535use common_exception:: Result ;
36+ use common_expression:: BlockThresholds ;
3637use common_expression:: DataBlock ;
3738use common_expression:: FunctionContext ;
3839use common_io:: prelude:: FormatSettings ;
@@ -71,15 +72,22 @@ use common_settings::ChangeValue;
7172use common_settings:: Settings ;
7273use common_storage:: DataOperator ;
7374use common_storage:: StageFileInfo ;
75+ use common_storages_fuse:: io:: SegmentWriter ;
76+ use common_storages_fuse:: io:: TableMetaLocationGenerator ;
7477use common_storages_fuse:: operations:: AppendOperationLogEntry ;
78+ use common_storages_fuse:: statistics:: reducers:: reduce_block_metas;
7579use common_storages_fuse:: FuseTable ;
7680use common_storages_fuse:: FUSE_TBL_SNAPSHOT_PREFIX ;
7781use databend_query:: sessions:: QueryContext ;
7882use futures:: TryStreamExt ;
83+ use rand:: thread_rng;
84+ use rand:: Rng ;
7985use storages_common_table_meta:: meta:: SegmentInfo ;
8086use storages_common_table_meta:: meta:: Statistics ;
8187use walkdir:: WalkDir ;
8288
89+ use crate :: storages:: fuse:: block_writer:: BlockWriter ;
90+ use crate :: storages:: fuse:: operations:: mutation:: CompactSegmentTestFixture ;
8391use crate :: storages:: fuse:: table_test_fixture:: execute_query;
8492use crate :: storages:: fuse:: table_test_fixture:: TestFixture ;
8593
@@ -301,6 +309,48 @@ async fn test_abort_on_error() -> Result<()> {
301309 Ok ( ( ) )
302310}
303311
312+ #[ tokio:: test( flavor = "multi_thread" ) ]
313+ async fn test_merge_segments ( ) -> common_exception:: Result < ( ) > {
314+ let fixture = TestFixture :: new ( ) . await ;
315+ let ctx = fixture. ctx ( ) ;
316+
317+ let operator = ctx. get_data_operator ( ) ?. operator ( ) ;
318+ let data_accessor = operator. clone ( ) ;
319+ let location_gen = TableMetaLocationGenerator :: with_prefix ( "test/" . to_owned ( ) ) ;
320+ let block_writer = BlockWriter :: new ( & data_accessor, & location_gen) ;
321+ let segment_writer = SegmentWriter :: new ( & data_accessor, & location_gen) ;
322+
323+ let mut rand = thread_rng ( ) ;
324+ let number_of_segments: usize = rand. gen_range ( 1 ..10 ) ;
325+ let mut block_number_of_segments = Vec :: with_capacity ( number_of_segments) ;
326+ let mut rows_per_blocks = Vec :: with_capacity ( number_of_segments) ;
327+ for _ in 0 ..number_of_segments {
328+ block_number_of_segments. push ( rand. gen_range ( 10 ..30 ) ) ;
329+ rows_per_blocks. push ( rand. gen_range ( 1 ..8 ) ) ;
330+ }
331+
332+ let threshold = BlockThresholds {
333+ max_rows_per_block : 5 ,
334+ min_rows_per_block : 4 ,
335+ max_bytes_per_block : 1024 ,
336+ } ;
337+
338+ let ( locations, block_metas, segment_infos) = CompactSegmentTestFixture :: gen_segments (
339+ & block_writer,
340+ & segment_writer,
341+ & block_number_of_segments,
342+ & rows_per_blocks,
343+ threshold,
344+ )
345+ . await ?;
346+
347+ let expect = reduce_block_metas ( & block_metas, threshold) ?;
348+ let iter = locations. iter ( ) . zip ( segment_infos. iter ( ) ) ;
349+ let ( _, results) = FuseTable :: merge_segments ( iter) ?;
350+ assert_eq ! ( expect, results) ;
351+ Ok ( ( ) )
352+ }
353+
304354struct CtxDelegation {
305355 ctx : Arc < dyn TableContext > ,
306356 catalog : Arc < FakedCatalog > ,
0 commit comments