Skip to content

Commit 2f17d02

Browse files
committed
1
1 parent 8866161 commit 2f17d02

File tree

11 files changed

+253
-68
lines changed

11 files changed

+253
-68
lines changed

be/src/pipeline/exec/exchange_sink_operator.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -510,13 +510,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
510510
auto input_rows = block->rows();
511511

512512
if (input_rows > 0) {
513-
bool has_filtered_rows = false;
514-
int64_t filtered_rows = 0;
515513
local_state._number_input_rows += input_rows;
516514

517515
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
518-
*block, convert_block, filtered_rows, has_filtered_rows,
519-
local_state._row_part_tablet_ids, local_state._number_input_rows));
516+
*block, convert_block, local_state._row_part_tablet_ids,
517+
local_state._number_input_rows));
520518
if (local_state._row_distribution.batching_rows() > 0) {
521519
SCOPED_TIMER(local_state._send_new_partition_timer);
522520
RETURN_IF_ERROR(local_state._send_new_partition_batch(block));

be/src/vec/sink/vrow_distribution.cpp

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,17 @@
4343

4444
namespace doris::vectorized {
4545

46-
std::pair<vectorized::VExprContextSPtrs, vectorized::VExprSPtrs>
47-
VRowDistribution::_get_partition_function() {
46+
std::pair<VExprContextSPtrs, VExprSPtrs> VRowDistribution::_get_partition_function() {
4847
return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()};
4948
}
5049

5150
Status VRowDistribution::_save_missing_values(
51+
const Block& input_block,
5252
std::vector<std::vector<std::string>>& col_strs, // non-const ref for move
5353
int col_size, Block* block, std::vector<int64_t> filter,
5454
const std::vector<const NullMap*>& col_null_maps) {
5555
// de-duplication for new partitions but save all rows.
56-
RETURN_IF_ERROR(_batching_block->add_rows(block, filter));
56+
RETURN_IF_ERROR(_batching_block->add_rows(&input_block, filter));
5757
std::vector<TNullableStringLiteral> cur_row_values;
5858
for (int row = 0; row < col_strs[0].size(); ++row) {
5959
cur_row_values.clear();
@@ -78,7 +78,7 @@ Status VRowDistribution::_save_missing_values(
7878
if (_batching_block->rows() > _batch_size) {
7979
_deal_batched = true;
8080
}
81-
81+
_batching_rows = _batching_block->rows();
8282
VLOG_NOTICE << "pushed some batching lines, now numbers = " << _batching_rows;
8383

8484
return Status::OK();
@@ -205,7 +205,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
205205
return status;
206206
}
207207

208-
void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_idx,
208+
void VRowDistribution::_get_tablet_ids(Block* block, int32_t index_idx,
209209
std::vector<int64_t>& tablet_ids) {
210210
tablet_ids.reserve(block->rows());
211211
for (int row_idx = 0; row_idx < block->rows(); row_idx++) {
@@ -221,8 +221,7 @@ void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_i
221221
}
222222
}
223223

224-
void VRowDistribution::_filter_block_by_skip(vectorized::Block* block,
225-
RowPartTabletIds& row_part_tablet_id) {
224+
void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id) {
226225
auto& row_ids = row_part_tablet_id.row_ids;
227226
auto& partition_ids = row_part_tablet_id.partition_ids;
228227
auto& tablet_ids = row_part_tablet_id.tablet_ids;
@@ -237,8 +236,7 @@ void VRowDistribution::_filter_block_by_skip(vectorized::Block* block,
237236
}
238237

239238
Status VRowDistribution::_filter_block_by_skip_and_where_clause(
240-
vectorized::Block* block, const vectorized::VExprContextSPtr& where_clause,
241-
RowPartTabletIds& row_part_tablet_id) {
239+
Block* block, const VExprContextSPtr& where_clause, RowPartTabletIds& row_part_tablet_id) {
242240
// TODO
243241
//SCOPED_RAW_TIMER(&_stat.where_clause_ns);
244242
int result_index = -1;
@@ -250,25 +248,23 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
250248
auto& row_ids = row_part_tablet_id.row_ids;
251249
auto& partition_ids = row_part_tablet_id.partition_ids;
252250
auto& tablet_ids = row_part_tablet_id.tablet_ids;
253-
if (const auto* nullable_column =
254-
vectorized::check_and_get_column<vectorized::ColumnNullable>(*filter_column)) {
251+
if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
255252
for (size_t i = 0; i < block->rows(); i++) {
256253
if (nullable_column->get_bool_inline(i) && !_skip[i]) {
257254
row_ids.emplace_back(i);
258255
partition_ids.emplace_back(_partitions[i]->id);
259256
tablet_ids.emplace_back(_tablet_ids[i]);
260257
}
261258
}
262-
} else if (const auto* const_column =
263-
vectorized::check_and_get_column<vectorized::ColumnConst>(*filter_column)) {
259+
} else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
264260
bool ret = const_column->get_bool(0);
265261
if (!ret) {
266262
return Status::OK();
267263
}
268264
// should we optimize?
269265
_filter_block_by_skip(block, row_part_tablet_id);
270266
} else {
271-
const auto& filter = assert_cast<const vectorized::ColumnUInt8&>(*filter_column).get_data();
267+
const auto& filter = assert_cast<const ColumnUInt8&>(*filter_column).get_data();
272268
for (size_t i = 0; i < block->rows(); i++) {
273269
if (filter[i] != 0 && !_skip[i]) {
274270
row_ids.emplace_back(i);
@@ -284,7 +280,7 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
284280
return Status::OK();
285281
}
286282

287-
Status VRowDistribution::_filter_block(vectorized::Block* block,
283+
Status VRowDistribution::_filter_block(Block* block,
288284
std::vector<RowPartTabletIds>& row_part_tablet_ids) {
289285
for (int i = 0; i < _schema->indexes().size(); i++) {
290286
_get_tablet_ids(block, i, _tablet_ids);
@@ -300,8 +296,7 @@ Status VRowDistribution::_filter_block(vectorized::Block* block,
300296
}
301297

302298
Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
303-
vectorized::Block* block, bool has_filtered_rows,
304-
std::vector<RowPartTabletIds>& row_part_tablet_ids) {
299+
Block* block, bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids) {
305300
auto num_rows = block->rows();
306301

307302
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
@@ -315,7 +310,7 @@ Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
315310
return Status::OK();
316311
}
317312

318-
Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
313+
Status VRowDistribution::_deal_missing_map(const Block& input_block, Block* block,
319314
const std::vector<uint16_t>& partition_cols_idx,
320315
int64_t& rows_stat_val) {
321316
// for missing partition keys, calc the missing partition and save in _partitions_need_create
@@ -345,8 +340,8 @@ Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
345340
}
346341

347342
// calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
348-
RETURN_IF_ERROR(
349-
_save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps));
343+
RETURN_IF_ERROR(_save_missing_values(input_block, col_strs, part_col_num, block, _missing_map,
344+
col_null_maps));
350345

351346
size_t new_bt_rows = _batching_block->rows();
352347
size_t new_bt_bytes = _batching_block->bytes();
@@ -362,7 +357,7 @@ Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
362357
}
363358

364359
Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
365-
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
360+
const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
366361
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
367362
int64_t& rows_stat_val) {
368363
auto num_rows = block->rows();
@@ -384,13 +379,13 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
384379
RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
385380

386381
if (!_missing_map.empty()) {
387-
RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val));
382+
RETURN_IF_ERROR(_deal_missing_map(input_block, block, partition_cols_idx, rows_stat_val));
388383
}
389384
return Status::OK();
390385
}
391386

392387
Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
393-
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
388+
const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
394389
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
395390
int64_t& rows_stat_val) {
396391
auto num_rows = block->rows();
@@ -413,7 +408,8 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
413408

414409
// allow and really need to create during auto-detect-overwriting.
415410
if (!_missing_map.empty()) {
416-
RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val));
411+
RETURN_IF_ERROR(
412+
_deal_missing_map(input_block, block, partition_cols_idx, rows_stat_val));
417413
}
418414
} else {
419415
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
@@ -467,20 +463,19 @@ void VRowDistribution::_reset_row_part_tablet_ids(
467463
}
468464

469465
Status VRowDistribution::generate_rows_distribution(
470-
vectorized::Block& input_block, std::shared_ptr<vectorized::Block>& block,
471-
int64_t& filtered_rows, bool& has_filtered_rows,
466+
Block& input_block, std::shared_ptr<Block>& block,
472467
std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val) {
473468
auto input_rows = input_block.rows();
474469
_reset_row_part_tablet_ids(row_part_tablet_ids, input_rows);
475470

476-
int64_t prev_filtered_rows =
477-
_block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows();
471+
// we store the batching block with value of `input_block`. so just do all of these again.
472+
bool has_filtered_rows = false; // validate may filter some rows. pass for setting _skip
478473
RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
479474
_state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows));
480475

481476
// batching block rows which need new partitions. deal together at finish.
482477
if (!_batching_block) [[unlikely]] {
483-
std::unique_ptr<Block> tmp_block = block->create_same_struct_block(0);
478+
std::unique_ptr<Block> tmp_block = input_block.create_same_struct_block(0);
484479
_batching_block = MutableBlock::create_unique(std::move(*tmp_block));
485480
}
486481

@@ -498,7 +493,7 @@ Status VRowDistribution::generate_rows_distribution(
498493
// we just calc left range here. leave right to FE to avoid dup calc.
499494
RETURN_IF_ERROR(part_funcs[i]->execute(part_ctxs[i].get(), block.get(), &result_idx));
500495

501-
VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(0, 1);
496+
VLOG_DEBUG << "Partition-calculated block:\n" << block->dump_data(0, 1);
502497
DCHECK(result_idx != -1);
503498

504499
partition_cols_idx.push_back(result_idx);
@@ -511,20 +506,18 @@ Status VRowDistribution::generate_rows_distribution(
511506
Status st = Status::OK();
512507
if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) {
513508
// when overwrite, no auto create partition allowed.
514-
st = _generate_rows_distribution_for_auto_overwrite(block.get(), partition_cols_idx,
515-
has_filtered_rows, row_part_tablet_ids,
516-
rows_stat_val);
509+
st = _generate_rows_distribution_for_auto_overwrite(input_block, block.get(),
510+
partition_cols_idx, has_filtered_rows,
511+
row_part_tablet_ids, rows_stat_val);
517512
} else if (_vpartition->is_auto_partition() && !_deal_batched) {
518-
st = _generate_rows_distribution_for_auto_partition(block.get(), partition_cols_idx,
519-
has_filtered_rows, row_part_tablet_ids,
520-
rows_stat_val);
513+
st = _generate_rows_distribution_for_auto_partition(input_block, block.get(),
514+
partition_cols_idx, has_filtered_rows,
515+
row_part_tablet_ids, rows_stat_val);
521516
} else { // not auto partition
522517
st = _generate_rows_distribution_for_non_auto_partition(block.get(), has_filtered_rows,
523518
row_part_tablet_ids);
524519
}
525520

526-
filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() -
527-
prev_filtered_rows;
528521
return st;
529522
}
530523

be/src/vec/sink/vrow_distribution.h

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -127,55 +127,55 @@ class VRowDistribution {
127127
// mv where clause
128128
// v1 needs index->node->row_ids - tabletids
129129
// v2 needs index,tablet->rowids
130-
Status generate_rows_distribution(vectorized::Block& input_block,
131-
std::shared_ptr<vectorized::Block>& block,
132-
int64_t& filtered_rows, bool& has_filtered_rows,
130+
Status generate_rows_distribution(Block& input_block, std::shared_ptr<Block>& block,
133131
std::vector<RowPartTabletIds>& row_part_tablet_ids,
134132
int64_t& rows_stat_val);
133+
// have 2 ways remind to deal batching block:
134+
// 1. in row_distribution, _batching_rows reaches the threshold, this class set _deal_batched = true.
135+
// 2. in caller, after last block and before close, set _deal_batched = true.
135136
bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; }
136137
size_t batching_rows() const { return _batching_rows; }
137138
// create partitions when need for auto-partition table using #_partitions_need_create.
138139
Status automatic_create_partition();
139140
void clear_batching_stats();
140141

141142
// for auto partition
142-
std::unique_ptr<MutableBlock> _batching_block;
143+
std::unique_ptr<MutableBlock> _batching_block; // same structure with input_block
143144
bool _deal_batched = false; // If true, send batched block before any block's append.
144145

145146
private:
146-
std::pair<vectorized::VExprContextSPtrs, vectorized::VExprSPtrs> _get_partition_function();
147+
std::pair<VExprContextSPtrs, VExprSPtrs> _get_partition_function();
147148

148-
Status _save_missing_values(std::vector<std::vector<std::string>>& col_strs, int col_size,
149+
Status _save_missing_values(const Block& input_block,
150+
std::vector<std::vector<std::string>>& col_strs, int col_size,
149151
Block* block, std::vector<int64_t> filter,
150152
const std::vector<const NullMap*>& col_null_maps);
151153

152-
void _get_tablet_ids(vectorized::Block* block, int32_t index_idx,
153-
std::vector<int64_t>& tablet_ids);
154+
void _get_tablet_ids(Block* block, int32_t index_idx, std::vector<int64_t>& tablet_ids);
154155

155-
void _filter_block_by_skip(vectorized::Block* block, RowPartTabletIds& row_part_tablet_id);
156+
void _filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id);
156157

157-
Status _filter_block_by_skip_and_where_clause(vectorized::Block* block,
158-
const vectorized::VExprContextSPtr& where_clause,
158+
Status _filter_block_by_skip_and_where_clause(Block* block,
159+
const VExprContextSPtr& where_clause,
159160
RowPartTabletIds& row_part_tablet_id);
160161

161-
Status _filter_block(vectorized::Block* block,
162-
std::vector<RowPartTabletIds>& row_part_tablet_ids);
162+
Status _filter_block(Block* block, std::vector<RowPartTabletIds>& row_part_tablet_ids);
163163

164164
Status _generate_rows_distribution_for_auto_partition(
165-
vectorized::Block* block, const std::vector<uint16_t>& partition_col_idx,
165+
const Block& input_block, Block* block, const std::vector<uint16_t>& partition_col_idx,
166166
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
167167
int64_t& rows_stat_val);
168168
// the whole process to deal missing rows. will call _save_missing_values
169-
Status _deal_missing_map(vectorized::Block* block,
169+
Status _deal_missing_map(const Block& input_block, Block* block,
170170
const std::vector<uint16_t>& partition_cols_idx,
171171
int64_t& rows_stat_val);
172172

173173
Status _generate_rows_distribution_for_non_auto_partition(
174-
vectorized::Block* block, bool has_filtered_rows,
174+
Block* block, bool has_filtered_rows,
175175
std::vector<RowPartTabletIds>& row_part_tablet_ids);
176176

177177
Status _generate_rows_distribution_for_auto_overwrite(
178-
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
178+
const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
179179
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
180180
int64_t& rows_stat_val);
181181
Status _replace_overwriting_partition();

be/src/vec/sink/writer/vtablet_writer.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1977,8 +1977,6 @@ Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block& input
19771977
SCOPED_RAW_TIMER(&_send_data_ns);
19781978

19791979
std::shared_ptr<vectorized::Block> block;
1980-
bool has_filtered_rows = false;
1981-
int64_t filtered_rows = 0;
19821980
_number_input_rows += rows;
19831981
// update incrementally so that FE can get the progress.
19841982
// the real 'num_rows_load_total' will be set when sink being closed.
@@ -1989,8 +1987,7 @@ Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block& input
19891987

19901988
_row_distribution_watch.start();
19911989
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
1992-
input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids,
1993-
_number_input_rows));
1990+
input_block, block, _row_part_tablet_ids, _number_input_rows));
19941991

