Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,13 +510,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
auto input_rows = block->rows();

if (input_rows > 0) {
bool has_filtered_rows = false;
int64_t filtered_rows = 0;
local_state._number_input_rows += input_rows;

RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
*block, convert_block, filtered_rows, has_filtered_rows,
local_state._row_part_tablet_ids, local_state._number_input_rows));
*block, convert_block, local_state._row_part_tablet_ids,
local_state._number_input_rows));
if (local_state._row_distribution.batching_rows() > 0) {
SCOPED_TIMER(local_state._send_new_partition_timer);
RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
Expand Down
69 changes: 31 additions & 38 deletions be/src/vec/sink/vrow_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@

namespace doris::vectorized {

std::pair<vectorized::VExprContextSPtrs, vectorized::VExprSPtrs>
VRowDistribution::_get_partition_function() {
std::pair<VExprContextSPtrs, VExprSPtrs> VRowDistribution::_get_partition_function() {
return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()};
}

Status VRowDistribution::_save_missing_values(
const Block& input_block,
std::vector<std::vector<std::string>>& col_strs, // non-const ref for move
int col_size, Block* block, std::vector<int64_t> filter,
const std::vector<const NullMap*>& col_null_maps) {
// de-duplication for new partitions but save all rows.
RETURN_IF_ERROR(_batching_block->add_rows(block, filter));
RETURN_IF_ERROR(_batching_block->add_rows(&input_block, filter));
std::vector<TNullableStringLiteral> cur_row_values;
for (int row = 0; row < col_strs[0].size(); ++row) {
cur_row_values.clear();
Expand All @@ -78,7 +78,7 @@ Status VRowDistribution::_save_missing_values(
if (_batching_block->rows() > _batch_size) {
_deal_batched = true;
}

_batching_rows = _batching_block->rows();
VLOG_NOTICE << "pushed some batching lines, now numbers = " << _batching_rows;

return Status::OK();
Expand Down Expand Up @@ -205,7 +205,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
return status;
}

void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_idx,
void VRowDistribution::_get_tablet_ids(Block* block, int32_t index_idx,
std::vector<int64_t>& tablet_ids) {
tablet_ids.reserve(block->rows());
for (int row_idx = 0; row_idx < block->rows(); row_idx++) {
Expand All @@ -221,8 +221,7 @@ void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_i
}
}

void VRowDistribution::_filter_block_by_skip(vectorized::Block* block,
RowPartTabletIds& row_part_tablet_id) {
void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id) {
auto& row_ids = row_part_tablet_id.row_ids;
auto& partition_ids = row_part_tablet_id.partition_ids;
auto& tablet_ids = row_part_tablet_id.tablet_ids;
Expand All @@ -237,8 +236,7 @@ void VRowDistribution::_filter_block_by_skip(vectorized::Block* block,
}

Status VRowDistribution::_filter_block_by_skip_and_where_clause(
vectorized::Block* block, const vectorized::VExprContextSPtr& where_clause,
RowPartTabletIds& row_part_tablet_id) {
Block* block, const VExprContextSPtr& where_clause, RowPartTabletIds& row_part_tablet_id) {
// TODO
//SCOPED_RAW_TIMER(&_stat.where_clause_ns);
int result_index = -1;
Expand All @@ -250,25 +248,23 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
auto& row_ids = row_part_tablet_id.row_ids;
auto& partition_ids = row_part_tablet_id.partition_ids;
auto& tablet_ids = row_part_tablet_id.tablet_ids;
if (const auto* nullable_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*filter_column)) {
if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
for (size_t i = 0; i < block->rows(); i++) {
if (nullable_column->get_bool_inline(i) && !_skip[i]) {
row_ids.emplace_back(i);
partition_ids.emplace_back(_partitions[i]->id);
tablet_ids.emplace_back(_tablet_ids[i]);
}
}
} else if (const auto* const_column =
vectorized::check_and_get_column<vectorized::ColumnConst>(*filter_column)) {
} else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
bool ret = const_column->get_bool(0);
if (!ret) {
return Status::OK();
}
// should we optimize?
_filter_block_by_skip(block, row_part_tablet_id);
} else {
const auto& filter = assert_cast<const vectorized::ColumnUInt8&>(*filter_column).get_data();
const auto& filter = assert_cast<const ColumnUInt8&>(*filter_column).get_data();
for (size_t i = 0; i < block->rows(); i++) {
if (filter[i] != 0 && !_skip[i]) {
row_ids.emplace_back(i);
Expand All @@ -284,7 +280,7 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
return Status::OK();
}

Status VRowDistribution::_filter_block(vectorized::Block* block,
Status VRowDistribution::_filter_block(Block* block,
std::vector<RowPartTabletIds>& row_part_tablet_ids) {
for (int i = 0; i < _schema->indexes().size(); i++) {
_get_tablet_ids(block, i, _tablet_ids);
Expand All @@ -300,8 +296,7 @@ Status VRowDistribution::_filter_block(vectorized::Block* block,
}

Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
vectorized::Block* block, bool has_filtered_rows,
std::vector<RowPartTabletIds>& row_part_tablet_ids) {
Block* block, bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids) {
auto num_rows = block->rows();

RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
Expand All @@ -315,7 +310,7 @@ Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
return Status::OK();
}

Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
Status VRowDistribution::_deal_missing_map(const Block& input_block, Block* block,
const std::vector<uint16_t>& partition_cols_idx,
int64_t& rows_stat_val) {
// for missing partition keys, calc the missing partition and save in _partitions_need_create
Expand Down Expand Up @@ -345,8 +340,8 @@ Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
}

// calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
RETURN_IF_ERROR(
_save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps));
RETURN_IF_ERROR(_save_missing_values(input_block, col_strs, part_col_num, block, _missing_map,
col_null_maps));

size_t new_bt_rows = _batching_block->rows();
size_t new_bt_bytes = _batching_block->bytes();
Expand All @@ -362,7 +357,7 @@ Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
}

Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val) {
auto num_rows = block->rows();
Expand All @@ -384,13 +379,13 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));

if (!_missing_map.empty()) {
RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val));
RETURN_IF_ERROR(_deal_missing_map(input_block, block, partition_cols_idx, rows_stat_val));
}
return Status::OK();
}

Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val) {
auto num_rows = block->rows();
Expand All @@ -413,7 +408,8 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(

// allow and really need to create during auto-detect-overwriting.
if (!_missing_map.empty()) {
RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val));
RETURN_IF_ERROR(
_deal_missing_map(input_block, block, partition_cols_idx, rows_stat_val));
}
} else {
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
Expand Down Expand Up @@ -467,20 +463,19 @@ void VRowDistribution::_reset_row_part_tablet_ids(
}

Status VRowDistribution::generate_rows_distribution(
vectorized::Block& input_block, std::shared_ptr<vectorized::Block>& block,
int64_t& filtered_rows, bool& has_filtered_rows,
Block& input_block, std::shared_ptr<Block>& block,
std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val) {
auto input_rows = input_block.rows();
_reset_row_part_tablet_ids(row_part_tablet_ids, input_rows);

int64_t prev_filtered_rows =
_block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows();
// we store the batching block with value of `input_block`. so just do all of these again.
bool has_filtered_rows = false; // validate may filter some rows. pass for setting _skip
RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
_state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows));

