Skip to content

Commit d49d620

Browse files
authored
[refine](expr) Use the new execute interface of expr in some places. (#59315)
before ```C++ Block block; block.insert({ColumnUInt8::create(1), std::make_shared<DataTypeUInt8>(), ""}); int result = -1; RETURN_IF_ERROR(ctx->execute(&block, &result)); DCHECK(result != -1); auto column = block.get_by_position(result).column; DCHECK(column->size() == 1); ``` now ```C++ ColumnWithTypeAndName result; RETURN_IF_ERROR(ctx->execute_const_expr(result)); DCHECK(result.column->size() == 1); ```
1 parent 2823d11 commit d49d620

17 files changed

+68
-96
lines changed

be/src/olap/push_handler.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -523,10 +523,8 @@ Status PushBrokerReader::_convert_to_output_block(vectorized::Block* block) {
523523
vectorized::ColumnPtr column_ptr;
524524

525525
auto& ctx = _dest_expr_ctxs[dest_index];
526-
int result_column_id = -1;
527526
// PT1 => dest primitive type
528-
RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
529-
column_ptr = _src_block.get_by_position(result_column_id).column;
527+
RETURN_IF_ERROR(ctx->execute(&_src_block, column_ptr));
530528
// column_ptr maybe a ColumnConst, convert it to a normal column
531529
column_ptr = column_ptr->convert_to_full_column_if_const();
532530
DCHECK(column_ptr);

be/src/pipeline/exec/analytic_sink_operator.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -901,10 +901,9 @@ size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos
901901
Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
902902
const vectorized::VExprContextSPtr& expr,
903903
vectorized::IColumn* dst_column, size_t length) {
904-
int result_col_id = -1;
905-
RETURN_IF_ERROR(expr->execute(block, &result_col_id));
906-
DCHECK_GE(result_col_id, 0);
907-
auto column = block->get_by_position(result_col_id).column->convert_to_full_column_if_const();
904+
vectorized::ColumnPtr column;
905+
RETURN_IF_ERROR(expr->execute(block, column));
906+
column = column->convert_to_full_column_if_const();
908907
// iff dst_column is string, maybe overflow of 4G, so need ignore overflow
909908
// the column is used by compare_at self to find the range, it's need convert it when overflow?
910909
dst_column->insert_range_from_ignore_overflow(*column, 0, length);

be/src/pipeline/exec/dict_sink_operator.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "common/status.h"
2121
#include "vec/core/block.h"
22+
#include "vec/core/column_with_type_and_name.h"
2223
#include "vec/functions/complex_hash_map_dictionary.h"
2324
#include "vec/functions/dictionary_factory.h"
2425
#include "vec/functions/dictionary_util.h"
@@ -53,20 +54,21 @@ Status DictSinkLocalState::load_dict(RuntimeState* state) {
5354

5455
for (long key_expr_id : p._key_output_expr_slots) {
5556
auto key_expr_ctx = _output_vexpr_ctxs[key_expr_id];
56-
int key_column_id = -1;
57-
RETURN_IF_ERROR(key_expr_ctx->execute(&input_block, &key_column_id));
58-
key_data.push_back(input_block.get_by_position(key_column_id));
57+
vectorized::ColumnWithTypeAndName key_exec_data;
58+
RETURN_IF_ERROR(key_expr_ctx->execute(&input_block, key_exec_data));
59+
60+
key_data.push_back(key_exec_data);
5961
}
6062

6163
for (size_t i = 0; i < p._value_output_expr_slots.size(); i++) {
6264
auto value_expr_id = p._value_output_expr_slots[i];
6365
auto value_name = p._value_names[i];
6466
auto value_expr_ctx = _output_vexpr_ctxs[value_expr_id];
65-
int value_column_id = -1;
66-
RETURN_IF_ERROR(value_expr_ctx->execute(&input_block, &value_column_id));
67-
auto att_data = input_block.get_by_position(value_column_id);
68-
att_data.name = value_name;
69-
value_data.push_back(att_data);
67+
68+
vectorized::ColumnPtr value_column;
69+
RETURN_IF_ERROR(value_expr_ctx->execute(&input_block, value_column));
70+
auto value_type = value_expr_ctx->execute_type(&input_block);
71+
value_data.push_back({value_column, value_type, value_name});
7072
}
7173

7274
RETURN_IF_ERROR(check_dict_input_data(key_data, value_data, p._skip_null_key));

be/src/pipeline/exec/operator.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -309,16 +309,16 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
309309
}
310310
vectorized::Block input_block = *origin_block;
311311

312-
std::vector<int> result_column_ids;
313312
size_t bytes_usage = 0;
313+
vectorized::ColumnsWithTypeAndName new_columns;
314314
for (const auto& projections : local_state->_intermediate_projections) {
315-
result_column_ids.resize(projections.size());
315+
new_columns.resize(projections.size());
316316
for (int i = 0; i < projections.size(); i++) {
317-
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
317+
RETURN_IF_ERROR(projections[i]->execute(&input_block, new_columns[i]));
318318
}
319-
320-
bytes_usage += input_block.allocated_bytes();
321-
input_block.shuffle_columns(result_column_ids);
319+
vectorized::Block tmp_block {new_columns};
320+
bytes_usage += tmp_block.allocated_bytes();
321+
input_block.swap(tmp_block);
322322
}
323323

324324
DCHECK_EQ(rows, input_block.rows());

be/src/pipeline/exec/partition_sort_sink_operator.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
3535
auto& p = _parent->cast<PartitionSortSinkOperatorX>();
3636
RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
3737
_partition_expr_ctxs.resize(p._partition_expr_ctxs.size());
38-
_partition_columns.resize(p._partition_expr_ctxs.size());
3938
for (size_t i = 0; i < p._partition_expr_ctxs.size(); i++) {
4039
RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state, _partition_expr_ctxs[i]));
4140
}
@@ -181,15 +180,13 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
181180

