12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
+ use std:: collections:: HashMap ;
15
16
use std:: collections:: VecDeque ;
16
17
use std:: sync:: Arc ;
17
18
use std:: time:: Instant ;
@@ -21,8 +22,10 @@ use databend_common_catalog::table::Table;
21
22
use databend_common_catalog:: table_context:: TableContext ;
22
23
use databend_common_exception:: Result ;
23
24
use databend_common_expression:: BlockMetaInfoDowncast ;
25
+ use databend_common_expression:: ComputedExpr ;
24
26
use databend_common_expression:: DataBlock ;
25
27
use databend_common_expression:: TableDataType ;
28
+ use databend_common_expression:: TableSchema ;
26
29
use databend_common_metrics:: storage:: metrics_inc_block_virtual_column_write_bytes;
27
30
use databend_common_metrics:: storage:: metrics_inc_block_virtual_column_write_milliseconds;
28
31
use databend_common_metrics:: storage:: metrics_inc_block_virtual_column_write_nums;
@@ -31,18 +34,26 @@ use databend_common_pipeline_sources::AsyncSource;
31
34
use databend_common_pipeline_sources:: AsyncSourcer ;
32
35
use databend_common_pipeline_transforms:: processors:: AsyncTransform ;
33
36
use databend_common_pipeline_transforms:: processors:: TransformPipelineHelper ;
37
+ use databend_common_sql:: executor:: physical_plans:: MutationKind ;
34
38
use databend_common_storages_fuse:: io:: write_data;
35
39
use databend_common_storages_fuse:: io:: BlockReader ;
36
40
use databend_common_storages_fuse:: io:: MetaReaders ;
37
41
use databend_common_storages_fuse:: io:: VirtualColumnBuilder ;
38
42
use databend_common_storages_fuse:: io:: WriteSettings ;
43
+ use databend_common_storages_fuse:: operations:: BlockMetaIndex ;
44
+ use databend_common_storages_fuse:: operations:: CommitSink ;
45
+ use databend_common_storages_fuse:: operations:: MutationGenerator ;
46
+ use databend_common_storages_fuse:: operations:: MutationLogEntry ;
47
+ use databend_common_storages_fuse:: operations:: MutationLogs ;
48
+ use databend_common_storages_fuse:: operations:: TableMutationAggregator ;
39
49
use databend_common_storages_fuse:: FuseStorageFormat ;
40
50
use databend_common_storages_fuse:: FuseTable ;
41
51
use databend_storages_common_cache:: LoadParams ;
42
52
use databend_storages_common_io:: ReadSettings ;
43
53
use databend_storages_common_table_meta:: meta:: BlockMeta ;
44
54
use databend_storages_common_table_meta:: meta:: ExtendedBlockMeta ;
45
55
use databend_storages_common_table_meta:: meta:: Location ;
56
+ use databend_storages_common_table_meta:: meta:: Statistics ;
46
57
use opendal:: Operator ;
47
58
48
59
// The big picture of refresh virtual column into pipeline:
@@ -63,38 +74,33 @@ use opendal::Operator;
63
74
pub async fn do_refresh_virtual_column (
64
75
ctx : Arc < dyn TableContext > ,
65
76
fuse_table : & FuseTable ,
66
- segment_locs : Option < Vec < Location > > ,
67
77
pipeline : & mut Pipeline ,
68
78
) -> Result < ( ) > {
69
- let snapshot_opt = fuse_table. read_table_snapshot ( ) . await ?;
70
- let snapshot = if let Some ( val) = snapshot_opt {
71
- val
72
- } else {
79
+ let Some ( snapshot) = fuse_table. read_table_snapshot ( ) . await ? else {
73
80
// no snapshot
74
81
return Ok ( ( ) ) ;
75
82
} ;
76
-
77
83
let table_schema = & fuse_table. get_table_info ( ) . meta . schema ;
78
84
79
85
// Collect source fields used by virtual columns.
86
+ let mut fields = Vec :: new ( ) ;
80
87
let mut field_indices = Vec :: new ( ) ;
81
88
for ( i, f) in table_schema. fields ( ) . iter ( ) . enumerate ( ) {
82
- if f. data_type ( ) . remove_nullable ( ) != TableDataType :: Variant {
89
+ if f. data_type ( ) . remove_nullable ( ) != TableDataType :: Variant
90
+ || matches ! ( f. computed_expr( ) , Some ( ComputedExpr :: Virtual ( _) ) )
91
+ {
83
92
continue ;
84
93
}
94
+ fields. push ( f. clone ( ) ) ;
85
95
field_indices. push ( i) ;
86
96
}
87
97
88
- if field_indices. is_empty ( ) {
89
- // no source variant column
90
- return Ok ( ( ) ) ;
91
- }
92
-
93
- let table_info = & fuse_table. get_table_info ( ) ;
94
- let Some ( virtual_column_builder) = VirtualColumnBuilder :: try_create ( ctx. clone ( ) , table_info)
95
- else {
96
- return Ok ( ( ) ) ;
97
- } ;
98
+ let source_schema = Arc :: new ( TableSchema {
99
+ fields,
100
+ ..fuse_table. schema ( ) . as_ref ( ) . clone ( )
101
+ } ) ;
102
+ let virtual_column_builder =
103
+ VirtualColumnBuilder :: try_create ( ctx. clone ( ) , fuse_table, source_schema) ?;
98
104
99
105
let projection = Projection :: Columns ( field_indices) ;
100
106
let block_reader =
@@ -108,16 +114,11 @@ pub async fn do_refresh_virtual_column(
108
114
109
115
let operator = fuse_table. get_operator_ref ( ) ;
110
116
111
- // If no segment locations are specified, iterates through all segments
112
- let segment_locs = if let Some ( segment_locs) = segment_locs {
113
- segment_locs
114
- } else {
115
- snapshot. segments . clone ( )
116
- } ;
117
-
118
- // Read source variant columns and extract inner fields as virtual columns.
117
+ // Iterates through all segments and collect blocks don't have virtual block meta.
118
+ let segment_locs = snapshot. segments . clone ( ) ;
119
119
let mut block_metas = VecDeque :: new ( ) ;
120
- for ( location, ver) in segment_locs {
120
+ let mut block_meta_index_map = HashMap :: new ( ) ;
121
+ for ( segment_idx, ( location, ver) ) in segment_locs. into_iter ( ) . enumerate ( ) {
121
122
let segment_info = segment_reader
122
123
. read ( & LoadParams {
123
124
location : location. to_string ( ) ,
@@ -127,10 +128,15 @@ pub async fn do_refresh_virtual_column(
127
128
} )
128
129
. await ?;
129
130
130
- for block_meta in segment_info. block_metas ( ) ? {
131
+ for ( block_idx , block_meta) in segment_info. block_metas ( ) ?. into_iter ( ) . enumerate ( ) {
131
132
if block_meta. virtual_block_meta . is_some ( ) {
132
133
continue ;
133
134
}
135
+ let index = BlockMetaIndex {
136
+ segment_idx,
137
+ block_idx,
138
+ } ;
139
+ block_meta_index_map. insert ( block_meta. location . clone ( ) , index) ;
134
140
block_metas. push_back ( block_meta) ;
135
141
}
136
142
}
@@ -139,6 +145,7 @@ pub async fn do_refresh_virtual_column(
139
145
return Ok ( ( ) ) ;
140
146
}
141
147
148
+ // Read source blocks.
142
149
let settings = ReadSettings :: from_ctx ( & ctx) ?;
143
150
pipeline. add_source (
144
151
|output| {
@@ -153,31 +160,52 @@ pub async fn do_refresh_virtual_column(
153
160
1 ,
154
161
) ?;
155
162
163
+ // Extract inner fields as virtual columns and write virtual block data.
156
164
let block_nums = block_metas. len ( ) ;
157
165
let max_threads = ctx. get_settings ( ) . get_max_threads ( ) ? as usize ;
158
166
let max_threads = std:: cmp:: min ( block_nums, max_threads) ;
159
167
pipeline. try_resize ( max_threads) ?;
160
168
pipeline. add_async_transformer ( || {
161
169
VirtualColumnTransform :: new (
170
+ operator. clone ( ) ,
162
171
write_settings. clone ( ) ,
172
+ block_meta_index_map. clone ( ) ,
163
173
virtual_column_builder. clone ( ) ,
164
- operator. clone ( ) ,
165
174
)
166
175
} ) ;
167
176
168
- let base_snapshot = fuse_table. read_table_snapshot ( ) . await ?;
169
- let table_meta_timestamps = ctx. get_table_meta_timestamps ( fuse_table, base_snapshot. clone ( ) ) ?;
170
-
171
- fuse_table. do_commit (
172
- ctx,
173
- pipeline,
174
- None ,
175
- vec ! [ ] ,
176
- false ,
177
- None ,
178
- None ,
179
- table_meta_timestamps,
180
- ) ?;
177
+ pipeline. try_resize ( 1 ) ?;
178
+ let table_meta_timestamps =
179
+ ctx. get_table_meta_timestamps ( fuse_table, Some ( snapshot. clone ( ) ) ) ?;
180
+ pipeline. add_async_accumulating_transformer ( || {
181
+ TableMutationAggregator :: create (
182
+ fuse_table,
183
+ ctx. clone ( ) ,
184
+ vec ! [ ] ,
185
+ vec ! [ ] ,
186
+ vec ! [ ] ,
187
+ Statistics :: default ( ) ,
188
+ MutationKind :: Update ,
189
+ table_meta_timestamps,
190
+ )
191
+ } ) ;
192
+
193
+ let prev_snapshot_id = snapshot. snapshot_id ;
194
+ let snapshot_gen = MutationGenerator :: new ( Some ( snapshot) , MutationKind :: Update ) ;
195
+ pipeline. add_sink ( |input| {
196
+ CommitSink :: try_create (
197
+ fuse_table,
198
+ ctx. clone ( ) ,
199
+ None ,
200
+ vec ! [ ] ,
201
+ snapshot_gen. clone ( ) ,
202
+ input,
203
+ None ,
204
+ Some ( prev_snapshot_id) ,
205
+ None ,
206
+ table_meta_timestamps,
207
+ )
208
+ } ) ?;
181
209
182
210
Ok ( ( ) )
183
211
}
@@ -238,21 +266,24 @@ impl AsyncSource for VirtualColumnSource {
238
266
239
267
/// `VirtualColumnTransform` is used to generate virtual columns for each blocks.
240
268
pub struct VirtualColumnTransform {
269
+ operator : Operator ,
241
270
write_settings : WriteSettings ,
271
+ block_meta_index_map : HashMap < Location , BlockMetaIndex > ,
242
272
virtual_column_builder : VirtualColumnBuilder ,
243
- operator : Operator ,
244
273
}
245
274
246
275
impl VirtualColumnTransform {
247
276
pub fn new (
277
+ operator : Operator ,
248
278
write_settings : WriteSettings ,
279
+ block_meta_index_map : HashMap < Location , BlockMetaIndex > ,
249
280
virtual_column_builder : VirtualColumnBuilder ,
250
- operator : Operator ,
251
281
) -> Self {
252
282
Self {
283
+ operator,
253
284
write_settings,
285
+ block_meta_index_map,
254
286
virtual_column_builder,
255
- operator,
256
287
}
257
288
}
258
289
}
@@ -301,13 +332,23 @@ impl AsyncTransform for VirtualColumnTransform {
301
332
}
302
333
}
303
334
335
+ let block_meta_index = self
336
+ . block_meta_index_map
337
+ . remove ( & block_meta. location )
338
+ . unwrap ( ) ;
304
339
let extended_block_meta = ExtendedBlockMeta {
305
340
block_meta : block_meta. clone ( ) ,
306
341
draft_virtual_block_meta : Some ( virtual_column_state. draft_virtual_block_meta ) ,
307
342
} ;
308
343
309
- let new_block = DataBlock :: new ( vec ! [ ] , 0 ) ;
310
- let new_block = new_block. add_meta ( Some ( extended_block_meta. boxed ( ) ) ) ?;
344
+ let entry = MutationLogEntry :: ReplacedBlock {
345
+ index : block_meta_index,
346
+ block_meta : Arc :: new ( extended_block_meta) ,
347
+ } ;
348
+ let meta = MutationLogs {
349
+ entries : vec ! [ entry] ,
350
+ } ;
351
+ let new_block = DataBlock :: empty_with_meta ( Box :: new ( meta) ) ;
311
352
Ok ( new_block)
312
353
}
313
354
}
0 commit comments