19951992
ChannelDistributionPayloadVec channel_to_payload;
19961993

be/src/vec/sink/writer/vtablet_writer_v2.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -481,17 +481,13 @@ Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
481481
DorisMetrics::instance()->load_rows->increment(input_rows);
482482
DorisMetrics::instance()->load_bytes->increment(input_bytes);
483483

484-
bool has_filtered_rows = false;
485-
int64_t filtered_rows = 0;
486-
487484
SCOPED_RAW_TIMER(&_send_data_ns);
488485
// This is just for passing compilation.
489486
_row_distribution_watch.start();
490487

491488
std::shared_ptr<vectorized::Block> block;
492489
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
493-
input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids,
494-
_number_input_rows));
490+
input_block, block, _row_part_tablet_ids, _number_input_rows));
495491
RowsForTablet rows_for_tablet;
496492
_generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
497493

debug/amazon_finance.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
global-type-constraints:
2+
varchar:
3+
range: {min: 0, max: 2}
4+
5+
tables:
6+
dwd_finance_order_expense_detail_amazon_d:
7+
dt:
8+
range: {min: "2025-11-16", max: "2025-12-22"}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
INSERT OVERWRITE TABLE auto_inc_target PARTITION (*)
2+
SELECT '2099-12-01' AS close_account_month,
3+
ROW_NUMBER() OVER () AS auto_increment_id,
4+
0 AS close_account_status,
5+
`_id`,
6+
main.`transaction_id` AS transaction_id,
7+
CURRENT_TIMESTAMP() AS insert_time
8+
FROM (
9+
SELECT `_id`,
10+
`transaction_id`
11+
FROM auto_inc_src
12+
WHERE dt = '2025-12-22'
13+
) main
14+
LEFT JOIN (
15+
SELECT DISTINCT transaction_id
16+
FROM auto_inc_target
17+
WHERE close_account_status = 1
18+
) et ON main.transaction_id = et.transaction_id
19+
WHERE et.transaction_id IS NULL

0 commit comments

Comments
 (0)