182181
Status PartitionSortSinkOperatorX::_split_block_by_partition(
183182
vectorized::Block* input_block, PartitionSortSinkLocalState& local_state, bool eos) {
183+
vectorized::ColumnRawPtrs key_columns_raw_ptr(_partition_exprs_num);
184+
vectorized::Columns key_columns(_partition_exprs_num);
184185
for (int i = 0; i < _partition_exprs_num; ++i) {
185-
int result_column_id = -1;
186-
RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, &result_column_id));
187-
DCHECK(result_column_id != -1);
188-
local_state._partition_columns[i] =
189-
input_block->get_by_position(result_column_id).column.get();
186+
RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, key_columns[i]));
187+
key_columns_raw_ptr[i] = key_columns[i].get();
190188
}
191-
RETURN_IF_ERROR(_emplace_into_hash_table(local_state._partition_columns, input_block,
192-
local_state, eos));
189+
RETURN_IF_ERROR(_emplace_into_hash_table(key_columns_raw_ptr, input_block, local_state, eos));
193190
return Status::OK();
194191
}
195192

be/src/pipeline/exec/partition_sort_sink_operator.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState<PartitionSort
4747
int64_t _sorted_partition_input_rows = 0;
4848
std::vector<PartitionDataPtr> _value_places;
4949
int _num_partition = 0;
50-
std::vector<const vectorized::IColumn*> _partition_columns;
5150
std::unique_ptr<PartitionedHashMapVariants> _partitioned_data;
5251
std::unique_ptr<vectorized::Arena> _agg_arena_pool;
5352
int _partition_exprs_num = 0;

be/src/pipeline/exec/repeat_operator.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "pipeline/exec/operator.h"
2424
#include "vec/common/assert_cast.h"
2525
#include "vec/core/block.h"
26+
#include "vec/core/column_with_type_and_name.h"
2627

