diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 04e23b82ac96b1..add4e0482bfb2e 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -510,13 +510,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block auto input_rows = block->rows(); if (input_rows > 0) { - bool has_filtered_rows = false; - int64_t filtered_rows = 0; local_state._number_input_rows += input_rows; RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution( - *block, convert_block, filtered_rows, has_filtered_rows, - local_state._row_part_tablet_ids, local_state._number_input_rows)); + *block, convert_block, local_state._row_part_tablet_ids, + local_state._number_input_rows)); if (local_state._row_distribution.batching_rows() > 0) { SCOPED_TIMER(local_state._send_new_partition_timer); RETURN_IF_ERROR(local_state._send_new_partition_batch(block)); diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 1f0bdd96b857bd..5c10a1c1c68048 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -43,17 +43,17 @@ namespace doris::vectorized { -std::pair -VRowDistribution::_get_partition_function() { +std::pair VRowDistribution::_get_partition_function() { return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; } Status VRowDistribution::_save_missing_values( + const Block& input_block, std::vector>& col_strs, // non-const ref for move int col_size, Block* block, std::vector filter, const std::vector& col_null_maps) { // de-duplication for new partitions but save all rows. - RETURN_IF_ERROR(_batching_block->add_rows(block, filter)); + RETURN_IF_ERROR(_batching_block->add_rows(&input_block, filter)); std::vector cur_row_values; for (int row = 0; row < col_strs[0].size(); ++row) { cur_row_values.clear(); @@ -78,7 +78,7 @@ Status VRowDistribution::_save_missing_values( if (_batching_block->rows() > _batch_size) { _deal_batched = true; } - + _batching_rows = _batching_block->rows(); VLOG_NOTICE << "pushed some batching lines, now numbers = " << _batching_rows; return Status::OK(); @@ -205,7 +205,7 @@ Status VRowDistribution::_replace_overwriting_partition() { return status; } -void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_idx, +void VRowDistribution::_get_tablet_ids(Block* block, int32_t index_idx, std::vector& tablet_ids) { tablet_ids.reserve(block->rows()); for (int row_idx = 0; row_idx < block->rows(); row_idx++) { @@ -221,8 +221,7 @@ void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_i } } -void VRowDistribution::_filter_block_by_skip(vectorized::Block* block, - RowPartTabletIds& row_part_tablet_id) { +void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id) { auto& row_ids = row_part_tablet_id.row_ids; auto& partition_ids = row_part_tablet_id.partition_ids; auto& tablet_ids = row_part_tablet_id.tablet_ids; @@ -237,8 +236,7 @@ void VRowDistribution::_filter_block_by_skip(vectorized::Block* block, } Status VRowDistribution::_filter_block_by_skip_and_where_clause( - vectorized::Block* block, const vectorized::VExprContextSPtr& where_clause, - RowPartTabletIds& row_part_tablet_id) { + Block* block, const VExprContextSPtr& where_clause, RowPartTabletIds& row_part_tablet_id) { // TODO //SCOPED_RAW_TIMER(&_stat.where_clause_ns); int result_index = -1; @@ -250,8 +248,7 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( auto& row_ids = row_part_tablet_id.row_ids; auto& partition_ids = row_part_tablet_id.partition_ids; auto& tablet_ids = row_part_tablet_id.tablet_ids; - if (const auto* nullable_column = - vectorized::check_and_get_column(*filter_column)) { + if (const auto* nullable_column = check_and_get_column(*filter_column)) { for (size_t i = 0; i < block->rows(); i++) { if (nullable_column->get_bool_inline(i) && !_skip[i]) { row_ids.emplace_back(i); @@ -259,8 +256,7 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( tablet_ids.emplace_back(_tablet_ids[i]); } } - } else if (const auto* const_column = - vectorized::check_and_get_column(*filter_column)) { + } else if (const auto* const_column = check_and_get_column(*filter_column)) { bool ret = const_column->get_bool(0); if (!ret) { return Status::OK(); @@ -268,7 +264,7 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( // should we optimize? _filter_block_by_skip(block, row_part_tablet_id); } else { - const auto& filter = assert_cast(*filter_column).get_data(); + const auto& filter = assert_cast(*filter_column).get_data(); for (size_t i = 0; i < block->rows(); i++) { if (filter[i] != 0 && !_skip[i]) { row_ids.emplace_back(i); @@ -284,7 +280,7 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause( return Status::OK(); } -Status VRowDistribution::_filter_block(vectorized::Block* block, +Status VRowDistribution::_filter_block(Block* block, std::vector& row_part_tablet_ids) { for (int i = 0; i < _schema->indexes().size(); i++) { _get_tablet_ids(block, i, _tablet_ids); @@ -300,8 +296,7 @@ Status VRowDistribution::_filter_block(vectorized::Block* block, } Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition( - vectorized::Block* block, bool has_filtered_rows, - std::vector& row_part_tablet_ids) { + Block* block, bool has_filtered_rows, std::vector& row_part_tablet_ids) { auto num_rows = block->rows(); RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, @@ -315,7 +310,7 @@ Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition( return Status::OK(); } -Status VRowDistribution::_deal_missing_map(vectorized::Block* block, +Status VRowDistribution::_deal_missing_map(const Block& input_block, Block* block, const std::vector& partition_cols_idx, int64_t& rows_stat_val) { // for missing partition keys, calc the missing partition and save in _partitions_need_create @@ -345,8 +340,8 @@ Status VRowDistribution::_deal_missing_map(vectorized::Block* block, } // calc the end value and save them. in the end of sending, we will create partitions for them and deal them. - RETURN_IF_ERROR( - _save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps)); + RETURN_IF_ERROR(_save_missing_values(input_block, col_strs, part_col_num, block, _missing_map, + col_null_maps)); size_t new_bt_rows = _batching_block->rows(); size_t new_bt_bytes = _batching_block->bytes(); @@ -362,7 +357,7 @@ Status VRowDistribution::_deal_missing_map(vectorized::Block* block, } Status VRowDistribution::_generate_rows_distribution_for_auto_partition( - vectorized::Block* block, const std::vector& partition_cols_idx, + const Block& input_block, Block* block, const std::vector& partition_cols_idx, bool has_filtered_rows, std::vector& row_part_tablet_ids, int64_t& rows_stat_val) { auto num_rows = block->rows(); @@ -384,13 +379,13 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition( RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); if (!_missing_map.empty()) { - RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val)); + RETURN_IF_ERROR(_deal_missing_map(input_block, block, partition_cols_idx, rows_stat_val)); } return Status::OK(); } Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( - vectorized::Block* block, const std::vector& partition_cols_idx, + const Block& input_block, Block* block, const std::vector& partition_cols_idx, bool has_filtered_rows, std::vector& row_part_tablet_ids, int64_t& rows_stat_val) { auto num_rows = block->rows(); @@ -413,7 +408,8 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( // allow and really need to create during auto-detect-overwriting. if (!_missing_map.empty()) { - RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val)); + RETURN_IF_ERROR( + _deal_missing_map(input_block, block, partition_cols_idx, rows_stat_val)); } } else { RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, @@ -467,20 +463,19 @@ void VRowDistribution::_reset_row_part_tablet_ids( } Status VRowDistribution::generate_rows_distribution( - vectorized::Block& input_block, std::shared_ptr& block, - int64_t& filtered_rows, bool& has_filtered_rows, + Block& input_block, std::shared_ptr& block, std::vector& row_part_tablet_ids, int64_t& rows_stat_val) { auto input_rows = input_block.rows(); _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows); - int64_t prev_filtered_rows = - _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows(); + // we store the batching block with value of `input_block`. so just do all of these again. + bool has_filtered_rows = false; // validate may filter some rows. pass for setting _skip RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows)); // batching block rows which need new partitions. deal together at finish. if (!_batching_block) [[unlikely]] { - std::unique_ptr tmp_block = block->create_same_struct_block(0); + std::unique_ptr tmp_block = input_block.create_same_struct_block(0); _batching_block = MutableBlock::create_unique(std::move(*tmp_block)); } @@ -498,7 +493,7 @@ Status VRowDistribution::generate_rows_distribution( // we just calc left range here. leave right to FE to avoid dup calc. RETURN_IF_ERROR(part_funcs[i]->execute(part_ctxs[i].get(), block.get(), &result_idx)); - VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(0, 1); + VLOG_DEBUG << "Partition-calculated block:\n" << block->dump_data(0, 1); DCHECK(result_idx != -1); partition_cols_idx.push_back(result_idx); @@ -511,20 +506,18 @@ Status VRowDistribution::generate_rows_distribution( Status st = Status::OK(); if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) { // when overwrite, no auto create partition allowed. - st = _generate_rows_distribution_for_auto_overwrite(block.get(), partition_cols_idx, - has_filtered_rows, row_part_tablet_ids, - rows_stat_val); + st = _generate_rows_distribution_for_auto_overwrite(input_block, block.get(), + partition_cols_idx, has_filtered_rows, + row_part_tablet_ids, rows_stat_val); } else if (_vpartition->is_auto_partition() && !_deal_batched) { - st = _generate_rows_distribution_for_auto_partition(block.get(), partition_cols_idx, - has_filtered_rows, row_part_tablet_ids, - rows_stat_val); + st = _generate_rows_distribution_for_auto_partition(input_block, block.get(), + partition_cols_idx, has_filtered_rows, + row_part_tablet_ids, rows_stat_val); } else { // not auto partition st = _generate_rows_distribution_for_non_auto_partition(block.get(), has_filtered_rows, row_part_tablet_ids); } - filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() - - prev_filtered_rows; return st; } diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 429b67bb068af5..5987a0eadcbf3b 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -127,11 +127,12 @@ class VRowDistribution { // mv where clause // v1 needs index->node->row_ids - tabletids // v2 needs index,tablet->rowids - Status generate_rows_distribution(vectorized::Block& input_block, - std::shared_ptr& block, - int64_t& filtered_rows, bool& has_filtered_rows, + Status generate_rows_distribution(Block& input_block, std::shared_ptr& block, std::vector& row_part_tablet_ids, int64_t& rows_stat_val); + // have 2 ways remind to deal batching block: + // 1. in row_distribution, _batching_rows reaches the threshold, this class set _deal_batched = true. + // 2. in caller, after last block and before close, set _deal_batched = true. bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; } size_t batching_rows() const { return _batching_rows; } // create partitions when need for auto-partition table using #_partitions_need_create. @@ -139,43 +140,42 @@ class VRowDistribution { void clear_batching_stats(); // for auto partition - std::unique_ptr _batching_block; + std::unique_ptr _batching_block; // same structure with input_block bool _deal_batched = false; // If true, send batched block before any block's append. private: - std::pair _get_partition_function(); + std::pair _get_partition_function(); - Status _save_missing_values(std::vector>& col_strs, int col_size, + Status _save_missing_values(const Block& input_block, + std::vector>& col_strs, int col_size, Block* block, std::vector filter, const std::vector& col_null_maps); - void _get_tablet_ids(vectorized::Block* block, int32_t index_idx, - std::vector& tablet_ids); + void _get_tablet_ids(Block* block, int32_t index_idx, std::vector& tablet_ids); - void _filter_block_by_skip(vectorized::Block* block, RowPartTabletIds& row_part_tablet_id); + void _filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id); - Status _filter_block_by_skip_and_where_clause(vectorized::Block* block, - const vectorized::VExprContextSPtr& where_clause, + Status _filter_block_by_skip_and_where_clause(Block* block, + const VExprContextSPtr& where_clause, RowPartTabletIds& row_part_tablet_id); - Status _filter_block(vectorized::Block* block, - std::vector& row_part_tablet_ids); + Status _filter_block(Block* block, std::vector& row_part_tablet_ids); Status _generate_rows_distribution_for_auto_partition( - vectorized::Block* block, const std::vector& partition_col_idx, + const Block& input_block, Block* block, const std::vector& partition_col_idx, bool has_filtered_rows, std::vector& row_part_tablet_ids, int64_t& rows_stat_val); // the whole process to deal missing rows. will call _save_missing_values - Status _deal_missing_map(vectorized::Block* block, + Status _deal_missing_map(const Block& input_block, Block* block, const std::vector& partition_cols_idx, int64_t& rows_stat_val); Status _generate_rows_distribution_for_non_auto_partition( - vectorized::Block* block, bool has_filtered_rows, + Block* block, bool has_filtered_rows, std::vector& row_part_tablet_ids); Status _generate_rows_distribution_for_auto_overwrite( - vectorized::Block* block, const std::vector& partition_cols_idx, + const Block& input_block, Block* block, const std::vector& partition_cols_idx, bool has_filtered_rows, std::vector& row_part_tablet_ids, int64_t& rows_stat_val); Status _replace_overwriting_partition(); diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 4629236fd5e3bc..2bd2256c7044d6 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1977,8 +1977,6 @@ Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block& input SCOPED_RAW_TIMER(&_send_data_ns); std::shared_ptr block; - bool has_filtered_rows = false; - int64_t filtered_rows = 0; _number_input_rows += rows; // update incrementally so that FE can get the progress. // the real 'num_rows_load_total' will be set when sink being closed. @@ -1989,8 +1987,7 @@ Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block& input _row_distribution_watch.start(); RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( - input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids, - _number_input_rows)); + input_block, block, _row_part_tablet_ids, _number_input_rows)); ChannelDistributionPayloadVec channel_to_payload; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 2125ad8726cdb5..dbf770856fbe27 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -481,17 +481,13 @@ Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) { DorisMetrics::instance()->load_rows->increment(input_rows); DorisMetrics::instance()->load_bytes->increment(input_bytes); - bool has_filtered_rows = false; - int64_t filtered_rows = 0; - SCOPED_RAW_TIMER(&_send_data_ns); // This is just for passing compilation. _row_distribution_watch.start(); std::shared_ptr block; RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( - input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids, - _number_input_rows)); + input_block, block, _row_part_tablet_ids, _number_input_rows)); RowsForTablet rows_for_tablet; _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet); diff --git a/regression-test/suites/partition_p1/auto_partition/ddl/iot.sql b/regression-test/suites/partition_p1/auto_partition/ddl/iot.sql new file mode 100644 index 00000000000000..ebfb15139acaa7 --- /dev/null +++ b/regression-test/suites/partition_p1/auto_partition/ddl/iot.sql @@ -0,0 +1,19 @@ +INSERT OVERWRITE TABLE auto_inc_target PARTITION (*) +SELECT '2099-12-01' AS close_account_month, + ROW_NUMBER() OVER () AS auto_increment_id, + 0 AS close_account_status, + `_id`, + main.`transaction_id` AS transaction_id, + CURRENT_TIMESTAMP() AS insert_time +FROM ( + SELECT `_id`, + `transaction_id` + FROM auto_inc_src + WHERE dt = '2025-12-22' + ) main + LEFT JOIN ( + SELECT DISTINCT transaction_id + FROM auto_inc_target + WHERE close_account_status = 1 + ) et ON main.transaction_id = et.transaction_id +WHERE et.transaction_id IS NULL \ No newline at end of file diff --git a/regression-test/suites/partition_p1/auto_partition/ddl/src.sql b/regression-test/suites/partition_p1/auto_partition/ddl/src.sql new file mode 100644 index 00000000000000..7d82f9f7892b90 --- /dev/null +++ b/regression-test/suites/partition_p1/auto_partition/ddl/src.sql @@ -0,0 +1,11 @@ +CREATE TABLE `auto_inc_src` ( + `dt` date NOT NULL COMMENT "统计日期", + `_id` varchar(255) NULL COMMENT "表ID", + `transaction_id` varchar(255) NULL COMMENT "交易ID", + `insert_time` datetime NULL COMMENT "数仓数据更新时间" +) +DUPLICATE KEY(`dt`) +DISTRIBUTED BY HASH(`transaction_id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1" +); \ No newline at end of file diff --git a/regression-test/suites/partition_p1/auto_partition/ddl/target.sql b/regression-test/suites/partition_p1/auto_partition/ddl/target.sql new file mode 100644 index 00000000000000..cd42180dc73592 --- /dev/null +++ b/regression-test/suites/partition_p1/auto_partition/ddl/target.sql @@ -0,0 +1,15 @@ +CREATE TABLE `auto_inc_target` ( + `close_account_month` date NOT NULL COMMENT "关账月份", + `auto_increment_id` bigint NOT NULL AUTO_INCREMENT(1) COMMENT "自增ID", + `close_account_status` int NULL COMMENT "关账状态(1关账/0未关账)", + `_id` varchar(255) NULL COMMENT "表ID", + `transaction_id` varchar(255) NULL COMMENT "交易ID", + `insert_time` datetime NULL COMMENT "数仓数据更新时间" +) +DUPLICATE KEY(`close_account_month`, `auto_increment_id`) +AUTO PARTITION BY RANGE (date_trunc(`close_account_month`, 'month')) +() +DISTRIBUTED BY HASH(`transaction_id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1" +); \ No newline at end of file diff --git a/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/auto_partition_auto_inc.yaml b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/auto_partition_auto_inc.yaml new file mode 100644 index 00000000000000..178a1074aeb802 --- /dev/null +++ b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/auto_partition_auto_inc.yaml @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +global-type-constraints: + varchar: + range: {min: 0, max: 2} + +tables: + auto_inc_src: + dt: + range: {min: "2025-11-16", max: "2025-12-22"} diff --git a/regression-test/suites/partition_p1/auto_partition/sql/auto_partition_auto_inc.groovy b/regression-test/suites/partition_p1/auto_partition/sql/auto_partition_auto_inc.groovy new file mode 100644 index 00000000000000..16232b629c2328 --- /dev/null +++ b/regression-test/suites/partition_p1/auto_partition/sql/auto_partition_auto_inc.groovy @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.io.FileType +import java.nio.file.Files +import java.nio.file.Paths + +suite("auto_partition_auto_inc", "p1, nonConcurrent") { // set global vars + // get doris-db from s3 + def dirPath = context.file.parent + def fatherPath = context.file.parentFile.parentFile.getPath() + def fileName = "doris-dbgen" + def fileUrl = "${getS3Url()}/regression/doris-dbgen-23-10-18/doris-dbgen-23-10-20/doris-dbgen" + def filePath = Paths.get(dirPath, fileName) + if (!Files.exists(filePath)) { + new URL(fileUrl).withInputStream { inputStream -> + Files.copy(inputStream, filePath) + } + def file = new File(dirPath + "/" + fileName) + file.setExecutable(true) + } + + def rows = 200000 + + // load data via doris-dbgen + def doris_dbgen_create_data = { db_name, tb_name -> + def tableName = tb_name + + def jdbcUrl = context.config.jdbcUrl + def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port + if (urlWithoutSchema.indexOf("/") >= 0) { + // e.g: jdbc:mysql://locahost:8080/?a=b + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) + } else { + // e.g: jdbc:mysql://locahost:8080 + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + } + String feHttpAddress = context.config.feHttpAddress + def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1) + + String realDb = db_name + String user = context.config.jdbcUser + String password = context.config.jdbcPassword + + def cm + if (password) { + cm = """ + ${dirPath}/doris-dbgen gen + --host ${sql_ip} + --sql-port ${sql_port} + --user ${user} + --password ${password} + --database ${realDb} + --table ${tableName} + --rows ${rows} + --http-port ${http_port} + --config ${fatherPath}/doris_dbgen_conf/stress_test_insert_into.yaml + """ + } else { + cm = """ + ${dirPath}/doris-dbgen gen + --host ${sql_ip} + --sql-port ${sql_port} + --user ${user} + --database ${realDb} + --table ${tableName} + --rows ${rows} + --http-port ${http_port} + --config ${fatherPath}/doris_dbgen_conf/stress_test_insert_into.yaml + """ + } + + logger.info("datagen: " + cm) + def proc = cm.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(1800000) + logger.info("std out: " + sout + ", std err: " + serr) + } + + def database_name = "regression_test_auto_partition_auto_inc" + def table_src = "auto_inc_src" + def table_dest = "auto_inc_target" + + sql "set global enable_auto_create_when_overwrite=true;" + try { + sql """drop database if exists ${database_name};""" + sql """create database ${database_name};""" + sql """use ${database_name};""" + sql """drop table if exists ${table_src};""" + sql """drop table if exists ${table_dest};""" + sql new File("""${fatherPath}/ddl/src.sql""").text + sql new File("""${fatherPath}/ddl/target.sql""").text + doris_dbgen_create_data(database_name, table_src) + sql """ + INSERT INTO auto_inc_src + (dt, + `_id`, + `transaction_id`, + `insert_time`) + SELECT + '2025-12-22' AS dt, + `_id`, + `transaction_id`, + `insert_time` + FROM auto_inc_src; + """ + sql """ + INSERT INTO auto_inc_target + (close_account_month, close_account_status) + VALUES + ('2025-11-01', 1), + ('2025-12-01', 0); + """ + sql new File("""${fatherPath}/ddl/iot.sql""").text + } finally { + sql "set global enable_auto_create_when_overwrite=false;" + } + + // TEST-BODY + def count_src = sql " select count() from ${table_src}; " + def count_dest = sql " select count() from ${table_dest}; " + assertTrue(count_dest[0][0] > 2) +} \ No newline at end of file