4343
4444namespace doris ::vectorized {
4545
46- std::pair<vectorized::VExprContextSPtrs, vectorized::VExprSPtrs>
47- VRowDistribution::_get_partition_function () {
46+ std::pair<VExprContextSPtrs, VExprSPtrs> VRowDistribution::_get_partition_function () {
4847 return {_vpartition->get_part_func_ctx (), _vpartition->get_partition_function ()};
4948}
5049
5150Status VRowDistribution::_save_missing_values (
51+ const Block& input_block,
5252 std::vector<std::vector<std::string>>& col_strs, // non-const ref for move
5353 int col_size, Block* block, std::vector<int64_t > filter,
5454 const std::vector<const NullMap*>& col_null_maps) {
5555 // de-duplication for new partitions but save all rows.
56- RETURN_IF_ERROR (_batching_block->add_rows (block , filter));
56+ RETURN_IF_ERROR (_batching_block->add_rows (&input_block , filter));
5757 std::vector<TNullableStringLiteral> cur_row_values;
5858 for (int row = 0 ; row < col_strs[0 ].size (); ++row) {
5959 cur_row_values.clear ();
@@ -78,7 +78,7 @@ Status VRowDistribution::_save_missing_values(
7878 if (_batching_block->rows () > _batch_size) {
7979 _deal_batched = true ;
8080 }
81-
81+ _batching_rows = _batching_block-> rows ();
8282 VLOG_NOTICE << " pushed some batching lines, now numbers = " << _batching_rows;
8383
8484 return Status::OK ();
@@ -205,7 +205,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
205205 return status;
206206}
207207
208- void VRowDistribution::_get_tablet_ids (vectorized:: Block* block, int32_t index_idx,
208+ void VRowDistribution::_get_tablet_ids (Block* block, int32_t index_idx,
209209 std::vector<int64_t >& tablet_ids) {
210210 tablet_ids.reserve (block->rows ());
211211 for (int row_idx = 0 ; row_idx < block->rows (); row_idx++) {
@@ -221,8 +221,7 @@ void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_i
221221 }
222222}
223223
224- void VRowDistribution::_filter_block_by_skip (vectorized::Block* block,
225- RowPartTabletIds& row_part_tablet_id) {
224+ void VRowDistribution::_filter_block_by_skip (Block* block, RowPartTabletIds& row_part_tablet_id) {
226225 auto & row_ids = row_part_tablet_id.row_ids ;
227226 auto & partition_ids = row_part_tablet_id.partition_ids ;
228227 auto & tablet_ids = row_part_tablet_id.tablet_ids ;
@@ -237,8 +236,7 @@ void VRowDistribution::_filter_block_by_skip(vectorized::Block* block,
237236}
238237
239238Status VRowDistribution::_filter_block_by_skip_and_where_clause (
240- vectorized::Block* block, const vectorized::VExprContextSPtr& where_clause,
241- RowPartTabletIds& row_part_tablet_id) {
239+ Block* block, const VExprContextSPtr& where_clause, RowPartTabletIds& row_part_tablet_id) {
242240 // TODO
243241 // SCOPED_RAW_TIMER(&_stat.where_clause_ns);
244242 int result_index = -1 ;
@@ -250,25 +248,23 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
250248 auto & row_ids = row_part_tablet_id.row_ids ;
251249 auto & partition_ids = row_part_tablet_id.partition_ids ;
252250 auto & tablet_ids = row_part_tablet_id.tablet_ids ;
253- if (const auto * nullable_column =
254- vectorized::check_and_get_column<vectorized::ColumnNullable>(*filter_column)) {
251+ if (const auto * nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
255252 for (size_t i = 0 ; i < block->rows (); i++) {
256253 if (nullable_column->get_bool_inline (i) && !_skip[i]) {
257254 row_ids.emplace_back (i);
258255 partition_ids.emplace_back (_partitions[i]->id );
259256 tablet_ids.emplace_back (_tablet_ids[i]);
260257 }
261258 }
262- } else if (const auto * const_column =
263- vectorized::check_and_get_column<vectorized::ColumnConst>(*filter_column)) {
259+ } else if (const auto * const_column = check_and_get_column<ColumnConst>(*filter_column)) {
264260 bool ret = const_column->get_bool (0 );
265261 if (!ret) {
266262 return Status::OK ();
267263 }
268264 // should we optimize?
269265 _filter_block_by_skip (block, row_part_tablet_id);
270266 } else {
271- const auto & filter = assert_cast<const vectorized:: ColumnUInt8&>(*filter_column).get_data ();
267+ const auto & filter = assert_cast<const ColumnUInt8&>(*filter_column).get_data ();
272268 for (size_t i = 0 ; i < block->rows (); i++) {
273269 if (filter[i] != 0 && !_skip[i]) {
274270 row_ids.emplace_back (i);
@@ -284,7 +280,7 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
284280 return Status::OK ();
285281}
286282
287- Status VRowDistribution::_filter_block (vectorized:: Block* block,
283+ Status VRowDistribution::_filter_block (Block* block,
288284 std::vector<RowPartTabletIds>& row_part_tablet_ids) {
289285 for (int i = 0 ; i < _schema->indexes ().size (); i++) {
290286 _get_tablet_ids (block, i, _tablet_ids);
@@ -300,8 +296,7 @@ Status VRowDistribution::_filter_block(vectorized::Block* block,
300296}
301297
302298Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition (
303- vectorized::Block* block, bool has_filtered_rows,
304- std::vector<RowPartTabletIds>& row_part_tablet_ids) {
299+ Block* block, bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids) {
305300 auto num_rows = block->rows ();
306301
307302 RETURN_IF_ERROR (_tablet_finder->find_tablets (_state, block, num_rows, _partitions,
@@ -315,7 +310,7 @@ Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
315310 return Status::OK ();
316311}
317312
318- Status VRowDistribution::_deal_missing_map (vectorized:: Block* block,
313+ Status VRowDistribution::_deal_missing_map (const Block& input_block, Block* block,
319314 const std::vector<uint16_t >& partition_cols_idx,
320315 int64_t & rows_stat_val) {
321316 // for missing partition keys, calc the missing partition and save in _partitions_need_create
@@ -345,8 +340,8 @@ Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
345340 }
346341
347342 // calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
348- RETURN_IF_ERROR (
349- _save_missing_values (col_strs, part_col_num, block, _missing_map, col_null_maps));
343+ RETURN_IF_ERROR (_save_missing_values (input_block, col_strs, part_col_num, block, _missing_map,
344+ col_null_maps));
350345
351346 size_t new_bt_rows = _batching_block->rows ();
352347 size_t new_bt_bytes = _batching_block->bytes ();
@@ -362,7 +357,7 @@ Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
362357}
363358
364359Status VRowDistribution::_generate_rows_distribution_for_auto_partition (
365- vectorized:: Block* block, const std::vector<uint16_t >& partition_cols_idx,
360+ const Block& input_block, Block* block, const std::vector<uint16_t >& partition_cols_idx,
366361 bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
367362 int64_t & rows_stat_val) {
368363 auto num_rows = block->rows ();
@@ -384,13 +379,13 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
384379 RETURN_IF_ERROR (_filter_block (block, row_part_tablet_ids));
385380
386381 if (!_missing_map.empty ()) {
387- RETURN_IF_ERROR (_deal_missing_map (block, partition_cols_idx, rows_stat_val));
382+ RETURN_IF_ERROR (_deal_missing_map (input_block, block, partition_cols_idx, rows_stat_val));
388383 }
389384 return Status::OK ();
390385}
391386
392387Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite (
393- vectorized:: Block* block, const std::vector<uint16_t >& partition_cols_idx,
388+ const Block& input_block, Block* block, const std::vector<uint16_t >& partition_cols_idx,
394389 bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
395390 int64_t & rows_stat_val) {
396391 auto num_rows = block->rows ();
@@ -413,7 +408,8 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
413408
414409 // allow and really need to create during auto-detect-overwriting.
415410 if (!_missing_map.empty ()) {
416- RETURN_IF_ERROR (_deal_missing_map (block, partition_cols_idx, rows_stat_val));
411+ RETURN_IF_ERROR (
412+ _deal_missing_map (input_block, block, partition_cols_idx, rows_stat_val));
417413 }
418414 } else {
419415 RETURN_IF_ERROR (_tablet_finder->find_tablets (_state, block, num_rows, _partitions,
@@ -467,20 +463,19 @@ void VRowDistribution::_reset_row_part_tablet_ids(
467463}
468464
469465Status VRowDistribution::generate_rows_distribution (
470- vectorized::Block& input_block, std::shared_ptr<vectorized::Block>& block,
471- int64_t & filtered_rows, bool & has_filtered_rows,
466+ Block& input_block, std::shared_ptr<Block>& block,
472467 std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t & rows_stat_val) {
473468 auto input_rows = input_block.rows ();
474469 _reset_row_part_tablet_ids (row_part_tablet_ids, input_rows);
475470
476- int64_t prev_filtered_rows =
477- _block_convertor-> num_filtered_rows () + _tablet_finder-> num_filtered_rows ();
471+ // we store the batching block with value of `input_block`. so just do all of these again.
472+ bool has_filtered_rows = false ; // validate may filter some rows. pass for setting _skip
478473 RETURN_IF_ERROR (_block_convertor->validate_and_convert_block (
479474 _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows));
480475
481476 // batching block rows which need new partitions. deal together at finish.
482477 if (!_batching_block) [[unlikely]] {
483- std::unique_ptr<Block> tmp_block = block-> create_same_struct_block (0 );
478+ std::unique_ptr<Block> tmp_block = input_block. create_same_struct_block (0 );
484479 _batching_block = MutableBlock::create_unique (std::move (*tmp_block));
485480 }
486481
@@ -498,7 +493,7 @@ Status VRowDistribution::generate_rows_distribution(
498493 // we just calc left range here. leave right to FE to avoid dup calc.
499494 RETURN_IF_ERROR (part_funcs[i]->execute (part_ctxs[i].get (), block.get (), &result_idx));
500495
501- VLOG_DEBUG << " Partition-calculated block:" << block->dump_data (0 , 1 );
496+ VLOG_DEBUG << " Partition-calculated block:\n " << block->dump_data (0 , 1 );
502497 DCHECK (result_idx != -1 );
503498
504499 partition_cols_idx.push_back (result_idx);
@@ -511,20 +506,18 @@ Status VRowDistribution::generate_rows_distribution(
511506 Status st = Status::OK ();
512507 if (_vpartition->is_auto_detect_overwrite () && !_deal_batched) {
513508 // when overwrite, no auto create partition allowed.
514- st = _generate_rows_distribution_for_auto_overwrite (block.get (), partition_cols_idx ,
515- has_filtered_rows, row_part_tablet_ids ,
516- rows_stat_val);
509+ st = _generate_rows_distribution_for_auto_overwrite (input_block, block.get (),
510+ partition_cols_idx, has_filtered_rows ,
511+ row_part_tablet_ids, rows_stat_val);
517512 } else if (_vpartition->is_auto_partition () && !_deal_batched) {
518- st = _generate_rows_distribution_for_auto_partition (block.get (), partition_cols_idx ,
519- has_filtered_rows, row_part_tablet_ids ,
520- rows_stat_val);
513+ st = _generate_rows_distribution_for_auto_partition (input_block, block.get (),
514+ partition_cols_idx, has_filtered_rows ,
515+ row_part_tablet_ids, rows_stat_val);
521516 } else { // not auto partition
522517 st = _generate_rows_distribution_for_non_auto_partition (block.get (), has_filtered_rows,
523518 row_part_tablet_ids);
524519 }
525520
526- filtered_rows = _block_convertor->num_filtered_rows () + _tablet_finder->num_filtered_rows () -
527- prev_filtered_rows;
528521 return st;
529522}
530523
0 commit comments