2728
namespace doris {
2829
#include "common/compile_check_begin.h"
@@ -190,13 +191,10 @@ Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block
190191
intermediate_block = vectorized::Block::create_unique();
191192

192193
for (auto& expr : expr_ctxs) {
193-
int result_column_id = -1;
194-
RETURN_IF_ERROR(expr->execute(input_block, &result_column_id));
195-
DCHECK(result_column_id != -1);
196-
input_block->get_by_position(result_column_id).column =
197-
input_block->get_by_position(result_column_id)
198-
.column->convert_to_full_column_if_const();
199-
intermediate_block->insert(input_block->get_by_position(result_column_id));
194+
vectorized::ColumnWithTypeAndName result_data;
195+
RETURN_IF_ERROR(expr->execute(input_block, result_data));
196+
result_data.column = result_data.column->convert_to_full_column_if_const();
197+
intermediate_block->insert(result_data);
200198
}
201199
DCHECK_EQ(expr_ctxs.size(), intermediate_block->columns());
202200
}

be/src/pipeline/exec/union_sink_operator.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "common/status.h"
2525
#include "operator.h"
2626
#include "vec/core/block.h"
27+
#include "vec/core/column_with_type_and_name.h"
2728

2829
namespace doris {
2930
#include "common/compile_check_begin.h"
@@ -166,9 +167,9 @@ class UnionSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<UnionSink
166167
const auto& child_exprs = local_state._child_expr;
167168
vectorized::ColumnsWithTypeAndName colunms;
168169
for (size_t i = 0; i < child_exprs.size(); ++i) {
169-
int result_column_id = -1;
170-
RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id));
171-
colunms.emplace_back(src_block->get_by_position(result_column_id));
170+
vectorized::ColumnWithTypeAndName result_data;
171+
RETURN_IF_ERROR(child_exprs[i]->execute(src_block, result_data));
172+
colunms.emplace_back(result_data);
172173
}
173174
local_state._child_row_idx += src_block->rows();
174175
*res_block = {colunms};

be/src/vec/common/sort/sorter.cpp

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -154,18 +154,12 @@ Status Sorter::partial_sort(Block& src_block, Block& dest_block, bool reversed)
154154
Status Sorter::_prepare_sort_columns(Block& src_block, Block& dest_block, bool reversed) {
155155
if (_materialize_sort_exprs) {
156156
auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
157-
std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
157+
ColumnsWithTypeAndName columns_data(output_tuple_expr_ctxs.size());
158158
for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) {
159-
RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&src_block, &valid_column_ids[i]));
159+
RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&src_block, columns_data[i]));
160160
}
161161

162-
Block new_block;
163-
for (auto column_id : valid_column_ids) {
164-
if (column_id < 0) {
165-
continue;
166-
}
167-
new_block.insert(src_block.get_by_position(column_id));
168-
}
162+
Block new_block {columns_data};
169163
dest_block.swap(new_block);
170164
}
171165

be/src/vec/exec/format/json/new_json_reader.cpp

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,18 +1495,11 @@ Status NewJsonReader::_get_column_default_value(
14951495
if (ctx->root()->node_type() == TExprNodeType::type::NULL_LITERAL) {
14961496
continue;
14971497
}
1498-
// empty block to save default value of slot_desc->col_name()
1499-
Block block;
1500-
// If block is empty, some functions will produce no result. So we insert a column with
1501-
// single value here.
1502-
block.insert({ColumnUInt8::create(1), std::make_shared<DataTypeUInt8>(), ""});
1503-
int result = -1;
1504-
RETURN_IF_ERROR(ctx->execute(&block, &result));
1505-
DCHECK(result != -1);
1506-
auto column = block.get_by_position(result).column;
1507-
DCHECK(column->size() == 1);
1498+
ColumnWithTypeAndName result;
1499+
RETURN_IF_ERROR(ctx->execute_const_expr(result));
1500+
DCHECK(result.column->size() == 1);
15081501
_col_default_value_map.emplace(slot_desc->col_name(),
1509-
column->get_data_at(0).to_string());
1502+
result.column->get_data_at(0).to_string());
15101503
}
15111504
}
15121505
return Status::OK();

0 commit comments

Comments
 (0)