@@ -20,19 +20,21 @@ use databend_common_catalog::table_context::TableContext;
20
20
use databend_common_exception:: Result ;
21
21
use databend_common_pipeline_core:: processors:: ProcessorPtr ;
22
22
use databend_common_pipeline_transforms:: MemorySettings ;
23
+ use databend_common_pipeline_transforms:: TransformPipelineHelper ;
23
24
use databend_common_sql:: executor:: physical_plans:: HilbertPartition ;
24
25
use databend_common_sql:: executor:: physical_plans:: MutationKind ;
26
+ use databend_common_storages_fuse:: io:: StreamBlockProperties ;
25
27
use databend_common_storages_fuse:: operations:: TransformBlockWriter ;
26
28
use databend_common_storages_fuse:: operations:: TransformSerializeBlock ;
27
29
use databend_common_storages_fuse:: statistics:: ClusterStatsGenerator ;
28
30
use databend_common_storages_fuse:: FuseTable ;
29
31
use databend_storages_common_cache:: TempDirManager ;
30
32
31
33
use crate :: pipelines:: memory_settings:: MemorySettingsExt ;
32
- use crate :: pipelines:: processors:: transforms:: CompactStrategy ;
33
- use crate :: pipelines:: processors:: transforms:: HilbertPartitionExchange ;
34
- use crate :: pipelines:: processors:: transforms:: TransformHilbertCollect ;
35
- use crate :: pipelines:: processors:: transforms:: TransformWindowPartitionCollect ;
34
+ use crate :: pipelines:: processors:: transforms:: CompactPartitionStrategy ;
35
+ use crate :: pipelines:: processors:: transforms:: ReclusterPartitionExchange ;
36
+ use crate :: pipelines:: processors:: transforms:: ReclusterPartitionStrategy ;
37
+ use crate :: pipelines:: processors:: transforms:: TransformPartitionCollect ;
36
38
use crate :: pipelines:: PipelineBuilder ;
37
39
use crate :: spillers:: SpillerDiskConfig ;
38
40
@@ -49,7 +51,7 @@ impl PipelineBuilder {
49
51
50
52
self . main_pipeline . exchange (
51
53
num_processors,
52
- HilbertPartitionExchange :: create ( partition. range_start , partition. range_width ) ,
54
+ ReclusterPartitionExchange :: create ( partition. range_start , partition. range_width ) ,
53
55
) ;
54
56
55
57
let settings = self . ctx . get_settings ( ) ;
@@ -66,9 +68,15 @@ impl PipelineBuilder {
66
68
let processor_id = AtomicUsize :: new ( 0 ) ;
67
69
68
70
if enable_stream_writer {
71
+ let properties = StreamBlockProperties :: try_create (
72
+ self . ctx . clone ( ) ,
73
+ table,
74
+ partition. table_meta_timestamps ,
75
+ ) ?;
76
+
69
77
self . main_pipeline . add_transform ( |input, output| {
70
78
Ok ( ProcessorPtr :: create ( Box :: new (
71
- TransformHilbertCollect :: new (
79
+ TransformPartitionCollect :: new (
72
80
self . ctx . clone ( ) ,
73
81
input,
74
82
output,
@@ -78,28 +86,24 @@ impl PipelineBuilder {
78
86
partition. range_width ,
79
87
window_spill_settings. clone ( ) ,
80
88
disk_spill. clone ( ) ,
81
- partition. rows_per_block ,
82
- partition. bytes_per_block ,
89
+ ReclusterPartitionStrategy :: new ( properties. clone ( ) ) ,
83
90
) ?,
84
91
) ) )
85
92
} ) ?;
86
93
87
- self . main_pipeline . add_transform ( |input , output | {
88
- TransformBlockWriter :: try_create (
94
+ self . main_pipeline . add_async_accumulating_transformer ( | | {
95
+ TransformBlockWriter :: create (
89
96
self . ctx . clone ( ) ,
90
- input,
91
- output,
92
97
MutationKind :: Recluster ,
93
98
table,
94
- partition. table_meta_timestamps ,
95
99
false ,
96
- Some ( partition. bytes_per_block ) ,
97
100
)
98
- } )
101
+ } ) ;
102
+ Ok ( ( ) )
99
103
} else {
100
104
self . main_pipeline . add_transform ( |input, output| {
101
105
Ok ( ProcessorPtr :: create ( Box :: new (
102
- TransformWindowPartitionCollect :: new (
106
+ TransformPartitionCollect :: new (
103
107
self . ctx . clone ( ) ,
104
108
input,
105
109
output,
@@ -109,24 +113,26 @@ impl PipelineBuilder {
109
113
partition. range_width ,
110
114
window_spill_settings. clone ( ) ,
111
115
disk_spill. clone ( ) ,
112
- CompactStrategy :: new ( partition. rows_per_block , partition. bytes_per_block ) ,
116
+ CompactPartitionStrategy :: new (
117
+ partition. rows_per_block ,
118
+ partition. bytes_per_block ,
119
+ ) ,
113
120
) ?,
114
121
) ) )
115
122
} ) ?;
116
123
117
- self . main_pipeline
118
- . add_transform ( |transform_input_port, transform_output_port| {
119
- let proc = TransformSerializeBlock :: try_create (
120
- self . ctx . clone ( ) ,
121
- transform_input_port,
122
- transform_output_port,
123
- table,
124
- ClusterStatsGenerator :: default ( ) ,
125
- MutationKind :: Recluster ,
126
- partition. table_meta_timestamps ,
127
- ) ?;
128
- proc. into_processor ( )
129
- } )
124
+ self . main_pipeline . add_transform ( |input, output| {
125
+ let proc = TransformSerializeBlock :: try_create (
126
+ self . ctx . clone ( ) ,
127
+ input,
128
+ output,
129
+ table,
130
+ ClusterStatsGenerator :: default ( ) ,
131
+ MutationKind :: Recluster ,
132
+ partition. table_meta_timestamps ,
133
+ ) ?;
134
+ proc. into_processor ( )
135
+ } )
130
136
}
131
137
}
132
138
}
0 commit comments