// batching block rows which need new partitions. deal together at finish.
if (!_batching_block) [[unlikely]] {
std::unique_ptr<Block> tmp_block = block->create_same_struct_block(0);
std::unique_ptr<Block> tmp_block = input_block.create_same_struct_block(0);
_batching_block = MutableBlock::create_unique(std::move(*tmp_block));
}

Expand All @@ -498,7 +493,7 @@ Status VRowDistribution::generate_rows_distribution(
// we just calc left range here. leave right to FE to avoid dup calc.
RETURN_IF_ERROR(part_funcs[i]->execute(part_ctxs[i].get(), block.get(), &result_idx));

VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(0, 1);
VLOG_DEBUG << "Partition-calculated block:\n" << block->dump_data(0, 1);
DCHECK(result_idx != -1);

partition_cols_idx.push_back(result_idx);
Expand All @@ -511,20 +506,18 @@ Status VRowDistribution::generate_rows_distribution(
Status st = Status::OK();
if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) {
// when overwrite, no auto create partition allowed.
st = _generate_rows_distribution_for_auto_overwrite(block.get(), partition_cols_idx,
has_filtered_rows, row_part_tablet_ids,
rows_stat_val);
st = _generate_rows_distribution_for_auto_overwrite(input_block, block.get(),
partition_cols_idx, has_filtered_rows,
row_part_tablet_ids, rows_stat_val);
} else if (_vpartition->is_auto_partition() && !_deal_batched) {
st = _generate_rows_distribution_for_auto_partition(block.get(), partition_cols_idx,
has_filtered_rows, row_part_tablet_ids,
rows_stat_val);
st = _generate_rows_distribution_for_auto_partition(input_block, block.get(),
partition_cols_idx, has_filtered_rows,
row_part_tablet_ids, rows_stat_val);
} else { // not auto partition
st = _generate_rows_distribution_for_non_auto_partition(block.get(), has_filtered_rows,
row_part_tablet_ids);
}

filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() -
prev_filtered_rows;
return st;
}

