@@ -1104,7 +1104,7 @@ impl ClusterSendExec {
11041104 ps
11051105 }
11061106
1107- fn issue_filters ( ps : & [ IdRow < Partition > ] ) -> Vec < ( u64 , RowRange ) > {
1107+ fn issue_filters ( ps : & [ IdRow < Partition > ] ) -> Vec < ( IdRow < Partition > , RowRange ) > {
11081108 if ps. is_empty ( ) {
11091109 return Vec :: new ( ) ;
11101110 }
@@ -1114,7 +1114,7 @@ impl ClusterSendExec {
11141114 if multi_id. is_none ( ) {
11151115 return ps
11161116 . iter ( )
1117- . map ( |p| ( p. get_id ( ) , RowRange :: default ( ) ) )
1117+ . map ( |p| ( p. clone ( ) , RowRange :: default ( ) ) )
11181118 . collect ( ) ;
11191119 }
11201120 let filter = RowRange {
@@ -1129,7 +1129,7 @@ impl ClusterSendExec {
11291129 } else {
11301130 filter. clone ( )
11311131 } ;
1132- r. push ( ( p. get_id ( ) , pf) )
1132+ r. push ( ( p. clone ( ) , pf) )
11331133 }
11341134 r
11351135 }
@@ -1138,7 +1138,8 @@ impl ClusterSendExec {
11381138 c : & dyn ConfigObj ,
11391139 logical : Vec < Vec < InlineCompoundPartition > > ,
11401140 ) -> Vec < ( String , ( Vec < ( u64 , RowRange ) > , Vec < InlineTableId > ) ) > {
1141- let mut m: HashMap < _ , ( Vec < ( u64 , RowRange ) > , Vec < InlineTableId > ) > = HashMap :: new ( ) ;
1141+ let mut m: HashMap < _ , ( Vec < ( IdRow < Partition > , RowRange ) > , Vec < InlineTableId > ) > =
1142+ HashMap :: new ( ) ;
11421143 for ps in & logical {
11431144 let inline_table_ids = ps
11441145 . iter ( )
@@ -1178,7 +1179,64 @@ impl ClusterSendExec {
11781179
11791180 let mut r = m. into_iter ( ) . collect_vec ( ) ;
11801181 r. sort_unstable_by ( |l, r| l. 0 . cmp ( & r. 0 ) ) ;
1181- r
1182+ r. into_iter ( )
1183+ . map ( |( worker, data) | {
1184+ let splitted = Self :: split_worker_parititons ( c, data) ;
1185+ splitted. into_iter ( ) . map ( move |data| ( worker. clone ( ) , data) )
1186+ } )
1187+ . flatten ( )
1188+ . collect_vec ( )
1189+ }
1190+
1191+ fn split_worker_parititons (
1192+ c : & dyn ConfigObj ,
1193+ partitions : ( Vec < ( IdRow < Partition > , RowRange ) > , Vec < InlineTableId > ) ,
1194+ ) -> Vec < ( Vec < ( u64 , RowRange ) > , Vec < InlineTableId > ) > {
1195+ if !partitions. 1 . is_empty ( )
1196+ || partitions
1197+ . 0
1198+ . iter ( )
1199+ . any ( |( p, _) | p. get_row ( ) . multi_partition_id ( ) . is_some ( ) )
1200+ {
1201+ return vec ! [ (
1202+ partitions
1203+ . 0
1204+ . into_iter( )
1205+ . map( |( p, range) | ( p. id, range) )
1206+ . collect_vec( ) ,
1207+ partitions. 1 ,
1208+ ) ] ;
1209+ }
1210+ let rows_split_threshold = c. partition_split_threshold ( ) * c. cluster_send_split_threshold ( ) ;
1211+ let file_size_split_threshold =
1212+ c. partition_size_split_threshold_bytes ( ) * c. cluster_send_split_threshold ( ) ;
1213+ let mut result = vec ! [ ] ;
1214+ let mut current_rows = 0 ;
1215+ let mut current_files_size = 0 ;
1216+ let mut current_chunk = vec ! [ ] ;
1217+ let ( partitions, _) = partitions;
1218+ for ( partition, range) in partitions {
1219+ let rows = partition. get_row ( ) . main_table_row_count ( ) ;
1220+ let file_size = partition. get_row ( ) . file_size ( ) . unwrap_or_default ( ) ;
1221+ if current_rows + rows > rows_split_threshold
1222+ || current_files_size + file_size > file_size_split_threshold
1223+ {
1224+ if !current_chunk. is_empty ( ) {
1225+ result. push ( ( std:: mem:: take ( & mut current_chunk) , vec ! [ ] ) ) ;
1226+ current_rows = 0 ;
1227+ current_files_size = 0 ;
1228+ }
1229+ }
1230+
1231+ current_rows += rows;
1232+ current_files_size += file_size;
1233+ current_chunk. push ( ( partition. id , range) ) ;
1234+ }
1235+ if !current_chunk. is_empty ( ) {
1236+ result. push ( ( current_chunk, vec ! [ ] ) ) ;
1237+ }
1238+
1239+ result
11821240 }
11831241
11841242 pub fn with_changed_schema (
0 commit comments