Expand Down
34 changes: 17 additions & 17 deletions be/src/vec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,55 +127,55 @@ class VRowDistribution {
// mv where clause
// v1 needs index->node->row_ids - tabletids
// v2 needs index,tablet->rowids
Status generate_rows_distribution(vectorized::Block& input_block,
std::shared_ptr<vectorized::Block>& block,
int64_t& filtered_rows, bool& has_filtered_rows,
Status generate_rows_distribution(Block& input_block, std::shared_ptr<Block>& block,
std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
// have 2 ways remind to deal batching block:
// 1. in row_distribution, _batching_rows reaches the threshold, this class set _deal_batched = true.
// 2. in caller, after last block and before close, set _deal_batched = true.
bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; }
size_t batching_rows() const { return _batching_rows; }
// create partitions when need for auto-partition table using #_partitions_need_create.
Status automatic_create_partition();
void clear_batching_stats();

// for auto partition
std::unique_ptr<MutableBlock> _batching_block;
std::unique_ptr<MutableBlock> _batching_block; // same structure with input_block
bool _deal_batched = false; // If true, send batched block before any block's append.

private:
std::pair<vectorized::VExprContextSPtrs, vectorized::VExprSPtrs> _get_partition_function();
std::pair<VExprContextSPtrs, VExprSPtrs> _get_partition_function();

Status _save_missing_values(std::vector<std::vector<std::string>>& col_strs, int col_size,
Status _save_missing_values(const Block& input_block,
std::vector<std::vector<std::string>>& col_strs, int col_size,
Block* block, std::vector<int64_t> filter,
const std::vector<const NullMap*>& col_null_maps);

void _get_tablet_ids(vectorized::Block* block, int32_t index_idx,
std::vector<int64_t>& tablet_ids);
void _get_tablet_ids(Block* block, int32_t index_idx, std::vector<int64_t>& tablet_ids);

void _filter_block_by_skip(vectorized::Block* block, RowPartTabletIds& row_part_tablet_id);
void _filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id);

Status _filter_block_by_skip_and_where_clause(vectorized::Block* block,
const vectorized::VExprContextSPtr& where_clause,
Status _filter_block_by_skip_and_where_clause(Block* block,
const VExprContextSPtr& where_clause,
RowPartTabletIds& row_part_tablet_id);

Status _filter_block(vectorized::Block* block,
std::vector<RowPartTabletIds>& row_part_tablet_ids);
Status _filter_block(Block* block, std::vector<RowPartTabletIds>& row_part_tablet_ids);

Status _generate_rows_distribution_for_auto_partition(
vectorized::Block* block, const std::vector<uint16_t>& partition_col_idx,
const Block& input_block, Block* block, const std::vector<uint16_t>& partition_col_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
// the whole process to deal missing rows. will call _save_missing_values
Status _deal_missing_map(vectorized::Block* block,
Status _deal_missing_map(const Block& input_block, Block* block,
const std::vector<uint16_t>& partition_cols_idx,
int64_t& rows_stat_val);

Status _generate_rows_distribution_for_non_auto_partition(
vectorized::Block* block, bool has_filtered_rows,
Block* block, bool has_filtered_rows,
std::vector<RowPartTabletIds>& row_part_tablet_ids);

Status _generate_rows_distribution_for_auto_overwrite(
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
Status _replace_overwriting_partition();
Expand Down
5 changes: 1 addition & 4 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1977,8 +1977,6 @@ Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block& input
SCOPED_RAW_TIMER(&_send_data_ns);

std::shared_ptr<vectorized::Block> block;
bool has_filtered_rows = false;
int64_t filtered_rows = 0;
_number_input_rows += rows;
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
Expand All @@ -1989,8 +1987,7 @@ Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block& input

_row_distribution_watch.start();
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids,
_number_input_rows));
input_block, block, _row_part_tablet_ids, _number_input_rows));

ChannelDistributionPayloadVec channel_to_payload;

Expand Down
6 changes: 1 addition & 5 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,17 +481,13 @@ Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
DorisMetrics::instance()->load_rows->increment(input_rows);
DorisMetrics::instance()->load_bytes->increment(input_bytes);

bool has_filtered_rows = false;
int64_t filtered_rows = 0;

SCOPED_RAW_TIMER(&_send_data_ns);
// This is just for passing compilation.
_row_distribution_watch.start();

std::shared_ptr<vectorized::Block> block;
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids,
_number_input_rows));
input_block, block, _row_part_tablet_ids, _number_input_rows));
RowsForTablet rows_for_tablet;
_generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);

Expand Down
19 changes: 19 additions & 0 deletions regression-test/suites/partition_p1/auto_partition/ddl/iot.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
INSERT OVERWRITE TABLE auto_inc_target PARTITION (*)
SELECT '2099-12-01' AS close_account_month,
ROW_NUMBER() OVER () AS auto_increment_id,
0 AS close_account_status,
`_id`,
main.`transaction_id` AS transaction_id,
CURRENT_TIMESTAMP() AS insert_time
FROM (
SELECT `_id`,
`transaction_id`
FROM auto_inc_src
WHERE dt = '2025-12-22'
) main
LEFT JOIN (
SELECT DISTINCT transaction_id
FROM auto_inc_target
WHERE close_account_status = 1
) et ON main.transaction_id = et.transaction_id
WHERE et.transaction_id IS NULL
11 changes: 11 additions & 0 deletions regression-test/suites/partition_p1/auto_partition/ddl/src.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE `auto_inc_src` (
`dt` date NOT NULL COMMENT "统计日期",
`_id` varchar(255) NULL COMMENT "表ID",
`transaction_id` varchar(255) NULL COMMENT "交易ID",
`insert_time` datetime NULL COMMENT "数仓数据更新时间"
)
DUPLICATE KEY(`dt`)
DISTRIBUTED BY HASH(`transaction_id`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
Loading
Loading